Module: Spark Advanced Internals | Duration: ~20 min | Lesson: 1 of 20
Priya at TheWorldShop is staring at two Spark jobs that ran on the same cluster, against the same parquet dataset, with the same number of executors. Hers took 8 minutes. The one her teammate Dev wrote took 45 seconds.
She diffs the two scripts line by line. Same reads, same filters, same aggregation, same write. The only difference she can spot is the order in which Dev chained the joins.
The clock said 10x. Her instinct said "that can't be the join order." She's right that the join order isn't the whole story. She's wrong that it can't matter that much. The answer lives in how the DAG scheduler turned each script into stages and where it placed the shuffle boundaries. Why does the boundary placement decide the wall-clock time?
2. Concept Explanation
From code to execution
When you call an action (.count(), .write()), Spark runs a five-step pipeline before the first task fires:
Step 1: RDD DAG construction
Every transformation creates an RDD with a reference to its parent RDD(s) and the function applied. That builds a Directed Acyclic Graph:
Step 2: Stage cutting (narrow vs. wide dependencies)
The DAGScheduler walks the RDD DAG backwards from the final RDD. Whenever it hits a wide dependency (a shuffle), it inserts a stage boundary.
Narrow dependency. Each output partition depends on exactly one input partition.
map,filter,flatMap,mapPartitions,union,coalesce- Pipelined: multiple narrow transforms fuse into a single stage, single pass over data.
Wide dependency. Each output partition depends on multiple input partitions.
groupByKey,reduceByKey,join,distinct,repartition,sortByKey- Requires a shuffle: data is written to disk (shuffle files), then read by the next stage.
Stage dependencies and parallel execution
Stages form their own DAG. The DAGScheduler respects dependencies:
- If Stage 2 depends on Stage 1, Stage 2 cannot start until Stage 1 completes.
- Independent stages (different branches of the DAG) run in parallel.
Step 3: TaskSets
Once stages are defined, the DAGScheduler creates one TaskSet per stage:
- One Task per partition of the stage's input RDD.
- All tasks in a TaskSet run the same computation on different data.
- The TaskSet goes to the TaskScheduler.
Step 4: Task placement and data locality
The TaskScheduler (backed by the SchedulerBackend) assigns tasks to executors. It prefers to run a task on the executor that already has the data:
| Locality | Meaning | Latency |
|---|---|---|
PROCESS_LOCAL | Data is in the executor's JVM (cached RDD) | ~μs |
NODE_LOCAL | Data is on the same node (local HDFS block) | ~ms |
NO_PREF | No preference (parallelized collections) | ~ms |
RACK_LOCAL | Data is on the same rack | ~10 ms |
ANY | Data is anywhere on the network | ~100 ms+ |
The scheduler waits up to spark.locality.wait (default 3 s) for a PROCESS_LOCAL slot before downgrading to NODE_LOCAL, and another 3 s before downgrading further.
Caching implication. When you .cache() an RDD, subsequent stages run at PROCESS_LOCAL. No network I/O for the input data. That's why caching dramatically speeds up iterative algorithms.
Step 5: Fault tolerance via lineage
If an executor dies and a task fails:
- Narrow dep failure. Rerun the failed task on another executor. Spark recomputes from the parent RDD; lineage is the recipe.
- Wide dep failure (shuffle data lost). The shuffle output of the previous stage is gone. Spark may need to rerun part of Stage N-1 to regenerate shuffle files before Stage N can continue.
- Too many failures. Stage fails after
spark.task.maxFailures(default 4) attempts. The whole job fails. - Speculation (optional). If a task is much slower than its peers, Spark launches a duplicate "speculative" task on another executor and uses whichever finishes first.
Reading the DAG in the UI
In the Spark UI → Jobs → click a job → "DAG Visualization":
Green = completed, blue = running, gray = pending. A "skipped" stage means the result was cached.
3. Worked Example
Trace TheWorldShop's daily orders pipeline through the DAG scheduler.
Stages produced.
- Stage 1 (400 tasks): read orders (400 partitions), filter, broadcast join with
products(broadcast to every executor, no shuffle), compute revenue. All narrow, single pass. - Stage 2 (200 tasks): groupBy shuffle.
spark.sql.shuffle.partitions = 200→ 200 tasks. - Stage 3 (200 tasks): orderBy shuffle for the global sort.
Why the BroadcastHashJoin doesn't create a boundary. The broadcast variable (the products table) is already in each executor's memory before tasks start. The join is computed locally; no shuffle needed. The DAGScheduler treats it as narrow.
Bring up the lab and run this yourself. Clone the lab repo (one time, shared across all Spark courses):
Verify Spark and the seed dataset are reachable:
If you'd like AI help personalizing the bring-up to your machine (Docker resource limits, port conflicts, M-series vs Intel), copy this prompt into a fresh chat:
Now run the worked example as a Spark SQL session, look at the Spark UI on http://localhost:4040, and find the DAG visualization for your job.
The *(N) prefix marks whole-stage code generation: Stage 1 is fused into a single compiled loop (Lesson 5).
Aha: Stage count is a function of how many shuffles you've asked for, not how many operators you've written. Three lines of Scala can produce ten stages if every line forces a shuffle, and ten lines can produce two stages if Catalyst can pipeline them. When you read a slow job, count the Exchange nodes in the physical plan first. That's your wall-clock budget, not the number of operators in your DataFrame chain.
4. Your Turn
Exercise: Predict the stages.
Given this query, work out the stage layout before running it:
- How many stages does Spark produce, and where are the shuffle boundaries?
- How many tasks run in Stage 1, and what determines that number?
- The
sessionsjoin creates a shuffle. What controls the number of output partitions after that shuffle? - If you call
sessions.cache()and rerun, how does the locality level change for the stage that reads sessions?
5. Real-World Application
Databricks Runtime ships an internal DAG visualization that annotates each node with bytes read and written. Engineering teams use it to spot unnecessary wide dependencies: a distinct() that should have been a dropDuplicates() on fewer columns, a repartition() that nobody removed after a debugging session, a join on the wrong side.
Palantir's Foundry platform builds its pipeline graph on top of Spark's DAG model. Each "transform" in Foundry compiles to a set of Spark stages, and Foundry's dependency tracker is a higher-level DAG laid over Spark's lower-level one.
The pattern repeats. At every shop running Spark at scale, the team eventually builds tooling that lets a non-Spark-expert read a job's stage graph and say "that shuffle shouldn't be there." It's the single highest-leverage piece of internal tooling a Spark platform team can build.
6. Recap + Bridge
The DAGScheduler converts your transformations into stages cut at shuffle boundaries, creates one TaskSet per stage, and the TaskScheduler assigns tasks to executors by locality. Cached data runs at PROCESS_LOCAL; uncached, remote data runs at ANY. Fault tolerance falls out of lineage: a failed task replays from its parent RDD.
Next lesson we step inside the shuffle itself. What actually lands on disk between stages, how the writer and reader find each other, and why groupByKey OOMs a cluster that reduceByKey handles without breaking a sweat.