Skip to content

Runtime row-group early stop via TopK dynamic filter #22407

@zhuqi-lucas

Description

@zhuqi-lucas

Is your feature request related to a problem or challenge?

DataFusion's TopK dynamic-filter pruning works at two granularities today:

  • File-level (via EarlyStoppingStream): once TopK's threshold tightens, un-opened files are pruned by their stats.
  • Row-level (inside an open row group): once the sort column is decoded, rows below the threshold are dropped via RowSelection.

There's a gap in the middle: once a file's row groups are picked at scan startup (via the upfront PruningPredicate), the per-RG decision is fixed. As TopK tightens at runtime, subsequent RGs in the already-opened file keep being decoded even when their stats already prove they can't beat the threshold.

This is the dominant cost for two query shapes:

  1. Single large parquet file (tens of GB, hundreds of RGs) — file-level pruning can't help because there's only one file.
  2. Inexact TopKExact can't fire (overlapping ranges or filter on a non-sort column), so SortExec stays; the threshold tightens at runtime but only inside an RG, not between RGs.

This issue tracks the gap: between consecutive row groups inside an open file, re-check the live DynamicFilterPhysicalExpr threshold against the next RG's stats and skip if prunable.

Architecture today — three layers of TopK pruning

Query: ORDER BY ts DESC LIMIT 10

  SortExec(TopK fetch=10)            ← TopK heap, threshold tightens at runtime
       ▲
   DynamicFilterPhysicalExpr (ts > threshold)
   gen=1: lit(true)   gen=2: ts > 100   gen=3: ts > 500   ...

  ParquetSource / opener

  ┌──────────────────────────────────────────────────────────────┐
  │ Layer 3   EarlyStoppingStream + FilePruner                   │
  │ — runs between batches (so it can also fire mid-file)        │
  │ — uses FILE-level stats (partition values, file min/max)     │
  │ — file.max(ts) < threshold → abort the whole current file    │
  │ — file granularity: either the whole file aborts or it keeps │
  └──────────────────────────────────────────────────────────────┘

  ┌──────────────────────────────────────────────────────────────┐
  │ Layer 1   ★ STATIC ★   RG selection at file open             │
  │ — runs once, when the file is opened                         │
  │ — PruningPredicate(WHERE + DynamicFilter)                    │
  │ — DynamicFilter is lit(true) at this point (heap is empty)   │
  │ — RG set is FROZEN for the rest of this file's scan          │
  │ — even if threshold tightens later, no RG ever gets dropped  │
  │   from the access plan                                       │
  └──────────────────────────────────────────────────────────────┘

  ┌──────────────────────────────────────────────────────────────┐
  │ Layer 2   inside an open RG, per row                         │
  │ — sort column fully decoded                                  │
  │ — DynamicFilter evaluated row-by-row                         │
  │ — RowSelection skips OTHER columns for failed rows           │
  │ — sort column itself is fully decoded even if all rows fail  │
  └──────────────────────────────────────────────────────────────┘

Where the gap is (with a concrete trace)

t=0  File A opens
     ├─ Layer 1 runs PruningPredicate with DynamicFilter = lit(true)
     └─ selects [RG1..RG7]  FROZEN

t=1  Decode RG1 (min=600, max=900)
     └─ heap fills, threshold = 500  (DynamicFilter generation bumps)

t=2  About to decode RG2 (stats: min=700, max=800)
     ├─ Layer 3 (file-level): File A.max still > 500, can't abort whole file
     ├─ Layer 1 already locked RG2 in
     └─ → decode RG2 in full

t=3  About to decode RG7 (stats: min=10, max=50, completely below threshold)
     ├─ Layer 3 (file-level): still keeps the file alive
     ├─ Layer 1 still says RG7 is selected
     └─ → decode RG7 in full — **wasted I/O + decompression + decode**

The gap is purely structural — the information needed to skip RG7 (RG7.max < threshold) is already available; it's just that no layer is currently re-evaluating per-RG stats against the live threshold.

What this issue saves

For an RG that the tightened threshold proves unwinnable (RG.max < threshold for DESC, or RG.min > threshold for ASC):

Stage Today After this issue
Sort column I/O (compressed pages) full RG read 0
Sort column decompression full RG 0
Sort column Arrow decode full RG arrow array 0
Build RowSelection from per-row eval full RG row eval 0
Other columns decode (post-RowSelection) already 0 (existing optimisation) 0

So the sort column's I/O + decompression + decode is the actual saving (everything downstream of that is already covered by today's Layer 2 RowSelection).

Important nuance — only fires when the whole RG is unwinnable

If the next RG has a mix of qualifying and non-qualifying rows (its min/max range straddles threshold), Layer 1's stats check cannot prune it — the partial-pass-through case still has to go through Layer 2 (decode sort column, build RowSelection, etc.). That's identical to today's behaviour and identical to how the upfront PruningPredicate works at file open. The win compounds with TopK convergence: as the threshold tightens, more long-tail RGs become 100% unwinnable and get skipped wholesale.

Describe the solution

