Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 94 additions & 0 deletions datafusion/optimizer/src/propagate_empty_relation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,34 @@ impl OptimizerRule for PropagateEmptyRelation {
schema: Arc::clone(&join.schema),
}),
)),
JoinType::LeftMark if left_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(&join.schema),
}),
)),
// Left mark join with an empty right side preserves all left rows
// and marks every row as not matched.
JoinType::LeftMark if right_empty => {
Ok(Transformed::yes(build_false_mark_projection(
Arc::clone(&join.left),
&join.schema,
)?))
}
JoinType::RightMark if right_empty => Ok(Transformed::yes(
LogicalPlan::EmptyRelation(EmptyRelation {
produce_one_row: false,
schema: Arc::clone(&join.schema),
}),
)),
// Right mark join with an empty left side preserves all right rows
// and marks every row as not matched.
JoinType::RightMark if left_empty => {
Ok(Transformed::yes(build_false_mark_projection(
Arc::clone(&join.right),
&join.schema,
)?))
}
_ => Ok(Transformed::no(plan)),
}
}
Expand Down Expand Up @@ -321,6 +349,32 @@ fn build_null_padded_projection(
)?))
}

/// Builds a Projection that replaces an eliminated mark join with the surviving
/// side's columns plus `FALSE` for the synthetic mark column.
fn build_false_mark_projection(
surviving_plan: Arc<LogicalPlan>,
join_schema: &DFSchemaRef,
) -> Result<LogicalPlan> {
let mark_index = join_schema.fields().len().saturating_sub(1);
let exprs = join_schema
.iter()
.enumerate()
.map(|(i, (qualifier, field))| {
if i == mark_index {
lit(false).alias_qualified(qualifier.cloned(), field.name())
} else {
Expr::Column(Column::new(qualifier.cloned(), field.name()))
}
})
.collect::<Vec<_>>();

Ok(LogicalPlan::Projection(Projection::try_new_with_schema(
exprs,
surviving_plan,
Arc::clone(join_schema),
)?))
}

/// Returns `true` if any grouping set in the list of GROUP BY expressions is
/// the empty set `()`.
///
Expand Down Expand Up @@ -656,6 +710,46 @@ mod tests {
assert_anti_join_empty_join_table_is_base_table(false)
}

#[test]
fn test_left_mark_join_right_empty_false_mark() -> Result<()> {
let left =
LogicalPlanBuilder::from(test_table_scan_with_name("left")?).build()?;
let right_empty = LogicalPlanBuilder::from(test_table_scan_with_name("right")?)
.filter(lit(false))?
.build()?;

let plan = LogicalPlanBuilder::from(left)
.join_using(
right_empty,
JoinType::LeftMark,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: left.a, left.b, left.c, Boolean(false) AS mark\n TableScan: left";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_right_mark_join_left_empty_false_mark() -> Result<()> {
let left_empty = LogicalPlanBuilder::from(test_table_scan_with_name("left")?)
.filter(lit(false))?
.build()?;
let right =
LogicalPlanBuilder::from(test_table_scan_with_name("right")?).build()?;

let plan = LogicalPlanBuilder::from(left_empty)
.join_using(
right,
JoinType::RightMark,
vec![Column::from_name("a".to_string())],
)?
.build()?;

let expected = "Projection: right.a, right.b, right.c, Boolean(false) AS mark\n TableScan: right";
assert_together_optimized_plan(plan, expected, true)
}

#[test]
fn test_join_empty_propagation_rules_noop() -> Result<()> {
// these cases should not result in an empty relation
Expand Down
33 changes: 33 additions & 0 deletions datafusion/sqllogictest/test_files/subquery.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2348,3 +2348,36 @@ DROP TABLE sq_count_customer;

statement ok
DROP TABLE sq_count_orders;

# Mark joins must preserve their synthetic mark column when the subquery input
# is optimized to EmptyRelation.
statement ok
CREATE TABLE mark_join_empty_t1(id INT, flag BOOLEAN);

statement ok
CREATE TABLE mark_join_empty_t2(id INT);

statement ok
INSERT INTO mark_join_empty_t1 VALUES (1, TRUE);

statement ok
INSERT INTO mark_join_empty_t2 VALUES (1);

query IB
SELECT *
FROM mark_join_empty_t1
WHERE EXISTS (
SELECT 1
FROM mark_join_empty_t2
WHERE mark_join_empty_t2.id = mark_join_empty_t1.id
AND 1 = 0
)
OR flag = TRUE;
----
1 true

statement ok
DROP TABLE mark_join_empty_t1;

statement ok
DROP TABLE mark_join_empty_t2;
Loading