Spark
This topics explains the most popular framework in Big Data Ecosystem called Spark and explains in detail about concepts, architecture and working of the Spark framework.
What is Apache Spark
• Is a fast in-memory data-processing engine
• Ability to efficiently execute streaming, machine-learning or SQL workloads which require fast-iterative access to data-sets
• Can run on top of Apache Hadoop YARN
• Is designed for data-science and its abstraction makes data-science easier
• It can cache data-set in memory and speed up iterative data-processing•
• Includes ML-lib
• Is 100 folds faster than MR, in benchmark tests
Hadoop Map Reduce Limitations
• It’s based on disk based computing
• Suitable for single pass computations -not iterative computations
• Needs a sequence of MR jobs to run iterative tasks
• Needs integration with several other frameworks/tools to solve bigdata usecases
• Apache Storm for stream data processing
• Apache Mahout for machine learning
Performance
• Spark processes data in memory, while MR persists back to disk, after a map-reduce job
• So spark should out-perform MR
• Nonetheless, Spark needs a lot of memory
• If data is too big to fit in memory, then there will be major performance degradation for spark
• MR kills its job, as soon as it’s done
• So it can run easily alongside other services with minor performance differences
• Still Spark has an advantage, as long as we are talking about iterative operations on the data
Spark v/s Hadoop MR
Ease of Use
• Spark has api’s for java, scala & python
• Spark SQL (formerly Shark)
• In spark, it’s easy to write user-defined functions
• Hadoop MR in java, is difficult to program
• Pig however makes it easier(need to learn syntax) and Hive adds SQL compatibility
• Unlike Spark, MR doesn’t have an interactive mode
Cost
• Memory in spark cluster needs to be atleast as large at the data being processed
•So, Hadoop likewise is a cheaper option
• Considering spark benchmark’s -less amount of hardware can perform same task much more faster, especially on cloud where compute power is paid per use
Compatibility
• Spark can run standalone or on top of Hadoop-YARN or on MESOS on premise
• It supports datasources which implement Hadoop input format
• So, it is compatible with all data-sources and file-formats supported by Hadoop•
It also works with BI tools via JDBC and ODBC
Data Processing
• Spark can do more than just data processing
• It can process graphs and work with existing ml-libraries
• Spark can do real-time as well as batch processing
• Hadoop MR is great for batch processing• But if you want real-time options on top of it, you will have to use platforms like Storm and Impala, Giraph -for graph processing
• Spark is the swiss army knife of data processing
• Hadoop MR is the command knife of batch processing
Fault Tolerance
• Spark has retires per task and speculative execution-like MR
• But MR relies on hard-drives
• So if a process crashes in the middle of execution, it can carry on from where it left off
• However Spark will have to retry from the beginning
• Both have good failure tolerance
• MR is slightly more tolerant
Security
• Spark can run on top of YARN and use HDFS
• So it can enjoy kerberos authentication, HDFS file permissions and encryption between nodes
• Hadoop MR can enjoy all Hadoop security benefits and integrate with Hadoop security projects like Knox Gateway and Sentry
RDD-Resilient Distributed Dataset
• It is the primary abstraction in Spark and is the core of Apache Spark
• One could compare RDDs to collections in Scala, a RDD is computed on many JVMs while a Scala collection lives on a single JVM
• Immutable and partitioned collection of records, which can only be created by coarse grained operations such as map, filter, group-by, etc
• Can only be created by reading data from a stable storage like HDFS or by transformations on existing RDD’s
Why RDD?
• For iterative distributed computing, it’s common to reuse and share data among multiple jobs or do parallel ad-hoc queries over a shared dataset
• The persistent issue with data reuse or data sharing exists in distributed computing system(like MR) -that is, you need to store data in some intermediate stable distributed store such as HDFS or Amazon S3
• This makes overall computation of jobs slower as it involves multiple IO operations, replications and serializations in the process
Features of RDD
• Resilient, fault-tolerant with the help of RDD lineage graph and so able to recompute missing or damaged partitions due to node failures
• Distributed with data residing on multiple nodes in a cluster
• Dataset is a collection of partitioned data with primitive values or values of values, e.g tuples or other objects
How Are RDD’s Fault Tolerant?
• As RDD’s are created over a set of transformations, it logs these transformations rather than actual data
• Graph of transformations to produce one RDD is called a Lineage Graph
For example -firstRDD=spark.textFile("hdfs://… ") secondRDD=firstRDD.filter(someFunction) thirdRDD = secondRDD.map(someFunction)
Lineage graph
• In case of we lose some partition of RDD , we can replay the transformation on that partition in lineage to achieve the same computation
• This is the biggest benefit of RDD , because it saves a lot of efforts in data management and replication and thus achieves faster computations
Additional Traits of RDD’s
•In-Memory , data inside RDD is stored in memory as much (size) and long (time) as possible
•Immutable or Read-Only , it does not change once created and can only be transformed using transformations to new RDDs
•Lazy evaluated , the data inside RDD is not available or transformed until an action is executed that triggers the execution
•Cacheable , you can hold all the data in a persistent “storage” like memory (default and the most preferred) or disk (the least preferred due to access speed)
•Parallel, process data in parallel
•Typed-RDD records have types, Long in RDD[Long] or (Int, String) in RDD[(Int, String)]
•Partitioned-records are partitioned (split into logical partitions) and distributed across nodes in a cluster
•Location-Stickiness-RDD can define placement preferences to compute partitions (as close to the records as possible)
RDD Supports Two Kinds of Operations
•Actions -operations that trigger computation and return values
•Transformations -lazy operations that return another RDD
Actions
• Actions are RDD operations that produce non-RDD values
• In other words, a RDD operation that returns a value of any type except RDD[T] is an action
• They trigger execution of RDD transformations to return values
• Simply put, an action evaluates the RDD lineage graph
• You can think of actions as a valve and until action is fired, the data to be processed is not even in the pipes, transformations
• Only actions can materialize the entire processing pipeline with real data• • Actions are one of two ways to send data from executors to the driver (the other being accumulators)
Transformations
• Transformations are lazy operations on a RDD that create one or many new RDDs, e.g. map, filter, reduceByKey, join, cogroup, randomSplit
• They are functions that take a RDD as the input and produce one or many RDDs as the output• They do not change the input RDD (since RDDs are immutable), but always produce one or more new RDDs by applying the computations they represent
• Transformations are lazy,they are not executed immediately but only after calling an action are transformations executed
• After executing a transformation, the result RDD(s) will always be different from their parents and can be smaller (e.g. filter, count, distinct, sample), bigger (e.g flatMap, union, cartesian) or the same size (e.g. map)
SparkContext
• It is the entry point to spark core -heart of the spark application
• Spark context sets up internal services and establishes a connection to a Spark execution environment
• Once a SparkContext is created you can use it to create RDDs, accumulators and broadcast variables, access Spark services and run jobs (until SparkContext is stopped)
• A Spark context is essentially a client of Spark’s execution environment and acts as the master of your Spark application (don’t get confused with the other meaning of Master in Spark, though)
Functions provided by SparkContext
• Get current status of spark application
• Setting configuration
• Creating distributed entities -RDD’s, accumulators, broadcast variables
• Accessing services -TaskScheduler, ShuffleManager, etc
Creating SparkContext Instance
You can create a SparkContext instance with or without creating a SparkConf object first
import org.apache.spark.SparkContext
val sc = SparkContext.getOrCreate()
// Using an explicit SparkConf object
import org.apache.spark.SparkConf
val conf = new SparkConf().setMaster(“local[*]”).setAppName(“Spark App”)
val sc = SparkContext.getOrCreate(conf)
Some of The Constructors Available
SparkContext()
SparkContext(conf: SparkConf)
SparkContext(master: String, appName: String, conf: SparkConf)
SparkContext(master: String, appName: String, sparkHome: String = null, jars: Seq[String] = Nil, environment: Map[String, String] = Map())
SQLContext
• SQLContext is the entry point for Spark SQL
• Whatever you did in Spark SQL, it had to start from creating an instance of SQLContext
• A SQLContext object requires a SparkContext, a CacheManager, and a SQLListener They are all transient and do not participate in serializing a SQLContext
• You can create a SQLContext using the following constructors:
• SQLContext(sc: SparkContext)
• SQLContext.getOrCreate(sc: SparkContext)
• SQLContext.newSession() allows for creating a new instance of SQLContext with a separate SQL configuration (through a shared SparkContext)
Spark Core Concepts
• Spark is built around the concepts of Resilient Distributed Datasets and Direct Acyclic Graph representing transformations and dependencies between them
• Spark Application (often referred to as Driver Program or Application Master) at high level consists of SparkContext and user code which interacts with it creating RDDs and performing series of transformations to achieve final result
• These transformations of RDDs are then translated into DAG and submitted to Scheduler to be executed on set of worker nodes
Spark Execution Workflow
• User code containing RDD transformations forms Direct Acyclic Graph
• DAG then split into stages of tasks by DAGScheduler
• Stages combine tasks which don’t require shuffling/re-partitioning
• Tasks run on workers and results then return to client
Let Us Analyze a DAG
• Any data processing workflow could be defined as reading the data source
• Applying set of transformations and materializing the result in different ways
Transformations create dependencies between RDDs
The dependencies are usually classified as “narrow” and “wide”
•Narrow (pipelineable)
• each partition of the parent RDD is used by at most one partition of the child RDD
• allow for pipelined execution on one cluster node
• failure recovery is more efficient as only lost parent partitions need to be recomputed
Splitting DAG Into Stages
• RDD operations with “narrow” dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier)
• In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it
• The actual pipelining of these operations happens in the RDD compute() functions of various RDDs
There are two types of tasks in Spark: ShuffleMapTask which partitions its input for shuffle and ResultTask which sends its output to the driver
Shuffle
During the shuffle ShuffleMapTask writes blocks to local drive, and then the task in the next stages fetches these blocks over the network
• Shuffle Write
• Redistributes data among partitions and writes files to disk
• Each hash shuffle task creates one file per “reduce” task
• Sort shuffle task creates one file with regions assigned to reducer
• Sort shuffle uses in-memory sorting with spillover to disk to get final result
• Shuffle Read
• Fetches the file applies reduce() logic
• If data ordering is needed then it is sorted on “reducer” side for any type of shuffle
Sort Shuffle
• Incoming records accumulated and sorted in memory according their target partition ids
• Sorted records are written to file or multiple files if spilled and then merged
• index file stores offsets of the data blocks in the data file
• Sorting without deserialization is possible under certain conditions
Spark Components
From high level there are three major components
Spark driver
• separate process to execute user applications creates SparkContext to schedule jobs execution and negotiate with cluster manager
Executors
• run tasks scheduled by driver
• store computation results in memory, on disk or off-heap
• interact with storage systems
Cluster Manager
• Mesos
• YARN
• Spark Standalone
Spark Driver contains more components responsible for translation of user code into actual jobs executed on cluster:
• SparkContext
• represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster
• DAGScheduler
• computes a DAG of stages for each job and submits them to TaskScheduler
• determines preferred locations for tasks (based on cache status or shuffle files locations) and finds minimum schedule to run the jobs
• TaskScheduler
• responsible for sending tasks to the cluster, running them, retrying if there are failures, and mitigating stragglers
• SchedulerBackend
• backend interface for scheduling systems that allows plugging in different implementations(Mesos, YARN, Standalone, local)
• BlockManager
• provides interfaces for putting and retrieving blocks both local and external
Memory Management
• Execution Memory
• storage for data needed during tasks execution
• shuffle-related data
• Storage Memory
• storage of cached RDDs and broadcast variables • possible to borrow from execution memory (spill otherwise)
• safeguard value is 50% of Spark Memory when cached blocks are immune to eviction
• User Memory
• user data structures and internal metadata in Spark • safeguarding against OOM
• Reserved Memory
• memory needed for running executor itself and not strictly related to Spark
Parallelize Collections
• Parallelized collections can be created using sparkcontext’s parallelize method on any existing collection
• The elements of the collection are copied to form a distributed dataset that can be operated on in parallel
val list_of_elements = List(1, 2, 3, 4, 5)
val parallelized_list = sc.parallelize(list_of_elements)
val parallelized_list_2 = sc.parallelize(list_of_elements, 6)
parallelized_list is a ParallelCollectionRDD
parallelized_list_2 is a RDD where the partition count has been explicitly set to 6
• One of the easiest ways to create an RDD is to use SparkContext
• textFile to read files
• Let’s assume we have a file input.txt with content : How are you.This is my first document• Spark is a fun thing to work with
• val fileContent = sc.textFile(“input.txt”)
fileContent is a MapPartitionsRDD
fileContent.count -provides the count of the number of rows of data
fileContent.getNumPartitions -provides the total number of partitions, the data has been divided into fileContent.collect -this will aggregate the data of the RDD into a collection
Create a RDD From a Text File
• You can define the the number of partitions for the data while creating the RDD
• val fileContent = sc• textFile(“input. txt”, 5) The output of fileContent.getNumPartitions, will be 5
• How will you convert the RDD, which is a list of sentences, to a list of words?
sc.textFile(“input• txt”).flatMap(x => x. split(" "))
This will create a RDD of strings, where each element is a word
Save a RDD
• val fileContent = sc.textFile(“input• txt”)
fileContent.saveAsTextFile(“output.txt”)
Review the output created
• It’s actually a folder containing files -each representing a partition of the RDD
Create RDD of tuples
Given an ordered dataset, how to fit it to the RDD infrastructure?
Assume input text file :
dave,26,true
mark,25,false
john,28,true
lisa,24,true
val data = sparkContext.textFile("<path_to_input_text_file")
val rdd = data. map(x => {
val row = x.split(",").toList (row.apply(0), row.apply(1).toInt, row.apply(2).toBoolean) })
Count
• count -returns the number of elements in the RDD
• countApprox (long timeout, double confidence) -Approximate version of count() that returns a potentially incomplete result within a timeout, even if not all tasks have finished
• countByValue(scala.math.Ordering
Distinct
• distinct() -Return a new RDD containing the distinct elements in this RDD
Filter
• filter (function) -Return a new RDD containing only the elements that satisfy a predicate
First and Flatmap
• first () -Return the first element in this RDD
• flatMap (function) -Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results
• isEmpty () • top (int num, scala• math• Ordering< T
ord) -Returns the top k (largest) elements from this RDD as defined by the specified implicit Ordering[T] and maintains the ordering•
Foreach
• foreach (function) -Applies a function f to all elements of this RDD• • foreachPartition-Applies a function f to each partition of this RDD• • glom () -Return an RDD created by coalescing all elements within each partition into an array•
groupBy
• groupBy (function) -Return an RDD of grouped items•
Set Operations
• intersection ( RDD < T
other)-Return the intersection of this RDD and another one• • subtract ( RDD < T other)-Return an RDD with the elements from this that are not in other• • union ( RDD < T other)-Return the union of this RDD and another one•
Map
• map (function) -Return a new RDD by applying a function to all elements of this RDD• • mapPartitions-Return a new RDD by applying a function to each partition of this RDD•
Max and Min
• max (scala• math• Ordering< T
ord) -Returns the max of this RDD as defined by the implicit Ordering[T]• • min (scala• math• Ordering< T ord) -Returns the min of this RDD as defined by the implicit Ordering[T]•
Zip
• zip ( RDD other) -Zips this RDD with another one, returning key-value pairs with the first element in each RDD, second element in each RDD, etc• • zipPartitions-Zip this RDD’s partitions with one (or more) RDD(s) and return a new RDD by applying a function to the zipped partitions• • zipWithIndex() -Zips this RDD with its element indices• • zipWithUniqueId() -Zips this RDD with generated unique Long ids