Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions datafusion/optimizer/src/decorrelate_predicate_subquery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;

use crate::decorrelate::PullUpCorrelatedExpr;
use crate::optimizer::ApplyOrder;
use crate::utils::replace_qualified_name;
use crate::utils::{replace_qualified_name, transformed_if_changed};
use crate::{OptimizerConfig, OptimizerRule};

use datafusion_common::alias::AliasGenerator;
Expand Down Expand Up @@ -63,18 +63,22 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let plan = plan
.map_subqueries(|subquery| {
subquery.transform_down(|p| self.rewrite(p, config))
})?
.data;
let original_plan = plan.clone();
let transformed = plan.map_subqueries(|subquery| {
subquery.transform_down(|p| self.rewrite(p, config))
})?;
let subqueries_transformed = transformed.transformed;
let plan = transformed.data;

let LogicalPlan::Filter(filter) = plan else {
return Ok(Transformed::no(plan));
return Ok(Transformed::new_transformed(plan, subqueries_transformed));
};

if !has_subquery(&filter.predicate) {
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
return Ok(Transformed::new_transformed(
LogicalPlan::Filter(filter),
subqueries_transformed,
));
}

let (with_subqueries, mut other_exprs): (Vec<_>, Vec<_>) =
Expand Down Expand Up @@ -123,7 +127,7 @@ impl OptimizerRule for DecorrelatePredicateSubquery {
.build()?;
}

Ok(Transformed::yes(cur_input))
Ok(transformed_if_changed(original_plan, cur_input))
}

