Advanced Spark
Reynold Xin, July 2, 2014 @ Spark Summit Training
This Talk
Formalize RDD concept
Life of a Spark Application
Performance Debugging
Mechanical sympathy by Jackie Stewart: a driver does not
need to know how to build an engine but they need to know
* the
Assumes
you can write word count, knows what
fundamentals of how one works to get the best out of it
transformation/action is
Reynold Xin
Apache Spark committer (worked on almost every
module: core, sql, mllib, graph)
Product & open-source eng @ Databricks
On leave from PhD @ UC Berkeley AMPLab
Example Application
val sc = new SparkContext(...)
Resilient distributed
datasets (RDDs)
val file = sc.textFile(hdfs://...)
val errors = file.filter(_.contains(ERROR))
errors.cache()
errors.count()
Action
Quiz: what is an RDD?
A: distributed collection of objects on disk
B: distributed collection of objects in memory
C: distributed collection of objects in Cassandra
Answer: could be any of the above!
Scientific Answer: RDD is an Interface!
1. Set of partitions (splits in Hadoop)
2. List of dependencies on parent RDDs
lineage
3. Function to compute a partition"
(as an Iterator) given its parent(s)
4. (Optional) partitioner (hash, range)
5. (Optional) preferred location(s)"
for each partition
optimized
execution
Example: HadoopRDD
partitions = one per HDFS block
dependencies = none
compute(part) = read corresponding block
preferredLocations(part) = HDFS block location
partitioner = none
Example: Filtered RDD
partitions = same as parent RDD
dependencies = one-to-one on parent
compute(part) = compute parent and filter it
preferredLocations(part) = none (ask parent)
partitioner = none
RDD Graph (DAG of tasks)
Dataset-level view:
file:
Partition-level view:
HadoopRDD"
path = hdfs://...
errors:
FilteredRDD"
func = _.contains()"
shouldCache = true
Task1
Task2
...
Example: JoinedRDD
partitions = one per reduce task
dependencies = shuffle on each parent
compute(partition) = read and join shuffled data
preferredLocations(part) = none"
partitioner = HashPartitioner(numTasks)
Spark will now know
this data is hashed!
Dependency Types
Narrow (pipeline-able)
Wide (shuffle)
groupByKey on"
non-partitioned data
map, filter
join with inputs
co-partitioned
union
join with inputs not"
co-partitioned
Recap
Each RDD consists of 5 properties:
1. partitions
2. dependencies
3. compute
4. (optional) partitioner
5. (optional) preferred locations
Life of a Spark Application
Spark Application
Your program
(JVM / Python)
Spark driver"
(app master)
RDD graph
sc = new SparkContext
f = sc.textFile()"
Scheduler
Spark executor
(multiple of them)
Cluster"
manager
Task
threads
"
f.filter()"
.count()"
"
Block tracker
...
Shuffle tracker
A single application often contains multiple actions
Block
manager
HDFS, HBase,
Job Scheduling Process
Scheduler
(DAGScheduler)
RDD
Objects
Executors
Threads
Task
DAG
rdd1.join(rdd2)
.groupBy()
.filter()
.count()
build
operator
DAG
Block
manager
split
graph
into
stages
of
tasks
execute
tasks
submit
each
stage
as
ready
store
and
serve
blocks
DAG Scheduler
Input: RDD and partitions to compute
Output: output from actions on those partitions
Roles:
> Build stages of tasks
> Submit them to lower level scheduler (e.g. YARN,
Mesos, Standalone) as ready
> Lower level scheduler will schedule data based on
locality
> Resubmit failed stages if outputs are lost
Scheduler Optimizations
Pipelines operations
within a stage
Picks join algorithms
based on partitioning
(minimize shuffles)
Reuses previously
cached data
B:
A:
G:
Stage
1
groupBy
Task
D:
C:
F:
map
E:
Stage
2
join
union
Stage
3
=
previously
computed
partition
Task
Unit of work to execute on in an executor thread
Unlike MR, there is no map vs reduce task
Each task either partitions its output for shuffle, or
send the output back to the driver
Shuffle
Redistributes data among partitions
Partition keys into buckets
(user-defined partitioner)
Optimizations:
> Avoided when possible, if"
data is already properly"
partitioned
> Partial aggregation reduces"
data movement
Stage
1
Stage
2
Shuffle
Write intermediate files to disk
Fetched by the next stage of tasks (reduce in MR)
Stage
1
Disk
Stage
2
Recap: Job Scheduling
Scheduler
(DAGScheduler)
RDD
Objects
Executors
Threads
Task
DAG
rdd1.join(rdd2)
.groupBy()
.filter()
.count()
build
operator
DAG
Block
manager
split
graph
into
stages
of
tasks
execute
tasks
submit
each
stage
as
ready
store
and
serve
blocks
Performance Debugging
Performance Debugging
Distributed performance: program slow due to
scheduling, coordination, or data distribution)
Local performance: program slow because whatever
Im running is just slow on a single node
Two useful tools:
> Application web UI (default port 4040)
> Executor logs (spark/work)
Find Slow Stage(s)
Stragglers?
Some tasks are just slower than others.
Easy to identify from summary metrics:
Stragglers due to slow nodes
sc.parallelize(1 to 15, 15).map { index =>
val host = java.net.InetAddress.getLocalHost.getHostName
if (host == "ip-172-31-2-222") {
Thread.sleep(10000)
} else {
Thread.sleep(1000)
}
}.count()
Stragglers due to slow nodes
Turn speculation on to mitigates this problem.
Speculation: Spark identifies slow tasks (by looking
at runtime distribution), and re-launches those tasks
on other nodes.
spark.speculation true
Demo Time: slow node
Stragglers due to data skew
sc.parallelize(1 to 15, 15)
.flatMap { i => 1 to i }
.map { i => Thread.sleep(1000) }
.count()
Speculation is not going to help because the
problem is inherent in the algorithm/data.
Pick a different algorithm or restructure the data.
Demo Time
Tasks are just slow
Garbage collection
Performance of the code running in each task
Garbage Collection
Look at the GC Time column in the web UI
What if the task is still running?
To discover whether GC is the problem:
1. Set spark.executor.extraJavaOptions to include:
-XX:-PrintGCDetails -XX:+PrintGCTimeStamps
2. Look at spark/work/app/[n]/stdout on
executors
3. Short GC times are OK. Long ones are bad.
jmap: heap analysis
jmap -histo [pid]
Gets a histogram of objects in the JVM heap
jmap -histo:live [pid]
Gets a histogram of objects in the heap after GC
(thus live)
Find out what objects are the trouble
Demo: GC log & jmap
Reduce GC impact
class DummyObject(var i: Int) {
def toInt = i
}
sc.parallelize(1 to 100 * 1000 * 1000, 1).map { i =>
new DummyObject(i) // new object every record
obj.toInt
}
sc.parallelize(1 to 100 * 1000 * 1000, 1).mapPartitions { iter =>
val obj = new DummyObject(0) // reuse the same object
iter.map { i =>
obj.i = i
obj.toInt
}
}
Local Performance
Each Spark executor runs a JVM/Python process
Insert your favorite JVM/Python profiling tool
> jstack
> YourKit
> VisualVM
> println
> (sorry I dont know a whole lot about Python)
>
Example: identify expensive comp.
def someCheapComputation(record: Int): Int = record + 1
def someExpensiveComputation(record: Int): String = {
Thread.sleep(1000)
record.toString
}
sc.parallelize(1 to 100000).map { record =>
val step1 = someCheapComputation(record)
val step2 = someExpensiveComputation(step1)
step2
}.saveAsTextFile("hdfs:/tmp1")
Demo Time
jstack
jstack
Can often pinpoint problems just by jstack a few times
YourKit (free for open source dev)
Debugging Tip
Local Debugging
Run in local mode (i.e. Spark master local) and
debug with your favorite debugger
> IntelliJ
> Eclipse
> println
With a sample dataset
What we have learned?
RDD abstraction
> lineage info: partitions, dependencies, compute
> optimization info: partitioner, preferred locations
Execution process (from RDD to tasks)
Performance & debugging
Thank You!