The DAG Scheduler Internals

Module: Spark Advanced Internals | Duration: ~20 min | Lesson: 1 of 20


Priya at TheWorldShop is staring at two Spark jobs that ran on the same cluster, against the same parquet dataset, with the same number of executors. Hers took 8 minutes. The one her teammate Dev wrote took 45 seconds.

She diffs the two scripts line by line. Same reads, same filters, same aggregation, same write. The only difference she can spot is the order in which Dev chained the joins.

The clock said 10x. Her instinct said "that can't be the join order." She's right that the join order isn't the whole story. She's wrong that it can't matter that much. The answer lives in how the DAG scheduler turned each script into stages and where it placed the shuffle boundaries. Why does the boundary placement decide the wall-clock time?


2. Concept Explanation

From code to execution

When you call an action (.count(), .write()), Spark runs a five-step pipeline before the first task fires:

Your DataFrame/RDD transformations
       ↓
[1] RDD DAG construction      logical dependency graph
       ↓
[2] DAGScheduler stage cuts   insert boundaries at shuffles
       ↓
[3] DAGScheduler TaskSets     one TaskSet per stage
       ↓
[4] TaskScheduler placement   assign tasks to executors by locality
       ↓
[5] Executor task execution   run, report result, handle failures

Step 1: RDD DAG construction

Every transformation creates an RDD with a reference to its parent RDD(s) and the function applied. That builds a Directed Acyclic Graph:

textFile("orders/")         RDD[String]              source (no parent)
       ↓  map(parse)
    MapPartitionsRDD         RDD[Order]               narrow dep
       ↓  filter(completed)
    FilteredRDD              RDD[Order]               narrow dep
       ↓  map(category, revenue)
    MapPartitionsRDD         RDD[(String, Double)]    narrow dep
       ↓  reduceByKey(+)
    ShuffledRDD              RDD[(String, Double)]    wide dep, STAGE CUT
       ↓  sortBy(revenue)
    ShuffledRDD              RDD[(String, Double)]    wide dep, STAGE CUT

Step 2: Stage cutting (narrow vs. wide dependencies)

The DAGScheduler walks the RDD DAG backwards from the final RDD. Whenever it hits a wide dependency (a shuffle), it inserts a stage boundary.

Narrow dependency. Each output partition depends on exactly one input partition.

  • map, filter, flatMap, mapPartitions, union, coalesce
  • Pipelined: multiple narrow transforms fuse into a single stage, single pass over data.

Wide dependency. Each output partition depends on multiple input partitions.

  • groupByKey, reduceByKey, join, distinct, repartition, sortByKey
  • Requires a shuffle: data is written to disk (shuffle files), then read by the next stage.
Stage 1 (all narrow, pipelined in one pass):
  textFile → map(parse) → filter(completed) → map(category, revenue)

  [no data movement between transforms; everything runs on the same partition]

   ←←← SHUFFLE BOUNDARY (reduceByKey writes shuffle files) ←←←

Stage 2:
  reduceByKey(+)     reads shuffle files, aggregates

   ←←← SHUFFLE BOUNDARY (sortBy writes shuffle files) ←←←

Stage 3:
  sortBy(revenue)    final sort, output

Stage dependencies and parallel execution

Stages form their own DAG. The DAGScheduler respects dependencies:

  • If Stage 2 depends on Stage 1, Stage 2 cannot start until Stage 1 completes.
  • Independent stages (different branches of the DAG) run in parallel.
val orders  = sc.textFile("s3://orders/")
val returns = sc.textFile("s3://returns/")

// Two independent RDD chains. Their early stages can run concurrently.
val orderRevenue = orders.map(parse).groupByKey()   // Stage A
val returnCounts = returns.map(parse).countByKey()  // Stage B (independent)

orderRevenue.join(returnCounts)                     // Stage C (depends on A and B)

Step 3: TaskSets

Once stages are defined, the DAGScheduler creates one TaskSet per stage:

  • One Task per partition of the stage's input RDD.
  • All tasks in a TaskSet run the same computation on different data.
  • The TaskSet goes to the TaskScheduler.
Stage 1: 400 partitions → TaskSet with 400 tasks
Stage 2: 200 partitions → TaskSet with 200 tasks (spark.sql.shuffle.partitions = 200)
Stage 3:   1 partition  → TaskSet with 1 task   (final global sort)

