A DAG Is a Contract, Not a Script

Module: DAGs, Tasks, and the Scheduler Loop | Duration: ~14 min | Lesson: 1 of 7


Dev writes his first DAG. He opens the file, sees Python, and starts thinking like a Python script writer. He puts a requests.get() at the top to fetch a config. He adds a print("starting daily revenue load") so he can watch the logs. He pushes to staging.

The next morning, his manager's Slack: "the third-party API we're hitting just billed us for 14,400 calls in 24 hours. Did you ship a loop?"

Dev didn't ship a loop. He shipped a file that the scheduler imported every 30 seconds. The requests.get() ran every import. The print flooded the scheduler logs. The actual task he meant to write only runs once a day. The two had nothing to do with each other.

A DAG file is read by the scheduler over and over. The tasks it declares run on their cadence. Conflating the two is the single most common mistake a learner makes in their first month with any orchestrator.


2. Concept Explanation

Two execution contexts in one file

A DAG file lives in two completely different execution contexts at the same time:

  1. Parse context. The orchestrator's scheduler imports the file every N seconds (Airflow's default: every 30 seconds, per min_file_process_interval). Every top-level statement runs every import. Functions defined but not called are just defined.
  2. Task context. The functions or operators referenced by tasks run on their schedule. A daily task's body runs once a day, on a worker process, far away in time and space from the parse context.

The same Python file, two timelines.

What goes where

Belongs in parse context (top level)Belongs in task context (function body)
from airflow import DAGThe actual work
Task and DAG declarationsAPI calls, DB writes, file reads
Static config dicts (constants)requests.get(), pd.read_csv(), spark.sql()
Anything that costs money or has side effects

The rule: the top level should be cheap and idempotent. The scheduler will run it every 30 seconds across every DAG file on the cluster. If your top level does anything other than declare structure, you've signed the cluster up for a bill.

Why this is a contract, not a script

When you write a normal Python script, you control execution. You hit run, statements execute in order, the script ends. A DAG isn't that. A DAG is a declaration you hand to a scheduler. The declaration says:

  • Here are tasks A, B, C.
  • A runs at 02:00 daily.
  • B runs after A.
  • C runs after B.
  • Retry on failure up to N times.

The scheduler reads the declaration, decides when each task fires, and dispatches them. The "Python file" packaging is incidental. You're writing a config, not a script.

