Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
4675c43
Push TopK (Sort with fetch) through outer joins
Apr 14, 2026
9aede67
lint fix
Apr 14, 2026
3899d6a
Merge branch 'main' into push-down-topk-through-join
Apr 14, 2026
19b0edc
fix build failure
Apr 14, 2026
367d8a5
Merge branch 'main' into push-down-topk-through-join
Apr 17, 2026
baf25ef
Handle edge cases
Apr 17, 2026
67f9265
Handle volatile expr early
Apr 17, 2026
0ca184e
Merge branch 'main' into push-down-topk-through-join
Apr 17, 2026
16f61cf
Merge branch 'main' into push-down-topk-through-join
SubhamSinghal Apr 17, 2026
d12aefa
Fix build failure
Apr 17, 2026
902ef77
Handle subquery alias
Apr 18, 2026
6b232e1
Merge branch 'main' into push-down-topk-through-join
Apr 18, 2026
254b224
Update comment
Apr 18, 2026
c648e71
Doc fix
Apr 18, 2026
34832c8
Merge branch 'main' into push-down-topk-through-join
SubhamSinghal Apr 19, 2026
e02b82a
Merge branch 'main' into push-down-topk-through-join
SubhamSinghal Apr 20, 2026
03f6499
Handle volatile expr in projection
Apr 21, 2026
e051707
Merge branch 'main' into push-down-topk-through-join
Apr 21, 2026
051868a
use structural equality
Apr 21, 2026
1cfeb76
Adds UT
Apr 21, 2026
c6edd07
Merge branch 'main' into push-down-topk-through-join
SubhamSinghal May 2, 2026
f72d845
Merge branch 'main' into push-down-topk-through-join
SubhamSinghal May 5, 2026
0371004
Fix UT
May 6, 2026
5d62f22
Merge branch 'main' into push-down-topk-through-join
May 9, 2026
38bbf87
Resolve comment
May 9, 2026
036fb9f
Merge branch 'main' into push-down-topk-through-join
May 9, 2026
8d4fc2c
Adds back missing UT
May 9, 2026
53662c1
Merge branch 'main' into push-down-topk-through-join
blaginin May 9, 2026
80f84bc
Fix explain slt test
May 10, 2026
f3cc067
Merge branch 'main' into push-down-topk-through-join
SubhamSinghal May 16, 2026
1efd763
Fix sort expr comparision
SubhamSinghal May 16, 2026
93997eb
Add cross, left and right mark join
SubhamSinghal May 19, 2026
d23468d
Merge main
SubhamSinghal May 19, 2026
7b4cf06
Handle volatile expr
SubhamSinghal May 20, 2026
2473ecc
Merge branch 'main' into push-down-topk-through-join
kumarUjjawal May 21, 2026
ca5b416
Adds pushdown topk bench
SubhamSinghal May 28, 2026
216be5e
Adds df bench
SubhamSinghal May 29, 2026
28c72d8
Merge push down topk with push down limit rule
SubhamSinghal Jun 1, 2026
44596bc
Fix build failure
SubhamSinghal Jun 1, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions benchmarks/queries/push_down_topk/q1.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
-- LEFT JOIN, ORDER BY column from preserved (left) side, small LIMIT.
-- Canonical case for push_down_topk_through_join: the Sort(fetch=10) is
-- duplicated below the join over the customer scan, so only the top 10
-- rows (by c_acctbal) are joined against orders.
SELECT c_custkey, c_acctbal
FROM customer LEFT JOIN orders ON c_custkey = o_custkey
ORDER BY c_acctbal
LIMIT 10
7 changes: 7 additions & 0 deletions benchmarks/queries/push_down_topk/q2.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- RIGHT JOIN, ORDER BY column from preserved (right) side.
-- Symmetric to q1: the Sort(fetch) is pushed below the join over the
-- orders scan (the right/preserved side).
SELECT o_orderkey, o_totalprice
FROM customer RIGHT JOIN orders ON c_custkey = o_custkey
ORDER BY o_totalprice
LIMIT 10
7 changes: 7 additions & 0 deletions benchmarks/queries/push_down_topk/q3.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- LEFT JOIN, multi-column ORDER BY (both columns from preserved side).
-- All sort exprs must come from the preserved side for the rule to fire;
-- this query checks that multi-column sorts are still pushed.
SELECT c_custkey, c_acctbal, c_nationkey
FROM customer LEFT JOIN orders ON c_custkey = o_custkey
ORDER BY c_acctbal, c_nationkey
LIMIT 100
7 changes: 7 additions & 0 deletions benchmarks/queries/push_down_topk/q4.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
-- CROSS JOIN, ORDER BY column from one side.
-- Cross joins preserve every row from both sides; the rule pushes the
-- Sort(fetch) below the join over the side referenced by ORDER BY.
SELECT c_custkey, c_acctbal
FROM customer CROSS JOIN nation
ORDER BY c_acctbal
LIMIT 10
9 changes: 9 additions & 0 deletions benchmarks/queries/push_down_topk/q5.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
-- Negative case: ORDER BY references the probe (non-preserved) side.
-- The rule MUST NOT fire here — orders is the right side of a LEFT JOIN
-- so it isn't preserved (rows can be NULL when there's no match), and
-- pushing a Sort with fetch onto orders would change semantics.
-- Included so the bench harness can verify the rule's selectivity.
SELECT c_custkey, o_totalprice
FROM customer LEFT JOIN orders ON c_custkey = o_custkey
ORDER BY o_totalprice
LIMIT 10
5 changes: 4 additions & 1 deletion benchmarks/src/bin/dfbench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ static ALLOC: snmalloc_rs::SnMalloc = snmalloc_rs::SnMalloc;
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;

