Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 193 additions & 5 deletions datafusion/common/src/nested_struct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@

use crate::error::{_plan_err, Result};
use arrow::{
array::{Array, ArrayRef, StructArray, new_null_array},
array::{Array, ArrayRef, MapArray, StructArray, new_null_array},
compute::{CastOptions, cast_with_options},
datatypes::{DataType::Struct, Field, FieldRef},
datatypes::{DataType, DataType::Struct, Field, FieldRef},
};
use std::sync::Arc;

Expand Down Expand Up @@ -157,6 +157,14 @@ pub fn cast_column(
Struct(target_fields) => {
cast_struct_column(source_col, target_fields, cast_options)
}
// Handle MAP schema evolution: the key type never changes, but the value
// struct may gain optional fields over time (additive schema evolution).
// We cast the internal key_value StructArray through cast_column so that
// the same struct-evolution logic (null-fill missing fields, drop extras)
// applies recursively to the value struct.
DataType::Map(target_kv_field, sorted) => {
cast_map_column(source_col, target_kv_field, *sorted, cast_options)
}
_ => Ok(cast_with_options(
source_col,
target_field.data_type(),
Expand All @@ -165,6 +173,45 @@ pub fn cast_column(
}
}

/// Cast a MAP column where the value struct has evolved (additive schema change).
///
/// Arrow stores MAP as `List<key_value: Struct<key, value>>`. We cast the inner
/// key_value StructArray through [`cast_column`] so that missing nullable fields in
/// the source value struct are filled with nulls and extra source fields are ignored —
/// the same behaviour as struct-to-struct evolution.
fn cast_map_column(
source_col: &ArrayRef,
target_kv_field: &FieldRef,
sorted: bool,
cast_options: &CastOptions,
) -> Result<ArrayRef> {
let Some(source_map) = source_col.as_any().downcast_ref::<MapArray>() else {
return _plan_err!(
"Cannot cast column of type {} to map type. Source must be a map.",
source_col.data_type()
);
};

// Cast the key_value StructArray (key + value fields) using cast_column so
// that struct field evolution is handled recursively.
let entries: ArrayRef = Arc::new(source_map.entries().clone());
let cast_entries = cast_column(&entries, target_kv_field, cast_options)?;
let Some(cast_struct) = cast_entries.as_any().downcast_ref::<StructArray>() else {
return _plan_err!("cast_map_column: cast entries must produce a StructArray");
};
let cast_struct = cast_struct.clone();

// Rebuild the MapArray preserving the original offsets and validity bitmap.
let map = MapArray::new(
Arc::clone(target_kv_field),
source_map.offsets().clone(),
cast_struct,
source_map.nulls().cloned(),
sorted,
);
Ok(Arc::new(map))
}

/// Validates compatibility between source and target struct fields for casting operations.
///
/// This function implements comprehensive struct compatibility checking by examining:
Expand Down Expand Up @@ -260,11 +307,11 @@ mod tests {
use crate::format::DEFAULT_CAST_OPTIONS;
use arrow::{
array::{
BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, MapArray,
MapBuilder, StringArray, StringBuilder,
BinaryArray, Int32Array, Int32Builder, Int64Array, ListArray, ListBuilder,
MapArray, MapBuilder, StringArray, StringBuilder, StructBuilder,
},
buffer::NullBuffer,
datatypes::{DataType, Field, FieldRef, Int32Type},
datatypes::{DataType, Field, FieldRef, Fields, Int32Type},
};
/// Macro to extract and downcast a column from a StructArray
macro_rules! get_column_as {
Expand Down Expand Up @@ -708,4 +755,145 @@ mod tests {
assert_eq!(a_col.value(0), 1);
assert_eq!(a_col.value(1), 2);
}

/// Verifies that a MAP column whose value struct has gained optional fields
/// can still be read from older Parquet files (additive schema evolution).
/// Physical: Map<String, List<Struct<id>>>
/// Logical: Map<String, List<Struct<id, primary, authenticatedState>>>
/// Missing value-struct fields must be filled with nulls.
#[test]
fn test_cast_map_with_evolved_value_struct() {
// Physical schema: value struct has only "id"
let phys_value_fields: Fields =
vec![Arc::new(field("id", DataType::Utf8))].into();

// Build physical MapArray: {"Email": [{id: "abc@example.com"}]}
let mut map_builder = {
let key_builder = StringBuilder::new();
let value_builder = ListBuilder::new(StructBuilder::new(
phys_value_fields.clone(),
vec![Box::new(StringBuilder::new())],
));
MapBuilder::new(None, key_builder, value_builder)
};
// Row 0: {"Email": [{id: "abc"}]}
map_builder.keys().append_value("Email");
{
let list_builder = map_builder.values();
let struct_builder = list_builder.values();
struct_builder
.field_builder::<StringBuilder>(0)
.unwrap()
.append_value("abc");
struct_builder.append(true);
list_builder.append(true);
}
map_builder.append(true).unwrap();
// Row 1: null map
map_builder.append(false).unwrap();

let phys_map: ArrayRef = Arc::new(map_builder.finish());

// Logical (target) schema: value struct has "id", "primary", "authenticatedState"
let log_value_fields: Fields = vec![
Arc::new(field("id", DataType::Utf8)),
Arc::new(field("primary", DataType::Boolean)),
Arc::new(field("authenticatedState", DataType::Utf8)),
]
.into();
let log_kv_field = Arc::new(non_null_field(
"key_value",
Struct(
vec![
Arc::new(field("key", DataType::Utf8)), // nullable to match MapBuilder
Arc::new(field(
"value",
DataType::List(Arc::new(field("item", Struct(log_value_fields)))),
)),
]
.into(),
),
));
let target_field =
field("my_map", DataType::Map(Arc::clone(&log_kv_field), false));

// The cast should succeed: missing "primary" and "authenticatedState" filled with nulls
let result = cast_column(&phys_map, &target_field, &DEFAULT_CAST_OPTIONS)
.expect("cast_map_column should handle value-struct schema evolution");

let result_map = result.as_any().downcast_ref::<MapArray>().unwrap();
assert!(!result_map.is_null(0));
assert!(result_map.is_null(1));

let entries = result_map.value(0);
let entries_struct = entries.as_any().downcast_ref::<StructArray>().unwrap();
// The key_value struct should have "key" and "value" fields in the logical schema
assert!(entries_struct.column_by_name("key").is_some());
assert!(entries_struct.column_by_name("value").is_some());
}

/// Validates that the schema_rewriter correctly allows Map→Map schema evolution
/// (the compatibility check must not reject additive value-struct changes).
#[test]
fn test_validate_map_value_struct_compatibility() {
let phys_kv_fields: Vec<Arc<Field>> = vec![
Arc::new(non_null_field("key", DataType::Utf8)),
Arc::new(field(
"value",
Struct(vec![Arc::new(field("id", DataType::Utf8))].into()),
)),
];
let log_kv_fields: Vec<Arc<Field>> = vec![
Arc::new(non_null_field("key", DataType::Utf8)),
Arc::new(field(
"value",
Struct(
vec![
Arc::new(field("id", DataType::Utf8)),
Arc::new(field("primary", DataType::Boolean)),
Arc::new(field("authenticatedState", DataType::Utf8)),
]
.into(),
),
)),
];

// Physical key_value fields (fewer value-struct fields) must be compatible
// with logical key_value fields (more value-struct fields).
validate_struct_compatibility(&phys_kv_fields, &log_kv_fields)
.expect("Map value-struct evolution (additive) should be compatible");
}

/// Validates that incompatible MAP value-struct changes are rejected.
/// A type change on an existing field (Binary → Int32) must fail — this
/// exercises the new (Map, Map) code path and confirms errors surface there,
/// not silently through the fallback cast.
#[test]
fn test_validate_map_value_struct_incompatibility() {
// Physical: Map(value: Struct({id: Binary}))
// Logical: Map(value: Struct({id: Int32})) — type change, not additive
let phys_kv_fields: Vec<Arc<Field>> = vec![
Arc::new(non_null_field("key", DataType::Utf8)),
Arc::new(field(
"value",
Struct(vec![Arc::new(field("id", DataType::Binary))].into()),
)),
];
let log_kv_fields: Vec<Arc<Field>> = vec![
Arc::new(non_null_field("key", DataType::Utf8)),
Arc::new(field(
"value",
Struct(vec![Arc::new(field("id", DataType::Int32))].into()),
)),
];

// Binary → Int32 is incompatible: must fail via the Map code path.
let result = validate_struct_compatibility(&phys_kv_fields, &log_kv_fields);
assert!(result.is_err());
let error_msg = result.unwrap_err().to_string();
assert!(
error_msg.contains("id"),
"error should name the incompatible field"
);
}
}
12 changes: 12 additions & 0 deletions datafusion/physical-expr-adapter/src/schema_rewriter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,18 @@ impl<'a> DefaultPhysicalExprAdapterRewriter<'a> {
(DataType::Struct(physical_fields), DataType::Struct(logical_fields)) => {
validate_struct_compatibility(physical_fields, logical_fields)?;
}
// Arrow stores MAP as List<key_value: Struct<key, value>>.
// Validate the inner key_value struct compatibility so that additive
// schema evolution in the value struct (new optional fields) is allowed.
(DataType::Map(physical_kv, _), DataType::Map(logical_kv, _)) => {
if let (
DataType::Struct(physical_fields),
DataType::Struct(logical_fields),
) = (physical_kv.data_type(), logical_kv.data_type())
{
validate_struct_compatibility(physical_fields, logical_fields)?;
}
}
_ => {
let is_compatible =
can_cast_types(physical_field.data_type(), logical_field.data_type());
Expand Down
Loading