diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index cd5e48b13b059..3b2cdb308d719 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -17,9 +17,9 @@ use crate::error::{_plan_err, Result}; use arrow::{ - array::{Array, ArrayRef, StructArray, new_null_array}, + array::{Array, ArrayRef, MapArray, StructArray, new_null_array}, compute::{CastOptions, cast_with_options}, - datatypes::{DataType::Struct, Field, FieldRef}, + datatypes::{DataType, DataType::Struct, Field, FieldRef}, }; use std::sync::Arc; @@ -157,6 +157,14 @@ pub fn cast_column( Struct(target_fields) => { cast_struct_column(source_col, target_fields, cast_options) } + // Handle MAP schema evolution: the key type never changes, but the value + // struct may gain optional fields over time (additive schema evolution). + // We cast the internal key_value StructArray through cast_column so that + // the same struct-evolution logic (null-fill missing fields, drop extras) + // applies recursively to the value struct. + DataType::Map(target_kv_field, sorted) => { + cast_map_column(source_col, target_kv_field, *sorted, cast_options) + } _ => Ok(cast_with_options( source_col, target_field.data_type(), @@ -165,6 +173,45 @@ pub fn cast_column( } } +/// Cast a MAP column where the value struct has evolved (additive schema change). +/// +/// Arrow stores MAP as `List>`. We cast the inner +/// key_value StructArray through [`cast_column`] so that missing nullable fields in +/// the source value struct are filled with nulls and extra source fields are ignored — +/// the same behaviour as struct-to-struct evolution. +fn cast_map_column( + source_col: &ArrayRef, + target_kv_field: &FieldRef, + sorted: bool, + cast_options: &CastOptions, +) -> Result { + let Some(source_map) = source_col.as_any().downcast_ref::() else { + return _plan_err!( + "Cannot cast column of type {} to map type. Source must be a map.", + source_col.data_type() + ); + }; + + // Cast the key_value StructArray (key + value fields) using cast_column so + // that struct field evolution is handled recursively. + let entries: ArrayRef = Arc::new(source_map.entries().clone()); + let cast_entries = cast_column(&entries, target_kv_field, cast_options)?; + let Some(cast_struct) = cast_entries.as_any().downcast_ref::() else { + return _plan_err!("cast_map_column: cast entries must produce a StructArray"); + }; + let cast_struct = cast_struct.clone(); + + // Rebuild the MapArray preserving the original offsets and validity bitmap. + let map = MapArray::new( + Arc::clone(target_kv_field), + source_map.offsets().clone(), + cast_struct, + source_map.nulls().cloned(), + sorted, + ); + Ok(Arc::new(map)) +} + /// Validates compatibility between source and target struct fields for casting operations. /// /// This function implements comprehensive struct compatibility checking by examining: @@ -260,11 +307,11 @@ mod tests { use crate::format::DEFAULT_CAST_OPTIONS; use arrow::{ array::{ - BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, MapArray, - MapBuilder, StringArray, StringBuilder, + BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, ListBuilder, + MapArray, MapBuilder, StringArray, StringBuilder, StructBuilder, }, buffer::NullBuffer, - datatypes::{DataType, Field, FieldRef, Int32Type}, + datatypes::{DataType, Field, FieldRef, Fields, Int32Type}, }; /// Macro to extract and downcast a column from a StructArray macro_rules! get_column_as { @@ -708,4 +755,145 @@ mod tests { assert_eq!(a_col.value(0), 1); assert_eq!(a_col.value(1), 2); } + + /// Verifies that a MAP column whose value struct has gained optional fields + /// can still be read from older Parquet files (additive schema evolution). + /// Physical: Map>> + /// Logical: Map>> + /// Missing value-struct fields must be filled with nulls. + #[test] + fn test_cast_map_with_evolved_value_struct() { + // Physical schema: value struct has only "id" + let phys_value_fields: Fields = + vec![Arc::new(field("id", DataType::Utf8))].into(); + + // Build physical MapArray: {"Email": [{id: "abc@example.com"}]} + let mut map_builder = { + let key_builder = StringBuilder::new(); + let value_builder = ListBuilder::new(StructBuilder::new( + phys_value_fields.clone(), + vec![Box::new(StringBuilder::new())], + )); + MapBuilder::new(None, key_builder, value_builder) + }; + // Row 0: {"Email": [{id: "abc"}]} + map_builder.keys().append_value("Email"); + { + let list_builder = map_builder.values(); + let struct_builder = list_builder.values(); + struct_builder + .field_builder::(0) + .unwrap() + .append_value("abc"); + struct_builder.append(true); + list_builder.append(true); + } + map_builder.append(true).unwrap(); + // Row 1: null map + map_builder.append(false).unwrap(); + + let phys_map: ArrayRef = Arc::new(map_builder.finish()); + + // Logical (target) schema: value struct has "id", "primary", "authenticatedState" + let log_value_fields: Fields = vec![ + Arc::new(field("id", DataType::Utf8)), + Arc::new(field("primary", DataType::Boolean)), + Arc::new(field("authenticatedState", DataType::Utf8)), + ] + .into(); + let log_kv_field = Arc::new(non_null_field( + "key_value", + Struct( + vec![ + Arc::new(field("key", DataType::Utf8)), // nullable to match MapBuilder + Arc::new(field( + "value", + DataType::List(Arc::new(field("item", Struct(log_value_fields)))), + )), + ] + .into(), + ), + )); + let target_field = + field("my_map", DataType::Map(Arc::clone(&log_kv_field), false)); + + // The cast should succeed: missing "primary" and "authenticatedState" filled with nulls + let result = cast_column(&phys_map, &target_field, &DEFAULT_CAST_OPTIONS) + .expect("cast_map_column should handle value-struct schema evolution"); + + let result_map = result.as_any().downcast_ref::().unwrap(); + assert!(!result_map.is_null(0)); + assert!(result_map.is_null(1)); + + let entries = result_map.value(0); + let entries_struct = entries.as_any().downcast_ref::().unwrap(); + // The key_value struct should have "key" and "value" fields in the logical schema + assert!(entries_struct.column_by_name("key").is_some()); + assert!(entries_struct.column_by_name("value").is_some()); + } + + /// Validates that the schema_rewriter correctly allows Map→Map schema evolution + /// (the compatibility check must not reject additive value-struct changes). + #[test] + fn test_validate_map_value_struct_compatibility() { + let phys_kv_fields: Vec> = vec![ + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( + "value", + Struct(vec![Arc::new(field("id", DataType::Utf8))].into()), + )), + ]; + let log_kv_fields: Vec> = vec![ + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( + "value", + Struct( + vec![ + Arc::new(field("id", DataType::Utf8)), + Arc::new(field("primary", DataType::Boolean)), + Arc::new(field("authenticatedState", DataType::Utf8)), + ] + .into(), + ), + )), + ]; + + // Physical key_value fields (fewer value-struct fields) must be compatible + // with logical key_value fields (more value-struct fields). + validate_struct_compatibility(&phys_kv_fields, &log_kv_fields) + .expect("Map value-struct evolution (additive) should be compatible"); + } + + /// Validates that incompatible MAP value-struct changes are rejected. + /// A type change on an existing field (Binary → Int32) must fail — this + /// exercises the new (Map, Map) code path and confirms errors surface there, + /// not silently through the fallback cast. + #[test] + fn test_validate_map_value_struct_incompatibility() { + // Physical: Map(value: Struct({id: Binary})) + // Logical: Map(value: Struct({id: Int32})) — type change, not additive + let phys_kv_fields: Vec> = vec![ + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( + "value", + Struct(vec![Arc::new(field("id", DataType::Binary))].into()), + )), + ]; + let log_kv_fields: Vec> = vec![ + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( + "value", + Struct(vec![Arc::new(field("id", DataType::Int32))].into()), + )), + ]; + + // Binary → Int32 is incompatible: must fail via the Map code path. + let result = validate_struct_compatibility(&phys_kv_fields, &log_kv_fields); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!( + error_msg.contains("id"), + "error should name the incompatible field" + ); + } } diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index b2bed36f0e740..b74e8ba68cb89 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -452,6 +452,18 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => { validate_struct_compatibility(physical_fields, logical_fields)?; } + // Arrow stores MAP as List>. + // Validate the inner key_value struct compatibility so that additive + // schema evolution in the value struct (new optional fields) is allowed. + (DataType::Map(physical_kv, _), DataType::Map(logical_kv, _)) => { + if let ( + DataType::Struct(physical_fields), + DataType::Struct(logical_fields), + ) = (physical_kv.data_type(), logical_kv.data_type()) + { + validate_struct_compatibility(physical_fields, logical_fields)?; + } + } _ => { let is_compatible = can_cast_types(physical_field.data_type(), logical_field.data_type());