diff --git a/datafusion/datasource-parquet/src/access_plan.rs b/datafusion/datasource-parquet/src/access_plan.rs index edbea39948f09..159df9b209701 100644 --- a/datafusion/datasource-parquet/src/access_plan.rs +++ b/datafusion/datasource-parquet/src/access_plan.rs @@ -611,6 +611,31 @@ impl PreparedAccessPlan { Ok(self) } + /// Truncate `row_group_indexes` to keep at most the first `count` + /// entries. + /// + /// Used by the cumulative-RG-prune step in the opener: after + /// `reorder_by_statistics` and optional `reverse`, the opener + /// accumulates `num_rows` from the front of `row_group_indexes` + /// until the cumulative count reaches `K` (the TopK fetch), then + /// calls this method with `count = number_of_row_groups_needed`. + /// + /// Bails out unchanged (no truncation) when `row_selection` is + /// `Some`. Page-level state is keyed by the surviving row groups + /// and would have to be remapped; the no-WHERE TopK gate ensures + /// `row_selection` is always `None` in the path that calls this + /// method, so the early-return is just a safety net. + pub(crate) fn truncate_row_groups(mut self, count: usize) -> Self { + if self.row_selection.is_some() { + debug!("Skipping cumulative RG truncate: row_selection present"); + return self; + } + if count < self.row_group_indexes.len() { + self.row_group_indexes.truncate(count); + } + self + } + /// Reverse the access plan for reverse scanning pub(crate) fn reverse(mut self, file_metadata: &ParquetMetaData) -> Result { // Get the row group indexes before reversing @@ -1026,4 +1051,53 @@ mod test { assert_eq!(result.row_group_indexes, vec![0, 1]); } + + // ---------------------------------------------------------------- + // `truncate_row_groups` tests + // ---------------------------------------------------------------- + + /// Happy path: truncate keeps only the first `count` indexes, + /// preserving order. Mirrors the opener's cumulative-RG-prune + /// step, where the iteration order has already been finalised + /// by `reorder_by_statistics` and optional `reverse`. + #[test] + fn truncate_row_groups_keeps_first_n() { + let plan = PreparedAccessPlan::new(vec![3, 1, 4, 1, 5, 9, 2, 6], None).unwrap(); + + let result = plan.truncate_row_groups(3); + + assert_eq!(result.row_group_indexes, vec![3, 1, 4]); + assert!(result.row_selection.is_none()); + } + + /// No-op when the requested keep-count is >= the current length + /// (e.g. LIMIT larger than the file's total row groups). We + /// cover both equal and greater-than separately. + #[test] + fn truncate_row_groups_no_op_when_count_exceeds_len() { + let same_len = PreparedAccessPlan::new(vec![0, 1, 2], None) + .unwrap() + .truncate_row_groups(3); + assert_eq!(same_len.row_group_indexes, vec![0, 1, 2]); + + let larger = PreparedAccessPlan::new(vec![0, 1, 2], None) + .unwrap() + .truncate_row_groups(10); + assert_eq!(larger.row_group_indexes, vec![0, 1, 2]); + } + + /// `row_selection` is `Some` → bail out unchanged. Page-level + /// state would need to be remapped to the surviving row groups; + /// the no-WHERE TopK gate ensures this is just a safety net. + #[test] + fn truncate_row_groups_skips_when_row_selection_present() { + let selection = RowSelection::from(vec![RowSelector::select(100)]); + let plan = + PreparedAccessPlan::new(vec![0, 1, 2], Some(selection.clone())).unwrap(); + + let result = plan.truncate_row_groups(1); + + assert_eq!(result.row_group_indexes, vec![0, 1, 2]); + assert_eq!(result.row_selection, Some(selection)); + } } diff --git a/datafusion/datasource-parquet/src/opener/mod.rs b/datafusion/datasource-parquet/src/opener/mod.rs index e5929fd43f11a..15b6580cdd013 100644 --- a/datafusion/datasource-parquet/src/opener/mod.rs +++ b/datafusion/datasource-parquet/src/opener/mod.rs @@ -137,6 +137,16 @@ pub(super) struct ParquetMorselizer { pub reverse_row_groups: bool, /// Optional sort order used to reorder row groups by their min/max statistics. pub sort_order_for_reorder: Option, + /// Optional TopK fetch (K) hint plumbed from the surrounding + /// `SortExec(fetch=K)` via [`ParquetSource`]'s `with_topk_fetch_hint`. + /// The opener uses it (in combination with `sort_order_for_reorder` and + /// a bare-`DynamicFilterPhysicalExpr` predicate) to seed the TopK + /// threshold from per-RG statistics before `PruningPredicate` build and + /// to truncate `row_group_indexes` once cumulative `num_rows` reaches + /// `K`. + /// + /// [`ParquetSource`]: crate::source::ParquetSource + pub topk_fetch: Option, } impl fmt::Debug for ParquetMorselizer { @@ -288,6 +298,8 @@ struct PreparedParquetOpen { max_predicate_cache_size: Option, reverse_row_groups: bool, sort_order_for_reorder: Option, + /// See [`ParquetMorselizer::topk_fetch`]. + topk_fetch: Option, preserve_order: bool, #[cfg(feature = "parquet_encryption")] file_decryption_properties: Option>, @@ -658,6 +670,7 @@ impl ParquetMorselizer { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: self.sort_order_for_reorder.clone(), + topk_fetch: self.topk_fetch, preserve_order: self.preserve_order, #[cfg(feature = "parquet_encryption")] file_decryption_properties: None, @@ -825,6 +838,37 @@ impl MetadataLoadedParquetOpen { } prepared.physical_file_schema = Arc::clone(&physical_file_schema); + // No-WHERE TopK stats init. + // + // Gate: sort pushdown fired (`sort_order_for_reorder.is_some()`) + // AND the surrounding `SortExec` plumbed K (`topk_fetch.is_some()`) + // AND the only predicate is a bare `DynamicFilterPhysicalExpr`. + // + // Effect: seed the TopK threshold from per-RG min/max statistics + // before `PruningPredicate` build, so the pruning predicate + // immediately benefits from a tight bound. The threshold is + // chosen so at least one row group with `num_rows >= K` can + // contain the final K rows — i.e. we don't prune away anything + // we'd actually need. + if let (Some(predicate), Some(fetch), Some(sort_order)) = ( + prepared.predicate.as_ref(), + prepared.topk_fetch, + prepared.sort_order_for_reorder.as_ref(), + ) && topk_no_where::predicate_is_pure_dynamic_filter(predicate) + { + let rg_metadata = reader_metadata.metadata().row_groups(); + // Best-effort — the helper is fully graceful (logs + + // returns Ok on any per-file failure). + topk_no_where::try_init_topk_threshold( + predicate, + fetch, + sort_order, + rg_metadata, + &physical_file_schema, + reader_metadata.parquet_schema(), + )?; + } + // Build predicates for this specific file let pruning_predicate = build_pruning_predicates( prepared.predicate.as_ref(), @@ -1131,6 +1175,48 @@ impl RowGroupsPrunedParquetOpen { // Both inputs come from the sort-pushdown channel — // `ParquetSource::try_pushdown_sort` sets `sort_order_for_reorder` // and/or `reverse_row_groups`. + // Same gate as the stats-init step in `prepare_filters`, plus: + // + // * `sort_order.len() == 1`: the leading-column reorder only + // sorts row groups by `min(col0)`. With a multi-column sort + // (e.g. `ORDER BY b ASC, a DESC`) ties on the leading column + // may span row groups, so truncating after the first K rows + // could drop a row group that contains a member of the true + // top-K once the tie is broken by the trailing columns. + // Multi-column TopK still benefits from reorder + reverse; + // we only skip the cumulative truncation. + // + // * `remapped_children.is_none()`: a projection between TopK + // and the scan would also break the assumption that the + // cumulative `num_rows` over the scan-side row groups maps + // directly to the K rows TopK is looking for. + // + // The boolean is hoisted here because the closure captures it. + let topk_no_where_active = prepared.topk_fetch.is_some() + && prepared + .sort_order_for_reorder + .as_ref() + .is_some_and(|so| so.len() == 1) + && prepared.predicate.as_ref().is_some_and(|p| { + topk_no_where::predicate_is_pure_dynamic_filter(p) + && topk_no_where::find_dynamic_filter(p).is_some_and(|df| { + // Same `remapped_children == original_children` + // check as the stats-init gate, allowing the + // identity-remap that `reassign_expr_columns` + // produces during filter pushdown. + match df.remapped_children() { + None => true, + Some(remapped) => { + let originals = df.original_children(); + originals.len() == remapped.len() + && originals + .iter() + .zip(remapped.iter()) + .all(|(orig, new)| orig.as_ref() == new.as_ref()) + } + } + }) + }); let prepare_access_plan = |plan: ParquetAccessPlan| -> Result { let mut prepared_plan = plan.prepare(rg_metadata)?; @@ -1144,6 +1230,28 @@ impl RowGroupsPrunedParquetOpen { if prepared.reverse_row_groups { prepared_plan = prepared_plan.reverse(file_metadata.as_ref())?; } + // Cumulative RG prune: after reorder + reverse, the + // first row group is the "best" one for the request. + // Walk forward, accumulate `num_rows`, and truncate + // once we have enough to satisfy K. Gated on the + // no-WHERE TopK shape and `topk_fetch` being set. + if topk_no_where_active + && prepared_plan.row_selection.is_none() + && let Some(fetch) = prepared.topk_fetch + { + let mut cumulative_rows: usize = 0; + let mut keep: usize = prepared_plan.row_group_indexes.len(); + for (i, &rg_idx) in prepared_plan.row_group_indexes.iter().enumerate() + { + cumulative_rows = cumulative_rows + .saturating_add(rg_metadata[rg_idx].num_rows() as usize); + if cumulative_rows >= fetch { + keep = i + 1; + break; + } + } + prepared_plan = prepared_plan.truncate_row_groups(keep); + } Ok(prepared_plan) }; @@ -1351,6 +1459,328 @@ fn create_initial_plan( Ok(ParquetAccessPlan::new_all(row_group_count)) } +/// Helpers for the no-WHERE TopK path: seed the dynamic filter +/// from per-row-group min/max statistics, and accumulate `num_rows` +/// across row groups to truncate at `K`. +/// +/// All helpers are gated on: +/// 1. `predicate_is_pure_dynamic_filter` — the only predicate on the +/// scan is a bare `DynamicFilterPhysicalExpr`. WHERE clauses, +/// other filters, or any wrapper that adds conjuncts disables +/// the path. +/// 2. `topk_fetch.is_some()` — the surrounding `SortExec(fetch=K)` +/// plumbed `K` through `with_topk_fetch_hint`. +/// 3. `sort_order_for_reorder.is_some()` — sort pushdown fired, +/// so the source has a coherent sort direction / column. +mod topk_no_where { + use super::*; + use arrow::array::ArrayRef; + use arrow::datatypes::Schema; + use datafusion_expr::Operator; + use datafusion_physical_expr::expressions::{ + BinaryExpr, Column, DynamicFilterPhysicalExpr, lit, + }; + use parquet::arrow::arrow_reader::statistics::StatisticsConverter; + use parquet::file::metadata::RowGroupMetaData; + use parquet::schema::types::SchemaDescriptor; + + /// True iff the top-level predicate downcasts to + /// [`DynamicFilterPhysicalExpr`] — i.e. the only filter is a + /// dynamic filter (TopK threshold) with no static WHERE-clause + /// conjuncts. + pub(super) fn predicate_is_pure_dynamic_filter( + predicate: &Arc, + ) -> bool { + predicate + .downcast_ref::() + .is_some() + } + + /// Walk the predicate tree and return a reference to the first + /// [`DynamicFilterPhysicalExpr`] found, so the caller can call + /// [`DynamicFilterPhysicalExpr::update`] on it. + /// + /// For the no-WHERE TopK path the predicate is already a bare + /// dynamic filter, but the recursive walk is cheap and keeps + /// the helper composable for future shapes. + pub(super) fn find_dynamic_filter( + expr: &Arc, + ) -> Option<&DynamicFilterPhysicalExpr> { + if let Some(df) = expr.downcast_ref::() { + return Some(df); + } + for child in expr.children() { + if let Some(found) = find_dynamic_filter(child) { + return Some(found); + } + } + None + } + + /// Unwrap single-child wrappers (e.g. `CastExpr`) to reach a + /// plain [`Column`]. Returns `None` for genuinely multi-arg + /// expressions like `a + b`. + pub(super) fn find_column_in_expr(expr: &Arc) -> Option { + if let Some(col) = expr.downcast_ref::() { + return Some(col.clone()); + } + let children = expr.children(); + if children.len() == 1 { + return find_column_in_expr(children[0]); + } + None + } + + /// Walk the row groups and compute the tightest *safe* threshold + /// for the TopK dynamic filter. + /// + /// For DESC `... LIMIT K` the dynamic filter is `col > T`, and we + /// want the **largest** `T` that is guaranteed `<=` the K-th + /// largest value in the file. Any single row group with + /// `num_rows >= K` already provides such a bound: all rows in + /// that RG are `>= min(RG)`, so the K-th largest value across + /// the file is also `>= min(RG)`. The largest such per-RG min + /// is the tightest safe `T` → `take_max = true`, `stats = + /// row_group_mins`. + /// + /// For ASC the dual holds: the dynamic filter is `col < T`, and + /// the tightest safe `T` is the **smallest** per-RG max over + /// RGs with `num_rows >= K` → `take_max = false`, `stats = + /// row_group_maxes`. + /// + /// Row groups smaller than `fetch` are skipped — alone they + /// can't guarantee that K qualifying rows live above/below + /// their min/max. + pub(super) fn compute_best_threshold( + stats: &ArrayRef, + rg_metadata: &[RowGroupMetaData], + fetch: usize, + take_max: bool, + ) -> Result> { + debug_assert_eq!( + stats.len(), + rg_metadata.len(), + "stats and rg_metadata must be aligned" + ); + + let mut best: Option = None; + for (idx, rg) in rg_metadata.iter().enumerate() { + if (rg.num_rows() as usize) < fetch { + continue; + } + if stats.is_null(idx) { + continue; + } + let candidate = ScalarValue::try_from_array(stats, idx)?; + best = match best { + None => Some(candidate), + Some(current) => { + let take_candidate = if take_max { + candidate > current + } else { + candidate < current + }; + Some(if take_candidate { candidate } else { current }) + } + }; + } + Ok(best) + } + + /// Seed the TopK dynamic filter from per-RG statistics. + /// + /// Builds `col {op} threshold` (with `op = >` for DESC and + /// `op = <` for ASC) and calls [`DynamicFilterPhysicalExpr::update`]. + /// If anything fails gracefully (sort key isn't a plain column, + /// no stats available, threshold can't be computed), the + /// function is a no-op — the TopK will simply tighten the + /// filter from real data on the first batch as before. + pub(super) fn try_init_topk_threshold( + predicate: &Arc, + fetch: usize, + sort_order: &LexOrdering, + rg_metadata: &[RowGroupMetaData], + arrow_schema: &Schema, + parquet_schema: &SchemaDescriptor, + ) -> Result<()> { + if rg_metadata.is_empty() { + return Ok(()); + } + + let Some(dynamic_filter) = find_dynamic_filter(predicate) else { + return Ok(()); + }; + + // Bail out if a projection sits between the TopK and the scan + // that actually rewrote the filter's column references (i.e. + // the dynamic filter's `remapped_children` differ from + // `original_children`). In that case the stored expression + // inside the dynamic filter is in the TopK output column space + // (e.g. `a@1`) but `sort_order_for_reorder` is in the scan + // column space (e.g. `a@0`, or worse, `CAST(a@0 AS Int64) + 1` + // for a computed projection). Installing a scan-side BinaryExpr + // would either bypass the children remapping (so the TopK side + // ends up evaluating against the wrong column) or, when + // `remap_children` does find a match, double-wrap the + // expression with the projection's cast/computation. Either + // way the resulting filter is wrong. Punt on the stats-init — + // TopK will tighten the dynamic filter from real data on the + // first batch as before. + // + // We deliberately allow `remapped_children == original_children` + // here: the filter-pushdown path always rebuilds children via + // `reassign_expr_columns`, which produces fresh `Arc`s that + // `with_new_children_if_necessary` then installs as + // `remapped_children`. When the two slices are *logically* equal + // there's no projection rewriting in play and stats-init is + // safe. + if let Some(remapped) = dynamic_filter.remapped_children() { + let originals = dynamic_filter.original_children(); + let logically_equal = originals.len() == remapped.len() + && originals + .iter() + .zip(remapped.iter()) + .all(|(orig, new)| orig.as_ref() == new.as_ref()); + if !logically_equal { + debug!( + "Skipping TopK stats init: dynamic filter children were rewritten by projection pushdown" + ); + return Ok(()); + } + } + + // Multi-column sort is unsafe for the dual cumulative-RG prune + // (the leading-column reorder doesn't disambiguate ties on the + // leading column, so the tie may span row groups). Stats init + // *itself* is safe for multi-column sort — the seeded threshold + // is still a valid lower/upper bound on the K-th value of the + // leading column — but we gate both on the same "single-column + // sort" condition for symmetry and to keep the no-WHERE TopK + // path obviously correct. Multi-column TopK still tightens its + // filter from real data on the first batch as before. + if sort_order.len() != 1 { + debug!( + "Skipping TopK stats init: multi-column sort (len={})", + sort_order.len() + ); + return Ok(()); + } + + let first_sort_expr = sort_order.first(); + let Some(column) = find_column_in_expr(&first_sort_expr.expr) else { + debug!( + "Skipping TopK stats init: sort expr is not a column-bearing expression" + ); + return Ok(()); + }; + if arrow_schema.field_with_name(column.name()).is_err() { + debug!( + "Skipping TopK stats init: column `{}` not in file schema", + column.name() + ); + return Ok(()); + } + + let converter = match StatisticsConverter::try_new( + column.name(), + arrow_schema, + parquet_schema, + ) { + Ok(c) => c, + Err(e) => { + debug!( + "Skipping TopK stats init: stats converter for `{}` failed: {e}", + column.name() + ); + return Ok(()); + } + }; + + // For DESC we want a *safe* lower bound on the K-th largest + // value → take the largest per-RG `min` over qualifying RGs. + // For ASC we want a *safe* upper bound on the K-th smallest + // value → take the smallest per-RG `max` over qualifying RGs. + // The dynamic filter expression is `col > T` for DESC and + // `col < T` for ASC respectively (mirrors TopK's existing + // `build_filter_expression`). + let is_desc = first_sort_expr.options.descending; + let stats = if is_desc { + converter.row_group_mins(rg_metadata.iter()) + } else { + converter.row_group_maxes(rg_metadata.iter()) + }; + let stats = match stats { + Ok(arr) => arr, + Err(e) => { + debug!( + "Skipping TopK stats init: cannot read stats for `{}`: {e}", + column.name() + ); + return Ok(()); + } + }; + + // `take_max = is_desc`: DESC picks the largest min, ASC picks + // the smallest max. + let Some(threshold) = + compute_best_threshold(&stats, rg_metadata, fetch, is_desc)? + else { + return Ok(()); + }; + + // Cast the threshold to the sort expression's output data + // type. For `CAST(small_val AS Int64) DESC` the column is + // Int16 (so the stats array is Int16) but the BinaryExpr + // must compare Int64 vs Int64 to evaluate. If the cast + // fails, bail out — this is best-effort. + let sort_expr_type = match first_sort_expr.expr.data_type(arrow_schema) { + Ok(t) => t, + Err(e) => { + debug!("Skipping TopK stats init: cannot resolve sort expr type: {e}"); + return Ok(()); + } + }; + let threshold = if threshold.data_type() != sort_expr_type { + match threshold.cast_to(&sort_expr_type) { + Ok(v) => v, + Err(e) => { + debug!( + "Skipping TopK stats init: cannot cast threshold from {:?} to {sort_expr_type:?}: {e}", + threshold.data_type() + ); + return Ok(()); + } + } + } else { + threshold + }; + + // Build `sort_expr {op} threshold`. The dynamic filter's + // children must remain stable across updates, so reuse the + // sort expression as-is (it shares the same column children + // that TopK declared). + // + // Use `>=` / `<=` rather than `>` / `<` because the + // threshold derived from stats is only a *safe* bound — + // some row in the file may equal it and still belong to + // the final K. The real TopK will tighten this to strict + // `>` / `<` on the next update once it has seen actual + // data. + let op = if is_desc { + Operator::GtEq + } else { + Operator::LtEq + }; + let new_expr: Arc = Arc::new(BinaryExpr::new( + Arc::clone(&first_sort_expr.expr), + op, + lit(threshold), + )); + dynamic_filter.update(new_expr)?; + Ok(()) + } +} + /// Build a page pruning predicate from an optional predicate expression. /// If the predicate is None or the predicate cannot be converted to a page pruning /// predicate, return None. @@ -1611,6 +2041,7 @@ mod test { max_predicate_cache_size: self.max_predicate_cache_size, reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: None, + topk_fetch: None, } } } diff --git a/datafusion/datasource-parquet/src/source.rs b/datafusion/datasource-parquet/src/source.rs index cf1fcea005dde..35ac4ebb8be32 100644 --- a/datafusion/datasource-parquet/src/source.rs +++ b/datafusion/datasource-parquet/src/source.rs @@ -296,6 +296,17 @@ pub struct ParquetSource { /// Sort order driving `PreparedAccessPlan::reorder_by_statistics` /// in the opener. sort_order_for_reorder: Option, + /// Optional TopK fetch (K) hint plumbed from the surrounding + /// `SortExec(fetch=K)` by the `PushdownSort` rule (via + /// [`FileSource::with_topk_fetch_hint`]). When set together with + /// `sort_order_for_reorder`, the opener can use it to: + /// 1. Seed the TopK dynamic filter from per-row-group min/max + /// statistics before `PruningPredicate` build. + /// 2. Truncate `row_group_indexes` once cumulative `num_rows >= K`. + /// + /// Not surfaced in EXPLAIN — the same value is shown on the + /// `SortExec` above the data source. + topk_fetch: Option, } impl ParquetSource { @@ -322,6 +333,7 @@ impl ParquetSource { encryption_factory: None, reverse_row_groups: false, sort_order_for_reorder: None, + topk_fetch: None, } } @@ -586,6 +598,7 @@ impl FileSource for ParquetSource { max_predicate_cache_size: self.max_predicate_cache_size(), reverse_row_groups: self.reverse_row_groups, sort_order_for_reorder: self.sort_order_for_reorder.clone(), + topk_fetch: self.topk_fetch, })) } @@ -601,6 +614,18 @@ impl FileSource for ParquetSource { ) } + /// Accept a TopK fetch (K) hint from the surrounding `SortExec`. + /// + /// `PushdownSort` invokes this after producing an `Inexact` result + /// for a `SortExec(fetch=Some(K))`. The opener uses `topk_fetch` + /// (together with `sort_order_for_reorder`) to drive stats-based + /// init of the TopK dynamic filter and cumulative row-group pruning. + fn with_topk_fetch_hint(&self, fetch: usize) -> Option> { + let mut conf = self.clone(); + conf.topk_fetch = Some(fetch); + Some(Arc::new(conf)) + } + fn table_schema(&self) -> &TableSchema { &self.table_schema } diff --git a/datafusion/datasource/src/file.rs b/datafusion/datasource/src/file.rs index 32bee63b54f23..837c7ba3a0f6b 100644 --- a/datafusion/datasource/src/file.rs +++ b/datafusion/datasource/src/file.rs @@ -293,6 +293,22 @@ pub trait FileSource: Any + Send + Sync { files } + /// Optional TopK fetch (K) hint for sort-pushdown `Inexact` scans. + /// + /// When the `PushdownSort` rule produces an `Inexact` result over a + /// `SortExec(fetch=Some(K))`, the surrounding `SortExec` is kept in + /// the plan (the source can only approximate the ordering). Sources + /// that can use `K` to drive further per-file optimisations — e.g. + /// seeding a TopK dynamic filter from parquet per-row-group min/max + /// statistics, or truncating the row-group iteration once cumulative + /// `num_rows` reaches `K` — should override this method and return a + /// new source with the hint set. + /// + /// Default returns `None` (no-op). + fn with_topk_fetch_hint(&self, _fetch: usize) -> Option> { + None + } + /// Try to push down a projection into this FileSource. /// /// `FileSource` implementations that support projection pushdown should diff --git a/datafusion/datasource/src/file_scan_config/mod.rs b/datafusion/datasource/src/file_scan_config/mod.rs index 04b74528d5ac1..cd0e76f34bdd8 100644 --- a/datafusion/datasource/src/file_scan_config/mod.rs +++ b/datafusion/datasource/src/file_scan_config/mod.rs @@ -999,6 +999,19 @@ impl DataSource for FileScanConfig { Some(Arc::new(new_config)) } + fn with_topk_fetch_hint(&self, fetch: usize) -> Option> { + // Delegate to the underlying file source. If the source accepts the + // hint, swap it in on a clone so the rest of the config (eq + // properties, schema, etc.) is preserved unchanged. + self.file_source + .with_topk_fetch_hint(fetch) + .map(|new_file_source| { + let mut new_config = self.clone(); + new_config.file_source = new_file_source; + Arc::new(new_config) as Arc + }) + } + fn apply_expressions( &self, f: &mut dyn FnMut(&dyn PhysicalExpr) -> Result, diff --git a/datafusion/datasource/src/source.rs b/datafusion/datasource/src/source.rs index 420c6b508ce4f..a93fe6200e903 100644 --- a/datafusion/datasource/src/source.rs +++ b/datafusion/datasource/src/source.rs @@ -225,6 +225,17 @@ pub trait DataSource: Any + Send + Sync + Debug { None } + /// Accept a TopK fetch (K) hint from a surrounding `SortExec`. + /// + /// See [`ExecutionPlan::with_topk_fetch_hint`] for the high-level + /// contract. The default returns `None`. `FileScanConfig` overrides + /// this to delegate to the underlying `FileSource`. + /// + /// [`ExecutionPlan::with_topk_fetch_hint`]: datafusion_physical_plan::ExecutionPlan::with_topk_fetch_hint + fn with_topk_fetch_hint(&self, _fetch: usize) -> Option> { + None + } + /// Apply a closure to each expression used by this data source. /// /// This includes filter predicates (which may contain dynamic filters) and any @@ -546,6 +557,15 @@ impl ExecutionPlan for DataSourceExec { }) } + fn with_topk_fetch_hint(&self, fetch: usize) -> Option> { + self.data_source + .with_topk_fetch_hint(fetch) + .map(|new_data_source| { + Arc::new(self.clone().with_data_source(new_data_source)) + as Arc + }) + } + fn with_new_state( &self, state: Arc, diff --git a/datafusion/physical-optimizer/src/pushdown_sort.rs b/datafusion/physical-optimizer/src/pushdown_sort.rs index 40a6fe2c205c7..d4a43347d09d3 100644 --- a/datafusion/physical-optimizer/src/pushdown_sort.rs +++ b/datafusion/physical-optimizer/src/pushdown_sort.rs @@ -125,6 +125,14 @@ impl PhysicalOptimizerRule for PushdownSort { return Ok(Transformed::yes(Arc::new(new_spm))); } SortOrderPushdownResult::Inexact { inner } => { + // Surface the sort's fetch (K) to the data source so + // it can drive TopK-aware per-file optimisations + // (stats-seeded threshold, cumulative RG pruning). + // No-op for sources that don't override the hint. + let inner = sort_child + .fetch() + .and_then(|fetch| inner.with_topk_fetch_hint(fetch)) + .unwrap_or(inner); let new_sort = SortExec::new(required_ordering.clone(), inner) .with_fetch(sort_child.fetch()) .with_preserve_partitioning(true); @@ -169,9 +177,19 @@ impl PhysicalOptimizerRule for PushdownSort { } } SortOrderPushdownResult::Inexact { inner } => { - // Data source is optimized for the ordering but not perfectly sorted - // Keep the Sort operator but use the optimized input - // Benefits: TopK queries can terminate early, better cache locality + // Data source is optimized for the ordering but not perfectly sorted. + // Keep the Sort operator but use the optimized input. + // Benefits: TopK queries can terminate early, better cache locality. + // + // If the SortExec carries a fetch (LIMIT) we are in a + // TopK shape — surface `K` to the data source so it can + // drive TopK-aware per-file optimisations (stats-seeded + // threshold, cumulative RG pruning). No-op for sources + // that don't override the hint. + let inner = sort_exec + .fetch() + .and_then(|fetch| inner.with_topk_fetch_hint(fetch)) + .unwrap_or(inner); Ok(Transformed::yes(Arc::new( SortExec::new(required_ordering.clone(), inner) .with_fetch(sort_exec.fetch()) diff --git a/datafusion/physical-plan/src/execution_plan.rs b/datafusion/physical-plan/src/execution_plan.rs index 1a67ea0ded11b..0669c62a2efa1 100644 --- a/datafusion/physical-plan/src/execution_plan.rs +++ b/datafusion/physical-plan/src/execution_plan.rs @@ -789,6 +789,20 @@ pub trait ExecutionPlan: Any + Debug + DisplayAs + Send + Sync { ) -> Option> { None } + + /// Hint to the plan that an `Inexact` sort-pushdown is being layered + /// under a `SortExec(fetch=Some(fetch))` (i.e. a TopK). + /// + /// `PushdownSort` calls this on the `inner` plan returned by + /// `try_pushdown_sort` when the eliminated/wrapped `SortExec` carried + /// a fetch. Data sources that can take advantage of `K` for further + /// per-file work (e.g. stats-seeded TopK threshold, cumulative + /// row-group pruning) should override this and return a new plan + /// with the hint applied. The default returns `None`, meaning + /// "no change — keep using the original plan". + fn with_topk_fetch_hint(&self, _fetch: usize) -> Option> { + None + } } impl dyn ExecutionPlan { diff --git a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt index b04b962a5df19..48e2add68fb7f 100644 --- a/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt +++ b/datafusion/sqllogictest/test_files/push_down_filter_parquet.slt @@ -268,7 +268,7 @@ EXPLAIN ANALYZE SELECT * FROM topk_single_col ORDER BY b DESC LIMIT 1; ---- Plan with Metrics 01)SortExec: TopK(fetch=1), expr=[b@1 DESC], preserve_partitioning=[false], filter=[b@1 IS NULL OR b@1 > bd], metrics=[output_rows=1, output_batches=1, row_replacements=1] -02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], sort_order_for_reorder=[b@1 DESC], reverse_row_groups=true, pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=4, pushdown_rows_pruned=0, predicate_cache_inner_records=4, predicate_cache_records=4, scan_efficiency_ratio=22.37% (240/1.07 K)] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/push_down_filter_parquet/topk_single_col.parquet]]}, projection=[a, b, c], file_type=parquet, predicate=DynamicFilter [ b@1 IS NULL OR b@1 > bd ], sort_order_for_reorder=[b@1 DESC], reverse_row_groups=true, pruning_predicate=b_null_count@0 > 0 OR b_null_count@0 != row_count@2 AND b_max@1 > bd, required_guarantees=[], metrics=[output_rows=4, output_batches=1, files_ranges_pruned_statistics=1 total → 1 matched, row_groups_pruned_statistics=1 total → 1 matched -> 1 fully matched, row_groups_pruned_bloom_filter=1 total → 1 matched, page_index_pages_pruned=0 total → 0 matched, page_index_pages_skipped_by_fully_matched=1, page_index_rows_pruned=0 total → 0 matched, limit_pruned_row_groups=0 total → 0 matched, batches_split=0, file_open_errors=0, file_scan_errors=0, files_opened=1, files_processed=1, num_predicate_creation_errors=0, predicate_evaluation_errors=0, pushdown_rows_matched=0, pushdown_rows_pruned=0, predicate_cache_inner_records=0, predicate_cache_records=0, scan_efficiency_ratio=22.37% (240/1.07 K)] statement ok reset datafusion.explain.analyze_categories; diff --git a/datafusion/sqllogictest/test_files/sort_pushdown.slt b/datafusion/sqllogictest/test_files/sort_pushdown.slt index 540562eb3bc8d..43a7c107d0a52 100644 --- a/datafusion/sqllogictest/test_files/sort_pushdown.slt +++ b/datafusion/sqllogictest/test_files/sort_pushdown.slt @@ -2392,6 +2392,126 @@ DROP TABLE th_mixed; statement ok DROP TABLE th_reorder; +# =========================================================== +# Test I: WITH ORDER + DESC LIMIT — stats init + cumulative prune +# Exercises the full optimisation chain on sorted non-overlapping data: +# file reorder → RG reorder → reverse → stats init → cumulative prune. +# Both stats init and cumulative prune are gated on no-WHERE (the +# predicate handed to the opener is a bare DynamicFilter) and on sort +# pushdown (sort_order_for_reorder is set). +# =========================================================== + +statement ok +SET datafusion.execution.target_partitions = 1; + +statement ok +SET datafusion.optimizer.enable_sort_pushdown = true; + +# Create sorted data spread across multiple small row groups +statement ok +CREATE TABLE ti_data(id INT, value INT) AS VALUES +(1,10),(2,20),(3,30),(4,40),(5,50),(6,60), +(7,70),(8,80),(9,90),(10,100),(11,110),(12,120); + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 3; + +query I +COPY (SELECT * FROM ti_data ORDER BY id ASC) +TO 'test_files/scratch/sort_pushdown/ti_sorted/data.parquet'; +---- +12 + +statement ok +SET datafusion.execution.parquet.max_row_group_size = 1048576; + +statement ok +CREATE EXTERNAL TABLE ti_sorted(id INT, value INT) +STORED AS PARQUET +LOCATION 'test_files/scratch/sort_pushdown/ti_sorted/data.parquet' +WITH ORDER (id ASC); + +# Test I.1: DESC LIMIT with sort pushdown — EXPLAIN shows reverse_row_groups=true +query TT +EXPLAIN SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 3; +---- +logical_plan +01)Sort: ti_sorted.id DESC NULLS FIRST, fetch=3 +02)--TableScan: ti_sorted projection=[id, value] +physical_plan +01)SortExec: TopK(fetch=3), expr=[id@0 DESC], preserve_partitioning=[false] +02)--DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ti_sorted/data.parquet]]}, projection=[id, value], file_type=parquet, predicate=DynamicFilter [ empty ], sort_order_for_reorder=[id@0 DESC], reverse_row_groups=true + +# Test I.2: DESC LIMIT results — should return the largest values +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 3; +---- +12 120 +11 110 +10 100 + +# Test I.3: ASC LIMIT (same direction as file order) — sort elimination, +# no TopK in the plan +query TT +EXPLAIN SELECT * FROM ti_sorted ORDER BY id ASC LIMIT 3; +---- +logical_plan +01)Sort: ti_sorted.id ASC NULLS LAST, fetch=3 +02)--TableScan: ti_sorted projection=[id, value] +physical_plan DataSourceExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/sqllogictest/test_files/scratch/sort_pushdown/ti_sorted/data.parquet]]}, projection=[id, value], limit=3, output_ordering=[id@0 ASC NULLS LAST], file_type=parquet + +query II +SELECT * FROM ti_sorted ORDER BY id ASC LIMIT 3; +---- +1 10 +2 20 +3 30 + +# Test I.4: DESC LIMIT with WHERE — stats init and cumulative prune both +# skip (predicate is not a bare DynamicFilter), result still correct +query II +SELECT * FROM ti_sorted WHERE value > 50 ORDER BY id DESC LIMIT 2; +---- +12 120 +11 110 + +# Test I.5: Larger LIMIT spanning multiple RGs (4 RGs of 3 rows each) +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 8; +---- +12 120 +11 110 +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 + +# Test I.6: LIMIT larger than total rows — returns all rows +query II +SELECT * FROM ti_sorted ORDER BY id DESC LIMIT 100; +---- +12 120 +11 110 +10 100 +9 90 +8 80 +7 70 +6 60 +5 50 +4 40 +3 30 +2 20 +1 10 + +# Cleanup Test I +statement ok +DROP TABLE ti_data; + +statement ok +DROP TABLE ti_sorted; + # =========================================================== # Test J: Non-overlapping RGs without WITH ORDER — # RG reorder via DynamicFilter sort_options