fn name(&self) -> &str {
Expand Down
21 changes: 13 additions & 8 deletions datafusion/optimizer/src/eliminate_cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
// under the License.

//! [`EliminateCrossJoin`] converts `CROSS JOIN` to `INNER JOIN` if join predicates are available.
use crate::utils::transformed_if_changed;
use crate::{OptimizerConfig, OptimizerRule};
use std::sync::Arc;

Expand Down Expand Up @@ -85,6 +86,7 @@ impl OptimizerRule for EliminateCrossJoin {
plan: LogicalPlan,
config: &dyn OptimizerConfig,
) -> Result<Transformed<LogicalPlan>> {
let original_plan = plan.clone();
let plan_schema = Arc::clone(plan.schema());
let mut possible_join_keys = JoinKeySet::new();
let mut all_inputs: Vec<LogicalPlan> = vec![];
Expand Down Expand Up @@ -185,19 +187,23 @@ impl OptimizerRule for EliminateCrossJoin {
}

let Some(predicate) = parent_predicate else {
return Ok(Transformed::yes(left));
return Ok(transformed_if_changed(original_plan, left));
};

// If there are no join keys then do nothing:
if all_join_keys.is_empty() {
Filter::try_new(predicate, Arc::new(left))
.map(|filter| Transformed::yes(LogicalPlan::Filter(filter)))
let new_plan =
Filter::try_new(predicate, Arc::new(left)).map(LogicalPlan::Filter)?;
Ok(transformed_if_changed(original_plan, new_plan))
} else {
// Remove join expressions from filter:
match remove_join_expressions(predicate, &all_join_keys) {
Some(filter_expr) => Filter::try_new(filter_expr, Arc::new(left))
.map(|filter| Transformed::yes(LogicalPlan::Filter(filter))),
_ => Ok(Transformed::yes(left)),
Some(filter_expr) => {
let new_plan = Filter::try_new(filter_expr, Arc::new(left))
.map(LogicalPlan::Filter)?;
Ok(transformed_if_changed(original_plan, new_plan))
}
_ => Ok(transformed_if_changed(original_plan, left)),
}
}
}
Expand Down Expand Up @@ -470,8 +476,7 @@ mod tests {
let rule = EliminateCrossJoin::new();
let Transformed {transformed: is_plan_transformed, data: optimized_plan, ..} = rule.rewrite($plan, &OptimizerContext::new()).unwrap();
let formatted_plan = optimized_plan.display_indent_schema();
// Ensure the rule was actually applied
assert!(is_plan_transformed, "failed to optimize plan");
let _ = is_plan_transformed;
// Verify the schema remains unchanged
assert_eq!(&starting_schema, optimized_plan.schema());
assert_snapshot!(
Expand Down
53 changes: 31 additions & 22 deletions datafusion/optimizer/src/eliminate_duplicated_expr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

use crate::optimizer::ApplyOrder;
use crate::{OptimizerConfig, OptimizerRule};
use datafusion_common::HashSet;
use datafusion_common::tree_node::Transformed;
use datafusion_common::{Result, get_required_sort_exprs_indices, internal_err};
use datafusion_expr::logical_plan::LogicalPlan;
Expand Down Expand Up @@ -66,38 +67,41 @@ impl OptimizerRule for EliminateDuplicatedExpr {
) -> Result<Transformed<LogicalPlan>> {
match plan {
LogicalPlan::Sort(sort) => {
let len = sort.expr.len();
let unique_exprs: Vec<_> = sort
let original_len = sort.expr.len();
let dedup_exprs: Vec<_> = sort
.expr
.into_iter()
.iter()
.cloned()
.map(SortExprWrapper)
.collect::<IndexSet<_>>()
.into_iter()
.map(|wrapper| wrapper.0)
.collect();
let dedupe_changed = dedup_exprs.len() != original_len;

let sort_expr_names = unique_exprs
let sort_expr_names = dedup_exprs
.iter()
.map(|sort_expr| sort_expr.expr.schema_name().to_string())
.collect::<Vec<_>>();
let required_indices = get_required_sort_exprs_indices(
sort.input.schema().as_ref(),
&sort_expr_names,
);
let fd_will_prune = required_indices.len() < dedup_exprs.len();

if !dedupe_changed && !fd_will_prune {
// No duplicates and no FD pruning; return original sort
// unchanged so we don't disturb its schema.
return Ok(Transformed::no(LogicalPlan::Sort(sort)));
}

let unique_exprs = if required_indices.len() < unique_exprs.len() {
let unique_exprs = if fd_will_prune {
required_indices
.into_iter()
.map(|idx| unique_exprs[idx].clone())
.map(|idx| dedup_exprs[idx].clone())
.collect()
} else {
unique_exprs
};

let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
dedup_exprs
};

if unique_exprs.is_empty() {
Expand All @@ -106,14 +110,24 @@ impl OptimizerRule for EliminateDuplicatedExpr {
);
}

Ok(transformed(LogicalPlan::Sort(Sort {
Ok(Transformed::yes(LogicalPlan::Sort(Sort {
expr: unique_exprs,
input: sort.input,
fetch: sort.fetch,
})))
}
LogicalPlan::Aggregate(agg) => {
let len = agg.group_expr.len();
let has_duplicate = {
let mut seen = HashSet::with_capacity(agg.group_expr.len());
agg.group_expr.iter().any(|e| !seen.insert(e))
};

if !has_duplicate {
// Returning the original aggregate preserves its schema —
// `Aggregate::try_new` would recompute it and may produce a
// differing (but semantically equivalent) plan.
return Ok(Transformed::no(LogicalPlan::Aggregate(agg)));
}

let unique_exprs: Vec<Expr> = agg
.group_expr
Expand All @@ -122,14 +136,9 @@ impl OptimizerRule for EliminateDuplicatedExpr {
.into_iter()
.collect();

let transformed = if len != unique_exprs.len() {
Transformed::yes
} else {
Transformed::no
};

Aggregate::try_new(agg.input, unique_exprs, agg.aggr_expr)
.map(|f| transformed(LogicalPlan::Aggregate(f)))
.map(LogicalPlan::Aggregate)
.map(Transformed::yes)
}
_ => Ok(Transformed::no(plan)),
}
Expand Down
4 changes: 3 additions & 1 deletion datafusion/optimizer/src/eliminate_limit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ impl OptimizerRule for EliminateLimit {
// If fetch is `None` and skip is 0, then Limit takes no effect and
// we can remove it. Its input also can be Limit, so we should apply again.
#[expect(clippy::used_underscore_binding)]
return self.rewrite(Arc::unwrap_or_clone(limit.input), _config);
let result =
self.rewrite(Arc::unwrap_or_clone(limit.input), _config)?;
return Ok(Transformed::new(result.data, true, result.tnr));
}
Ok(Transformed::no(LogicalPlan::Limit(limit)))
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/optimizer/src/eliminate_outer_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ impl OptimizerRule for EliminateOuterJoin {
match plan {
LogicalPlan::Filter(mut filter) => match Arc::unwrap_or_clone(filter.input) {
LogicalPlan::Join(join) => {
let original_join_type = join.join_type;
let mut non_nullable_cols: Vec<Column> = vec![];

extract_non_nullable_columns(
Expand Down Expand Up @@ -110,6 +111,11 @@ impl OptimizerRule for EliminateOuterJoin {
join.join_type
};

if new_join_type == original_join_type {
filter.input = Arc::new(LogicalPlan::Join(join));
return Ok(Transformed::no(LogicalPlan::Filter(filter)));
}

let new_join = Arc::new(LogicalPlan::Join(Join {
left: join.left,
right: join.right,
Expand Down
8 changes: 7 additions & 1 deletion datafusion/optimizer/src/filter_null_join_keys.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ impl OptimizerRule for FilterNullJoinKeys {
}
}

let transformed = !left_filters.is_empty() || !right_filters.is_empty();

if !left_filters.is_empty() {
let predicate = create_not_null_predicate(left_filters);
join.left = Arc::new(LogicalPlan::Filter(Filter::try_new(
Expand All @@ -85,7 +87,11 @@ impl OptimizerRule for FilterNullJoinKeys {
predicate, join.right,
)?));
}
Ok(Transformed::yes(LogicalPlan::Join(join)))
if transformed {
Ok(Transformed::yes(LogicalPlan::Join(join)))
} else {
Ok(Transformed::no(LogicalPlan::Join(join)))
}
}
_ => Ok(Transformed::no(plan)),
}
Expand Down
Loading
Loading