Step 4: Task placement and data locality

The TaskScheduler (backed by the SchedulerBackend) assigns tasks to executors. It prefers to run a task on the executor that already has the data:

LocalityMeaningLatency
PROCESS_LOCALData is in the executor's JVM (cached RDD)~μs
NODE_LOCALData is on the same node (local HDFS block)~ms
NO_PREFNo preference (parallelized collections)~ms
RACK_LOCALData is on the same rack~10 ms
ANYData is anywhere on the network~100 ms+

The scheduler waits up to spark.locality.wait (default 3 s) for a PROCESS_LOCAL slot before downgrading to NODE_LOCAL, and another 3 s before downgrading further.

Caching implication. When you .cache() an RDD, subsequent stages run at PROCESS_LOCAL. No network I/O for the input data. That's why caching dramatically speeds up iterative algorithms.

Step 5: Fault tolerance via lineage

If an executor dies and a task fails:

  1. Narrow dep failure. Rerun the failed task on another executor. Spark recomputes from the parent RDD; lineage is the recipe.
  2. Wide dep failure (shuffle data lost). The shuffle output of the previous stage is gone. Spark may need to rerun part of Stage N-1 to regenerate shuffle files before Stage N can continue.
  3. Too many failures. Stage fails after spark.task.maxFailures (default 4) attempts. The whole job fails.
  4. Speculation (optional). If a task is much slower than its peers, Spark launches a duplicate "speculative" task on another executor and uses whichever finishes first.
spark.conf.set("spark.speculation", "true")
spark.conf.set("spark.speculation.multiplier", "1.5")  // > 1.5x median → launch dup
spark.conf.set("spark.speculation.quantile", "0.75")   // wait until 75% are done

Reading the DAG in the UI

In the Spark UI → Jobs → click a job → "DAG Visualization":

[Stage 1: textFile → filter → map]
              ↓ (shuffle)
[Stage 2: reduceByKey]
              ↓ (shuffle)
[Stage 3: sortBy → write]

Green = completed, blue = running, gray = pending. A "skipped" stage means the result was cached.


3. Worked Example

Trace TheWorldShop's daily orders pipeline through the DAG scheduler.

val orders   = spark.read.parquet("s3://theworldshop/orders/")
val products = spark.read.parquet("s3://theworldshop/products/")

val joined = orders
  .filter(col("status") === "COMPLETED")             // narrow
  .join(broadcast(products), "product_id")            // BroadcastHashJoin, no shuffle
  .withColumn("revenue", col("price") * col("quantity"))  // narrow
  .groupBy("category")                                // wide → STAGE CUT
  .agg(sum("revenue").as("total"))
  .orderBy(desc("total"))                             // wide → STAGE CUT

joined.write.parquet("s3://theworldshop/reports/")

Stages produced.

  • Stage 1 (400 tasks): read orders (400 partitions), filter, broadcast join with products (broadcast to every executor, no shuffle), compute revenue. All narrow, single pass.
  • Stage 2 (200 tasks): groupBy shuffle. spark.sql.shuffle.partitions = 200 → 200 tasks.
  • Stage 3 (200 tasks): orderBy shuffle for the global sort.

Why the BroadcastHashJoin doesn't create a boundary. The broadcast variable (the products table) is already in each executor's memory before tasks start. The join is computed locally; no shuffle needed. The DAGScheduler treats it as narrow.

Bring up the lab and run this yourself. Clone the lab repo (one time, shared across all Spark courses):

git clone https://github.com/petascalelabs/petascalelabs-lab-setup.git
cd petascalelabs-lab-setup/compute-engines/spark-core/spark-advanced/
./scripts/setup.sh

Verify Spark and the seed dataset are reachable:

./scripts/verify.sh
# expected: "Spark 3.5 ready, TheWorldShop orders/products parquet mounted at /data"

If you'd like AI help personalizing the bring-up to your machine (Docker resource limits, port conflicts, M-series vs Intel), copy this prompt into a fresh chat:

I'm setting up the lab for the "Spark Advanced Internals" course on data-learning.