The push decoder model in DataFusion (merged in #22289) already supports
multiple decoders per file via pending_decoders in
PushDecoderStreamState. We extend this with two pieces.

1. Force per-row-group decoder splitting when the predicate is dynamic

ParquetAccessPlan::split_runs coalesces consecutive RGs with the same
fully_matched classification into a single decoder run. For
ORDER BY ... LIMIT queries the initial DynamicFilter is lit(true),
which has no analyzable bounds, so the static fully-matched analysis
marks nothing — and split_runs collapses every RG into one run.
Without inter-run transition points, runtime pruning has nowhere to
hook in.

Add a force_per_row_group flag to split_runs, set by
is_dynamic_physical_expr(predicate) (helper from
datafusion-physical-expr-common). When the predicate is dynamic each
row group becomes its own run; static-predicate scans keep the existing
coalescing behaviour, so non-TopK queries pay no extra decoder
construction cost. Decoder construction itself is cheap (no I/O — just
shared-Arc of ParquetMetaData), so the worst case for a dynamic
query is N decoder structs allocated, of which N−1 may be dropped
without ever running.

2. RowGroupPruner re-evaluates the dynamic predicate at run boundaries

Mirrors FilePruner's pattern at row-group granularity:

  • Tracks snapshot_generation(&predicate) so the cached
    PruningPredicate is rebuilt only when the dynamic filter has
    actually moved (cost: one rebuild per threshold update, not per
    RG check).
  • Between decoder runs in PushDecoderStreamState::transition,
    evaluates the cached predicate against the next run's
    RowGroupMetaData via the existing RowGroupPruningStatistics
    adapter from row_group_filter.rs — same evaluator the static
    path uses at scan-startup, no duplication.
  • If every RG in the next run is provably unwinnable, drop the run
    without invoking its decoder. Zero I/O, zero decode.
  • Conservative on errors: predicate construction failures and stats
    evaluation errors log + increment the existing
    predicate_evaluation_errors metric, then return "don't prune".
    A flaky pruning path can never silently drop data.

Observability

  • New metric row_groups_pruned_dynamic_filter (a Count on
    ParquetFileMetrics) surfaces the runtime saving.
  • New dynamic_rg_pruning=eligible marker on ParquetSource's
    EXPLAIN output (in fmt_extra) signals plan-time eligibility.
    Eligible rather than true because the static plan can't predict
    the runtime outcome — that stays in the metric.

Algorithm

The check uses raw RG min/max stats — same source as the upfront
PruningPredicate. It can only prove "no row in this RG could beat
threshold", never the converse, so:

  • Skip is always safe (no under-return).
  • May be conservative on RGs that have a few qualifying rows alongside
    many non-qualifying ones — that's identical to existing stats
    pruning's worst case.

The dynamic filter itself is already proven correct upstream; this
issue just applies it more often.

Scope

In-scope (single PR target):

  • RowGroupPruner in datafusion-datasource-parquet that evaluates a
    predicate against RowGroupMetaData (mirrors FilePruner).
  • force_per_row_group flag on
    ParquetAccessPlan::split_runs, gated on
    is_dynamic_physical_expr(predicate).
  • PendingDecoderRun carries the row-group indices each pending
    decoder will scan, so the pruner knows what to evaluate.
  • EXPLAIN marker + row_groups_pruned_dynamic_filter metric.
  • Unit tests on RowGroupPruner + integration test asserting the
    metric fires end-to-end + SLT asserting both EXPLAIN surfaces.
  • Works with or without WHERE — the dynamic threshold check
    composes with whatever existing predicate the scan carries.

Explicitly out-of-scope (separate follow-up):

  • RG-granular work queue — distributing row-group descriptors
    instead of PartitionedFile to address cross-file load
    balancing and cross-partition early stop. The two are
    complementary, not blocking: this issue gets the per-file
    early-stop win without touching the morsel scheduler.

Why now

  • Push decoder migration is merged (Extract parquet push decoder module #22289).
    PushDecoderStreamState already supports per-file multi-decoder
    execution via pending_decoders; we just need to use it. No
    arrow-rs changes needed
    — earlier drafts of this issue assumed we
    would have to add peek_next_row_group / skip_next_row_group to
    arrow-rs, but the existing push decoder + per-RG decoder splitting
    is sufficient.
  • is_dynamic_physical_expr (in datafusion-physical-expr-common)
    gives a clean static check for "this predicate may change at
    runtime", letting us narrowly target the per-RG-decoder cost to
    queries that actually benefit.
  • RowGroupPruningStatistics already exists in row_group_filter.rs
    — same evaluator the static RowGroupAccessPlanFilter::prune_by_statistics
    path uses, no duplication.
  • TopK threshold init (feat: TopK stats init + cumulative RG pruning for pure-TopK parquet scans (no-WHERE) #22385) lands the heap pre-fill, making the
    dynamic filter useful from scan-start instead of from the first
    batch. The two changes are multiplicative: pre-fill tightens
    the threshold earlier, more RGs become unwinnable, this PR skips
    them.

Benchmarks

benchmarks/sort_pushdown_inexact (3 files × 20 RGs, scrambled order
within file, disjoint per-RG ranges):

Query main this PR Δ
Q1 ORDER BY l_orderkey DESC LIMIT 100 6.99 ms 3.80 ms −46%
Q2 ORDER BY l_orderkey DESC LIMIT 1000 3.29 ms 1.33 ms −60%
Q3 SELECT * ... DESC LIMIT 100 11.17 ms 9.91 ms −11%
Q4 SELECT * ... DESC LIMIT 1000 9.28 ms 7.95 ms −14%

Narrow-projection queries gain the most — their per-RG cost is
dominated by metadata + sort-column read, which this PR eliminates for
unwinnable RGs. Wide-projection queries gain less because the kept
RG's all-column decode dominates total time, but still see meaningful
savings.

Related work

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions