From 7bdfacad9f102ba477b7f09f2f1d4c9aa234e277 Mon Sep 17 00:00:00 2001 From: ochescu Date: Sat, 4 Apr 2026 15:14:57 +0300 Subject: [PATCH 1/4] [HSTACK] Fix MAP schema evolution: allow additive value-struct changes MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DataFusion correctly handles Struct→Struct schema evolution (missing fields filled with nulls, extra fields ignored) but falls back to Arrow's generic cast for Map→Map. Since Arrow's generic cast cannot add missing struct fields, reading Delta tables where the MAP value struct has gained optional fields (additive evolution) fails with: Cannot cast column 'identityMap' from '...(physical data type)' to '...(logical data type)' Fix: 1. schema_rewriter.rs — validate_compat match: add Map arm that recursively calls validate_struct_compatibility on the internal key_value struct, so additive value-struct changes pass the compatibility check. 2. nested_struct.rs — cast_column match: add Map arm via cast_map_column() which casts the key_value StructArray through cast_column (preserving struct evolution semantics) and rebuilds the MapArray with the same offsets and validity bitmap. Tests added: - test_cast_map_with_evolved_value_struct: end-to-end cast of a MAP where the physical value struct (only "id") is evolved to the logical schema (id + primary + authenticatedState). - test_validate_map_value_struct_compatibility: confirms the compatibility check passes for additive value-struct changes. Real-world trigger: AEP identityMap is Map>>. Older Parquet files have fewer fields in the Struct; newer files have more. DataFusion previously failed to read the older files. --- datafusion/common/src/nested_struct.rs | 174 +++++++++++++++++- .../src/schema_rewriter.rs | 10 + 2 files changed, 182 insertions(+), 2 deletions(-) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index cd5e48b13b059..60d93075afe6f 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -17,9 +17,9 @@ use crate::error::{_plan_err, Result}; use arrow::{ - array::{Array, ArrayRef, StructArray, new_null_array}, + array::{Array, ArrayRef, MapArray, StructArray, new_null_array}, compute::{CastOptions, cast_with_options}, - datatypes::{DataType::Struct, Field, FieldRef}, + datatypes::{DataType, DataType::Struct, Field, FieldRef}, }; use std::sync::Arc; @@ -157,6 +157,14 @@ pub fn cast_column( Struct(target_fields) => { cast_struct_column(source_col, target_fields, cast_options) } + // Handle MAP schema evolution: the key type never changes, but the value + // struct may gain optional fields over time (additive schema evolution). + // We cast the internal key_value StructArray through cast_column so that + // the same struct-evolution logic (null-fill missing fields, drop extras) + // applies recursively to the value struct. + DataType::Map(target_kv_field, sorted) => { + cast_map_column(source_col, target_kv_field, *sorted, cast_options) + } _ => Ok(cast_with_options( source_col, target_field.data_type(), @@ -165,6 +173,45 @@ pub fn cast_column( } } +/// Cast a MAP column where the value struct has evolved (additive schema change). +/// +/// Arrow stores MAP as `List>`. We cast the inner +/// key_value StructArray through [`cast_column`] so that missing nullable fields in +/// the source value struct are filled with nulls and extra source fields are ignored — +/// the same behaviour as struct-to-struct evolution. +fn cast_map_column( + source_col: &ArrayRef, + target_kv_field: &FieldRef, + sorted: bool, + cast_options: &CastOptions, +) -> Result { + let Some(source_map) = source_col.as_any().downcast_ref::() else { + return _plan_err!( + "Cannot cast column of type {} to map type. Source must be a map.", + source_col.data_type() + ); + }; + + // Cast the key_value StructArray (key + value fields) using cast_column so + // that struct field evolution is handled recursively. + let entries: ArrayRef = Arc::new(source_map.entries().clone()); + let cast_entries = cast_column(&entries, target_kv_field, cast_options)?; + let Some(cast_struct) = cast_entries.as_any().downcast_ref::() else { + return _plan_err!("cast_map_column: cast entries must produce a StructArray"); + }; + let cast_struct = cast_struct.clone(); + + // Rebuild the MapArray preserving the original offsets and validity bitmap. + let map = MapArray::new( + Arc::clone(target_kv_field), + source_map.offsets().clone(), + cast_struct, + source_map.nulls().cloned(), + sorted, + ); + Ok(Arc::new(map)) +} + /// Validates compatibility between source and target struct fields for casting operations. /// /// This function implements comprehensive struct compatibility checking by examining: @@ -708,4 +755,127 @@ mod tests { assert_eq!(a_col.value(0), 1); assert_eq!(a_col.value(1), 2); } + + /// Simulates the real AEP scenario: identityMap is Map>> + /// in older Parquet files, but the Delta log (logical schema) has evolved to + /// Map>>. + /// The physical files must still be readable — missing value-struct fields + /// should be filled with nulls. + #[test] + fn test_cast_map_with_evolved_value_struct() { + use arrow::array::{ListBuilder, StringBuilder, StructBuilder}; + use arrow::datatypes::Fields; + + // Physical schema: identityMap value struct has only "id" + let phys_value_fields: Fields = vec![ + Arc::new(Field::new("id", DataType::Utf8, true)), + ] + .into(); + + // Build physical MapArray: {"Email": [{id: "abc@example.com"}]} + let mut map_builder = { + let key_builder = StringBuilder::new(); + let value_builder = ListBuilder::new(StructBuilder::new( + phys_value_fields.clone(), + vec![Box::new(StringBuilder::new())], + )); + MapBuilder::new(None, key_builder, value_builder) + }; + // Row 0: {"Email": [{id: "abc"}]} + map_builder.keys().append_value("Email"); + { + let list_builder = map_builder.values(); + let struct_builder = list_builder.values(); + struct_builder.field_builder::(0).unwrap().append_value("abc"); + struct_builder.append(true); + list_builder.append(true); + } + map_builder.append(true).unwrap(); + // Row 1: null map + map_builder.append(false).unwrap(); + + let phys_map: ArrayRef = Arc::new(map_builder.finish()); + + // Logical (target) schema: value struct has "id", "primary", "authenticatedState" + let log_value_fields: Fields = vec![ + Arc::new(Field::new("id", DataType::Utf8, true)), + Arc::new(Field::new("primary", DataType::Boolean, true)), + Arc::new(Field::new("authenticatedState", DataType::Utf8, true)), + ] + .into(); + let log_kv_field = Arc::new(Field::new( + "key_value", + Struct(vec![ + Arc::new(Field::new("key", DataType::Utf8, true)), // nullable to match MapBuilder + Arc::new(Field::new( + "value", + DataType::List(Arc::new(Field::new( + "item", + Struct(log_value_fields), + true, + ))), + true, + )), + ].into()), + false, + )); + let target_field = Field::new( + "identityMap", + DataType::Map(Arc::clone(&log_kv_field), false), + true, + ); + + // The cast should succeed: missing "primary" and "authenticatedState" filled with nulls + let result = cast_column(&phys_map, &target_field, &DEFAULT_CAST_OPTIONS) + .expect("cast_map_column should handle value-struct schema evolution"); + + let result_map = result.as_any().downcast_ref::().unwrap(); + assert!(!result_map.is_null(0)); + assert!(result_map.is_null(1)); + + let entries = result_map.value(0); + let entries_struct = entries.as_any().downcast_ref::().unwrap(); + // The key_value struct should have "key" and "value" fields in the logical schema + assert!(entries_struct.column_by_name("key").is_some()); + assert!(entries_struct.column_by_name("value").is_some()); + } + + /// Validates that the schema_rewriter correctly allows Map→Map schema evolution + /// (the compatibility check must not reject additive value-struct changes). + #[test] + fn test_validate_map_value_struct_compatibility() { + + let phys_kv_fields: Vec> = vec![ + Arc::new(Field::new("key", DataType::Utf8, false)), + Arc::new(Field::new( + "value", + Struct( + vec![Arc::new(Field::new("id", DataType::Utf8, true))].into(), + ), + true, + )), + ]; + let log_kv_fields: Vec> = vec![ + Arc::new(Field::new("key", DataType::Utf8, false)), + Arc::new(Field::new( + "value", + Struct( + vec![ + Arc::new(Field::new("id", DataType::Utf8, true)), + Arc::new(Field::new("primary", DataType::Boolean, true)), + Arc::new(Field::new("authenticatedState", DataType::Utf8, true)), + ] + .into(), + ), + true, + )), + ]; + + // Physical key_value fields (fewer value-struct fields) must be compatible + // with logical key_value fields (more value-struct fields). + let phys_refs: Vec> = phys_kv_fields.into_iter().collect(); + let log_refs: Vec> = log_kv_fields.into_iter().collect(); + validate_struct_compatibility(&phys_refs, &log_refs) + .expect("Map value-struct evolution (additive) should be compatible"); + } } diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index b2bed36f0e740..4cc8b4dc71f9e 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -452,6 +452,16 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => { validate_struct_compatibility(physical_fields, logical_fields)?; } + // Arrow stores MAP as List>. + // Validate the inner key_value struct compatibility so that additive + // schema evolution in the value struct (new optional fields) is allowed. + (DataType::Map(physical_kv, _), DataType::Map(logical_kv, _)) => { + if let (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) = + (physical_kv.data_type(), logical_kv.data_type()) + { + validate_struct_compatibility(physical_fields, logical_fields)?; + } + } _ => { let is_compatible = can_cast_types(physical_field.data_type(), logical_field.data_type()); From 304c40ad8a062cb126fea34fa27166b40cfe9557 Mon Sep 17 00:00:00 2001 From: ochescu Date: Mon, 6 Apr 2026 12:27:06 +0300 Subject: [PATCH 2/4] [HSTACK] Fix MAP schema evolution: allow additive value-struct changes Follow repo conventions: move test imports to mod-level, use field() helpers. --- datafusion/common/src/nested_struct.rs | 79 ++++++++----------- .../src/schema_rewriter.rs | 6 +- 2 files changed, 37 insertions(+), 48 deletions(-) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 60d93075afe6f..1347a8fef9c49 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -307,11 +307,11 @@ mod tests { use crate::format::DEFAULT_CAST_OPTIONS; use arrow::{ array::{ - BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, MapArray, - MapBuilder, StringArray, StringBuilder, + BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, ListBuilder, + MapArray, MapBuilder, StringArray, StringBuilder, StructBuilder, }, buffer::NullBuffer, - datatypes::{DataType, Field, FieldRef, Int32Type}, + datatypes::{DataType, Field, FieldRef, Fields, Int32Type}, }; /// Macro to extract and downcast a column from a StructArray macro_rules! get_column_as { @@ -763,14 +763,9 @@ mod tests { /// should be filled with nulls. #[test] fn test_cast_map_with_evolved_value_struct() { - use arrow::array::{ListBuilder, StringBuilder, StructBuilder}; - use arrow::datatypes::Fields; - // Physical schema: identityMap value struct has only "id" - let phys_value_fields: Fields = vec![ - Arc::new(Field::new("id", DataType::Utf8, true)), - ] - .into(); + let phys_value_fields: Fields = + vec![Arc::new(field("id", DataType::Utf8))].into(); // Build physical MapArray: {"Email": [{id: "abc@example.com"}]} let mut map_builder = { @@ -786,7 +781,10 @@ mod tests { { let list_builder = map_builder.values(); let struct_builder = list_builder.values(); - struct_builder.field_builder::(0).unwrap().append_value("abc"); + struct_builder + .field_builder::(0) + .unwrap() + .append_value("abc"); struct_builder.append(true); list_builder.append(true); } @@ -798,31 +796,27 @@ mod tests { // Logical (target) schema: value struct has "id", "primary", "authenticatedState" let log_value_fields: Fields = vec![ - Arc::new(Field::new("id", DataType::Utf8, true)), - Arc::new(Field::new("primary", DataType::Boolean, true)), - Arc::new(Field::new("authenticatedState", DataType::Utf8, true)), + Arc::new(field("id", DataType::Utf8)), + Arc::new(field("primary", DataType::Boolean)), + Arc::new(field("authenticatedState", DataType::Utf8)), ] .into(); - let log_kv_field = Arc::new(Field::new( + let log_kv_field = Arc::new(non_null_field( "key_value", - Struct(vec![ - Arc::new(Field::new("key", DataType::Utf8, true)), // nullable to match MapBuilder - Arc::new(Field::new( - "value", - DataType::List(Arc::new(Field::new( - "item", - Struct(log_value_fields), - true, - ))), - true, - )), - ].into()), - false, + Struct( + vec![ + Arc::new(field("key", DataType::Utf8)), // nullable to match MapBuilder + Arc::new(field( + "value", + DataType::List(Arc::new(field("item", Struct(log_value_fields)))), + )), + ] + .into(), + ), )); - let target_field = Field::new( + let target_field = field( "identityMap", DataType::Map(Arc::clone(&log_kv_field), false), - true, ); // The cast should succeed: missing "primary" and "authenticatedState" filled with nulls @@ -844,38 +838,31 @@ mod tests { /// (the compatibility check must not reject additive value-struct changes). #[test] fn test_validate_map_value_struct_compatibility() { - let phys_kv_fields: Vec> = vec![ - Arc::new(Field::new("key", DataType::Utf8, false)), - Arc::new(Field::new( + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( "value", - Struct( - vec![Arc::new(Field::new("id", DataType::Utf8, true))].into(), - ), - true, + Struct(vec![Arc::new(field("id", DataType::Utf8))].into()), )), ]; let log_kv_fields: Vec> = vec![ - Arc::new(Field::new("key", DataType::Utf8, false)), - Arc::new(Field::new( + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( "value", Struct( vec![ - Arc::new(Field::new("id", DataType::Utf8, true)), - Arc::new(Field::new("primary", DataType::Boolean, true)), - Arc::new(Field::new("authenticatedState", DataType::Utf8, true)), + Arc::new(field("id", DataType::Utf8)), + Arc::new(field("primary", DataType::Boolean)), + Arc::new(field("authenticatedState", DataType::Utf8)), ] .into(), ), - true, )), ]; // Physical key_value fields (fewer value-struct fields) must be compatible // with logical key_value fields (more value-struct fields). - let phys_refs: Vec> = phys_kv_fields.into_iter().collect(); - let log_refs: Vec> = log_kv_fields.into_iter().collect(); - validate_struct_compatibility(&phys_refs, &log_refs) + validate_struct_compatibility(&phys_kv_fields, &log_kv_fields) .expect("Map value-struct evolution (additive) should be compatible"); } } diff --git a/datafusion/physical-expr-adapter/src/schema_rewriter.rs b/datafusion/physical-expr-adapter/src/schema_rewriter.rs index 4cc8b4dc71f9e..b74e8ba68cb89 100644 --- a/datafusion/physical-expr-adapter/src/schema_rewriter.rs +++ b/datafusion/physical-expr-adapter/src/schema_rewriter.rs @@ -456,8 +456,10 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> { // Validate the inner key_value struct compatibility so that additive // schema evolution in the value struct (new optional fields) is allowed. (DataType::Map(physical_kv, _), DataType::Map(logical_kv, _)) => { - if let (DataType::Struct(physical_fields), DataType::Struct(logical_fields)) = - (physical_kv.data_type(), logical_kv.data_type()) + if let ( + DataType::Struct(physical_fields), + DataType::Struct(logical_fields), + ) = (physical_kv.data_type(), logical_kv.data_type()) { validate_struct_compatibility(physical_fields, logical_fields)?; } From acc8123d47d70f60b55307c957180cc2235a07dd Mon Sep 17 00:00:00 2001 From: ochescu Date: Mon, 6 Apr 2026 12:30:22 +0300 Subject: [PATCH 3/4] [HSTACK] Fix MAP schema evolution: add negative incompatibility test --- datafusion/common/src/nested_struct.rs | 33 ++++++++++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 1347a8fef9c49..41d3aaad54fc5 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -865,4 +865,37 @@ mod tests { validate_struct_compatibility(&phys_kv_fields, &log_kv_fields) .expect("Map value-struct evolution (additive) should be compatible"); } + + /// Validates that incompatible MAP value-struct changes are rejected. + /// A type change on an existing field (Binary → Int32) must fail — this + /// exercises the new (Map, Map) code path and confirms errors surface there, + /// not silently through the fallback cast. + #[test] + fn test_validate_map_value_struct_incompatibility() { + // Physical: Map(value: Struct({id: Binary})) + // Logical: Map(value: Struct({id: Int32})) — type change, not additive + let phys_kv_fields: Vec> = vec![ + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( + "value", + Struct(vec![Arc::new(field("id", DataType::Binary))].into()), + )), + ]; + let log_kv_fields: Vec> = vec![ + Arc::new(non_null_field("key", DataType::Utf8)), + Arc::new(field( + "value", + Struct(vec![Arc::new(field("id", DataType::Int32))].into()), + )), + ]; + + // Binary → Int32 is incompatible: must fail via the Map code path. + let result = validate_struct_compatibility(&phys_kv_fields, &log_kv_fields); + assert!(result.is_err()); + let error_msg = result.unwrap_err().to_string(); + assert!( + error_msg.contains("id"), + "error should name the incompatible field" + ); + } } From 14fb369ffe8bcfd58691b57c359ae82a2821801b Mon Sep 17 00:00:00 2001 From: ochescu Date: Mon, 6 Apr 2026 12:33:07 +0300 Subject: [PATCH 4/4] [HSTACK] Fix MAP schema evolution: remove domain-specific references from comments --- datafusion/common/src/nested_struct.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/datafusion/common/src/nested_struct.rs b/datafusion/common/src/nested_struct.rs index 41d3aaad54fc5..3b2cdb308d719 100644 --- a/datafusion/common/src/nested_struct.rs +++ b/datafusion/common/src/nested_struct.rs @@ -756,14 +756,14 @@ mod tests { assert_eq!(a_col.value(1), 2); } - /// Simulates the real AEP scenario: identityMap is Map>> - /// in older Parquet files, but the Delta log (logical schema) has evolved to - /// Map>>. - /// The physical files must still be readable — missing value-struct fields - /// should be filled with nulls. + /// Verifies that a MAP column whose value struct has gained optional fields + /// can still be read from older Parquet files (additive schema evolution). + /// Physical: Map>> + /// Logical: Map>> + /// Missing value-struct fields must be filled with nulls. #[test] fn test_cast_map_with_evolved_value_struct() { - // Physical schema: identityMap value struct has only "id" + // Physical schema: value struct has only "id" let phys_value_fields: Fields = vec![Arc::new(field("id", DataType::Utf8))].into(); @@ -814,10 +814,8 @@ mod tests { .into(), ), )); - let target_field = field( - "identityMap", - DataType::Map(Arc::clone(&log_kv_field), false), - ); + let target_field = + field("my_map", DataType::Map(Arc::clone(&log_kv_field), false)); // The cast should succeed: missing "primary" and "authenticatedState" filled with nulls let result = cast_column(&phys_map, &target_field, &DEFAULT_CAST_OPTIONS)