Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1124,6 +1124,22 @@ config_namespace! {
/// into the file scan phase.
pub enable_topk_dynamic_filter_pushdown: bool, default = true

/// When set to true, uncorrelated scalar subqueries are
/// left in the logical plan and executed by `ScalarSubqueryExec` during
/// physical execution. When set to false, all scalar subqueries
/// (including uncorrelated ones) are rewritten to left joins by the
/// `ScalarSubqueryToJoin` optimizer rule.
///
/// Note disabling this option is not recommended. It restores
/// pre <https://github.com/apache/datafusion/pull/21240>
/// behavior, which silently produces incorrect results for
/// multi-row subqueries and does not support scalar subqueries in
/// ORDER BY / JOIN ON / aggregate-function arguments. This option is
/// intended as a temporary escape hatch for distributed execution
/// frameworks and is planned to be removed in a future DataFusion
/// release.
pub enable_physical_uncorrelated_scalar_subquery: bool, default = true

/// When set to true, the optimizer will attempt to push down Join dynamic filters
/// into the file scan phase.
pub enable_join_dynamic_filter_pushdown: bool, default = true
Expand Down
15 changes: 14 additions & 1 deletion datafusion/core/src/physical_planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,20 @@ impl DefaultPhysicalPlanner {
session_state: &'a SessionState,
) -> futures::future::BoxFuture<'a, Result<Arc<dyn ExecutionPlan>>> {
Box::pin(async move {
let all_subqueries = Self::collect_scalar_subqueries(logical_plan);
// When `enable_physical_uncorrelated_scalar_subquery` is disabled, the
// `ScalarSubqueryToJoin` optimizer rule rewrites all uncorrelated
// scalar subqueries to joins, so none should reach this point.
// Skip collection in that case to avoid creating a no-op
// `ScalarSubqueryExec` wrapper.
let all_subqueries = if session_state
.config_options()
.optimizer
.enable_physical_uncorrelated_scalar_subquery
{
Self::collect_scalar_subqueries(logical_plan)
} else {
Vec::new()
};
let (links, index_map) = self
.plan_scalar_subqueries(all_subqueries, session_state)
.await?;
Expand Down
150 changes: 124 additions & 26 deletions datafusion/optimizer/src/scalar_subquery_to_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.

//! [`ScalarSubqueryToJoin`] rewriting correlated scalar subquery filters to `JOIN`s
//! [`ScalarSubqueryToJoin`] rewriting scalar subquery filters to `JOIN`s

use std::collections::{BTreeSet, HashMap};
use std::sync::Arc;
Expand All @@ -36,9 +36,14 @@ use datafusion_expr::logical_plan::{JoinType, Subquery};
use datafusion_expr::utils::conjunction;
use datafusion_expr::{EmptyRelation, Expr, LogicalPlan, LogicalPlanBuilder, expr};

/// Optimizer rule that rewrites correlated scalar subquery filters to joins and
/// places an additional projection on top of the filter, to preserve the
/// original schema.
/// Optimizer rule that rewrites scalar subquery filters to joins and places an
/// additional projection on top of the filter to preserve the original schema.
///
/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is
/// true (the default), only *correlated* scalar subqueries are rewritten here;
/// uncorrelated ones are left for physical execution via `ScalarSubqueryExec`.
/// When the option is false, all scalar subqueries — correlated and
/// uncorrelated — are rewritten to left joins by this rule.
#[derive(Default, Debug)]
pub struct ScalarSubqueryToJoin {}

Expand All @@ -63,10 +68,12 @@ impl ScalarSubqueryToJoin {
&self,
predicate: &Expr,
alias_gen: &Arc<AliasGenerator>,
physical_uncorrelated: bool,
) -> Result<(Vec<(Subquery, String)>, Expr)> {
let mut extract = ExtractScalarSubQuery {
sub_query_info: vec![],
alias_gen,
physical_uncorrelated,
};
predicate
.clone()
Expand All @@ -88,15 +95,23 @@ impl OptimizerRule for ScalarSubqueryToJoin {
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Filter(filter) => {
let physical_uncorrelated = config
.options()
.optimizer
.enable_physical_uncorrelated_scalar_subquery;
// Optimization: skip the rest of the rule and its copies if
// there are no scalar subqueries
if !contains_correlated_scalar_subquery(&filter.predicate) {
// there are no scalar subqueries this rule should rewrite
if !contains_scalar_subquery_to_rewrite(
&filter.predicate,
physical_uncorrelated,
) {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

let (subqueries, mut rewrite_expr) = self.extract_subquery_exprs(
&filter.predicate,
config.alias_generator(),
physical_uncorrelated,
)?;

assert_or_internal_err!(
Expand Down Expand Up @@ -142,13 +157,15 @@ impl OptimizerRule for ScalarSubqueryToJoin {
Ok(Transformed::yes(new_plan))
}
LogicalPlan::Projection(projection) => {
let physical_uncorrelated = config
.options()
.optimizer
.enable_physical_uncorrelated_scalar_subquery;
// Optimization: skip the rest of the rule and its copies if there
// are no correlated scalar subqueries
if !projection
.expr
.iter()
.any(contains_correlated_scalar_subquery)
{
// are no scalar subqueries this rule should rewrite
if !projection.expr.iter().any(|expr| {
contains_scalar_subquery_to_rewrite(expr, physical_uncorrelated)
}) {
return Ok(Transformed::no(LogicalPlan::Projection(projection)));
}

Expand All @@ -157,8 +174,11 @@ impl OptimizerRule for ScalarSubqueryToJoin {
let mut rewrite_exprs: Vec<Expr> =
Vec::with_capacity(projection.expr.len());
for (idx, expr) in projection.expr.iter().enumerate() {
let (subqueries, rewrite_expr) =
self.extract_subquery_exprs(expr, config.alias_generator())?;
let (subqueries, rewrite_expr) = self.extract_subquery_exprs(
expr,
config.alias_generator(),
physical_uncorrelated,
)?;
for (_, alias) in &subqueries {
alias_to_index.insert(alias.clone(), idx);
}
Expand Down Expand Up @@ -230,29 +250,42 @@ impl OptimizerRule for ScalarSubqueryToJoin {
}
}

/// Returns true if the expression contains a correlated scalar subquery, false
/// otherwise. Uncorrelated scalar subqueries are handled by the physical
/// planner via `ScalarSubqueryExec` and do not need to be converted to joins.
fn contains_correlated_scalar_subquery(expr: &Expr) -> bool {
/// Returns true if the expression contains a scalar subquery that this rule
/// should rewrite to a join.
///
/// When `enable_physical_uncorrelated_scalar_subquery` is true (the default) only
/// correlated scalar subqueries are rewritten — uncorrelated ones are handled
/// by the physical planner via `ScalarSubqueryExec`. When it is false, all
/// scalar subqueries (correlated and uncorrelated) are rewritten.
fn contains_scalar_subquery_to_rewrite(expr: &Expr, physical_uncorrelated: bool) -> bool {
expr.exists(|expr| {
Ok(matches!(expr, Expr::ScalarSubquery(sq) if !sq.outer_ref_columns.is_empty()))
Ok(matches!(
expr,
Expr::ScalarSubquery(sq)
if !physical_uncorrelated || !sq.outer_ref_columns.is_empty()
))
})
.expect("Inner is always Ok")
}

struct ExtractScalarSubQuery<'a> {
sub_query_info: Vec<(Subquery, String)>,
alias_gen: &'a Arc<AliasGenerator>,
physical_uncorrelated: bool,
}

impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
type Node = Expr;

fn f_down(&mut self, expr: Expr) -> Result<Transformed<Expr>> {
match expr {
// Skip uncorrelated scalar subqueries
// Match scalar subqueries this rule should rewrite to a join. When
// `physical_uncorrelated` is true, only correlated subqueries are
// rewritten — uncorrelated ones are handled later by the physical
// planner. When false, both are rewritten.
Expr::ScalarSubquery(ref subquery)
if !subquery.outer_ref_columns.is_empty() =>
if !self.physical_uncorrelated
|| !subquery.outer_ref_columns.is_empty() =>
{
let subquery = subquery.clone();
let scalar_expr = subquery
Expand Down Expand Up @@ -304,24 +337,41 @@ impl TreeNodeRewriter for ExtractScalarSubQuery<'_> {
/// where c.balance > a.val
/// ```
///
/// When [`datafusion_common::config::OptimizerOptions::enable_physical_uncorrelated_scalar_subquery`] is
/// false, this function also handles uncorrelated scalar subqueries, rewriting
/// them as a `Left Join: Filter: Boolean(true)` instead of leaving them for
/// `ScalarSubqueryExec`.
///
/// # Arguments
///
/// * `query_info` - The subquery portion of the `where` (select avg(total) from orders)
/// * `filter_input` - The non-subquery portion (from customers)
/// * `outer_others` - Any additional parts to the `where` expression (and c.x = y)
/// * `subquery_alias` - Subquery aliases
/// * `subquery` - The scalar subquery to rewrite (correlated, or uncorrelated
/// when `enable_physical_uncorrelated_scalar_subquery` is false).
/// * `outer_input` - The outer plan that the decorrelated subquery is
/// left-joined onto — the input of the `Filter` or `Projection` node
/// that contained the subquery.
/// * `subquery_alias` - The unique alias assigned to the decorrelated
/// subquery; used both to qualify the join condition and to produce
/// column references for the caller to substitute.
///
/// Returns `Ok(None)` if the subquery cannot be decorrelated. On success,
/// returns the rewritten outer plan and a map from each count-bug-affected
/// column to its `CASE WHEN __always_true IS NULL THEN ... END` compensation
/// expression, which the caller must substitute into any expression that
/// references those columns.
fn build_join(
subquery: &Subquery,
filter_input: &LogicalPlan,
subquery_alias: &str,
) -> Result<Option<(LogicalPlan, HashMap<Column, Expr>)>> {
// `build_join` also handles uncorrelated scalar subqueries (as a left
// join with `Boolean(true)`) when the
// `enable_physical_uncorrelated_scalar_subquery` option is disabled.
let subquery_plan = subquery.subquery.as_ref();
let mut pull_up = PullUpCorrelatedExpr::new().with_need_handle_count_bug(true);
let new_plan = subquery_plan.clone().rewrite(&mut pull_up).data()?;
if !pull_up.can_pull_up {
return Ok(None);
}

let collected_count_expr_map =
pull_up.collected_count_expr_map.get(&new_plan).cloned();
let sub_query_alias = LogicalPlanBuilder::from(new_plan)
Expand Down Expand Up @@ -1177,4 +1227,52 @@ mod tests {
"
)
}

#[test]
fn uncorrelated_scalar_subquery_rewritten_when_flag_off() -> Result<()> {
use datafusion_common::config::ConfigOptions;

let sq = Arc::new(
LogicalPlanBuilder::from(scan_tpch_table("orders"))
.aggregate(Vec::<Expr>::new(), vec![max(col("orders.o_custkey"))])?
.project(vec![max(col("orders.o_custkey"))])?
.build()?,
);

let plan = LogicalPlanBuilder::from(scan_tpch_table("customer"))
.filter(col("customer.c_custkey").eq(scalar_subquery(sq)))?
.project(vec![col("customer.c_custkey")])?
.build()?;

let mut options = ConfigOptions::default();
options
.optimizer
.enable_physical_uncorrelated_scalar_subquery = false;
let context = crate::OptimizerContext::new_with_config_options(Arc::new(options));

let rule: Arc<dyn OptimizerRule + Send + Sync> =
Arc::new(ScalarSubqueryToJoin::new());
let optimizer = crate::Optimizer::with_rules(vec![rule]);
let optimized_plan = optimizer
.optimize(plan, &context, |_, _| {})
.expect("failed to optimize plan");
let formatted_plan = optimized_plan.display_indent_schema();

insta::assert_snapshot!(
formatted_plan,
@r"
Projection: customer.c_custkey [c_custkey:Int64]
Projection: customer.c_custkey, customer.c_name [c_custkey:Int64, c_name:Utf8]
Filter: customer.c_custkey = __scalar_sq_1.max(orders.o_custkey) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
Left Join: Filter: Boolean(true) [c_custkey:Int64, c_name:Utf8, max(orders.o_custkey):Int64;N]
TableScan: customer [c_custkey:Int64, c_name:Utf8]
SubqueryAlias: __scalar_sq_1 [max(orders.o_custkey):Int64;N]
Projection: max(orders.o_custkey) [max(orders.o_custkey):Int64;N]
Aggregate: groupBy=[[]], aggr=[[max(orders.o_custkey)]] [max(orders.o_custkey):Int64;N]
TableScan: orders [o_orderkey:Int64, o_custkey:Int64, o_orderstatus:Utf8, o_totalprice:Float64;N]
"
);

Ok(())
}
}
2 changes: 2 additions & 0 deletions datafusion/sqllogictest/test_files/information_schema.slt
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true
datafusion.optimizer.enable_dynamic_filter_pushdown true
datafusion.optimizer.enable_join_dynamic_filter_pushdown true
datafusion.optimizer.enable_leaf_expression_pushdown true
datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true
datafusion.optimizer.enable_piecewise_merge_join false
datafusion.optimizer.enable_round_robin_repartition true
datafusion.optimizer.enable_sort_pushdown true
Expand Down Expand Up @@ -453,6 +454,7 @@ datafusion.optimizer.enable_distinct_aggregation_soft_limit true When set to tru
datafusion.optimizer.enable_dynamic_filter_pushdown true When set to true attempts to push down dynamic filters generated by operators (TopK, Join & Aggregate) into the file scan phase. For example, for a query such as `SELECT * FROM t ORDER BY timestamp DESC LIMIT 10`, the optimizer will attempt to push down the current top 10 timestamps that the TopK operator references into the file scans. This means that if we already have 10 timestamps in the year 2025 any files that only have timestamps in the year 2024 can be skipped / pruned at various stages in the scan. The config will suppress `enable_join_dynamic_filter_pushdown`, `enable_topk_dynamic_filter_pushdown` & `enable_aggregate_dynamic_filter_pushdown` So if you disable `enable_topk_dynamic_filter_pushdown`, then enable `enable_dynamic_filter_pushdown`, the `enable_topk_dynamic_filter_pushdown` will be overridden.
datafusion.optimizer.enable_join_dynamic_filter_pushdown true When set to true, the optimizer will attempt to push down Join dynamic filters into the file scan phase.
datafusion.optimizer.enable_leaf_expression_pushdown true When set to true, the optimizer will extract leaf expressions (such as `get_field`) from filter/sort/join nodes into projections closer to the leaf table scans, and push those projections down towards the leaf nodes.
datafusion.optimizer.enable_physical_uncorrelated_scalar_subquery true When set to true, uncorrelated scalar subqueries are left in the logical plan and executed by `ScalarSubqueryExec` during physical execution. When set to false, all scalar subqueries (including uncorrelated ones) are rewritten to left joins by the `ScalarSubqueryToJoin` optimizer rule. Note disabling this option is not recommended. It restores pre <https://github.com/apache/datafusion/pull/21240> behavior, which silently produces incorrect results for multi-row subqueries and does not support scalar subqueries in ORDER BY / JOIN ON / aggregate-function arguments. This option is intended as a temporary escape hatch for distributed execution frameworks and is planned to be removed in a future DataFusion release.
datafusion.optimizer.enable_piecewise_merge_join false When set to true, piecewise merge join is enabled. PiecewiseMergeJoin is currently experimental. Physical planner will opt for PiecewiseMergeJoin when there is only one range filter.
datafusion.optimizer.enable_round_robin_repartition true When set to true, the physical plan optimizer will try to add round robin repartitioning to increase parallelism to leverage more CPU cores
datafusion.optimizer.enable_sort_pushdown true Enable sort pushdown optimization. When enabled, attempts to push sort requirements down to data sources that can natively handle them (e.g., by reversing file/row group read order). Returns **inexact ordering**: Sort operator is kept for correctness, but optimized input enables early termination for TopK queries (ORDER BY ... LIMIT N), providing significant speedup. Memory: No additional overhead (only changes read order). Future: Will add option to detect perfectly sorted data and eliminate Sort completely. Default: true
Expand Down
Loading
Loading