The Metadata Layer Deep Dive

Module: Advanced | Duration: ~25 min | Lesson: 1 of 8


TheWorldShop's orders table has grown to 8 billion rows across 400,000 Parquet files. A query planner needs to answer: "Which of these 400,000 files might contain orders from the US placed in January 2024?"

A naive approach: list all 400,000 files, read each file's footer to check its statistics. That's 400,000 network round trips. Minutes of planning overhead before reading a single byte of data.

Iceberg answers this question in under a second. How? The answer is in the three-level metadata hierarchy. This lesson is where you understand it cold, down to the file format and byte-level details.


2. Concept Explanation

The Full Hierarchy

table metadata pointer
        │
        ▼
  metadata.json  ← human-readable JSON, ~10-50 KB
        │
        ▼
  manifest list (.avro)  ← one per snapshot, ~MB range
        │
        ├── manifest-1.avro  ← ~MB each, partition summaries + file stats
        ├── manifest-2.avro
        └── manifest-N.avro
                │
                └── part-00000.parquet  ← actual data (GB-TB each)
                    part-00001.parquet

Each level in the hierarchy serves a different purpose in the scan planning pipeline.

Level 1: Table Metadata (metadata.json)

The entry point. Written in JSON for human readability. Contains everything needed to understand the current and historical state of the table.

Key fields (from format/spec.md in the codebase):

{
  "format-version": 2,
  "table-uuid": "9c12d441-03fe-4693-9a96-a0705ddf69c3",
  "location": "s3://theworldshop-warehouse/orders",
  "last-sequence-number": 1234,
  "last-updated-ms": 1706227200000,
  "last-column-id": 12,
  
  "schemas": [{
    "schema-id": 0,
    "fields": [
      {"id": 1, "name": "order_id",    "type": "string", "required": true},
      {"id": 2, "name": "order_date",  "type": "date",   "required": true},
      {"id": 3, "name": "order_total", "type": "double",  "required": true}
    ]
  }],
  "current-schema-id": 0,
  
  "partition-specs": [{
    "spec-id": 0,
    "fields": [
      {"source-id": 2, "field-id": 1000, "name": "order_date_day", "transform": "day"}
    ]
  }],
  
  "current-snapshot-id": 3051729675574597004,
  "snapshots": [{
    "snapshot-id": 3051729675574597004,
    "parent-snapshot-id": 8765432109876543210,
    "timestamp-ms": 1706227200000,
    "manifest-list": "s3://theworldshop-warehouse/orders/metadata/snap-3051729.avro",
    "summary": {
      "operation": "append",
      "added-data-files": "4",
      "added-records": "1200000",
      "total-data-files": "400000",
      "total-records": "8000000000"
    }
  }],
  
  "snapshot-log": [...],
  "metadata-log": [...]
}

When a query engine starts planning, it reads this single file (~50 KB) and knows: current schema, partition spec, and where to find the manifest list.

Level 2: Manifest List (Avro)

One manifest list per snapshot. This is an Avro file: binary, compact, schema-encoded. Each row represents one manifest file. The manifest list's value comes from its partition summary columns: for each partition field, it stores the lower_bound and upper_bound of values across all files in that manifest.

Key manifest list entry fields (from ManifestFile.java):

// api/src/main/java/org/apache/iceberg/ManifestFile.java
interface ManifestFile {
    String path();              // s3://...manifest-1.avro
    long length();              // bytes
    int partitionSpecId();      // which partition spec applies
    long snapshotId();          // which snapshot added this manifest
    int addedFilesCount();
    int existingFilesCount();
    int deletedFilesCount();
    long addedRowsCount();
    
    List<PartitionFieldSummary> partitions(); // ← THE KEY FIELD
}

interface PartitionFieldSummary {
    boolean containsNull();
    ByteBuffer lowerBound();   // min partition value (encoded)
    ByteBuffer upperBound();   // max partition value (encoded)
}

This is where partition pruning happens. When a query has WHERE order_date = '2024-01-15', Iceberg:

  1. Computes days('2024-01-15') = 19737
  2. Scans the manifest list
  3. For each manifest entry, checks: lowerBound ≤ 19737 ≤ upperBound
  4. Skips any manifest where the bound check fails

If a manifest covers only data from 2023-12-01 to 2023-12-31, it's skipped entirely, without reading the manifest file.

Level 3: Manifest Files (Avro)

Each manifest file contains one row per data file or delete file. Critically, each row includes per-column statistics (min, max, null count) for every column in the data file.

Key fields per data file entry (DataFile.java):

// api/src/main/java/org/apache/iceberg/DataFile.java
interface DataFile extends ContentFile<DataFile> {
    String path();               // s3://...part-00000.parquet
    FileFormat format();         // PARQUET, ORC, AVRO
    StructLike partition();      // partition values
    long recordCount();          // row count in this file
    long fileSizeInBytes();
    
    Map<Integer, Long>    columnSizes();   // column ID → bytes
    Map<Integer, Long>    valueCounts();   // column ID → value count
    Map<Integer, Long>    nullValueCounts();
    Map<Integer, ByteBuffer> lowerBounds(); // column ID → min value
    Map<Integer, ByteBuffer> upperBounds(); // column ID → max value
}

With these per-file statistics, the query planner can skip files where a filter predicate is provably false:

  • WHERE order_total > 1000: skip files where upperBounds[3] ≤ 1000
  • WHERE country = 'US': skip files where lowerBounds[6] > 'US' OR upperBounds[6] < 'US'

