From d555f308b802e845b100ad27f59f0342bc0acd7e Mon Sep 17 00:00:00 2001 From: Adam Alani Date: Mon, 1 Jun 2026 10:29:48 +0200 Subject: [PATCH 1/3] feat: add map_transform higher-order function (alias transform_values) Introduce a new higher-order UDF that returns a new map by applying a lambda `(k, v) -> expr` to each entry of the input map, transforming the values while preserving the keys. Registered under the alias `transform_values` via `with_aliases` from #22593. Also fix a latent indexing bug in `LambdaExpr`: when a multi-parameter lambda referenced only a subset of its declared parameters (e.g. `(k, v) -> v * 2`), the compressed column index map could shift lambda params into outer capture slots. `LambdaExpr` now tracks `outer_columns_count` and remaps indices so outer captures and lambda params keep stable positions regardless of which params are used. --- datafusion/functions-nested/src/lib.rs | 8 + .../functions-nested/src/map_transform.rs | 462 ++++++++++++++++++ .../physical-expr/src/expressions/lambda.rs | 64 ++- .../src/higher_order_function.rs | 12 +- datafusion/physical-expr/src/planner.rs | 18 +- .../test_files/map/map_transform.slt | 76 +++ 6 files changed, 622 insertions(+), 18 deletions(-) create mode 100644 datafusion/functions-nested/src/map_transform.rs create mode 100644 datafusion/sqllogictest/test_files/map/map_transform.slt diff --git a/datafusion/functions-nested/src/lib.rs b/datafusion/functions-nested/src/lib.rs index bd473394ec9a6..4be8ba066c0d0 100644 --- a/datafusion/functions-nested/src/lib.rs +++ b/datafusion/functions-nested/src/lib.rs @@ -67,6 +67,7 @@ pub mod map; pub mod map_entries; pub mod map_extract; pub mod map_keys; +pub mod map_transform; pub mod map_values; pub mod min_max; pub mod planner; @@ -123,6 +124,7 @@ pub mod expr_fn { pub use super::map_entries::map_entries; pub use super::map_extract::map_extract; pub use super::map_keys::map_keys; + pub use super::map_transform::map_transform; pub use super::map_values::map_values; pub use super::min_max::array_max; pub use super::min_max::array_min; @@ -211,6 +213,12 @@ pub fn all_default_higher_order_functions() -> Vec> { array_any_match::array_any_match_higher_order_function(), array_filter::array_filter_higher_order_function(), array_transform::array_transform_higher_order_function(), + Arc::new( + map_transform::map_transform_higher_order_function() + .as_ref() + .clone() + .with_aliases(["transform_values"]), + ), ] } diff --git a/datafusion/functions-nested/src/map_transform.rs b/datafusion/functions-nested/src/map_transform.rs new file mode 100644 index 0000000000000..31454b51eb7e9 --- /dev/null +++ b/datafusion/functions-nested/src/map_transform.rs @@ -0,0 +1,462 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! [`datafusion_expr::HigherOrderUDF`] definitions for map_transform function. + +use arrow::{ + array::{Array, AsArray, MapArray, StructArray}, + buffer::OffsetBuffer, + compute::take_arrays, + datatypes::{DataType, Field, FieldRef, Fields}, +}; +use datafusion_common::{ + Result, ScalarValue, exec_err, plan_err, + utils::{list_values_row_number, take_function_args}, +}; +use datafusion_expr::{ + ColumnarValue, Documentation, HigherOrderFunctionArgs, HigherOrderReturnFieldArgs, + HigherOrderSignature, HigherOrderUDFImpl, LambdaParametersProgress, ValueOrLambda, + Volatility, +}; +use datafusion_macros::user_doc; +use std::sync::Arc; + +use crate::utils::get_map_entry_field; + +make_higher_order_function_expr_and_func!( + MapTransform, + map_transform, + map lambda, + "transforms the values of a map", + map_transform_higher_order_function +); + +#[user_doc( + doc_section(label = "Map Functions"), + description = "Returns a map that applies the lambda to each entry of the map and \ + transforms the values. The keys are preserved unchanged.", + syntax_example = "map_transform(map, (k, v) -> expr)", + sql_example = r#"```sql +> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10); ++--------------------------------------------------------------+ +| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) | ++--------------------------------------------------------------+ +| {a: 10, b: 20, c: 30} | ++--------------------------------------------------------------+ +```"#, + argument( + name = "map", + description = "Map expression. Can be a constant, column, or function, and any combination of map operators." + ), + argument( + name = "lambda", + description = "Lambda accepting two parameters `(key, value)`. The return value is used as the new value for the entry." + ) +)] +#[derive(Debug, PartialEq, Eq, Hash)] +pub struct MapTransform { + signature: HigherOrderSignature, + aliases: Vec, +} + +impl Default for MapTransform { + fn default() -> Self { + Self::new() + } +} + +impl MapTransform { + pub fn new() -> Self { + Self { + signature: HigherOrderSignature::exact( + vec![ValueOrLambda::Value(()), ValueOrLambda::Lambda(())], + Volatility::Immutable, + ), + aliases: vec![], + } + } +} + +impl HigherOrderUDFImpl for MapTransform { + fn name(&self) -> &str { + "map_transform" + } + + fn aliases(&self) -> &[String] { + &self.aliases + } + + fn signature(&self) -> &HigherOrderSignature { + &self.signature + } + + fn coerce_value_types(&self, arg_types: &[DataType]) -> Result> { + let [map_type] = take_function_args(self.name(), arg_types)?; + match map_type { + DataType::Map(_, _) => Ok(vec![map_type.clone()]), + other => plan_err!( + "{} expected a map as first argument, got {other}", + self.name() + ), + } + } + + fn lambda_parameters( + &self, + _step: usize, + fields: &[ValueOrLambda>], + ) -> Result { + let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(_lambda)] = + take_function_args(self.name(), fields)? + else { + return plan_err!("{} expects a map followed by a lambda", self.name()); + }; + + let entry_fields = get_map_entry_field(map.data_type())?; + let key_field = Arc::clone(entry_fields.first().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "{} expected map entries to contain a key field", + self.name() + )) + })?); + let value_field = Arc::clone(entry_fields.last().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "{} expected map entries to contain a value field", + self.name() + )) + })?); + + Ok(LambdaParametersProgress::Complete(vec![vec![ + key_field, + value_field, + ]])) + } + + fn return_field_from_args( + &self, + args: HigherOrderReturnFieldArgs, + ) -> Result> { + let [ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)] = + take_function_args(self.name(), args.arg_fields)? + else { + return plan_err!("{} expects a map followed by a lambda", self.name()); + }; + + let (entries_field, ordered_keys) = match map.data_type() { + DataType::Map(field, ordered) => (Arc::clone(field), *ordered), + other => return plan_err!("expected map, got {other}"), + }; + + let entry_fields = match entries_field.data_type() { + DataType::Struct(fields) => fields.clone(), + other => { + return plan_err!("expected map entries to be a struct, got {other}"); + } + }; + + let key_field = Arc::clone(entry_fields.first().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "{} expected map entries to contain a key field", + self.name() + )) + })?); + + let original_value_field = entry_fields.last().ok_or_else(|| { + datafusion_common::DataFusionError::Internal(format!( + "{} expected map entries to contain a value field", + self.name() + )) + })?; + + let new_value_field = Arc::new(Field::new( + original_value_field.name(), + lambda.data_type().clone(), + lambda.is_nullable(), + )); + + let new_entries_struct = + DataType::Struct(Fields::from(vec![key_field, new_value_field])); + let new_entries_field = Arc::new(Field::new( + entries_field.name(), + new_entries_struct, + entries_field.is_nullable(), + )); + + Ok(Arc::new(Field::new( + "", + DataType::Map(new_entries_field, ordered_keys), + map.is_nullable(), + ))) + } + + fn invoke_with_args(&self, args: HigherOrderFunctionArgs) -> Result { + let [map, lambda] = take_function_args(self.name(), &args.args)?; + let (ValueOrLambda::Value(map), ValueOrLambda::Lambda(lambda)) = (map, lambda) + else { + return plan_err!("{} expects a map followed by a lambda", self.name()); + }; + + let map_array_dyn = map.to_array(args.number_rows)?; + let map_array = match map_array_dyn.data_type() { + DataType::Map(_, _) => map_array_dyn.as_map(), + other => return exec_err!("{} expected a map, got {other}", self.name()), + }; + + // Fast path: every row is null. + if map_array.null_count() == map_array.len() { + return Ok(ColumnarValue::Scalar(ScalarValue::try_new_null( + args.return_type(), + )?)); + } + + let offsets = map_array.offsets(); + let first = offsets.first().copied().unwrap_or(0) as usize; + let last = offsets.last().copied().unwrap_or(0) as usize; + let len = last - first; + + let new_entries_field = match args.return_field.data_type() { + DataType::Map(field, _) => Arc::clone(field), + other => { + return exec_err!( + "{} expected return_field to be a map, got {other}", + self.name() + ); + } + }; + let (new_key_field, new_value_field) = match new_entries_field.data_type() { + DataType::Struct(fields) if fields.len() == 2 => { + (Arc::clone(&fields[0]), Arc::clone(&fields[1])) + } + other => { + return exec_err!( + "{} expected map entries struct with two fields, got {other}", + self.name() + ); + } + }; + + let flat_keys = if first == 0 && last == map_array.keys().len() { + Arc::clone(map_array.keys()) + } else { + map_array.keys().slice(first, len) + }; + let flat_values = if first == 0 && last == map_array.values().len() { + Arc::clone(map_array.values()) + } else { + map_array.values().slice(first, len) + }; + + // Fast path: no entries at all and the map has no nulls — return an empty map + // mirroring the input row count. + if len == 0 && map_array.null_count() == 0 { + return Ok(ColumnarValue::Scalar(ScalarValue::new_default( + args.return_type(), + )?)); + } + + let keys_param = || Ok(Arc::clone(&flat_keys)); + let values_param = || Ok(Arc::clone(&flat_values)); + + let transformed_values = lambda + .evaluate(&[&keys_param, &values_param], |arrays| { + let indices = list_values_row_number(&map_array_dyn)?; + Ok(take_arrays(arrays, &indices, None)?) + })? + .into_array(len)?; + + let adjusted_offsets = if first == 0 { + offsets.clone() + } else { + let first_i32 = first as i32; + let adjusted = offsets.iter().map(|o| *o - first_i32).collect(); + OffsetBuffer::new(adjusted) + }; + + let new_entries = StructArray::try_new( + Fields::from(vec![new_key_field, new_value_field]), + vec![Arc::clone(&flat_keys), transformed_values], + None, + )?; + + let new_map = MapArray::try_new( + new_entries_field, + adjusted_offsets, + new_entries, + map_array.nulls().cloned(), + matches!(map_array_dyn.data_type(), DataType::Map(_, true)), + )?; + + Ok(ColumnarValue::Array(Arc::new(new_map))) + } + + fn documentation(&self) -> Option<&Documentation> { + self.doc() + } +} + +#[cfg(test)] +mod tests { + use std::{collections::HashMap, sync::Arc}; + + use arrow::{ + array::{ + Array, ArrayRef, Int32Array, MapArray, RecordBatch, StringArray, StructArray, + }, + buffer::{NullBuffer, OffsetBuffer}, + datatypes::{DataType, Field, Fields}, + }; + use datafusion_common::{DFSchema, Result}; + use datafusion_expr::{ + Expr, col, + execution_props::ExecutionProps, + expr::{HigherOrderFunction, LambdaVariable}, + lambda, lit, + }; + use datafusion_physical_expr::create_physical_expr; + + use crate::map_transform::map_transform_higher_order_function; + + fn create_str_int_map( + keys: Vec<&str>, + values: Vec>, + offsets: OffsetBuffer, + nulls: Option, + ) -> MapArray { + let key_field = Arc::new(Field::new("key", DataType::Utf8, false)); + let value_field = Arc::new(Field::new("value", DataType::Int32, true)); + let entries_fields = + Fields::from(vec![Arc::clone(&key_field), Arc::clone(&value_field)]); + + let keys_array: ArrayRef = Arc::new(StringArray::from(keys)); + let values_array: ArrayRef = Arc::new(Int32Array::from(values)); + + let entries = StructArray::new( + entries_fields.clone(), + vec![keys_array, values_array], + None, + ); + let entries_field = Arc::new(Field::new( + "entries", + DataType::Struct(entries_fields), + false, + )); + + MapArray::new(entries_field, offsets, entries, nulls, false) + } + + fn eval_map_transform( + map: MapArray, + lambda_body: Expr, + param_names: [&str; 2], + ) -> Result { + let schema = DFSchema::from_unqualified_fields( + vec![Field::new("m", map.data_type().clone(), map.is_nullable())].into(), + HashMap::new(), + )?; + let len = map.len(); + + create_physical_expr( + &Expr::HigherOrderFunction(HigherOrderFunction::new( + map_transform_higher_order_function(), + vec![col("m"), lambda(param_names, lambda_body)], + )), + &schema, + &ExecutionProps::new(), + )? + .evaluate(&RecordBatch::try_new( + Arc::clone(schema.inner()), + vec![Arc::new(map.clone())], + )?)? + .into_array(len) + } + + fn value_var(name: &str) -> Expr { + Expr::LambdaVariable(LambdaVariable::new( + name.to_string(), + Some(Arc::new(Field::new(name, DataType::Int32, true))), + )) + } + + #[test] + fn map_transform_doubles_values() { + let map = create_str_int_map( + vec!["a", "b", "c"], + vec![Some(1), Some(2), Some(3)], + OffsetBuffer::::from_lengths(vec![3]), + None, + ); + + let result = + eval_map_transform(map, value_var("v") * lit(2i32), ["k", "v"]).unwrap(); + let result_map = result.as_any().downcast_ref::().unwrap(); + let result_values = result_map + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(result_values.values(), &[2, 4, 6]); + } + + #[test] + fn map_transform_uses_keys_via_case() { + let map = create_str_int_map( + vec!["a", "b"], + vec![Some(1), Some(2)], + OffsetBuffer::::from_lengths(vec![2]), + None, + ); + + let key_var = Expr::LambdaVariable(LambdaVariable::new( + "k".to_string(), + Some(Arc::new(Field::new("k", DataType::Utf8, false))), + )); + // (k, v) -> case when k = 'a' then v + 100 else v end + let lambda_body = Expr::Case(datafusion_expr::expr::Case { + expr: None, + when_then_expr: vec![( + Box::new(key_var.eq(lit("a"))), + Box::new(value_var("v") + lit(100i32)), + )], + else_expr: Some(Box::new(value_var("v"))), + }); + let result = eval_map_transform(map, lambda_body, ["k", "v"]).unwrap(); + let result_map = result.as_any().downcast_ref::().unwrap(); + let result_values = result_map + .values() + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(result_values.values(), &[101, 2]); + } + + #[test] + fn map_transform_preserves_null_rows() { + let map = create_str_int_map( + vec!["a", "b", "c", "d"], + vec![Some(1), Some(2), Some(3), Some(4)], + OffsetBuffer::::from_lengths(vec![2, 2]), + Some(NullBuffer::from(vec![true, false])), + ); + + let result = + eval_map_transform(map, value_var("v") + lit(10i32), ["k", "v"]).unwrap(); + let result_map = result.as_any().downcast_ref::().unwrap(); + assert!(result_map.is_valid(0)); + assert!(!result_map.is_valid(1)); + assert_eq!(result_map.value_length(0), 2); + } +} diff --git a/datafusion/physical-expr/src/expressions/lambda.rs b/datafusion/physical-expr/src/expressions/lambda.rs index 9275821ae9150..a48470df00697 100644 --- a/datafusion/physical-expr/src/expressions/lambda.rs +++ b/datafusion/physical-expr/src/expressions/lambda.rs @@ -43,6 +43,11 @@ pub struct LambdaExpr { body: Arc, projected_body: Arc, projection: Vec, + /// Number of columns in the outer input schema. Column/LambdaVariable + /// indices below this value reference outer captures; indices at or above + /// reference lambda parameters (whose position in the merged evaluation + /// batch is fixed by the higher-order function, not by the projection). + outer_columns_count: usize, } // Manually derive PartialEq and Hash to work around https://github.com/rust-lang/rust/issues/78808 [https://github.com/apache/datafusion/issues/13196] @@ -60,8 +65,19 @@ impl Hash for LambdaExpr { } impl LambdaExpr { - /// Create a new lambda expression with the given parameters and body - pub fn try_new(params: Vec, body: Arc) -> Result { + /// Create a new lambda expression with the given parameters and body. + /// + /// `outer_columns_count` is the number of columns in the outer input + /// schema this lambda was planned against. Column/LambdaVariable indices + /// strictly below `outer_columns_count` reference outer captures and get + /// compressed to the front of the evaluation batch; indices at or above + /// reference lambda parameters and keep their fixed position relative to + /// the captures (so unused lambda parameters do not shift used ones). + pub fn try_new( + params: Vec, + body: Arc, + outer_columns_count: usize, + ) -> Result { if !all_unique(¶ms) { return plan_err!( "lambda params must be unique, got ({})", @@ -71,10 +87,14 @@ impl LambdaExpr { check_async_udf(&body)?; - Ok(Self::new(params, body)) + Ok(Self::new(params, body, outer_columns_count)) } - fn new(params: Vec, body: Arc) -> Self { + fn new( + params: Vec, + body: Arc, + outer_columns_count: usize, + ) -> Self { let mut used_column_indices = HashSet::new(); body.apply(|node| { @@ -92,10 +112,26 @@ impl LambdaExpr { projection.sort(); + // Captures (outer column refs) get compressed to the front of the + // merged batch. Lambda variables (indices >= outer_columns_count) + // keep their fixed offset from the start of the lambda parameter + // block, because the higher-order function always pushes all + // declared parameters into the merged batch in order. + let used_captures_count = projection + .iter() + .filter(|i| **i < outer_columns_count) + .count(); let column_index_map = projection .iter() .enumerate() - .map(|(projected, original)| (*original, projected)) + .map(|(captures_pos, original)| { + let projected = if *original < outer_columns_count { + captures_pos + } else { + used_captures_count + (*original - outer_columns_count) + }; + (*original, projected) + }) .collect::>(); let projected_body = Arc::clone(&body) @@ -129,6 +165,7 @@ impl LambdaExpr { body, projected_body, projection, + outer_columns_count, } } @@ -187,7 +224,11 @@ impl PhysicalExpr for LambdaExpr { check_async_udf(body)?; - Ok(Arc::new(Self::new(self.params.clone(), Arc::clone(body)))) + Ok(Arc::new(Self::new( + self.params.clone(), + Arc::clone(body), + self.outer_columns_count, + ))) } fn fmt_sql(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -195,14 +236,19 @@ impl PhysicalExpr for LambdaExpr { } } -/// Create a lambda expression +/// Create a lambda expression. +/// +/// `outer_columns_count` is the number of columns in the outer input schema +/// this lambda was planned against. See [`LambdaExpr::try_new`] for details. pub fn lambda( params: impl IntoIterator>, body: Arc, + outer_columns_count: usize, ) -> Result> { Ok(Arc::new(LambdaExpr::try_new( params.into_iter().map(Into::into).collect(), body, + outer_columns_count, )?)) } @@ -240,13 +286,13 @@ mod tests { #[test] fn test_lambda_evaluate() { - let lambda = lambda(["a"], Arc::new(NoOp::new())).unwrap(); + let lambda = lambda(["a"], Arc::new(NoOp::new()), 0).unwrap(); let batch = RecordBatch::new_empty(Arc::new(Schema::empty())); assert!(lambda.evaluate(&batch).is_err()); } #[test] fn test_lambda_duplicate_name() { - assert!(lambda(["a", "a"], Arc::new(NoOp::new())).is_err()); + assert!(lambda(["a", "a"], Arc::new(NoOp::new()), 0).is_err()); } } diff --git a/datafusion/physical-expr/src/higher_order_function.rs b/datafusion/physical-expr/src/higher_order_function.rs index 7390eb33a0922..e2c2a47516499 100644 --- a/datafusion/physical-expr/src/higher_order_function.rs +++ b/datafusion/physical-expr/src/higher_order_function.rs @@ -628,7 +628,7 @@ mod tests { let hof = HigherOrderFunctionExpr::try_new_with_schema( fun, - vec![lambda(["a"], Arc::new(Literal::new(expected.clone()))).unwrap()], + vec![lambda(["a"], Arc::new(Literal::new(expected.clone())), 0).unwrap()], &Schema::empty(), Arc::new(ConfigOptions::new()), ) @@ -664,10 +664,12 @@ mod tests { let hof = HigherOrderFunctionExpr::try_new_with_schema( fun, vec![ - not( - lambda(["a"], Arc::new(Literal::new(ScalarValue::Int32(Some(42))))) - .unwrap(), + not(lambda( + ["a"], + Arc::new(Literal::new(ScalarValue::Int32(Some(42)))), + 0, ) + .unwrap()) .unwrap(), ], &Schema::empty(), @@ -707,7 +709,7 @@ mod tests { .unwrap(); let result = Arc::new(hof) - .with_new_children(vec![lambda(["a"], Arc::new(NoOp::new())).unwrap()]) + .with_new_children(vec![lambda(["a"], Arc::new(NoOp::new()), 0).unwrap()]) .unwrap_err(); assert_contains!( diff --git a/datafusion/physical-expr/src/planner.rs b/datafusion/physical-expr/src/planner.rs index d0d0508a106a5..d4c4abab78872 100644 --- a/datafusion/physical-expr/src/planner.rs +++ b/datafusion/physical-expr/src/planner.rs @@ -513,10 +513,20 @@ pub fn create_physical_expr( config_options, )?)) } - Expr::Lambda(Lambda { params, body }) => expressions::lambda( - params, - create_physical_expr(body, input_dfschema, execution_props)?, - ), + Expr::Lambda(Lambda { params, body }) => { + // The lambda was planned against a schema that appends the + // lambda's parameters to the outer input schema. Subtract + // `params.len()` to recover the outer column count, which the + // physical lambda needs to distinguish outer captures from + // lambda parameters during projection. + let outer_columns_count = + input_dfschema.fields().len().saturating_sub(params.len()); + expressions::lambda( + params, + create_physical_expr(body, input_dfschema, execution_props)?, + outer_columns_count, + ) + } Expr::LambdaVariable(LambdaVariable { name, field, diff --git a/datafusion/sqllogictest/test_files/map/map_transform.slt b/datafusion/sqllogictest/test_files/map/map_transform.slt new file mode 100644 index 0000000000000..9291c8f009c12 --- /dev/null +++ b/datafusion/sqllogictest/test_files/map/map_transform.slt @@ -0,0 +1,76 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +############# +## map_transform Tests +############# + +statement ok +set datafusion.sql_parser.dialect = databricks; + +# basic: double every value, keys untouched +query ? +SELECT map_transform(map(['a','b','c'], [1,2,3]), (k, v) -> v * 2); +---- +{a: 2, b: 4, c: 6} + +# alias transform_values works +query ? +SELECT transform_values(map(['a','b','c'], [1,2,3]), (k, v) -> v * 2); +---- +{a: 2, b: 4, c: 6} + +# lambda may reference the key +query ? +SELECT map_transform(map(['a','b','c'], [1,2,3]), (k, v) -> case when k = 'b' then v * 100 else v end); +---- +{a: 1, b: 200, c: 3} + +# lambda can change the value type +query ? +SELECT map_transform(map(['a','b'], [1,2]), (k, v) -> concat(k, cast(v as varchar))); +---- +{a: a1, b: b2} + +# captures from outer columns are visible inside the lambda body +statement ok +CREATE TABLE t (m_key array, m_val array, n int) +AS VALUES + (['a', 'b'], [1, 2], 10), + (['a', 'b'], [3, 4], 100); + +query I? +SELECT n, map_transform(map(m_key, m_val), (k, v) -> v + n) from t order by n; +---- +10 {a: 11, b: 12} +100 {a: 103, b: 104} + +statement ok +drop table t; + +# errors +query error DataFusion error: Error during planning: The function 'map_transform' expected 2 argument\(s\) but received 0 +SELECT map_transform(); + +query error DataFusion error: Error during planning: map_transform expected a map as first argument, got Int64 +SELECT map_transform(1, (k, v) -> v); + +query error DataFusion error: Error during planning: lambda defined 3 params but UDF support only 2 +SELECT map_transform(map(['a'], [1]), (a, b, c) -> b); + +statement ok +set datafusion.sql_parser.dialect = generic; From 6734b829c088089f0167192bce8041fe91b2f647 Mon Sep 17 00:00:00 2001 From: Adam Alani Date: Mon, 1 Jun 2026 10:38:22 +0200 Subject: [PATCH 2/3] refactor: drop transform_values alias for map_transform Keep a single canonical name `map_transform` and remove the `transform_values` alias to simplify registration (no more `Arc::new(... .with_aliases(...))` dance in `lib.rs`). --- datafusion/functions-nested/src/lib.rs | 7 +------ datafusion/sqllogictest/test_files/map/map_transform.slt | 6 ------ 2 files changed, 1 insertion(+), 12 deletions(-) diff --git a/datafusion/functions-nested/src/lib.rs b/datafusion/functions-nested/src/lib.rs index 4be8ba066c0d0..4d183ef3fb6b3 100644 --- a/datafusion/functions-nested/src/lib.rs +++ b/datafusion/functions-nested/src/lib.rs @@ -213,12 +213,7 @@ pub fn all_default_higher_order_functions() -> Vec> { array_any_match::array_any_match_higher_order_function(), array_filter::array_filter_higher_order_function(), array_transform::array_transform_higher_order_function(), - Arc::new( - map_transform::map_transform_higher_order_function() - .as_ref() - .clone() - .with_aliases(["transform_values"]), - ), + map_transform::map_transform_higher_order_function(), ] } diff --git a/datafusion/sqllogictest/test_files/map/map_transform.slt b/datafusion/sqllogictest/test_files/map/map_transform.slt index 9291c8f009c12..5e461a86c851c 100644 --- a/datafusion/sqllogictest/test_files/map/map_transform.slt +++ b/datafusion/sqllogictest/test_files/map/map_transform.slt @@ -28,12 +28,6 @@ SELECT map_transform(map(['a','b','c'], [1,2,3]), (k, v) -> v * 2); ---- {a: 2, b: 4, c: 6} -# alias transform_values works -query ? -SELECT transform_values(map(['a','b','c'], [1,2,3]), (k, v) -> v * 2); ----- -{a: 2, b: 4, c: 6} - # lambda may reference the key query ? SELECT map_transform(map(['a','b','c'], [1,2,3]), (k, v) -> case when k = 'b' then v * 100 else v end); From 83ada3deb8afbe6563857a471c1a7cdf2c41f428 Mon Sep 17 00:00:00 2001 From: Adam Alani Date: Mon, 1 Jun 2026 11:14:02 +0200 Subject: [PATCH 3/3] docs: regenerate scalar function docs for map_transform Run `./dev/update_function_docs.sh` so the user-facing function reference picks up the new `map_transform` entry from its `#[user_doc(...)]` annotation. --- .../source/user-guide/sql/scalar_functions.md | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/docs/source/user-guide/sql/scalar_functions.md b/docs/source/user-guide/sql/scalar_functions.md index b615c6bfb3fb2..d3023523a3491 100644 --- a/docs/source/user-guide/sql/scalar_functions.md +++ b/docs/source/user-guide/sql/scalar_functions.md @@ -5211,6 +5211,7 @@ select struct(a as field_a, b) from t; - [map_entries](#map_entries) - [map_extract](#map_extract) - [map_keys](#map_keys) +- [map_transform](#map_transform) - [map_values](#map_values) ### `element_at` @@ -5352,6 +5353,30 @@ SELECT map_keys(map([100, 5], [42, 43])); [100, 5] ``` +### `map_transform` + +Returns a map that applies the lambda to each entry of the map and transforms the values. The keys are preserved unchanged. + +```sql +map_transform(map, (k, v) -> expr) +``` + +#### Arguments + +- **map**: Map expression. Can be a constant, column, or function, and any combination of map operators. +- **lambda**: Lambda accepting two parameters `(key, value)`. The return value is used as the new value for the entry. + +#### Example + +```sql +> select map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10); ++--------------------------------------------------------------+ +| map_transform(MAP {'a': 1, 'b': 2, 'c': 3}, (k, v) -> v * 10) | ++--------------------------------------------------------------+ +| {a: 10, b: 20, c: 30} | ++--------------------------------------------------------------+ +``` + ### `map_values` Returns a list of all values in the map.