The lab artifacts are in petascalelabs-lab-setup/compute-engines/spark-core/spark-advanced/
and include:
- docker-compose.yml that brings up a Spark 3.5 standalone cluster (1 master, 2 workers)
- ./scripts/setup.sh, ./scripts/verify.sh, ./scripts/teardown.sh
- data/orders.parquet and data/products.parquet (TheWorldShop seed datasets)

My environment:
- OS: <fill in macOS / Linux / Windows + version>
- RAM: <fill in total system RAM in GB>
- Docker version: <output of `docker --version`>
- Notes about ports already in use, proxy, corporate firewall, etc.: <fill in>

Please walk me through bringing the lab up successfully, identifying anything I'll
need to change in docker-compose.yml or .env for my specific machine before I run
./scripts/setup.sh. I want the cluster usable and the parquet datasets visible
inside the Spark workers.

Now run the worked example as a Spark SQL session, look at the Spark UI on http://localhost:4040, and find the DAG visualization for your job.

joined.explain()
# Physical Plan:
# *(3) Sort [total DESC]
#   Exchange rangepartitioning(...)        ← Stage 3 boundary
#     *(2) HashAggregate(category, sum)
#       Exchange hashpartitioning(...)     ← Stage 2 boundary
#         *(1) HashAggregate(partial_sum)  [BroadcastHashJoin inside Stage 1]
#           BroadcastHashJoin [product_id]
#             *(1) Filter [status = COMPLETED]
#               *(1) FileScan parquet orders
#             BroadcastExchange
#               *(1) FileScan parquet products

The *(N) prefix marks whole-stage code generation: Stage 1 is fused into a single compiled loop (Lesson 5).

Aha: Stage count is a function of how many shuffles you've asked for, not how many operators you've written. Three lines of Scala can produce ten stages if every line forces a shuffle, and ten lines can produce two stages if Catalyst can pipeline them. When you read a slow job, count the Exchange nodes in the physical plan first. That's your wall-clock budget, not the number of operators in your DataFrame chain.


4. Your Turn

Exercise: Predict the stages.

Given this query, work out the stage layout before running it:

val clicks   = spark.read.parquet("s3://theworldshop/clicks/")     // 800 partitions
val sessions = spark.read.parquet("s3://theworldshop/sessions/")   // 300 partitions
val products = spark.read.parquet("s3://theworldshop/products/")   //  10 partitions

val result = clicks
  .filter(col("event_type") === "purchase")
  .join(sessions, "session_id")               // large-large join
  .join(broadcast(products), "product_id")    // small table, broadcast
  .groupBy("brand", "country")
  .agg(count("*").as("purchases"))
  .orderBy(desc("purchases"))
  1. How many stages does Spark produce, and where are the shuffle boundaries?
  2. How many tasks run in Stage 1, and what determines that number?
  3. The sessions join creates a shuffle. What controls the number of output partitions after that shuffle?
  4. If you call sessions.cache() and rerun, how does the locality level change for the stage that reads sessions?

5. Real-World Application

Databricks Runtime ships an internal DAG visualization that annotates each node with bytes read and written. Engineering teams use it to spot unnecessary wide dependencies: a distinct() that should have been a dropDuplicates() on fewer columns, a repartition() that nobody removed after a debugging session, a join on the wrong side.

Palantir's Foundry platform builds its pipeline graph on top of Spark's DAG model. Each "transform" in Foundry compiles to a set of Spark stages, and Foundry's dependency tracker is a higher-level DAG laid over Spark's lower-level one.

The pattern repeats. At every shop running Spark at scale, the team eventually builds tooling that lets a non-Spark-expert read a job's stage graph and say "that shuffle shouldn't be there." It's the single highest-leverage piece of internal tooling a Spark platform team can build.


6. Recap + Bridge

The DAGScheduler converts your transformations into stages cut at shuffle boundaries, creates one TaskSet per stage, and the TaskScheduler assigns tasks to executors by locality. Cached data runs at PROCESS_LOCAL; uncached, remote data runs at ANY. Fault tolerance falls out of lineage: a failed task replays from its parent RDD.

Next lesson we step inside the shuffle itself. What actually lands on disk between stages, how the writer and reader find each other, and why groupByKey OOMs a cluster that reduceByKey handles without breaking a sweat.