This is data skipping, and it's based entirely on the metadata already in manifests. No file reads needed.

Sequence Numbers

Every file written to Iceberg gets a monotonically increasing sequence number. This enables incremental scans. You can ask "give me all files added after sequence number N":

// Used for streaming incremental reads (Flink uses this)
table.newIncrementalScan()
     .fromSnapshotId(lastProcessedSnapshot)
     .toSnapshotId(currentSnapshot)
     .planFiles(); // only files added between the two snapshots

This is the foundation of Iceberg's streaming read support.

How Scan Planning Actually Works

Putting it all together, here's the full scan planning pipeline for a query with predicate WHERE order_date = '2024-01-15' AND country = 'US':

1. Read metadata.json (1 file, ~50ms)
   → get current snapshot ID → get manifest list location

2. Read manifest list (1 Avro file, ~100ms)
   → for each manifest entry:
     → check partition bounds for order_date_day vs 19737
     → SKIP 380 of 400 manifests (wrong date range)
     → pass 20 manifests to next step

3. Read 20 manifest files (~2s)
   → for each data file entry:
     → check per-file bounds: country upperBound ≥ 'US' AND lowerBound ≤ 'US'
     → SKIP 60% of files in those manifests
     → pass ~3,000 data files to Spark executor planning

4. Spark reads ~3,000 Parquet files
   → within each file: Parquet column statistics + row group pruning further reduce I/O

Total planning overhead: ~2.5 seconds for an 8-billion-row, 400,000-file table

Compare to Hive: listing 400,000 files would take minutes.


3. Worked Example

Let's inspect actual metadata files from the Iceberg codebase test fixtures. Here's how you can dump and read them in Spark:

# In Spark, Iceberg exposes metadata as queryable tables

# 1. Inspect the manifest list for the current snapshot
spark.sql("""
    SELECT 
        path,
        length,
        partition_spec_id,
        added_snapshot_id,
        added_files_count,
        partition_summaries
    FROM theworldshop.orders.manifests
""").show(truncate=False)

# 2. Inspect individual data file statistics
spark.sql("""
    SELECT 
        file_path,
        record_count,
        file_size_in_bytes,
        column_sizes,
        value_counts,
        null_value_counts,
        lower_bounds,
        upper_bounds,
        partition
    FROM theworldshop.orders.files
    LIMIT 5
""").show(truncate=False)

# 3. See all snapshots and their manifest lists
spark.sql("""
    SELECT 
        snapshot_id,
        parent_id,
        operation,
        committed_at,
        summary
    FROM theworldshop.orders.snapshots
""").show(truncate=False)

# 4. Look at the raw metadata JSON on disk
import json
import os

warehouse = "/tmp/theworldshop-warehouse"
metadata_path = f"{warehouse}/theworldshop/orders/metadata"
metadata_files = sorted([f for f in os.listdir(metadata_path) if f.endswith('.json')])
latest_metadata = metadata_files[-1]

with open(f"{metadata_path}/{latest_metadata}") as f:
    meta = json.load(f)
    
print(f"Format version: {meta['format-version']}")
print(f"Current snapshot: {meta['current-snapshot-id']}")
print(f"Schema fields: {[f['name'] for f in meta['schemas'][0]['fields']]}")
print(f"Number of snapshots: {len(meta['snapshots'])}")

4. Your Turn

Exercise: Given an Iceberg table with these manifest entries:

ManifestDate LowerBoundDate UpperBoundCountry LowerBoundCountry UpperBoundFiles
M12024-01-012024-01-31'CA''US'500
M22024-01-012024-01-31'DE''UK'400
M32024-02-012024-02-28'CA''US'600
M42024-02-012024-02-28'AU''JP'350

Query: WHERE order_date BETWEEN '2024-01-10' AND '2024-01-20' AND country = 'US'

  1. Which manifests survive the partition (date) pruning step?
  2. Of those, which survive the file statistics (country) pruning step?
  3. How many data files does Spark actually need to open?

5. Real-World Application

Netflix's Metacat service (their metadata management platform) is built on top of Iceberg's metadata model. They've described cases where query planning time for their largest tables dropped from 12 minutes (Hive) to 8 seconds (Iceberg) just from the metadata hierarchy redesign.

The Puffin file format (which you'll see in Lesson 5 on performance) extends this even further by allowing pre-computed statistics (like theta sketches for approximate distinct counts) to be stored alongside manifests, enabling query planners to skip not just files but also defer expensive cardinality estimation.

In your career: When a stakeholder says "our Iceberg queries are slow," the first thing to check is scan planning efficiency. Are manifests being pruned? Are per-file statistics being used? The metadata layer is the first place to look, not the compute layer.

Aha: Iceberg's manifest list isn't an index "for" the data. It's an index over per-partition min/max bounds. By the time the query engine touches a manifest file, the partition decision is already made. That's why scan planning stays sub-second even when the file count grows past a million.


6. Recap + Bridge

What we learned: The Iceberg metadata hierarchy (metadata.json → manifest list → manifests → data files) enables sub-second scan planning for billion-row tables. Manifest list entries contain partition bounds for cheap partition pruning. Manifest entries contain per-file column statistics for data skipping. Sequence numbers enable efficient incremental reads for streaming.

Coming up next: Lesson 2 covers the most complex aspect of Iceberg for production engineers: how row-level updates and deletes work. Copy-on-Write (COW) and Merge-on-Read (MOR) are two fundamentally different strategies, and choosing the wrong one can tank your table's performance.