From 2c6eada79352b95212e3d16c81a7293f63e0b151 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Fri, 6 Mar 2026 16:25:37 +0000 Subject: [PATCH 1/2] Add merge_into hook to TableProvider trait Add merge_into async method to TableProvider trait for MERGE INTO DML support. The method accepts: - source: ExecutionPlan representing the USING clause - on: Expr representing the ON join condition - clauses: Vec for WHEN MATCHED/NOT MATCHED actions Default implementation returns not_impl_err for tables that don't support MERGE INTO operations. --- datafusion/catalog/src/table.rs | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/datafusion/catalog/src/table.rs b/datafusion/catalog/src/table.rs index c6468fd5ad131..43eebcb36adca 100644 --- a/datafusion/catalog/src/table.rs +++ b/datafusion/catalog/src/table.rs @@ -28,7 +28,7 @@ use datafusion_common::{Result, internal_err}; use datafusion_expr::Expr; use datafusion_expr::statistics::StatisticsRequest; -use datafusion_expr::dml::InsertOp; +use datafusion_expr::dml::{InsertOp, MergeIntoClause}; use datafusion_expr::{ CreateExternalTable, LogicalPlan, TableProviderFilterPushDown, TableType, }; @@ -379,6 +379,23 @@ pub trait TableProvider: Any + Debug + Sync + Send { async fn truncate(&self, _state: &dyn Session) -> Result> { not_impl_err!("TRUNCATE not supported for {} table", self.table_type()) } + + /// Merge rows from a source into this table. + /// + /// The `source` is an [`ExecutionPlan`] representing the USING clause. + /// The `on` condition is the join predicate from the ON clause. + /// The `clauses` describe the WHEN MATCHED / WHEN NOT MATCHED actions. + /// + /// Returns an [`ExecutionPlan`] producing a single row with `count` (UInt64). + async fn merge_into( + &self, + _state: &dyn Session, + _source: Arc, + _on: Expr, + _clauses: Vec, + ) -> Result> { + not_impl_err!("MERGE INTO not supported for {} table", self.table_type()) + } } impl dyn TableProvider { From cb89e5391c430b69693eb57726e5d2401e0ce868 Mon Sep 17 00:00:00 2001 From: xuanyili Date: Fri, 6 Mar 2026 16:31:08 +0000 Subject: [PATCH 2/2] Add SQL and physical planner support for MERGE INTO Implement merge_to_plan and merge_clause_to_plan in SQL planner: - Parse Statement::Merge into LogicalPlan::Dml with WriteOp::MergeInto - Resolve target table and plan source (USING clause) as LogicalPlan - Build combined schema for target + source to resolve ON and WHEN expressions - Convert ON condition and WHEN clauses to DataFusion Expr - Handle UPDATE, INSERT, and DELETE actions in WHEN clauses Add physical planner dispatch for WriteOp::MergeInto: - Use source_as_provider() to recover the TableProvider from the TableSource - Extract source ExecutionPlan from children - Call TableProvider::merge_into with source plan, ON condition, and clauses - Wrap errors with MERGE INTO operation context Wire MergeInto's expressions through LogicalPlan tree-traversal so optimizers can rewrite them: add MergeIntoOp::exprs() (stable iteration order: on, then per-clause predicate + action value Exprs) and MergeIntoOp::with_new_exprs() to rebuild the op from a transformed expr vector. Branch LogicalPlan::apply_expressions, map_expressions, and with_new_exprs on WriteOp::MergeInto to use these helpers; other WriteOp variants continue to expose no expressions as before. --- datafusion/core/src/physical_planner.rs | 20 ++ datafusion/expr/src/logical_plan/dml.rs | 151 +++++++++++++- datafusion/expr/src/logical_plan/plan.rs | 14 +- datafusion/expr/src/logical_plan/tree_node.rs | 27 ++- datafusion/sql/src/statement.rs | 189 +++++++++++++++++- 5 files changed, 395 insertions(+), 6 deletions(-) diff --git a/datafusion/core/src/physical_planner.rs b/datafusion/core/src/physical_planner.rs index 190a08da12222..36d2a0c2ee168 100644 --- a/datafusion/core/src/physical_planner.rs +++ b/datafusion/core/src/physical_planner.rs @@ -924,6 +924,26 @@ impl DefaultPhysicalPlanner { ); } } + LogicalPlan::Dml(DmlStatement { + table_name, + target, + op: WriteOp::MergeInto(merge_op), + .. + }) => { + let provider = source_as_provider(target)?; + let input_exec = children.one()?; + provider + .merge_into( + session_state, + input_exec, + merge_op.on.clone(), + merge_op.clauses.clone(), + ) + .await + .map_err(|e| { + e.context(format!("MERGE INTO operation on table '{table_name}'")) + })? + } LogicalPlan::Window(Window { window_expr, .. }) => { assert_or_internal_err!( !window_expr.is_empty(), diff --git a/datafusion/expr/src/logical_plan/dml.rs b/datafusion/expr/src/logical_plan/dml.rs index 5b6403e6e2f08..7717dfaff7a33 100644 --- a/datafusion/expr/src/logical_plan/dml.rs +++ b/datafusion/expr/src/logical_plan/dml.rs @@ -23,7 +23,7 @@ use std::sync::Arc; use arrow::datatypes::{DataType, Field, Schema}; use datafusion_common::file_options::file_type::FileType; -use datafusion_common::{DFSchemaRef, TableReference}; +use datafusion_common::{DFSchemaRef, Result, TableReference, internal_err}; use crate::{Expr, LogicalPlan, TableSource}; @@ -307,6 +307,106 @@ pub struct MergeIntoOp { pub clauses: Vec, } +impl MergeIntoOp { + /// Count of top-level [`Expr`]s owned by this operation (no allocation). + /// + /// Matches the length of [`Self::exprs`] and the `exprs` vec consumed by + /// [`Self::with_new_exprs`]. + fn expr_count(&self) -> usize { + 1 + self + .clauses + .iter() + .map(|c| { + c.predicate.is_some() as usize + + match &c.action { + MergeIntoAction::Update(a) => a.len(), + MergeIntoAction::Insert { values, .. } => values.len(), + MergeIntoAction::Delete => 0, + } + }) + .sum::() + } + + /// Top-level [`Expr`]s in stable order: `on`, then per-clause predicate + /// (if any) and action value expressions. + pub fn exprs(&self) -> Vec<&Expr> { + let mut out = Vec::with_capacity(self.expr_count()); + out.push(&self.on); + for clause in &self.clauses { + if let Some(predicate) = &clause.predicate { + out.push(predicate); + } + match &clause.action { + MergeIntoAction::Update(assignments) => { + out.extend(assignments.iter().map(|(_, value)| value)); + } + MergeIntoAction::Insert { values, .. } => { + out.extend(values.iter()); + } + MergeIntoAction::Delete => {} + } + } + out + } + + /// Rebuild this `MergeIntoOp` from a flat vector of new expressions, in + /// the same order produced by [`Self::exprs`]. The clause kinds, action + /// kinds, column lists, and presence/absence of each predicate are + /// preserved from `self`. + pub fn with_new_exprs(&self, exprs: Vec) -> Result { + let expected = self.expr_count(); + if exprs.len() != expected { + return internal_err!( + "MergeIntoOp::with_new_exprs expected {expected} expressions, got {}", + exprs.len() + ); + } + let mut iter = exprs.into_iter(); + let on = iter.next().expect("non-empty by length check"); + let clauses = self + .clauses + .iter() + .map(|clause| { + let predicate = clause + .predicate + .is_some() + .then(|| iter.next().expect("non-empty by length check")); + let action = match &clause.action { + MergeIntoAction::Update(assignments) => { + let assignments = assignments + .iter() + .map(|(name, _)| { + ( + name.clone(), + iter.next().expect("non-empty by length check"), + ) + }) + .collect(); + MergeIntoAction::Update(assignments) + } + MergeIntoAction::Insert { columns, values } => { + let values = values + .iter() + .map(|_| iter.next().expect("non-empty by length check")) + .collect(); + MergeIntoAction::Insert { + columns: columns.clone(), + values, + } + } + MergeIntoAction::Delete => MergeIntoAction::Delete, + }; + MergeIntoClause { + kind: clause.kind, + predicate, + action, + } + }) + .collect(); + Ok(Self { on, clauses }) + } +} + /// A single WHEN clause within a MERGE INTO statement. #[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Hash)] pub struct MergeIntoClause { @@ -445,4 +545,53 @@ mod tests { MergeIntoClauseKind::NotMatchedBySource ); } + + #[test] + fn merge_into_op_exprs_round_trip() { + let op = MergeIntoOp { + on: col("id").eq(col("source_id")), + clauses: vec![ + MergeIntoClause { + kind: MergeIntoClauseKind::Matched, + predicate: Some(col("qty").gt(lit(0_i64))), + action: MergeIntoAction::Update(vec![ + ("qty".to_string(), col("source_qty")), + ("price".to_string(), col("source_price")), + ]), + }, + MergeIntoClause { + kind: MergeIntoClauseKind::NotMatched, + predicate: None, + action: MergeIntoAction::Insert { + columns: vec!["id".to_string(), "qty".to_string()], + values: vec![col("source_id"), col("source_qty")], + }, + }, + MergeIntoClause { + kind: MergeIntoClauseKind::NotMatchedBySource, + predicate: Some(col("active").eq(lit(true))), + action: MergeIntoAction::Delete, + }, + ], + }; + let exprs = op.exprs(); + assert_eq!(exprs.len(), 7); + + let owned: Vec = exprs.into_iter().cloned().collect(); + let rebuilt = op.with_new_exprs(owned).unwrap(); + assert_eq!(op, rebuilt); + } + + #[test] + fn merge_into_op_with_new_exprs_length_mismatch() { + let op = MergeIntoOp { + on: col("id").eq(col("source_id")), + clauses: vec![], + }; + let err = op.with_new_exprs(vec![]).unwrap_err(); + assert!( + err.to_string().contains("expected 1 expressions, got 0"), + "unexpected error: {err}" + ); + } } diff --git a/datafusion/expr/src/logical_plan/plan.rs b/datafusion/expr/src/logical_plan/plan.rs index 9ca6941a61ce6..beeace1d1461e 100644 --- a/datafusion/expr/src/logical_plan/plan.rs +++ b/datafusion/expr/src/logical_plan/plan.rs @@ -39,7 +39,7 @@ use crate::expr_rewriter::{ }; use crate::logical_plan::display::{GraphvizVisitor, IndentVisitor}; use crate::logical_plan::extension::UserDefinedLogicalNode; -use crate::logical_plan::{DmlStatement, Statement}; +use crate::logical_plan::{DmlStatement, Statement, WriteOp}; use crate::utils::{ enumerate_grouping_sets, exprlist_to_fields, find_out_reference_exprs, grouping_set_expr_count, grouping_set_to_exprlist, merge_schema, split_conjunction, @@ -810,12 +810,20 @@ impl LogicalPlan { op, .. }) => { - self.assert_no_expressions(expr)?; let input = self.only_input(inputs)?; + let op = match op { + WriteOp::MergeInto(merge_op) => { + WriteOp::MergeInto(Box::new(merge_op.with_new_exprs(expr)?)) + } + other => { + self.assert_no_expressions(expr)?; + other.clone() + } + }; Ok(LogicalPlan::Dml(DmlStatement::new( table_name.clone(), Arc::clone(target), - op.clone(), + op, Arc::new(input), ))) } diff --git a/datafusion/expr/src/logical_plan/tree_node.rs b/datafusion/expr/src/logical_plan/tree_node.rs index c10ac92eef4f5..c4c1d743b58b6 100644 --- a/datafusion/expr/src/logical_plan/tree_node.rs +++ b/datafusion/expr/src/logical_plan/tree_node.rs @@ -45,7 +45,7 @@ use crate::{ DistinctOn, DmlStatement, Execute, Explain, Expr, Extension, Filter, Join, Limit, LogicalPlan, Partitioning, Prepare, Projection, RecursiveQuery, Repartition, Sort, Statement, Subquery, SubqueryAlias, TableScan, Union, Unnest, UserDefinedLogicalNode, - Values, Window, builder::unnest_with_options, dml::CopyTo, + Values, Window, WriteOp, builder::unnest_with_options, dml::CopyTo, }; use datafusion_common::tree_node::TreeNodeRefContainer; @@ -480,6 +480,10 @@ impl LogicalPlan { } _ => Ok(TreeNodeRecursion::Continue), }, + LogicalPlan::Dml(DmlStatement { + op: WriteOp::MergeInto(merge_op), + .. + }) => merge_op.exprs().apply_ref_elements(f), // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) @@ -719,6 +723,27 @@ impl LogicalPlan { ) })? } + LogicalPlan::Dml(DmlStatement { + table_name, + target, + op: WriteOp::MergeInto(merge_op), + input, + output_schema, + }) => { + let owned_exprs: Vec = + merge_op.exprs().into_iter().cloned().collect(); + owned_exprs.map_elements(f)?.transform_data(|new_exprs| { + Ok(Transformed::no(LogicalPlan::Dml(DmlStatement { + table_name, + target, + op: WriteOp::MergeInto(Box::new( + merge_op.with_new_exprs(new_exprs)?, + )), + input, + output_schema, + }))) + })? + } // plans without expressions LogicalPlan::EmptyRelation(_) | LogicalPlan::RecursiveQuery(_) diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 401313f9d396c..06efcde407933 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -39,7 +39,9 @@ use datafusion_common::{ internal_err, not_impl_err, plan_datafusion_err, plan_err, schema_err, unqualified_field_not_found, }; -use datafusion_expr::dml::{CopyTo, InsertOp}; +use datafusion_expr::dml::{ + CopyTo, InsertOp, MergeIntoAction, MergeIntoClause, MergeIntoClauseKind, MergeIntoOp, +}; use datafusion_expr::expr_rewriter::normalize_col_with_schemas_and_ambiguity_check; use datafusion_expr::logical_plan::DdlStatement; use datafusion_expr::logical_plan::builder::project; @@ -1217,6 +1219,8 @@ impl SqlToRel<'_, S> { self.delete_to_plan(&table_name, selection, limit) } + Statement::Merge(merge) => self.merge_to_plan(merge), + Statement::StartTransaction { modes, begin: false, @@ -2407,6 +2411,189 @@ impl SqlToRel<'_, S> { Ok(plan) } + fn merge_to_plan(&self, merge: ast::Merge) -> Result { + let ast::Merge { + table, + source, + on, + clauses, + into: _, + merge_token: _, + optimizer_hints, + output, + } = merge; + + if !optimizer_hints.is_empty() { + plan_err!("Optimizer hints not supported")?; + } + + if output.is_some() { + return not_impl_err!("MERGE OUTPUT clause is not supported"); + } + + // 1. Resolve target table + let (target_table_name, target_alias) = match &table { + TableFactor::Table { name, alias, .. } => (name.clone(), alias.clone()), + _ => plan_err!("Cannot MERGE INTO non-table relation!")?, + }; + let target_table_ref = self.object_name_to_table_reference(target_table_name)?; + let target_table_source = self + .context_provider + .get_table_source(target_table_ref.clone())?; + let target_schema = Arc::new(DFSchema::try_from_qualified_schema( + target_table_ref.clone(), + &target_table_source.schema(), + )?); + + // 2. Plan the source (USING clause) as a LogicalPlan + let mut planner_context = PlannerContext::new(); + let source_table_with_joins = TableWithJoins { + relation: source, + joins: vec![], + }; + let source_plan = + self.plan_from_tables(vec![source_table_with_joins], &mut planner_context)?; + + // 3. Build a combined schema for resolving expressions in ON and WHEN clauses + let combined_schema = + Arc::new(target_schema.as_ref().join(source_plan.schema())?); + + // 4. Convert the ON condition from sqlparser Expr to datafusion Expr + let on_expr = self.sql_to_expr(*on, &combined_schema, &mut planner_context)?; + + // 5. Convert each WHEN clause + let df_clauses = clauses + .into_iter() + .map(|clause| { + self.merge_clause_to_plan( + clause, + &combined_schema, + &target_schema, + &target_alias, + &mut planner_context, + ) + }) + .collect::>>()?; + + // 6. Build the DmlStatement + let plan = LogicalPlan::Dml(DmlStatement::new( + target_table_ref, + target_table_source, + WriteOp::MergeInto(Box::new(MergeIntoOp { + on: on_expr, + clauses: df_clauses, + })), + Arc::new(source_plan), + )); + + Ok(plan) + } + + fn merge_clause_to_plan( + &self, + clause: ast::MergeClause, + combined_schema: &DFSchema, + target_schema: &DFSchema, + _target_alias: &Option, + planner_context: &mut PlannerContext, + ) -> Result { + let kind = match clause.clause_kind { + ast::MergeClauseKind::Matched => MergeIntoClauseKind::Matched, + ast::MergeClauseKind::NotMatched => MergeIntoClauseKind::NotMatched, + ast::MergeClauseKind::NotMatchedByTarget => { + MergeIntoClauseKind::NotMatchedByTarget + } + ast::MergeClauseKind::NotMatchedBySource => { + MergeIntoClauseKind::NotMatchedBySource + } + }; + + let predicate = clause + .predicate + .map(|p| self.sql_to_expr(p, combined_schema, planner_context)) + .transpose()?; + + let action = match clause.action { + ast::MergeAction::Update(update_expr) => { + let assignments = update_expr + .assignments + .into_iter() + .map(|assign| { + let col_name = match &assign.target { + AssignmentTarget::ColumnName(cols) => cols + .0 + .iter() + .last() + .ok_or_else(|| plan_datafusion_err!("Empty column id"))? + .as_ident() + .unwrap() + .value + .clone(), + _ => plan_err!("Tuples are not supported")?, + }; + // Validate column exists in target + target_schema.field_with_unqualified_name(&col_name)?; + let value = self.sql_to_expr( + assign.value, + combined_schema, + planner_context, + )?; + Ok((col_name, value)) + }) + .collect::>>()?; + MergeIntoAction::Update(assignments) + } + ast::MergeAction::Insert(insert_expr) => { + let columns: Vec = insert_expr + .columns + .iter() + .map(|c| { + c.0.iter() + .last() + .map(|p| p.as_ident().unwrap().value.clone()) + .ok_or_else(|| { + plan_datafusion_err!("Empty column name in MERGE INSERT") + }) + }) + .collect::>>()?; + + let values = match insert_expr.kind { + ast::MergeInsertKind::Values(values) => { + // VALUES clause has rows; for MERGE INSERT we expect exactly one row + if values.rows.len() != 1 { + return plan_err!( + "MERGE INSERT must have exactly one row of values" + ); + } + values + .rows + .into_iter() + .next() + .unwrap() + .content + .into_iter() + .map(|v| { + self.sql_to_expr(v, combined_schema, planner_context) + }) + .collect::>>()? + } + ast::MergeInsertKind::Row => { + return not_impl_err!("MERGE INSERT ROW is not supported"); + } + }; + + MergeIntoAction::Insert { columns, values } + } + ast::MergeAction::Delete { .. } => MergeIntoAction::Delete, + }; + + Ok(MergeIntoClause { + kind, + predicate, + action, + }) + } + fn insert_to_plan( &self, table_name: ObjectName,