From 38bb934eb11e4d811b98a10aa2801f2f85f9d7ba Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 14:20:14 +0800 Subject: [PATCH 1/8] feat: add topk_fetch field on ParquetSource MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new `topk_fetch: Option` field on `ParquetSource` alongside the existing sort-pushdown markers (`sort_order_for_reorder` and `reverse_row_groups`). Future commits will: 1. Surface a `FileSource::with_topk_fetch_hint` trait method that `PushdownSort` calls with the outer `SortExec.fetch()` after building an `Inexact` plan. 2. Use the value in the opener to (a) seed the TopK dynamic filter from per-RG min/max statistics before `PruningPredicate` build and (b) truncate `row_group_indexes` once cumulative `num_rows` reaches K. The field is intentionally not surfaced in EXPLAIN — the same fetch value is already shown on the `SortExec` above the source. The temporary `#[expect(dead_code)]` will be removed when the companion plumbing lands in the next commit. --- datafusion/datasource-parquet/src/source.rs | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index cf1fcea005dde..fd34fbb7c5d82 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -296,6 +296,21 @@ pub struct ParquetSource { /// Sort order driving `PreparedAccessPlan::reorder_by_statistics` /// in the opener. sort_order_for_reorder: Option, + /// Optional TopK fetch (K) hint plumbed from the surrounding + /// `SortExec(fetch=K)` by the `PushdownSort` rule (via + /// [`FileSource::with_topk_fetch_hint`]). When set together with + /// `sort_order_for_reorder`, the opener can use it to: + /// 1. Seed the TopK dynamic filter from per-row-group min/max + /// statistics before `PruningPredicate` build. + /// 2. Truncate `row_group_indexes` once cumulative `num_rows >= K`. + /// + /// Not surfaced in EXPLAIN — the same value is shown on the + /// `SortExec` above the data source. + #[expect( + dead_code, + reason = "consumed by `with_topk_fetch_hint` and the opener in subsequent commits" + )] + topk_fetch: Option, } impl ParquetSource { @@ -322,6 +337,7 @@ impl ParquetSource { encryption_factory: None, reverse_row_groups: false, sort_order_for_reorder: None, + topk_fetch: None, } } From 05f657b9734d7c9ee472063593955130dfa92092 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 14:22:41 +0800 Subject: [PATCH 2/8] feat: add FileSource::with_topk_fetch_hint trait method MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds a new `with_topk_fetch_hint(fetch: usize) -> Option>` method to the `FileSource` trait with a `None` default. The new method gives a sort-pushdown `Inexact` `FileSource` a chance to take the surrounding `SortExec`'s `fetch` (K) as a hint and produce a refined source — e.g. one that can seed a TopK dynamic filter from per-RG statistics or prune row groups by cumulative `num_rows`. `ParquetSource` overrides the method: it clones itself, stores the hint in the `topk_fetch` field introduced in the previous commit, and returns the new source. The trait default returns `None` so file formats that don't benefit (CSV, JSON, Avro, Arrow, ...) need no changes. The `PushdownSort` rule will start calling this in the next commit. --- datafusion/datasource-parquet/src/source.rs | 16 ++++++++++++---- datafusion/datasource/src/file.rs | 16 ++++++++++++++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index fd34fbb7c5d82..b6f3c36ad25d6 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -306,10 +306,6 @@ pub struct ParquetSource { /// /// Not surfaced in EXPLAIN — the same value is shown on the /// `SortExec` above the data source. - #[expect( - dead_code, - reason = "consumed by `with_topk_fetch_hint` and the opener in subsequent commits" - )] topk_fetch: Option, } @@ -617,6 +613,18 @@ impl FileSource for ParquetSource { ) } + /// Accept a TopK fetch (K) hint from the surrounding `SortExec`. + /// + /// `PushdownSort` invokes this after producing an `Inexact` result + /// for a `SortExec(fetch=Some(K))`. The opener uses `topk_fetch` + /// (together with `sort_order_for_reorder`) to drive stats-based + /// init of the TopK dynamic filter and cumulative row-group pruning. + fn with_topk_fetch_hint(&self, fetch: usize) -> Option> { + let mut conf = self.clone(); + conf.topk_fetch = Some(fetch); + Some(Arc::new(conf)) + } + fn table_schema(&self) -> &TableSchema { &self.table_schema } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 32bee63b54f23..837c7ba3a0f6b 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -293,6 +293,22 @@ pub trait FileSource: Any + Send + Sync { files } + /// Optional TopK fetch (K) hint for sort-pushdown `Inexact` scans. + /// + /// When the `PushdownSort` rule produces an `Inexact` result over a + /// `SortExec(fetch=Some(K))`, the surrounding `SortExec` is kept in + /// the plan (the source can only approximate the ordering). Sources + /// that can use `K` to drive further per-file optimisations — e.g. + /// seeding a TopK dynamic filter from parquet per-row-group min/max + /// statistics, or truncating the row-group iteration once cumulative + /// `num_rows` reaches `K` — should override this method and return a + /// new source with the hint set. + /// + /// Default returns `None` (no-op). + fn with_topk_fetch_hint(&self, _fetch: usize) -> Option> { + None + } + /// Try to push down a projection into this FileSource. /// /// `FileSource` implementations that support projection pushdown should From 658271125cc32c3042064d23b989ac664085c440 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 14:26:52 +0800 Subject: [PATCH 3/8] feat: plumb SortExec.fetch via with_topk_fetch_hint in PushdownSort MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When `PushdownSort` produces an `Inexact` result (the source can only approximate the requested ordering, so a `SortExec` is kept), and the outer `SortExec` carried a fetch (i.e. we are in a TopK shape), the rule now surfaces `K` to the data source. To carry the hint through the plan tree without a hard downcast on `DataSourceExec` in `physical-optimizer` — which would require a new crate dependency on `datafusion-datasource` — the plumbing mirrors the existing `try_pushdown_sort` chain: * `ExecutionPlan::with_topk_fetch_hint` (default `None`) * `DataSourceExec::with_topk_fetch_hint` (delegates to data source) * `DataSource::with_topk_fetch_hint` (default `None`) * `FileScanConfig::with_topk_fetch_hint` (swaps in the new file source returned by `FileSource::with_topk_fetch_hint`) Both `Inexact` branches in `PushdownSort` (the SPM → SortExec pattern and the standalone SortExec pattern) now call `inner.with_topk_fetch_hint(fetch)` when the SortExec has a fetch, falling back to `inner` unchanged otherwise. Sources that don't override the hint (CSV, JSON, Avro, Arrow, MemorySource, ...) keep behaving exactly as before. The Parquet opener will start using the hint in the next two commits. --- .../datasource/src/file_scan_config/mod.rs | 13 ++++++++++ datafusion/datasource/src/source.rs | 20 ++++++++++++++++ .../physical-optimizer/src/pushdown_sort.rs | 24 ++++++++++++++++--- .../physical-plan/src/execution_plan.rs | 14 +++++++++++ 4 files changed, 68 insertions(+), 3 deletions(-) diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..cd0e76f34bdd8 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -999,6 +999,19 @@ impl DataSource for FileScanConfig { Some(Arc::new(new_config)) } + fn with_topk_fetch_hint(&self, fetch: usize) -> Option> { + // Delegate to the underlying file source. If the source accepts the + // hint, swap it in on a clone so the rest of the config (eq + // properties, schema, etc.) is preserved unchanged. + self.file_source + .with_topk_fetch_hint(fetch) + .map(|new_file_source| { + let mut new_config = self.clone(); + new_config.file_source = new_file_source; + Arc::new(new_config) as Arc + }) + } + fn apply_expressions( &self, f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 420c6b508ce4f..a93fe6200e903 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -225,6 +225,17 @@ pub trait DataSource: Any + Send + Sync + Debug { None } + /// Accept a TopK fetch (K) hint from a surrounding `SortExec`. + /// + /// See [`ExecutionPlan::with_topk_fetch_hint`] for the high-level + /// contract. The default returns `None`. `FileScanConfig` overrides + /// this to delegate to the underlying `FileSource`. + /// + /// [`ExecutionPlan::with_topk_fetch_hint`]: datafusion_physical_plan::ExecutionPlan::with_topk_fetch_hint + fn with_topk_fetch_hint(&self, _fetch: usize) -> Option> { + None + } + /// Apply a closure to each expression used by this data source. /// /// This includes filter predicates (which may contain dynamic filters) and any @@ -546,6 +557,15 @@ impl ExecutionPlan for DataSourceExec { }) } + fn with_topk_fetch_hint(&self, fetch: usize) -> Option> { + self.data_source + .with_topk_fetch_hint(fetch) + .map(|new_data_source| { + Arc::new(self.clone().with_data_source(new_data_source)) + as Arc + }) + } + fn with_new_state( &self, state: Arc, diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 40a6fe2c205c7..d4a43347d09d3 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -125,6 +125,14 @@ impl PhysicalOptimizerRule for PushdownSort { return Ok(Transformed::yes(Arc::new(new_spm))); } SortOrderPushdownResult::Inexact { inner } => { + // Surface the sort's fetch (K) to the data source so + // it can drive TopK-aware per-file optimisations + // (stats-seeded threshold, cumulative RG pruning). + // No-op for sources that don't override the hint. + let inner = sort_child + .fetch() + .and_then(|fetch| inner.with_topk_fetch_hint(fetch)) + .unwrap_or(inner); let new_sort = SortExec::new(required_ordering.clone(), inner) .with_fetch(sort_child.fetch()) .with_preserve_partitioning(true); @@ -169,9 +177,19 @@ impl PhysicalOptimizerRule for PushdownSort { } } SortOrderPushdownResult::Inexact { inner } => { - // Data source is optimized for the ordering but not perfectly sorted - // Keep the Sort operator but use the optimized input - // Benefits: TopK queries can terminate early, better cache locality + // Data source is optimized for the ordering but not perfectly sorted. + // Keep the Sort operator but use the optimized input. + // Benefits: TopK queries can terminate early, better cache locality. + // + // If the SortExec carries a fetch (LIMIT) we are in a + // TopK shape — surface `K` to the data source so it can + // drive TopK-aware per-file optimisations (stats-seeded + // threshold, cumulative RG pruning). No-op for sources + // that don't override the hint. + let inner = sort_exec + .fetch() + .and_then(|fetch| inner.with_topk_fetch_hint(fetch)) + .unwrap_or(inner); Ok(Transformed::yes(Arc::new( SortExec::new(required_ordering.clone(), inner) .with_fetch(sort_exec.fetch()) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..0669c62a2efa1 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -789,6 +789,20 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { ) -> Option> { None } + + /// Hint to the plan that an `Inexact` sort-pushdown is being layered + /// under a `SortExec(fetch=Some(fetch))` (i.e. a TopK). + /// + /// `PushdownSort` calls this on the `inner` plan returned by + /// `try_pushdown_sort` when the eliminated/wrapped `SortExec` carried + /// a fetch. Data sources that can take advantage of `K` for further + /// per-file work (e.g. stats-seeded TopK threshold, cumulative + /// row-group pruning) should override this and return a new plan + /// with the hint applied. The default returns `None`, meaning + /// "no change — keep using the original plan". + fn with_topk_fetch_hint(&self, _fetch: usize) -> Option> { + None + } } impl dyn ExecutionPlan { From 7ed985707c879c217e5f0da7b7cfb85efab62125 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 14:29:32 +0800 Subject: [PATCH 4/8] feat: add PreparedAccessPlan::truncate_row_groups MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces `truncate_row_groups(count)` on `PreparedAccessPlan` — the building block for the opener's upcoming cumulative-RG-prune step in the no-WHERE TopK path. After `reorder_by_statistics` and optional `reverse`, the opener walks `row_group_indexes` front to back, accumulating `num_rows` until it reaches `K`, then truncates. Semantics: * Keep at most the first `count` entries of `row_group_indexes`. * No-op when `count >= row_group_indexes.len()`. * Bail unchanged when `row_selection.is_some()`. Page-level selections are keyed by surviving row groups and would have to be remapped; the no-WHERE TopK gate keeps `row_selection` at `None` in the caller, so the early-return is a safety net. Three new unit tests cover the happy path, the no-op path, and the row-selection bail-out. The method is `pub(crate)` and has a `#[cfg_attr(not(test), expect(dead_code, ...))]` to keep clippy clean while the caller in the opener is still on its way; that attribute will go away in the next commit when the wiring lands. --- .../datasource-parquet/src/access_plan.rs | 81 +++++++++++++++++++ 1 file changed, 81 insertions(+) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index edbea39948f09..6bba237039254 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -611,6 +611,38 @@ impl PreparedAccessPlan { Ok(self) } + /// Truncate `row_group_indexes` to keep at most the first `count` + /// entries. + /// + /// Used by the cumulative-RG-prune step in the opener: after + /// `reorder_by_statistics` and optional `reverse`, the opener + /// accumulates `num_rows` from the front of `row_group_indexes` + /// until the cumulative count reaches `K` (the TopK fetch), then + /// calls this method with `count = number_of_row_groups_needed`. + /// + /// Bails out unchanged (no truncation) when `row_selection` is + /// `Some`. Page-level state is keyed by the surviving row groups + /// and would have to be remapped; the no-WHERE TopK gate ensures + /// `row_selection` is always `None` in the path that calls this + /// method, so the early-return is just a safety net. + #[cfg_attr( + not(test), + expect( + dead_code, + reason = "called by the opener's cumulative-RG-prune step in the next commit" + ) + )] + pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self { + if self.row_selection.is_some() { + debug!("Skipping cumulative RG truncate: row_selection present"); + return self; + } + if count < self.row_group_indexes.len() { + self.row_group_indexes.truncate(count); + } + self + } + /// Reverse the access plan for reverse scanning pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing @@ -1026,4 +1058,53 @@ mod test { assert_eq!(result.row_group_indexes, vec![0, 1]); } + + // ---------------------------------------------------------------- + // `truncate_row_groups` tests + // ---------------------------------------------------------------- + + /// Happy path: truncate keeps only the first `count` indexes, + /// preserving order. Mirrors the opener's cumulative-RG-prune + /// step, where the iteration order has already been finalised + /// by `reorder_by_statistics` and optional `reverse`. + #[test] + fn truncate_row_groups_keeps_first_n() { + let plan = PreparedAccessPlan::new(vec![3, 1, 4, 1, 5, 9, 2, 6], None).unwrap(); + + let result = plan.truncate_row_groups(3); + + assert_eq!(result.row_group_indexes, vec![3, 1, 4]); + assert!(result.row_selection.is_none()); + } + + /// No-op when the requested keep-count is >= the current length + /// (e.g. LIMIT larger than the file's total row groups). We + /// cover both equal and greater-than separately. + #[test] + fn truncate_row_groups_no_op_when_count_exceeds_len() { + let same_len = PreparedAccessPlan::new(vec![0, 1, 2], None) + .unwrap() + .truncate_row_groups(3); + assert_eq!(same_len.row_group_indexes, vec![0, 1, 2]); + + let larger = PreparedAccessPlan::new(vec![0, 1, 2], None) + .unwrap() + .truncate_row_groups(10); + assert_eq!(larger.row_group_indexes, vec![0, 1, 2]); + } + + /// `row_selection` is `Some` → bail out unchanged. Page-level + /// state would need to be remapped to the surviving row groups; + /// the no-WHERE TopK gate ensures this is just a safety net. + #[test] + fn truncate_row_groups_skips_when_row_selection_present() { + let selection = RowSelection::from(vec![RowSelector::select(100)]); + let plan = + PreparedAccessPlan::new(vec![0, 1, 2], Some(selection.clone())).unwrap(); + + let result = plan.truncate_row_groups(1); + + assert_eq!(result.row_group_indexes, vec![0, 1, 2]); + assert_eq!(result.row_selection, Some(selection)); + } } From b3179c9c1f964dbc751836c43b038dc98220c2bb Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 14:35:50 +0800 Subject: [PATCH 5/8] feat: TopK stats init + cumulative RG pruning in opener MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two new opener optimisations on the no-WHERE TopK path, both gated on the same conditions: 1. The surrounding `SortExec(fetch=K)` plumbed `K` through `with_topk_fetch_hint` (`topk_fetch.is_some()`). 2. Sort pushdown fired (`sort_order_for_reorder.is_some()`). 3. The source predicate is a bare `DynamicFilterPhysicalExpr` — i.e. no static WHERE conjuncts (`predicate_is_pure_dynamic_filter`). When all three hold: **Stats init (in `prepare_filters`, before `PruningPredicate` build).** Computes the "best safe threshold" from per-row-group min/max statistics: * DESC sort → keep the largest `max` across RGs with `num_rows >= K`. * ASC sort → keep the smallest `min` across RGs with `num_rows >= K`. Then builds `col {>|<} threshold` and feeds it into the dynamic filter via the existing `DynamicFilterPhysicalExpr::update` API. The pruning predicate built immediately afterwards picks up the tighter bound, so cold-start TopK skips many row groups before ever decoding data. The "RGs smaller than `K`" filter is the correctness guarantee: we never raise the threshold past a value that a single qualifying row group could provide, so K rows are always reachable. **Cumulative RG prune (in the `prepare_access_plan` closure).** After `reorder_by_statistics` + optional `reverse`, the iteration order is final and the first row group is the best for the request. Walk forward, accumulate `num_rows`, and truncate via the new `PreparedAccessPlan::truncate_row_groups` once cumulative count reaches `K`. Skipped when `row_selection.is_some()` (already asserted by `truncate_row_groups`, but the gate is cheaper). Both optimisations are best-effort and fully graceful — any per-file failure (missing stats, non-column sort key, type mismatch) `debug!`s and falls back to the unmodified behaviour. Plumbing: * `ParquetMorselizer` and `PreparedParquetOpen` gain a `topk_fetch: Option` field, populated by `ParquetSource::create_morselizer`. * New private helpers live in a `topk_no_where` submodule inside `opener/mod.rs`: `predicate_is_pure_dynamic_filter`, `find_dynamic_filter`, `find_column_in_expr`, `compute_best_threshold`, `try_init_topk_threshold`. `topk_fetch` is intentionally not surfaced in EXPLAIN — the same value is already shown on the `SortExec` above the data source. --- .../datasource-parquet/src/access_plan.rs | 7 - .../datasource-parquet/src/opener/mod.rs | 340 ++++++++++++++++++ datafusion/datasource-parquet/src/source.rs | 1 + 3 files changed, 341 insertions(+), 7 deletions(-) diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index 6bba237039254..159df9b209701 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -625,13 +625,6 @@ impl PreparedAccessPlan { /// and would have to be remapped; the no-WHERE TopK gate ensures /// `row_selection` is always `None` in the path that calls this /// method, so the early-return is just a safety net. - #[cfg_attr( - not(test), - expect( - dead_code, - reason = "called by the opener's cumulative-RG-prune step in the next commit" - ) - )] pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self { if self.row_selection.is_some() { debug!("Skipping cumulative RG truncate: row_selection present"); diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index e5929fd43f11a..d8400666c1bf0 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -137,6 +137,14 @@ pub(super) struct ParquetMorselizer { pub reverse_row_groups: bool, /// Optional sort order used to reorder row groups by their min/max statistics. pub sort_order_for_reorder: Option, + /// Optional TopK fetch (K) hint plumbed from the surrounding + /// `SortExec(fetch=K)` via [`crate::ParquetSource`]'s + /// `with_topk_fetch_hint`. The opener uses it (in combination with + /// `sort_order_for_reorder` and a bare-`DynamicFilterPhysicalExpr` + /// predicate) to seed the TopK threshold from per-RG statistics + /// before `PruningPredicate` build and to truncate + /// `row_group_indexes` once cumulative `num_rows` reaches `K`. + pub topk_fetch: Option, } impl fmt::Debug for ParquetMorselizer { @@ -288,6 +296,8 @@ struct PreparedParquetOpen { max_predicate_cache_size: Option, reverse_row_groups: bool, sort_order_for_reorder: Option, + /// See [`ParquetMorselizer::topk_fetch`]. + topk_fetch: Option, preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -658,6 +668,7 @@ impl ParquetMorselizer { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: self.sort_order_for_reorder.clone(), + topk_fetch: self.topk_fetch, preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, @@ -825,6 +836,37 @@ impl MetadataLoadedParquetOpen { } prepared.physical_file_schema = Arc::clone(&physical_file_schema); + // No-WHERE TopK stats init. + // + // Gate: sort pushdown fired (`sort_order_for_reorder.is_some()`) + // AND the surrounding `SortExec` plumbed K (`topk_fetch.is_some()`) + // AND the only predicate is a bare `DynamicFilterPhysicalExpr`. + // + // Effect: seed the TopK threshold from per-RG min/max statistics + // before `PruningPredicate` build, so the pruning predicate + // immediately benefits from a tight bound. The threshold is + // chosen so at least one row group with `num_rows >= K` can + // contain the final K rows — i.e. we don't prune away anything + // we'd actually need. + if let (Some(predicate), Some(fetch), Some(sort_order)) = ( + prepared.predicate.as_ref(), + prepared.topk_fetch, + prepared.sort_order_for_reorder.as_ref(), + ) && topk_no_where::predicate_is_pure_dynamic_filter(predicate) + { + let rg_metadata = reader_metadata.metadata().row_groups(); + // Best-effort — the helper is fully graceful (logs + + // returns Ok on any per-file failure). + topk_no_where::try_init_topk_threshold( + predicate, + fetch, + sort_order, + rg_metadata, + &physical_file_schema, + reader_metadata.parquet_schema(), + )?; + } + // Build predicates for this specific file let pruning_predicate = build_pruning_predicates( prepared.predicate.as_ref(), @@ -1131,6 +1173,14 @@ impl RowGroupsPrunedParquetOpen { // Both inputs come from the sort-pushdown channel — // `ParquetSource::try_pushdown_sort` sets `sort_order_for_reorder` // and/or `reverse_row_groups`. + // Same gate as the stats-init step in `prepare_filters`. The + // boolean is hoisted here because the closure captures it. + let topk_no_where_active = prepared.topk_fetch.is_some() + && prepared.sort_order_for_reorder.is_some() + && prepared + .predicate + .as_ref() + .is_some_and(topk_no_where::predicate_is_pure_dynamic_filter); let prepare_access_plan = |plan: ParquetAccessPlan| -> Result { let mut prepared_plan = plan.prepare(rg_metadata)?; @@ -1144,6 +1194,28 @@ impl RowGroupsPrunedParquetOpen { if prepared.reverse_row_groups { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } + // Cumulative RG prune: after reorder + reverse, the + // first row group is the "best" one for the request. + // Walk forward, accumulate `num_rows`, and truncate + // once we have enough to satisfy K. Gated on the + // no-WHERE TopK shape and `topk_fetch` being set. + if topk_no_where_active + && prepared_plan.row_selection.is_none() + && let Some(fetch) = prepared.topk_fetch + { + let mut cumulative_rows: usize = 0; + let mut keep: usize = prepared_plan.row_group_indexes.len(); + for (i, &rg_idx) in prepared_plan.row_group_indexes.iter().enumerate() + { + cumulative_rows = cumulative_rows + .saturating_add(rg_metadata[rg_idx].num_rows() as usize); + if cumulative_rows >= fetch { + keep = i + 1; + break; + } + } + prepared_plan = prepared_plan.truncate_row_groups(keep); + } Ok(prepared_plan) }; @@ -1351,6 +1423,273 @@ fn create_initial_plan( Ok(ParquetAccessPlan::new_all(row_group_count)) } +/// Helpers for the no-WHERE TopK path: seed the dynamic filter +/// from per-row-group min/max statistics, and accumulate `num_rows` +/// across row groups to truncate at `K`. +/// +/// All helpers are gated on: +/// 1. `predicate_is_pure_dynamic_filter` — the only predicate on the +/// scan is a bare `DynamicFilterPhysicalExpr`. WHERE clauses, +/// other filters, or any wrapper that adds conjuncts disables +/// the path. +/// 2. `topk_fetch.is_some()` — the surrounding `SortExec(fetch=K)` +/// plumbed `K` through `with_topk_fetch_hint`. +/// 3. `sort_order_for_reorder.is_some()` — sort pushdown fired, +/// so the source has a coherent sort direction / column. +mod topk_no_where { + use super::*; + use arrow::array::ArrayRef; + use arrow::datatypes::Schema; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, lit, + }; + use parquet::arrow::arrow_reader::statistics::StatisticsConverter; + use parquet::file::metadata::RowGroupMetaData; + use parquet::schema::types::SchemaDescriptor; + + /// True iff the top-level predicate downcasts to + /// [`DynamicFilterPhysicalExpr`] — i.e. the only filter is a + /// dynamic filter (TopK threshold) with no static WHERE-clause + /// conjuncts. + pub(super) fn predicate_is_pure_dynamic_filter( + predicate: &Arc, + ) -> bool { + predicate + .downcast_ref::() + .is_some() + } + + /// Walk the predicate tree and return a reference to the first + /// [`DynamicFilterPhysicalExpr`] found, so the caller can call + /// [`DynamicFilterPhysicalExpr::update`] on it. + /// + /// For the no-WHERE TopK path the predicate is already a bare + /// dynamic filter, but the recursive walk is cheap and keeps + /// the helper composable for future shapes. + pub(super) fn find_dynamic_filter( + expr: &Arc, + ) -> Option<&DynamicFilterPhysicalExpr> { + if let Some(df) = expr.downcast_ref::() { + return Some(df); + } + for child in expr.children() { + if let Some(found) = find_dynamic_filter(child) { + return Some(found); + } + } + None + } + + /// Unwrap single-child wrappers (e.g. `CastExpr`) to reach a + /// plain [`Column`]. Returns `None` for genuinely multi-arg + /// expressions like `a + b`. + pub(super) fn find_column_in_expr(expr: &Arc) -> Option { + if let Some(col) = expr.downcast_ref::() { + return Some(col.clone()); + } + let children = expr.children(); + if children.len() == 1 { + return find_column_in_expr(children[0]); + } + None + } + + /// Walk the row groups and compute the tightest *safe* threshold + /// for the TopK dynamic filter. + /// + /// For DESC `... LIMIT K` the dynamic filter is `col > T`, and we + /// want the **largest** `T` that is guaranteed `<=` the K-th + /// largest value in the file. Any single row group with + /// `num_rows >= K` already provides such a bound: all rows in + /// that RG are `>= min(RG)`, so the K-th largest value across + /// the file is also `>= min(RG)`. The largest such per-RG min + /// is the tightest safe `T` → `take_max = true`, `stats = + /// row_group_mins`. + /// + /// For ASC the dual holds: the dynamic filter is `col < T`, and + /// the tightest safe `T` is the **smallest** per-RG max over + /// RGs with `num_rows >= K` → `take_max = false`, `stats = + /// row_group_maxes`. + /// + /// Row groups smaller than `fetch` are skipped — alone they + /// can't guarantee that K qualifying rows live above/below + /// their min/max. + pub(super) fn compute_best_threshold( + stats: &ArrayRef, + rg_metadata: &[RowGroupMetaData], + fetch: usize, + take_max: bool, + ) -> Result> { + debug_assert_eq!( + stats.len(), + rg_metadata.len(), + "stats and rg_metadata must be aligned" + ); + + let mut best: Option = None; + for (idx, rg) in rg_metadata.iter().enumerate() { + if (rg.num_rows() as usize) < fetch { + continue; + } + if stats.is_null(idx) { + continue; + } + let candidate = ScalarValue::try_from_array(stats, idx)?; + best = match best { + None => Some(candidate), + Some(current) => { + let take_candidate = if take_max { + candidate > current + } else { + candidate < current + }; + Some(if take_candidate { candidate } else { current }) + } + }; + } + Ok(best) + } + + /// Seed the TopK dynamic filter from per-RG statistics. + /// + /// Builds `col {op} threshold` (with `op = >` for DESC and + /// `op = <` for ASC) and calls [`DynamicFilterPhysicalExpr::update`]. + /// If anything fails gracefully (sort key isn't a plain column, + /// no stats available, threshold can't be computed), the + /// function is a no-op — the TopK will simply tighten the + /// filter from real data on the first batch as before. + pub(super) fn try_init_topk_threshold( + predicate: &Arc, + fetch: usize, + sort_order: &LexOrdering, + rg_metadata: &[RowGroupMetaData], + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + ) -> Result<()> { + if rg_metadata.is_empty() { + return Ok(()); + } + + let Some(dynamic_filter) = find_dynamic_filter(predicate) else { + return Ok(()); + }; + + let first_sort_expr = sort_order.first(); + let Some(column) = find_column_in_expr(&first_sort_expr.expr) else { + debug!( + "Skipping TopK stats init: sort expr is not a column-bearing expression" + ); + return Ok(()); + }; + if arrow_schema.field_with_name(column.name()).is_err() { + debug!( + "Skipping TopK stats init: column `{}` not in file schema", + column.name() + ); + return Ok(()); + } + + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + parquet_schema, + ) { + Ok(c) => c, + Err(e) => { + debug!( + "Skipping TopK stats init: stats converter for `{}` failed: {e}", + column.name() + ); + return Ok(()); + } + }; + + // For DESC we want a *safe* lower bound on the K-th largest + // value → take the largest per-RG `min` over qualifying RGs. + // For ASC we want a *safe* upper bound on the K-th smallest + // value → take the smallest per-RG `max` over qualifying RGs. + // The dynamic filter expression is `col > T` for DESC and + // `col < T` for ASC respectively (mirrors TopK's existing + // `build_filter_expression`). + let is_desc = first_sort_expr.options.descending; + let stats = if is_desc { + converter.row_group_mins(rg_metadata.iter()) + } else { + converter.row_group_maxes(rg_metadata.iter()) + }; + let stats = match stats { + Ok(arr) => arr, + Err(e) => { + debug!( + "Skipping TopK stats init: cannot read stats for `{}`: {e}", + column.name() + ); + return Ok(()); + } + }; + + // `take_max = is_desc`: DESC picks the largest min, ASC picks + // the smallest max. + let Some(threshold) = + compute_best_threshold(&stats, rg_metadata, fetch, is_desc)? + else { + return Ok(()); + }; + + // Cast the threshold to the sort expression's output data + // type. For `CAST(small_val AS Int64) DESC` the column is + // Int16 (so the stats array is Int16) but the BinaryExpr + // must compare Int64 vs Int64 to evaluate. If the cast + // fails, bail out — this is best-effort. + let sort_expr_type = match first_sort_expr.expr.data_type(arrow_schema) { + Ok(t) => t, + Err(e) => { + debug!("Skipping TopK stats init: cannot resolve sort expr type: {e}"); + return Ok(()); + } + }; + let threshold = if threshold.data_type() != sort_expr_type { + match threshold.cast_to(&sort_expr_type) { + Ok(v) => v, + Err(e) => { + debug!( + "Skipping TopK stats init: cannot cast threshold from {:?} to {sort_expr_type:?}: {e}", + threshold.data_type() + ); + return Ok(()); + } + } + } else { + threshold + }; + + // Build `sort_expr {op} threshold`. The dynamic filter's + // children must remain stable across updates, so reuse the + // sort expression as-is (it shares the same column children + // that TopK declared). + // + // Use `>=` / `<=` rather than `>` / `<` because the + // threshold derived from stats is only a *safe* bound — + // some row in the file may equal it and still belong to + // the final K. The real TopK will tighten this to strict + // `>` / `<` on the next update once it has seen actual + // data. + let op = if is_desc { + Operator::GtEq + } else { + Operator::LtEq + }; + let new_expr: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&first_sort_expr.expr), + op, + lit(threshold), + )); + dynamic_filter.update(new_expr)?; + Ok(()) + } +} + /// Build a page pruning predicate from an optional predicate expression. /// If the predicate is None or the predicate cannot be converted to a page pruning /// predicate, return None. @@ -1611,6 +1950,7 @@ mod test { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: None, + topk_fetch: None, } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index b6f3c36ad25d6..35ac4ebb8be32 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -598,6 +598,7 @@ impl FileSource for ParquetSource { max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: self.sort_order_for_reorder.clone(), + topk_fetch: self.topk_fetch, })) } From 4ba874692cddde52c5c27d5d834ec526d6ceaa27 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 14:43:41 +0800 Subject: [PATCH 6/8] test: add Test I SLT for TopK stats init + cumulative prune (no-WHERE path) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Test I.1: DESC LIMIT — EXPLAIN shows DynamicFilter [ empty ] + sort_order_for_reorder + reverse_row_groups=true (sort pushdown fires) Test I.2: DESC LIMIT result correctness Test I.3: ASC LIMIT in same direction as file — Exact path, SortExec eliminated, limit becomes static fetch on source Test I.4: DESC LIMIT with WHERE — stats init and cumulative prune both skip (predicate is not a bare DynamicFilter), result still correct via dynamic filter pushdown Test I.5: Larger LIMIT spanning multiple RGs Test I.6: LIMIT larger than total rows — returns all rows --- .../sqllogictest/test_files/sort_pushdown.slt | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 540562eb3bc8d..43a7c107d0a52 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2392,6 +2392,126 @@ DROP TABLE th_mixed; statement ok DROP TABLE th_reorder; +# =========================================================== +# Test I: WITH ORDER + DESC LIMIT — stats init + cumulative prune +# Exercises the full optimisation chain on sorted non-overlapping data: +# file reorder → RG reorder → reverse → stats init → cumulative prune. +# Both stats init and cumulative prune are gated on no-WHERE (the +# predicate handed to the opener is a bare DynamicFilter) and on sort +# pushdown (sort_order_for_reorder is set). +# =========================================================== + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +# Create sorted data spread across multiple small row groups +statement ok +CREATE TABLE ti_data(id INT, value INT) AS VALUES +(1,10),(2,20),(3,30),(4,40),(5,50),(6,60), +(7,70),(8,80),(9,90),(10,100),(11,110),(12,120); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +query I +COPY (SELECT * FROM ti_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/ti_sorted/data.parquet'; +---- +12 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE ti_sorted(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/ti_sorted/data.parquet' +WITH ORDER (id ASC); + +# Test I.1: DESC LIMIT with sort pushdown — EXPLAIN shows reverse_row_groups=true +query TT +EXPLAIN SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: ti_sorted.id DESC NULLS FIRST, fetch=3 +02)--TableScan: ti_sorted projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ti_sorted/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], sort_order_for_reorder=[id@0 DESC], reverse_row_groups=true + +# Test I.2: DESC LIMIT results — should return the largest values +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 3; +---- +12 120 +11 110 +10 100 + +# Test I.3: ASC LIMIT (same direction as file order) — sort elimination, +# no TopK in the plan +query TT +EXPLAIN SELECT * FROM ti_sorted ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: ti_sorted.id ASC NULLS LAST, fetch=3 +02)--TableScan: ti_sorted projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ti_sorted/data.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT * FROM ti_sorted ORDER BY id ASC LIMIT 3; +---- +1 10 +2 20 +3 30 + +# Test I.4: DESC LIMIT with WHERE — stats init and cumulative prune both +# skip (predicate is not a bare DynamicFilter), result still correct +query II +SELECT * FROM ti_sorted WHERE value > 50 ORDER BY id DESC LIMIT 2; +---- +12 120 +11 110 + +# Test I.5: Larger LIMIT spanning multiple RGs (4 RGs of 3 rows each) +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 8; +---- +12 120 +11 110 +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 + +# Test I.6: LIMIT larger than total rows — returns all rows +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 100; +---- +12 120 +11 110 +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 +4 40 +3 30 +2 20 +1 10 + +# Cleanup Test I +statement ok +DROP TABLE ti_data; + +statement ok +DROP TABLE ti_sorted; + # =========================================================== # Test J: Non-overlapping RGs without WITH ORDER — # RG reorder via DynamicFilter sort_options From 168ce20947b9432c1fd166bf0ddf45eda64567d4 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 16:19:18 +0800 Subject: [PATCH 7/8] fix: gate TopK stats init + cumulative RG prune on safe shapes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three issues surfaced after CI ran the no-WHERE TopK path against the full SLT corpus and the topk_filter_pushdown fuzz test: * `cargo doc` failed on an unresolved intra-doc link `[crate::ParquetSource]`. Rewrite as a reference link with an explicit path target so rustdoc resolves it from the morsel module. * Stats init installed a `BinaryExpr` using the scan-side column from `sort_order_for_reorder`, but the dynamic filter's stored expression lives in TopK output column space when a projection sits between TopK and the scan. The `remap_children` walk then either bypassed the rewrite (TopK saw scan-side columns, producing `Int32 <= Utf8View` / `Int32 <= Int64` failures) or double-wrapped the projection's CAST/computation (producing `CAST(CAST(...) AS Int32) AS Int64) + 1`). Gate stats init on `remapped_children == original_children`: the filter-pushdown path always rebuilds children via `reassign_expr_columns` so `remapped_children` is essentially always `Some`, but it's safe when the two slices are *logically* equal. * Cumulative RG prune is unsafe for multi-column sort: leading-column reorder doesn't disambiguate ties on the leading column, so a tie may span row groups and truncating after the first K rows can drop the row group that actually contains a member of the top-K. This was caught by the topk_filter_pushdown fuzz with `ORDER BY department ASC NULLS FIRST, id ASC NULLS LAST, name ASC NULLS LAST LIMIT 1` returning the wrong row. Gate the cumulative truncation on `sort_order.len() == 1` and on the same `remapped_children == original_children` check. Multi-column TopK still benefits from reorder + reverse; only the cumulative-K truncation is skipped. Stats init is also gated on `sort_order.len() == 1` for symmetry, even though it is correct for multi-column sort — the seeded leading-column threshold is still a valid lower/upper bound on the K-th value. The single-column condition keeps the no-WHERE TopK path obviously correct and matches the gate on the cumulative truncation. --- .../datasource-parquet/src/opener/mod.rs | 113 ++++++++++++++++-- 1 file changed, 102 insertions(+), 11 deletions(-) diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index d8400666c1bf0..15b6580cdd013 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -138,12 +138,14 @@ pub(super) struct ParquetMorselizer { /// Optional sort order used to reorder row groups by their min/max statistics. pub sort_order_for_reorder: Option, /// Optional TopK fetch (K) hint plumbed from the surrounding - /// `SortExec(fetch=K)` via [`crate::ParquetSource`]'s - /// `with_topk_fetch_hint`. The opener uses it (in combination with - /// `sort_order_for_reorder` and a bare-`DynamicFilterPhysicalExpr` - /// predicate) to seed the TopK threshold from per-RG statistics - /// before `PruningPredicate` build and to truncate - /// `row_group_indexes` once cumulative `num_rows` reaches `K`. + /// `SortExec(fetch=K)` via [`ParquetSource`]'s `with_topk_fetch_hint`. + /// The opener uses it (in combination with `sort_order_for_reorder` and + /// a bare-`DynamicFilterPhysicalExpr` predicate) to seed the TopK + /// threshold from per-RG statistics before `PruningPredicate` build and + /// to truncate `row_group_indexes` once cumulative `num_rows` reaches + /// `K`. + /// + /// [`ParquetSource`]: crate::source::ParquetSource pub topk_fetch: Option, } @@ -1173,14 +1175,48 @@ impl RowGroupsPrunedParquetOpen { // Both inputs come from the sort-pushdown channel — // `ParquetSource::try_pushdown_sort` sets `sort_order_for_reorder` // and/or `reverse_row_groups`. - // Same gate as the stats-init step in `prepare_filters`. The - // boolean is hoisted here because the closure captures it. + // Same gate as the stats-init step in `prepare_filters`, plus: + // + // * `sort_order.len() == 1`: the leading-column reorder only + // sorts row groups by `min(col0)`. With a multi-column sort + // (e.g. `ORDER BY b ASC, a DESC`) ties on the leading column + // may span row groups, so truncating after the first K rows + // could drop a row group that contains a member of the true + // top-K once the tie is broken by the trailing columns. + // Multi-column TopK still benefits from reorder + reverse; + // we only skip the cumulative truncation. + // + // * `remapped_children.is_none()`: a projection between TopK + // and the scan would also break the assumption that the + // cumulative `num_rows` over the scan-side row groups maps + // directly to the K rows TopK is looking for. + // + // The boolean is hoisted here because the closure captures it. let topk_no_where_active = prepared.topk_fetch.is_some() - && prepared.sort_order_for_reorder.is_some() && prepared - .predicate + .sort_order_for_reorder .as_ref() - .is_some_and(topk_no_where::predicate_is_pure_dynamic_filter); + .is_some_and(|so| so.len() == 1) + && prepared.predicate.as_ref().is_some_and(|p| { + topk_no_where::predicate_is_pure_dynamic_filter(p) + && topk_no_where::find_dynamic_filter(p).is_some_and(|df| { + // Same `remapped_children == original_children` + // check as the stats-init gate, allowing the + // identity-remap that `reassign_expr_columns` + // produces during filter pushdown. + match df.remapped_children() { + None => true, + Some(remapped) => { + let originals = df.original_children(); + originals.len() == remapped.len() + && originals + .iter() + .zip(remapped.iter()) + .all(|(orig, new)| orig.as_ref() == new.as_ref()) + } + } + }) + }); let prepare_access_plan = |plan: ParquetAccessPlan| -> Result { let mut prepared_plan = plan.prepare(rg_metadata)?; @@ -1575,6 +1611,61 @@ mod topk_no_where { return Ok(()); }; + // Bail out if a projection sits between the TopK and the scan + // that actually rewrote the filter's column references (i.e. + // the dynamic filter's `remapped_children` differ from + // `original_children`). In that case the stored expression + // inside the dynamic filter is in the TopK output column space + // (e.g. `a@1`) but `sort_order_for_reorder` is in the scan + // column space (e.g. `a@0`, or worse, `CAST(a@0 AS Int64) + 1` + // for a computed projection). Installing a scan-side BinaryExpr + // would either bypass the children remapping (so the TopK side + // ends up evaluating against the wrong column) or, when + // `remap_children` does find a match, double-wrap the + // expression with the projection's cast/computation. Either + // way the resulting filter is wrong. Punt on the stats-init — + // TopK will tighten the dynamic filter from real data on the + // first batch as before. + // + // We deliberately allow `remapped_children == original_children` + // here: the filter-pushdown path always rebuilds children via + // `reassign_expr_columns`, which produces fresh `Arc`s that + // `with_new_children_if_necessary` then installs as + // `remapped_children`. When the two slices are *logically* equal + // there's no projection rewriting in play and stats-init is + // safe. + if let Some(remapped) = dynamic_filter.remapped_children() { + let originals = dynamic_filter.original_children(); + let logically_equal = originals.len() == remapped.len() + && originals + .iter() + .zip(remapped.iter()) + .all(|(orig, new)| orig.as_ref() == new.as_ref()); + if !logically_equal { + debug!( + "Skipping TopK stats init: dynamic filter children were rewritten by projection pushdown" + ); + return Ok(()); + } + } + + // Multi-column sort is unsafe for the dual cumulative-RG prune + // (the leading-column reorder doesn't disambiguate ties on the + // leading column, so the tie may span row groups). Stats init + // *itself* is safe for multi-column sort — the seeded threshold + // is still a valid lower/upper bound on the K-th value of the + // leading column — but we gate both on the same "single-column + // sort" condition for symmetry and to keep the no-WHERE TopK + // path obviously correct. Multi-column TopK still tightens its + // filter from real data on the first batch as before. + if sort_order.len() != 1 { + debug!( + "Skipping TopK stats init: multi-column sort (len={})", + sort_order.len() + ); + return Ok(()); + } + let first_sort_expr = sort_order.first(); let Some(column) = find_column_in_expr(&first_sort_expr.expr) else { debug!( From fbcf62316b7b6372d71dd66dba1194534d37aba0 Mon Sep 17 00:00:00 2001 From: Qi Zhu <821684824@qq.com> Date: Wed, 20 May 2026 16:19:29 +0800 Subject: [PATCH 8/8] test: refresh push_down_filter_parquet snapshot for stats-init metrics MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Single-column no-WHERE TopK is the one shape where stats init now fires, and it tightens the seeded threshold enough that the row group is marked fully-matched at PruningPredicate build time. Update the EXPLAIN ANALYZE snapshot to reflect the new metric pattern: row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched page_index_pages_skipped_by_fully_matched=1 pushdown_rows_matched=0 predicate_cache_inner_records=0 predicate_cache_records=0 Result and dynamic-filter shape are unchanged; this is a pure optimization-metric refresh. All other failing queries from the previous run (`topk_multi_col`, `topk_proj`) now match their old snapshots because the gates in opener/mod.rs skip stats init for multi-column / projection-rewritten cases. --- datafusion/sqllogictest/test_files/push_down_filter_parquet.slt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index b04b962a5df19..48e2add68fb7f 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -268,7 +268,7 @@ EXPLAIN ANALYZE SELECT * FROM topk_single_col ORDER BY b DESC LIMIT 1; ---- Plan with Metrics 01)SortExec: TopK(fetch=1), expr=[b@1 DESC], preserve_partitioning=[false], filter=[b@1 IS NULL OR b@1 > bd], metrics=[output_rows=1, output_batches=1, row_replacements=1] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], sort_order_for_reorder=[b@1 DESC], reverse_row_groups=true, pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=4, predicate_cache_records=4, scan_efficiency_ratio=22.37% (240/1.07 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], sort_order_for_reorder=[b@1 DESC], reverse_row_groups=true, pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_pages_skipped_by_fully_matched=1, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=22.37% (240/1.07 K)] statement ok reset datafusion.explain.analyze_categories;