In some orchestrators this is explicit (Dagster's @asset, Prefect's @flow with deferred execution, Argo's pure-YAML). In Airflow, the Python file disguises it. The disguise is what gets newcomers.

The shape of a clean DAG file

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator

# Constants are fine at top level. Cheap. No side effects.
SOURCE_BUCKET = "s3://theworldshop-raw/"
WAREHOUSE_TABLE = "warehouse.orders"

# Functions defined here are not executed. Just defined.
def load_orders(**context):
    # This runs on a worker, on the schedule, not on every parse.
    # Side effects go here.
    df = pd.read_parquet(SOURCE_BUCKET + f"orders/dt={context['ds']}/")
    df.to_sql(WAREHOUSE_TABLE, ...)

with DAG(
    dag_id="daily_orders",
    schedule="@daily",
    start_date=pendulum.datetime(2026, 1, 1, tz="UTC"),
    catchup=False,
) as dag:
    PythonOperator(
        task_id="load_orders",
        python_callable=load_orders,
    )

Notice what's at top level: imports, two constants, a function definition, the DAG declaration. Nothing that touches a network, a disk, or a clock. The function body where the work lives is invoked by the worker, not by every parse.

The smell test

Before merging a DAG, ask: "if the scheduler parsed this file 2,800 times today, what would the side effects be?" If the answer is anything other than "memory and CPU," you've put work in the parse context.

The most common offenders:

  • requests.get(...) at top level to pull a config.
  • pd.read_csv(...) to load a list of dimensions used by the DAG.
  • Variable.get("api_key") (in Airflow this hits the metadata DB on every parse).
  • print(...) calls that flood the scheduler log.
  • datetime.now() at top level (creates "every 30 seconds the schedule is different" bugs).

Fix all of them by moving the side effect into a function body the task invokes.


3. Worked Example

Dev's broken DAG, simplified:

import requests, pandas as pd
from airflow import DAG, ...

print("loading daily_revenue DAG")

# This runs every 30 seconds. Across 200 DAGs, that is the bill.
config = requests.get("https://config.theworldshop.com/daily_revenue").json()
dim_table = pd.read_csv("s3://theworldshop/dims/products.csv")

def load_revenue(**context):
    # Uses the config and dim_table closure variables
    ...

with DAG(...) as dag:
    PythonOperator(task_id="load_revenue", python_callable=load_revenue)

What's wrong:

  • print runs on every parse, every 30 seconds.
  • requests.get hits an HTTP endpoint every 30 seconds, all day, across every scheduler process.
  • pd.read_csv reads S3 every 30 seconds. The S3 bill is small per call; over a month it's not.
  • config and dim_table are closure variables baked into load_revenue. They're a snapshot of parse time, not task time. If config changes mid-day, the running task still has the morning's copy until the next parse.

The fixed version:

from airflow import DAG, ...

CONFIG_URL = "https://config.theworldshop.com/daily_revenue"
DIM_TABLE_PATH = "s3://theworldshop/dims/products.csv"

def load_revenue(**context):
    # Fetched at task time, on a worker
    config = requests.get(CONFIG_URL).json()
    dim_table = pd.read_csv(DIM_TABLE_PATH)
    # actual work
    ...

with DAG(...) as dag:
    PythonOperator(task_id="load_revenue", python_callable=load_revenue)

Now the side effects happen inside the task body, executed once per scheduled run, on a worker. The parse context only knows the names of the constants. The cluster runs flat instead of melting.

Aha: The Python file you wrote is read by the scheduler thousands of times. The functions inside it run on their cadence. If your "I'll just put this at the top" instinct from regular Python kicks in, the scheduler does what you asked it to: that work happens every parse.


4. Real-World Application

This is the single most common ramp-up bug across Airflow, Dagster, and Prefect, even for senior engineers crossing over from a non-orchestrator background. Every Airflow shop has had the "why is the scheduler slow?" investigation that traces back to a requests.get() in someone's DAG file (Lesson 7 of this course goes deeper).

Dagster's design pushes this harder by separating the definition file from any execution side effects with explicit decorators (@asset, @op). Prefect's @flow and @task decorators do the same. Argo Workflows sidesteps the bug entirely by being pure YAML, so there's nothing to "execute" at parse time.

Whichever tool you land on, the underlying fact is the same: the declaration and the execution are two timelines. The declaration is parsed often and should be cheap. The execution runs on the schedule and is where real work happens. Internalize the split and every other orchestrator concept becomes easier.


5. Your Turn

Exercise: Look at this DAG file. Identify every piece of code that runs at parse time and shouldn't.

import boto3, json, datetime
from airflow import DAG
from airflow.operators.python import PythonOperator

s3 = boto3.client("s3")
secrets = json.loads(s3.get_object(Bucket="theworldshop-secrets",
                                   Key="prod.json")["Body"].read())

now = datetime.datetime.now()
print(f"daily_etl loaded at {now}")

def run_etl(**context):
    db_url = secrets["postgres_url"]
    ...

with DAG(dag_id="daily_etl", schedule="@daily", ...) as dag:
    PythonOperator(task_id="run_etl", python_callable=run_etl)
  1. List every line that runs on every parse and explain what cost or correctness issue it creates.
  2. Rewrite the file so that secrets, now, and the print message all behave correctly.
  3. Why is secrets baked into the closure a correctness problem (not just a cost problem)?

6. Recap + Bridge

The DAG file lives in two execution contexts: parse (every 30 seconds, the scheduler's loop) and task (once per schedule, on a worker). Anything with a side effect, a cost, or a "current state" semantics belongs in the task body, not at top level. The next lesson digs into the next layer down: when a task fires, where does it run, and what does that decision cost you?