Advanced Apache Spark Training - Sameer Farooqui (Databricks)
Scheduling, Monitoring, Distributing
Spark Ecosystem
- Spark Core (Memory + Disk)
- Data frames API: Scala, Java, Python, R
- Higher level libraries:
- Spark SQL
- Streaming
- MLib, GraphX -> ML library
- BlinkDB (sample aggregation based on confidence level, instead of the whole dataset)
- Tachyon -> memory-based distributed storage system, data sharing across cluster frameworks
- Resource Manager:
- standalone scheduler
- YARN
- local node
- Mesos (expanded comparison)
- Outer Layer:
- HDFS
- MongoDB
- RDBMS
- Cassandra
RDD
(Resilient Distributed Dataset)1000 - 10,000 partition normally
each partition requires a thread of computation
An RDD can be created in 2 ways:
1) Parallelize a collection (data in the memory of JVM, only for prototyping)
2) Read data from an external source (S3, HDFS...) --- connection open/close only once
DAG (Directed acyclic graph)
Building DAG is just metadata.
Only when action is called, materialize the DAG
According to DAG, strategically cache intermedia RDD (also lazy, only when a downstream action triggers the RDD cache, the cache starts)
External data/ collection -> BaseRDD (input RDD) -> Transformation, PipeLined Stage -> launch action such as count(), collect() to kick off a parallel computation, which is then optimized and executed by Spark
Transformations: filter()...
Actions: collect(), saveToCassandra(), count()...
Type of RDDs: HadoopRDD, FilteredRDD, JoinedRDD, ShuffledRDD, CassandraRDD (optimized for Cassandra, Cassandra connector)...
Spark Runtime Architecture Integration with Resource Managers
Local, Standalone Scheduler -> Static Partitioning (fixed number of executors during Spark job)YARN, Mesos -> Dynamic Partitioning (number of executors can grow/shrink live during the job)
MapReduce vs. Spark
MapReduce:[Master]: JobTracker ("brain"), NameNode (heartbeats to data nodes)
[Nodes]:
TaskTracker: several JVMs for each machine, either Map or Reduce
DataNode: response to NameNode, kind of like HDFS
Spark:
One JVM for each machine. Several slots where tasks can run.
Get parallelism in MapReduce: having processes IDs, JVMs
Get parallelism in Spark: having multiple tasks running JVM
MapReduce Slot: hard-coded, either Map or Reduce, takes time to build/ tear down a JVM
Spark Slot: generic, can convert, more efficient to reuse a slot
Local Mode
JVM: Executor + DriverRDDs
Cores (slots): number of simultaneous tasks
-- in general, number of tasks threads = core number * 2 or 3
Internal Threads
Do not need a worker JVM in local mode.
Spark Standalone
SAPRK_LOCAL_DIRSLocal Dirs:
1) when RDD is persistent with memory and disk, and one of the partitions of the RDD has to be spilled down to a local disk -- to speed up
2) Intermedia shuffle data
Resource Management Layer:
Spark Master JVM
Worker JVM
For each Spark application:
Driver -> talk with Master JVM, Master JVM acts as a schedule and notifies each Worker JVM -> Work JVM spin up an Executor
Executor contains RDD, tasks...(Worker core: how many cores that worker JVM can give out to underlying executor)
If an Executor crashes, Worker JVM will restart; if a Worker JVM crashes, Master JVM will restart.
High Availability on Spark with Zookeeper
One Worker can start several executors for different Drivers (Applications)
Applications run in FIFO model
Application CORE # and allocated memory should roughly match
(Cores per application, Memory per executor)
Spark UI:
Jobs -- applications
Stages -- jobs decomposed into one or more stages, a stage decomposed into one or more tasks
Storage -- RDD caches, memory persistence (caches are normally 2-3 times more than data size in HDFS)
Environment -- config: default settings in Scala source code < spark environment config file < spark submission < spark source code of spark context
Executors -- thread dump, can be used for profiling
YARN
(Yet Another Resource Negotiator)
- Resource Manager (Scheduler+ Apps Master) -- High Availablity via ZooKeeper
- Node Manager (hearting to resources with Resource Manager with bandwidth/ cores occupation information)
Client -> Resource Manager -> Start an App Master in a Node Manager -> Resource Manager gives keys and tokens to App Master to find Containers -> App Master return back to Client
Client Driver also has a Scheduler
Dynamic Allocation
min, max, time-outs
- Resource Manager (Scheduler+ Apps Master) -- High Availablity via ZooKeeper
- Node Manager (hearting to resources with Resource Manager with bandwidth/ cores occupation information)
Client Driver also has a Scheduler
Dynamic Allocation
min, max, time-outs
Memory and Persistence
Recommended to use < 75% memory for SparkExecutor heap size >= 8GB
Memory usage is affected by storage level and serialization format
1. Persistence in memory
The entire swapped out partition will evaporate
(LRU cache); if an executor dies, DAG will create if it is used again
- Deserialization mode
- Serialization mode -- save heap space; choose a faster serialization library (Kryo)
2. Persistence in memory + disk
The entire swapped out partition will go to LOCAL_DIR
3. Persistence in disk
Tachyon: off-heap storage
If RDD fits in memory, choose MEMORY_ONLY
If not, use MEMORY_ONLY_SER with fast serialization library
Don't' spill to disk unless functions that computed the datasets are very expensive or they filter a large amount of date (recomputing from DAG may be as fast as reading from disk)
Use replicated storage levels sparingly and only if you want fast fault recovery (maybe to serve requests from a web app)
Intermediate data is automatically persisted during shuffle operations
PySpark: always serialize with Pickle library
Default Memory Allocation in Executer JVM:
Cached RDDs: 60%
Shuffle memory: 20%
User programs (remainder): 20%
If Executor crashes (out of memory), need to figure out which part.
Cost of GC is proportional to the number of Java objects (not with the size of the object)
GC: Parallel, CMS, G1
Jobs -> Stages -> Tasks
When .collect() is called, a job is created. A job can be split into several stages, each stage into several tasks.
Lineage
Narrow Dependencies
Each partition of the parent RDD is used by at most one partition of the child RDD
e.g. map, filter, union, join with inputs co-partitioned (the same way to hash)
Wide Dependencies
Where multiple child partitions may depend on it
e.g. groupByKey, join with inputs not co-partitioned
Broadcast Variables and Accumulators
PySpark
Shuffle
Spark Streaming
No comments:
Post a Comment