use datafusion_benchmarks::{
cancellation, clickbench, dict, h2o, hj, imdb, nlj, smj, sort_tpch, tpcds, tpch,
cancellation, clickbench, dict, h2o, hj, imdb, nlj, push_down_topk, smj, sort_tpch,
tpcds, tpch,
};

#[derive(Debug, Parser)]
Expand All @@ -51,6 +52,7 @@ enum Options {
HJ(hj::RunOpt),
Imdb(imdb::RunOpt),
Nlj(nlj::RunOpt),
PushDownTopk(push_down_topk::RunOpt),
Smj(smj::RunOpt),
SortPushdown(sort_pushdown::RunOpt),
SortTpch(sort_tpch::RunOpt),
Expand All @@ -72,6 +74,7 @@ pub async fn main() -> Result<()> {
Options::HJ(opt) => opt.run().await,
Options::Imdb(opt) => Box::pin(opt.run()).await,
Options::Nlj(opt) => opt.run().await,
Options::PushDownTopk(opt) => opt.run().await,
Options::Smj(opt) => opt.run().await,
Options::SortPushdown(opt) => opt.run().await,
Options::SortTpch(opt) => opt.run().await,
Expand Down
1 change: 1 addition & 0 deletions benchmarks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ pub mod h2o;
pub mod hj;
pub mod imdb;
pub mod nlj;
pub mod push_down_topk;
pub mod smj;
pub mod sort_pushdown;
pub mod sort_tpch;
Expand Down
264 changes: 264 additions & 0 deletions benchmarks/src/push_down_topk.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmark for `push_down_topk_through_join`.
//!
//! Runs SQL files from `queries/push_down_topk/` against TPC-H
//! `customer`, `orders`, and `nation`. Intended to be run on a branch
//! with the `push_down_topk_through_join` rule registered and
//! against a baseline that does not register the rule, with results
//! compared via `compare.py`.
//!
//! # Usage
//!
//! ```text
//! # Generate TPC-H SF=1 (one-time)
//! ./bench.sh data tpch
//!
//! # Run with rule registered (this branch) and write results
//! ./bench.sh run push_down_topk -o pr.json
//!
//! # Run again on a baseline (e.g. main, or this branch with rule
//! # registration reverted) and write results
//! ./bench.sh run push_down_topk -o baseline.json
//!
//! ./compare.py baseline.json pr.json
//! ```

use clap::Args;
use futures::StreamExt;
use std::path::PathBuf;
use std::sync::Arc;

use datafusion::datasource::TableProvider;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl,
};
use datafusion::error::Result;
use datafusion::execution::SessionStateBuilder;
use datafusion::physical_plan::{
display::DisplayableExecutionPlan, displayable, execute_stream,
};
use datafusion::prelude::*;
use datafusion_common::DEFAULT_PARQUET_EXTENSION;
use datafusion_common::instant::Instant;

use crate::util::{BenchmarkRun, CommonOpt, QueryResult, print_memory_stats};

const PUSH_DOWN_TOPK_QUERY_DIR: &str = "queries/push_down_topk";

#[derive(Debug, Args)]
pub struct RunOpt {
#[command(flatten)]
common: CommonOpt,

/// Query number (1-N). If unset, runs every query in the directory.
#[arg(short, long)]
pub query: Option<usize>,

/// Path to TPC-H parquet directory (must contain `customer`, `orders`,
/// `nation` subdirectories).
#[arg(required = true, short = 'p', long = "path")]
path: PathBuf,

/// Path to JSON benchmark result, comparable via `compare.py`.
#[arg(short = 'o', long = "output")]
output_path: Option<PathBuf>,

/// Path to directory containing query SQL files.
/// Defaults to `queries/push_down_topk/` relative to current directory.
#[arg(long = "queries-path")]
queries_path: Option<PathBuf>,
}

impl RunOpt {
const TABLES: [&'static str; 3] = ["customer", "orders", "nation"];

fn queries_dir(&self) -> PathBuf {
self.queries_path
.clone()
.unwrap_or_else(|| PathBuf::from(PUSH_DOWN_TOPK_QUERY_DIR))
}

fn load_query(&self, query_id: usize) -> Result<String> {
let path = self.queries_dir().join(format!("q{query_id}.sql"));
std::fs::read_to_string(&path).map_err(|e| {
datafusion_common::DataFusionError::Execution(format!(
"Failed to read query file {}: {e}",
path.display()
))
})
}

fn available_queries(&self) -> Vec<usize> {
let dir = self.queries_dir();
let mut ids = Vec::new();
if let Ok(entries) = std::fs::read_dir(&dir) {
for entry in entries.flatten() {
let name = entry.file_name();
let name = name.to_string_lossy();
if let Some(rest) = name.strip_prefix('q')
&& let Some(num_str) = rest.strip_suffix(".sql")
&& let Ok(id) = num_str.parse::<usize>()
{
ids.push(id);
}
}
}
ids.sort();
ids
}

pub async fn run(&self) -> Result<()> {
let mut benchmark_run = BenchmarkRun::new();

let query_ids = match self.query {
Some(query_id) => vec![query_id],
None => self.available_queries(),
};

for query_id in query_ids {
benchmark_run.start_new_case(&format!("{query_id}"));

match self.benchmark_query(query_id).await {
Ok(query_results) => {
for iter in query_results {
benchmark_run.write_iter(iter.elapsed, iter.row_count);
}
}
Err(e) => {
benchmark_run.mark_failed();
eprintln!("Query {query_id} failed: {e}");
}
}
}

benchmark_run.maybe_write_json(self.output_path.as_ref())?;
benchmark_run.maybe_print_failures();
Ok(())
}

async fn benchmark_query(&self, query_id: usize) -> Result<Vec<QueryResult>> {
let sql = self.load_query(query_id)?;

let config = self.common.config()?;
let rt = self.common.build_runtime()?;
let state = SessionStateBuilder::new()
.with_config(config)
.with_runtime_env(rt)
.with_default_features()
.build();
let ctx = SessionContext::from(state);

self.register_tables(&ctx).await?;

let mut millis = vec![];
let mut query_results = vec![];
for i in 0..self.iterations() {
let start = Instant::now();
let row_count = self.execute_query(&ctx, &sql).await?;
let elapsed = start.elapsed();
let ms = elapsed.as_secs_f64() * 1000.0;
millis.push(ms);

println!(
"Query {query_id} iteration {i} took {ms:.1} ms and returned {row_count} rows"
);
query_results.push(QueryResult { elapsed, row_count });
}

let avg = millis.iter().sum::<f64>() / millis.len() as f64;
println!("Query {query_id} avg time: {avg:.2} ms");

print_memory_stats();
Ok(query_results)
}

async fn register_tables(&self, ctx: &SessionContext) -> Result<()> {
for table in Self::TABLES {
let provider = self.get_table(ctx, table).await?;
ctx.register_table(table, provider)?;
}
Ok(())
}

async fn execute_query(&self, ctx: &SessionContext, sql: &str) -> Result<usize> {
let debug = self.common.debug;
let plan = ctx.sql(sql).await?;
let (state, plan) = plan.into_parts();

if debug {
println!("=== Logical plan ===\n{plan}\n");
}

let plan = state.optimize(&plan)?;
if debug {
println!("=== Optimized logical plan ===\n{plan}\n");
}
let physical_plan = state.create_physical_plan(&plan).await?;
if debug {
println!(
"=== Physical plan ===\n{}\n",
displayable(physical_plan.as_ref()).indent(true)
);
}

let mut row_count = 0;
let mut stream = execute_stream(physical_plan.clone(), state.task_ctx())?;
while let Some(batch) = stream.next().await {
row_count += batch?.num_rows();
}

if debug {
println!(
"=== Physical plan with metrics ===\n{}\n",
DisplayableExecutionPlan::with_metrics(physical_plan.as_ref())
.indent(true)
);
}

Ok(row_count)
}

async fn get_table(
&self,
ctx: &SessionContext,
table: &str,
) -> Result<Arc<dyn TableProvider>> {
let path = self.path.to_str().unwrap();
let state = ctx.state();
let table_path = format!("{path}/{table}");
let format = Arc::new(
ParquetFormat::default()
.with_options(ctx.state().table_options().parquet.clone()),
);
let options = ListingOptions::new(format)
.with_file_extension(DEFAULT_PARQUET_EXTENSION)
.with_collect_stat(true);
let table_path = ListingTableUrl::parse(table_path)?;
let schema = options.infer_schema(&state, &table_path).await?;
let config = ListingTableConfig::new(table_path)
.with_listing_options(options)
.with_schema(schema);
Ok(Arc::new(ListingTable::try_new(config)?))
}

fn iterations(&self) -> usize {
self.common.iterations
}
}
Loading
Loading