From d89176c5732cb41f1d7090cd0ee4ac51b164078b Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 7 May 2026 15:09:00 +0200 Subject: [PATCH 1/2] Add failing test for partitioned_by_file_group protobuf roundtrip --- .../tests/cases/roundtrip_physical_plan.rs | 49 +++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs index 58b79d641b55..f28d3a1f5b4d 100644 --- a/datafusion/proto/tests/cases/roundtrip_physical_plan.rs +++ b/datafusion/proto/tests/cases/roundtrip_physical_plan.rs @@ -4067,5 +4067,54 @@ fn test_custom_node_with_dynamic_filter_dedup_roundtrip() -> Result<()> { // rewrite can reconstruct the remapped form on the other side. assert_dynamic_filters_equal(deser_custom_df, deser_filter_df); assert_dynamic_filter_update_is_visible(deser_custom_df, deser_filter_df)?; + + Ok(()) +} + +#[test] +fn roundtrip_parquet_exec_partitioned_by_file_group() -> Result<()> { + use datafusion::datasource::physical_plan::FileScanConfig; + + let file_schema = + Arc::new(Schema::new(vec![Field::new("col", DataType::Utf8, false)])); + let file_source = Arc::new(ParquetSource::new(Arc::clone(&file_schema))); + let scan_config = + FileScanConfigBuilder::new(ObjectStoreUrl::local_filesystem(), file_source) + .with_file_groups(vec![FileGroup::new(vec![PartitionedFile::new( + "/path/to/file.parquet".to_string(), + 1024, + )])]) + .with_partitioned_by_file_group(true) + .build(); + + assert!(scan_config.partitioned_by_file_group); + + let exec_plan: Arc = DataSourceExec::from_data_source(scan_config); + + let ctx = SessionContext::new(); + let codec = DefaultPhysicalExtensionCodec {}; + let proto_converter = DefaultPhysicalProtoConverter {}; + let bytes = physical_plan_to_bytes_with_proto_converter( + Arc::clone(&exec_plan), + &codec, + &proto_converter, + )?; + let result_plan = physical_plan_from_bytes_with_proto_converter( + bytes.as_ref(), + ctx.task_ctx().as_ref(), + &codec, + &proto_converter, + )?; + + let data_source_exec = result_plan + .downcast_ref::() + .expect("Expected DataSourceExec"); + let file_scan_config = data_source_exec + .data_source() + .downcast_ref::() + .expect("Expected FileScanConfig"); + + assert!(file_scan_config.partitioned_by_file_group); + Ok(()) } From 6dec7efcde7d0f507699f12b5f230a89f654947e Mon Sep 17 00:00:00 2001 From: Marc Brinkmann Date: Thu, 7 May 2026 15:54:51 +0200 Subject: [PATCH 2/2] Serialize partitioned_by_file_group in FileScanExecConf protobuf --- datafusion/proto-models/proto/datafusion.proto | 1 + .../proto-models/src/generated/pbjson.rs | 18 ++++++++++++++++++ datafusion/proto-models/src/generated/prost.rs | 2 ++ .../proto/src/physical_plan/from_proto.rs | 1 + datafusion/proto/src/physical_plan/to_proto.rs | 1 + 5 files changed, 23 insertions(+) diff --git a/datafusion/proto-models/proto/datafusion.proto b/datafusion/proto-models/proto/datafusion.proto index e627e6dd4e89..ea6d07836662 100644 --- a/datafusion/proto-models/proto/datafusion.proto +++ b/datafusion/proto-models/proto/datafusion.proto @@ -1120,6 +1120,7 @@ message FileScanExecConf { optional uint64 batch_size = 12; optional ProjectionExprs projection_exprs = 13; + optional bool partitioned_by_file_group = 14; } message ParquetScanExecNode { diff --git a/datafusion/proto-models/src/generated/pbjson.rs b/datafusion/proto-models/src/generated/pbjson.rs index 26e8424023ec..8e6997757f11 100644 --- a/datafusion/proto-models/src/generated/pbjson.rs +++ b/datafusion/proto-models/src/generated/pbjson.rs @@ -6848,6 +6848,9 @@ impl serde::Serialize for FileScanExecConf { if self.projection_exprs.is_some() { len += 1; } + if self.partitioned_by_file_group.is_some() { + len += 1; + } let mut struct_ser = serializer.serialize_struct("datafusion.FileScanExecConf", len)?; if !self.file_groups.is_empty() { struct_ser.serialize_field("fileGroups", &self.file_groups)?; @@ -6884,6 +6887,9 @@ impl serde::Serialize for FileScanExecConf { if let Some(v) = self.projection_exprs.as_ref() { struct_ser.serialize_field("projectionExprs", v)?; } + if let Some(v) = self.partitioned_by_file_group.as_ref() { + struct_ser.serialize_field("partitionedByFileGroup", v)?; + } struct_ser.end() } } @@ -6911,6 +6917,8 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "batchSize", "projection_exprs", "projectionExprs", + "partitioned_by_file_group", + "partitionedByFileGroup", ]; #[allow(clippy::enum_variant_names)] @@ -6926,6 +6934,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { Constraints, BatchSize, ProjectionExprs, + PartitionedByFileGroup, } impl<'de> serde::Deserialize<'de> for GeneratedField { fn deserialize(deserializer: D) -> std::result::Result @@ -6958,6 +6967,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { "constraints" => Ok(GeneratedField::Constraints), "batchSize" | "batch_size" => Ok(GeneratedField::BatchSize), "projectionExprs" | "projection_exprs" => Ok(GeneratedField::ProjectionExprs), + "partitionedByFileGroup" | "partitioned_by_file_group" => Ok(GeneratedField::PartitionedByFileGroup), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -6988,6 +6998,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { let mut constraints__ = None; let mut batch_size__ = None; let mut projection_exprs__ = None; + let mut partitioned_by_file_group__ = None; while let Some(k) = map_.next_key()? { match k { GeneratedField::FileGroups => { @@ -7061,6 +7072,12 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { } projection_exprs__ = map_.next_value()?; } + GeneratedField::PartitionedByFileGroup => { + if partitioned_by_file_group__.is_some() { + return Err(serde::de::Error::duplicate_field("partitionedByFileGroup")); + } + partitioned_by_file_group__ = map_.next_value()?; + } } } Ok(FileScanExecConf { @@ -7075,6 +7092,7 @@ impl<'de> serde::Deserialize<'de> for FileScanExecConf { constraints: constraints__, batch_size: batch_size__, projection_exprs: projection_exprs__, + partitioned_by_file_group: partitioned_by_file_group__, }) } } diff --git a/datafusion/proto-models/src/generated/prost.rs b/datafusion/proto-models/src/generated/prost.rs index 0b43e2e7d6e4..d8187e65a501 100644 --- a/datafusion/proto-models/src/generated/prost.rs +++ b/datafusion/proto-models/src/generated/prost.rs @@ -1677,6 +1677,8 @@ pub struct FileScanExecConf { pub batch_size: ::core::option::Option, #[prost(message, optional, tag = "13")] pub projection_exprs: ::core::option::Option, + #[prost(bool, optional, tag = "14")] + pub partitioned_by_file_group: ::core::option::Option, } #[derive(Clone, PartialEq, ::prost::Message)] pub struct ParquetScanExecNode { diff --git a/datafusion/proto/src/physical_plan/from_proto.rs b/datafusion/proto/src/physical_plan/from_proto.rs index f5fd214ef683..5b4b95d9c659 100644 --- a/datafusion/proto/src/physical_plan/from_proto.rs +++ b/datafusion/proto/src/physical_plan/from_proto.rs @@ -711,6 +711,7 @@ pub fn parse_protobuf_file_scan_config( .with_limit(proto.limit.as_ref().map(|sl| sl.limit as usize)) .with_output_ordering(output_ordering) .with_batch_size(proto.batch_size.map(|s| s as usize)) + .with_partitioned_by_file_group(proto.partitioned_by_file_group.unwrap_or(false)) .build(); Ok(config) } diff --git a/datafusion/proto/src/physical_plan/to_proto.rs b/datafusion/proto/src/physical_plan/to_proto.rs index 5181c9740130..84de2cecbf17 100644 --- a/datafusion/proto/src/physical_plan/to_proto.rs +++ b/datafusion/proto/src/physical_plan/to_proto.rs @@ -757,6 +757,7 @@ pub fn serialize_file_scan_config( constraints: Some(conf.constraints.clone().into()), batch_size: conf.batch_size.map(|s| s as u64), projection_exprs, + partitioned_by_file_group: Some(conf.partitioned_by_file_group), }) }