diff --git a/datafusion/common/src/config.rs b/datafusion/common/src/config.rs index 3e3ab3429a2fb..4cada22616e46 100644 --- a/datafusion/common/src/config.rs +++ b/datafusion/common/src/config.rs @@ -840,6 +840,73 @@ impl ConfigField for Option { } } +/// Target maximum size of a Parquet row group in bytes. +/// +/// Wraps a `usize` so the "must be greater than zero" constraint (arrow-rs +/// panics on a zero byte limit) is validated when the config is set, rather +/// than when the writer properties are built. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub struct MaxRowGroupBytes(usize); + +impl MaxRowGroupBytes { + /// Creates a `MaxRowGroupBytes`, rejecting zero. + pub fn try_new(value: usize) -> Result { + if value == 0 { + return Err(DataFusionError::Configuration( + "max_row_group_bytes must be greater than 0".to_string(), + )); + } + Ok(Self(value)) + } + + /// Returns the configured byte limit. + pub fn get(&self) -> usize { + self.0 + } +} + +impl FromStr for MaxRowGroupBytes { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result { + let value = s.parse::().map_err(|_| { + DataFusionError::Configuration(format!( + "Invalid max_row_group_bytes: '{s}'. Expected a positive integer." + )) + })?; + Self::try_new(value) + } +} + +impl Display for MaxRowGroupBytes { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.0) + } +} + +/// `ConfigField` for `Option`. A custom impl (rather than the +/// blanket `Option` one) so an invalid value is rejected without leaving the +/// option in an invalid intermediate state on error. `MaxRowGroupBytes` +/// deliberately does not implement `Default`, so the blanket impl does not apply. +impl ConfigField for Option { + fn visit(&self, v: &mut V, key: &str, description: &'static str) { + match self { + Some(s) => v.some(key, s, description), + None => v.none(key, description), + } + } + + fn set(&mut self, _key: &str, value: &str) -> Result<()> { + *self = Some(MaxRowGroupBytes::from_str(value)?); + Ok(()) + } + + fn reset(&mut self, _key: &str) -> Result<()> { + *self = None; + Ok(()) + } +} + config_namespace! { /// Options for reading and writing parquet files /// @@ -973,9 +1040,20 @@ config_namespace! { /// (writing) Target maximum number of rows in each row group (defaults to 1M /// rows). Writing larger row groups requires more memory to write, but - /// can get better compression and be faster to read. + /// can get better compression and be faster to read. When + /// `max_row_group_bytes` is also set, the writer flushes a row group when + /// either limit is reached, whichever comes first. pub max_row_group_size: usize, default = 1024 * 1024 + /// (writing) Target maximum size of each row group in bytes. When set, + /// the writer flushes whenever either this limit or `max_row_group_size` + /// is reached, whichever comes first. Useful for bounding writer memory + /// on wide schemas where a row-count limit can map to very different + /// byte sizes. Matches the behavior of `parquet.block.size` in + /// parquet-mr. If `None` (the default), only the row-count limit + /// applies. + pub max_row_group_bytes: Option, default = None + /// (writing) Sets "created by" property pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into() @@ -4095,4 +4173,49 @@ mod tests { assert_eq!(cdc.max_chunk_size, 1024 * 1024); assert_eq!(cdc.norm_level, 0); } + + #[test] + fn max_row_group_bytes_rejects_zero() { + use crate::config::MaxRowGroupBytes; + use std::str::FromStr; + + assert!(MaxRowGroupBytes::try_new(0).is_err()); + assert!(MaxRowGroupBytes::from_str("0").is_err()); + assert!(MaxRowGroupBytes::from_str("not_a_number").is_err()); + assert_eq!(MaxRowGroupBytes::try_new(128).unwrap().get(), 128); + assert_eq!(MaxRowGroupBytes::from_str("128").unwrap().get(), 128); + } + + #[test] + fn parquet_max_row_group_bytes_config_set_rejects_zero() { + use crate::config::ConfigOptions; + + let mut options = ConfigOptions::new(); + options + .set("datafusion.execution.parquet.max_row_group_bytes", "1024") + .unwrap(); + assert_eq!( + options + .execution + .parquet + .max_row_group_bytes + .map(|v| v.get()), + Some(1024) + ); + + // Zero is rejected at set time, leaving the previous value unchanged. + assert!( + options + .set("datafusion.execution.parquet.max_row_group_bytes", "0") + .is_err() + ); + assert_eq!( + options + .execution + .parquet + .max_row_group_bytes + .map(|v| v.get()), + Some(1024) + ); + } } diff --git a/datafusion/common/src/file_options/parquet_writer.rs b/datafusion/common/src/file_options/parquet_writer.rs index 3f827fbfa75a0..80338d9d3b716 100644 --- a/datafusion/common/src/file_options/parquet_writer.rs +++ b/datafusion/common/src/file_options/parquet_writer.rs @@ -183,6 +183,7 @@ impl ParquetOptions { dictionary_page_size_limit, statistics_enabled, max_row_group_size, + max_row_group_bytes, created_by, column_index_truncate_length, statistics_truncate_length, @@ -225,6 +226,7 @@ impl ParquetOptions { .unwrap_or(DEFAULT_STATISTICS_ENABLED), ) .set_max_row_group_row_count(Some(*max_row_group_size)) + .set_max_row_group_bytes(max_row_group_bytes.as_ref().map(|v| v.get())) .set_created_by(created_by.clone()) .set_column_index_truncate_length(*column_index_truncate_length) .set_statistics_truncate_length(*statistics_truncate_length) @@ -411,7 +413,8 @@ mod tests { #[cfg(feature = "parquet_encryption")] use crate::config::ConfigFileEncryptionProperties; use crate::config::{ - CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions, + CdcOptions, MaxRowGroupBytes, ParquetColumnOptions, ParquetEncryptionOptions, + ParquetOptions, }; use crate::parquet_config::DFParquetWriterVersion; use parquet::basic::Compression; @@ -456,6 +459,7 @@ mod tests { dictionary_page_size_limit: 42, statistics_enabled: Some("chunk".into()), max_row_group_size: 42, + max_row_group_bytes: Some(MaxRowGroupBytes::try_new(42).unwrap()), created_by: "wordy".into(), column_index_truncate_length: Some(42), statistics_truncate_length: Some(42), @@ -565,6 +569,9 @@ mod tests { max_row_group_size: props .max_row_group_row_count() .unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT), + max_row_group_bytes: props + .max_row_group_bytes() + .and_then(|v| MaxRowGroupBytes::try_new(v).ok()), created_by: props.created_by().to_string(), column_index_truncate_length: props.column_index_truncate_length(), statistics_truncate_length: props.statistics_truncate_length(), @@ -888,6 +895,26 @@ mod tests { assert!(WriterPropertiesBuilder::try_from(&opts).is_err()); } + #[test] + fn test_max_row_group_bytes_disabled_by_default() { + let mut opts = TableParquetOptions::default(); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert_eq!(props.max_row_group_bytes(), None); + } + + #[test] + fn test_max_row_group_bytes_propagated_to_writer_props() { + let mut opts = TableParquetOptions::default(); + opts.global.max_row_group_bytes = + Some(MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap()); + opts.arrow_schema(&Arc::new(Schema::empty())); + + let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build(); + assert_eq!(props.max_row_group_bytes(), Some(64 * 1024 * 1024)); + } + #[test] fn test_bloom_filter_set_ndv_only() { // the TableParquetOptions::default, with only ndv set diff --git a/datafusion/proto-common/proto/datafusion_common.proto b/datafusion/proto-common/proto/datafusion_common.proto index 684d9a2612408..d2a7aecf1b436 100644 --- a/datafusion/proto-common/proto/datafusion_common.proto +++ b/datafusion/proto-common/proto/datafusion_common.proto @@ -627,6 +627,10 @@ message ParquetOptions { uint64 max_predicate_cache_size = 33; } + oneof max_row_group_bytes_opt { + uint64 max_row_group_bytes = 37; + } + CdcOptions content_defined_chunking = 35; // Optional timezone applied to INT96-coerced timestamps when `coerce_int96` diff --git a/datafusion/proto-common/src/from_proto/mod.rs b/datafusion/proto-common/src/from_proto/mod.rs index 94a06bcc13bbd..a580ac30d2b81 100644 --- a/datafusion/proto-common/src/from_proto/mod.rs +++ b/datafusion/proto-common/src/from_proto/mod.rs @@ -39,8 +39,8 @@ use datafusion_common::{ DataFusionError, JoinSide, ScalarValue, Statistics, TableReference, arrow_datafusion_err, config::{ - CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions, - TableParquetOptions, + CdcOptions, CsvOptions, JsonOptions, MaxRowGroupBytes, ParquetColumnOptions, + ParquetOptions, TableParquetOptions, }, file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions}, parsers::CompressionTypeVariant, @@ -1130,6 +1130,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions { max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt { protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize), }).unwrap_or(None), + max_row_group_bytes: value.max_row_group_bytes_opt.and_then(|opt| match opt { + protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => MaxRowGroupBytes::try_new(v as usize).ok(), + }), use_content_defined_chunking: value.content_defined_chunking.map(|cdc| { let defaults = CdcOptions::default(); CdcOptions { @@ -1329,7 +1332,9 @@ pub(crate) fn csv_writer_options_from_proto( #[cfg(test)] mod tests { - use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions}; + use datafusion_common::config::{ + CdcOptions, MaxRowGroupBytes, ParquetOptions, TableParquetOptions, + }; fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions { let proto: crate::protobuf_common::ParquetOptions = @@ -1373,6 +1378,21 @@ mod tests { assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string())); } + #[test] + fn test_parquet_options_max_row_group_bytes_round_trip() { + let opts = ParquetOptions { + max_row_group_bytes: Some( + MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap(), + ), + ..ParquetOptions::default() + }; + let recovered = parquet_options_proto_round_trip(opts.clone()); + assert_eq!( + recovered.max_row_group_bytes.map(|v| v.get()), + Some(64 * 1024 * 1024) + ); + } + #[test] fn test_table_parquet_options_coerce_int96_tz_round_trip() { let mut opts = TableParquetOptions::default(); diff --git a/datafusion/proto-common/src/generated/pbjson.rs b/datafusion/proto-common/src/generated/pbjson.rs index 0568982e97a44..9f444c317a1ce 100644 --- a/datafusion/proto-common/src/generated/pbjson.rs +++ b/datafusion/proto-common/src/generated/pbjson.rs @@ -6171,6 +6171,9 @@ impl serde::Serialize for ParquetOptions { if self.max_predicate_cache_size_opt.is_some() { len += 1; } + if self.max_row_group_bytes_opt.is_some() { + len += 1; + } if self.coerce_int96_tz_opt.is_some() { len += 1; } @@ -6342,6 +6345,15 @@ impl serde::Serialize for ParquetOptions { } } } + if let Some(v) = self.max_row_group_bytes_opt.as_ref() { + match v { + parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => { + #[allow(clippy::needless_borrow)] + #[allow(clippy::needless_borrows_for_generic_args)] + struct_ser.serialize_field("maxRowGroupBytes", ToString::to_string(&v).as_str())?; + } + } + } if let Some(v) = self.coerce_int96_tz_opt.as_ref() { match v { parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => { @@ -6422,6 +6434,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "coerceInt96", "max_predicate_cache_size", "maxPredicateCacheSize", + "max_row_group_bytes", + "maxRowGroupBytes", "coerce_int96_tz", "coerceInt96Tz", ]; @@ -6461,6 +6475,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { BloomFilterNdv, CoerceInt96, MaxPredicateCacheSize, + MaxRowGroupBytes, CoerceInt96Tz, } impl<'de> serde::Deserialize<'de> for GeneratedField { @@ -6516,6 +6531,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { "bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv), "coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96), "maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize), + "maxRowGroupBytes" | "max_row_group_bytes" => Ok(GeneratedField::MaxRowGroupBytes), "coerceInt96Tz" | "coerce_int96_tz" => Ok(GeneratedField::CoerceInt96Tz), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } @@ -6569,6 +6585,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { let mut bloom_filter_ndv_opt__ = None; let mut coerce_int96_opt__ = None; let mut max_predicate_cache_size_opt__ = None; + let mut max_row_group_bytes_opt__ = None; let mut coerce_int96_tz_opt__ = None; while let Some(k) = map_.next_key()? { match k { @@ -6784,6 +6801,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { } max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0)); } + GeneratedField::MaxRowGroupBytes => { + if max_row_group_bytes_opt__.is_some() { + return Err(serde::de::Error::duplicate_field("maxRowGroupBytes")); + } + max_row_group_bytes_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(x.0)); + } GeneratedField::CoerceInt96Tz => { if coerce_int96_tz_opt__.is_some() { return Err(serde::de::Error::duplicate_field("coerceInt96Tz")); @@ -6826,6 +6849,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions { bloom_filter_ndv_opt: bloom_filter_ndv_opt__, coerce_int96_opt: coerce_int96_opt__, max_predicate_cache_size_opt: max_predicate_cache_size_opt__, + max_row_group_bytes_opt: max_row_group_bytes_opt__, coerce_int96_tz_opt: coerce_int96_tz_opt__, }) } diff --git a/datafusion/proto-common/src/generated/prost.rs b/datafusion/proto-common/src/generated/prost.rs index 632b16929faa6..05df0ede45e8e 100644 --- a/datafusion/proto-common/src/generated/prost.rs +++ b/datafusion/proto-common/src/generated/prost.rs @@ -900,6 +900,10 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")] + pub max_row_group_bytes_opt: ::core::option::Option< + parquet_options::MaxRowGroupBytesOpt, + >, /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` /// is set. When `Some`, INT96 columns coerce to /// `Timestamp(, Some())` instead of the default @@ -964,6 +968,11 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum MaxRowGroupBytesOpt { + #[prost(uint64, tag = "37")] + MaxRowGroupBytes(u64), + } /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` /// is set. When `Some`, INT96 columns coerce to /// `Timestamp(, Some())` instead of the default diff --git a/datafusion/proto-common/src/to_proto/mod.rs b/datafusion/proto-common/src/to_proto/mod.rs index 940679b836ff1..a4b7d6d24af6b 100644 --- a/datafusion/proto-common/src/to_proto/mod.rs +++ b/datafusion/proto-common/src/to_proto/mod.rs @@ -938,6 +938,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions { coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96), coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz), max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)), + max_row_group_bytes_opt: value.max_row_group_bytes.map(|v| protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v.get() as u64)), content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc| protobuf::CdcOptions { min_chunk_size: cdc.min_chunk_size as u64, diff --git a/datafusion/proto-models/src/generated/datafusion_proto_common.rs b/datafusion/proto-models/src/generated/datafusion_proto_common.rs index 632b16929faa6..05df0ede45e8e 100644 --- a/datafusion/proto-models/src/generated/datafusion_proto_common.rs +++ b/datafusion/proto-models/src/generated/datafusion_proto_common.rs @@ -900,6 +900,10 @@ pub struct ParquetOptions { pub max_predicate_cache_size_opt: ::core::option::Option< parquet_options::MaxPredicateCacheSizeOpt, >, + #[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")] + pub max_row_group_bytes_opt: ::core::option::Option< + parquet_options::MaxRowGroupBytesOpt, + >, /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` /// is set. When `Some`, INT96 columns coerce to /// `Timestamp(, Some())` instead of the default @@ -964,6 +968,11 @@ pub mod parquet_options { #[prost(uint64, tag = "33")] MaxPredicateCacheSize(u64), } + #[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)] + pub enum MaxRowGroupBytesOpt { + #[prost(uint64, tag = "37")] + MaxRowGroupBytes(u64), + } /// Optional timezone applied to INT96-coerced timestamps when `coerce_int96` /// is set. When `Some`, INT96 columns coerce to /// `Timestamp(, Some())` instead of the default diff --git a/datafusion/proto/src/logical_plan/file_formats.rs b/datafusion/proto/src/logical_plan/file_formats.rs index d5af7be485f26..5b5758503cb2a 100644 --- a/datafusion/proto/src/logical_plan/file_formats.rs +++ b/datafusion/proto/src/logical_plan/file_formats.rs @@ -382,7 +382,8 @@ mod parquet { parquet_options, }; use datafusion_common::config::{ - CdcOptions, ParquetColumnOptions, ParquetOptions, TableParquetOptions, + CdcOptions, MaxRowGroupBytes, ParquetColumnOptions, ParquetOptions, + TableParquetOptions, }; use datafusion_datasource_parquet::file_format::ParquetFormatFactory; @@ -454,6 +455,9 @@ mod parquet { max_predicate_cache_size_opt: global_options.global.max_predicate_cache_size.map(|size| { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size as u64) }), + max_row_group_bytes_opt: global_options.global.max_row_group_bytes.map(|size| { + parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(size.get() as u64) + }), content_defined_chunking: global_options.global.use_content_defined_chunking.as_ref().map(|cdc| { CdcOptionsProto { min_chunk_size: cdc.min_chunk_size as u64, @@ -563,6 +567,9 @@ mod parquet { max_predicate_cache_size: proto.max_predicate_cache_size_opt.as_ref().map(|opt| match opt { parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(size) => *size as usize, }), + max_row_group_bytes: proto.max_row_group_bytes_opt.as_ref().and_then(|opt| match opt { + parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(size) => MaxRowGroupBytes::try_new(*size as usize).ok(), + }), use_content_defined_chunking: proto.content_defined_chunking.map(|cdc| { let defaults = CdcOptions::default(); CdcOptions { diff --git a/datafusion/sqllogictest/test_files/copy.slt b/datafusion/sqllogictest/test_files/copy.slt index 402ac8e8512bf..7aa7269b58fb8 100644 --- a/datafusion/sqllogictest/test_files/copy.slt +++ b/datafusion/sqllogictest/test_files/copy.slt @@ -326,6 +326,7 @@ OPTIONS ( 'format.compression::col1' 'zstd(5)', 'format.compression::col2' snappy, 'format.max_row_group_size' 12345, +'format.max_row_group_bytes' 2048, 'format.data_pagesize_limit' 1234, 'format.write_batch_size' 1234, 'format.writer_version' 2.0, diff --git a/datafusion/sqllogictest/test_files/information_schema.slt b/datafusion/sqllogictest/test_files/information_schema.slt index 3bf101f203fbd..780223087e8b4 100644 --- a/datafusion/sqllogictest/test_files/information_schema.slt +++ b/datafusion/sqllogictest/test_files/information_schema.slt @@ -248,6 +248,7 @@ datafusion.execution.parquet.enable_page_index true datafusion.execution.parquet.encoding NULL datafusion.execution.parquet.force_filter_selections false datafusion.execution.parquet.max_predicate_cache_size NULL +datafusion.execution.parquet.max_row_group_bytes NULL datafusion.execution.parquet.max_row_group_size 1048576 datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 datafusion.execution.parquet.maximum_parallel_row_group_writers 1 @@ -398,7 +399,8 @@ datafusion.execution.parquet.enable_page_index true (reading) If true, reads the datafusion.execution.parquet.encoding NULL (writing) Sets default encoding for any column. Valid values are: plain, plain_dictionary, rle, bit_packed, delta_binary_packed, delta_length_byte_array, delta_byte_array, rle_dictionary, and byte_stream_split. These values are not case sensitive. If NULL, uses default parquet writer setting datafusion.execution.parquet.force_filter_selections false (reading) Force the use of RowSelections for filter results, when pushdown_filters is enabled. If false, the reader will automatically choose between a RowSelection and a Bitmap based on the number and pattern of selected rows. datafusion.execution.parquet.max_predicate_cache_size NULL (reading) The maximum predicate cache size, in bytes. When `pushdown_filters` is enabled, sets the maximum memory used to cache the results of predicate evaluation between filter evaluation and output generation. Decreasing this value will reduce memory usage, but may increase IO and CPU usage. None means use the default parquet reader setting. 0 means no caching. -datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. +datafusion.execution.parquet.max_row_group_bytes NULL (writing) Target maximum size of each row group in bytes. When set, the writer flushes whenever either this limit or `max_row_group_size` is reached, whichever comes first. Useful for bounding writer memory on wide schemas where a row-count limit can map to very different byte sizes. Matches the behavior of `parquet.block.size` in parquet-mr. If `None` (the default), only the row-count limit applies. +datafusion.execution.parquet.max_row_group_size 1048576 (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. When `max_row_group_bytes` is also set, the writer flushes a row group when either limit is reached, whichever comes first. datafusion.execution.parquet.maximum_buffered_record_batches_per_stream 2 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.maximum_parallel_row_group_writers 1 (writing) By default parallel parquet writer is tuned for minimum memory usage in a streaming execution plan. You may see a performance benefit when writing large parquet files by increasing maximum_parallel_row_group_writers and maximum_buffered_record_batches_per_stream if your system has idle cores and can tolerate additional memory usage. Boosting these values is likely worthwhile when writing out already in-memory data, such as from a cached data frame. datafusion.execution.parquet.metadata_size_hint 524288 (reading) If specified, the parquet reader will try and fetch the last `size_hint` bytes of the parquet file optimistically. If not specified, two reads are required: One read to fetch the 8-byte parquet footer and another to fetch the metadata length encoded in the footer Default setting to 512 KiB, which should be sufficient for most parquet files, it can reduce one I/O operation per parquet file. If the metadata is larger than the hint, two reads will still be performed. diff --git a/datafusion/sqllogictest/test_files/parquet_max_row_group_bytes.slt b/datafusion/sqllogictest/test_files/parquet_max_row_group_bytes.slt new file mode 100644 index 0000000000000..8de83329ae073 --- /dev/null +++ b/datafusion/sqllogictest/test_files/parquet_max_row_group_bytes.slt @@ -0,0 +1,186 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# End-to-end tests for the `max_row_group_bytes` Parquet writer option: +# write Parquet files with the option set, then read them back to confirm +# the option is wired through from config to the writer. +# See datafusion/common/src/config.rs for the option definition. + +statement ok +CREATE TABLE source_table(id INT, name VARCHAR) AS VALUES +(1, 'one'), (2, 'two'), (3, 'three'), (4, 'four'), (5, 'five'); + +# Write with max_row_group_bytes set via COPY format options. +query I +COPY source_table +TO 'test_files/scratch/parquet_max_row_group_bytes/copy_options/' +STORED AS PARQUET +OPTIONS ('format.max_row_group_bytes' 1024); +---- +5 + +statement ok +CREATE EXTERNAL TABLE readback_copy_options +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_max_row_group_bytes/copy_options/'; + +query IT +SELECT id, name FROM readback_copy_options ORDER BY id; +---- +1 one +2 two +3 three +4 four +5 five + +# The option also applies when set via the session config (not just COPY OPTIONS). +statement ok +SET datafusion.execution.parquet.max_row_group_bytes = 2048; + +query I +COPY source_table +TO 'test_files/scratch/parquet_max_row_group_bytes/session_config/' +STORED AS PARQUET; +---- +5 + +statement ok +RESET datafusion.execution.parquet.max_row_group_bytes; + +statement ok +CREATE EXTERNAL TABLE readback_session_config +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_max_row_group_bytes/session_config/'; + +query IT +SELECT id, name FROM readback_session_config ORDER BY id; +---- +1 one +2 two +3 three +4 four +5 five + +# A zero byte limit is rejected with a clear configuration error. +query error DataFusion error: Invalid or Unsupported Configuration: max_row_group_bytes must be greater than 0 +COPY source_table +TO 'test_files/scratch/parquet_max_row_group_bytes/invalid/' +STORED AS PARQUET +OPTIONS ('format.max_row_group_bytes' 0); + +# ----------------------------------------------------------------------------- +# Row-group-count verification via EXPLAIN ANALYZE. +# +# `row_groups_pruned_statistics=N total` reports the number of row groups in the +# written file, so it lets us confirm that `max_row_group_bytes` actually +# changes how the writer splits row groups, and that combining it with +# `max_row_group_size` flushes on whichever limit is reached first. +# +# NOTE: byte-based flushing is currently honored only by the single-threaded +# Parquet writer (`AsyncArrowWriter`/`ArrowWriter`), which encodes inline and +# can therefore observe the in-progress row group's encoded size. The +# multi-threaded (parallel) writer decides row-group boundaries by row count +# only and ignores `max_row_group_bytes`, so these cases force the +# single-threaded path with `allow_single_file_parallelism = false`. Extending +# the parallel writer to honor the byte limit is a follow-up change. +# ----------------------------------------------------------------------------- + +statement ok +set datafusion.execution.target_partitions = 1; + +statement ok +set datafusion.execution.minimum_parallel_output_files = 1; + +statement ok +set datafusion.execution.batch_size = 1024; + +statement ok +set datafusion.execution.parquet.allow_single_file_parallelism = false; + +# Row-count limit only: 4096 rows with max_row_group_size = 1000 -> 5 row groups +# (four full groups of 1000 plus a remainder of 96). +statement ok +COPY (SELECT value AS id FROM range(0, 4096)) +TO 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_only/' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' 1000); + +statement ok +CREATE EXTERNAL TABLE rg_count_size_only +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_only/'; + +query TT +EXPLAIN ANALYZE SELECT * FROM rg_count_size_only WHERE id >= 0; +---- +Plan with Metrics +01)FilterExec: id@0 >= 0 +02)--DataSourceExec: row_groups_pruned_statistics=5 total + +# Both limits set: the byte limit also flushes the sub-1000-row remainders that +# the row-count limit would otherwise carry into the next batch, so the file is +# split more finely -> 8 row groups (whichever limit is reached first). +statement ok +COPY (SELECT value AS id FROM range(0, 4096)) +TO 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_and_bytes/' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' 1000, 'format.max_row_group_bytes' 1); + +statement ok +CREATE EXTERNAL TABLE rg_count_size_and_bytes +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_max_row_group_bytes/rg_count_size_and_bytes/'; + +query TT +EXPLAIN ANALYZE SELECT * FROM rg_count_size_and_bytes WHERE id >= 0; +---- +Plan with Metrics +01)FilterExec: id@0 >= 0 +02)--DataSourceExec: row_groups_pruned_statistics=8 total + +# Byte limit drives alone: the row-count limit is far larger than the data, so +# only the byte limit splits. Each 1024-row batch fills a fresh (empty) row +# group, which is never split mid-batch -> 4 row groups. +statement ok +COPY (SELECT value AS id FROM range(0, 4096)) +TO 'test_files/scratch/parquet_max_row_group_bytes/rg_count_bytes_only/' +STORED AS PARQUET +OPTIONS ('format.max_row_group_size' 100000, 'format.max_row_group_bytes' 1); + +statement ok +CREATE EXTERNAL TABLE rg_count_bytes_only +STORED AS PARQUET +LOCATION 'test_files/scratch/parquet_max_row_group_bytes/rg_count_bytes_only/'; + +query TT +EXPLAIN ANALYZE SELECT * FROM rg_count_bytes_only WHERE id >= 0; +---- +Plan with Metrics +01)FilterExec: id@0 >= 0 +02)--DataSourceExec: row_groups_pruned_statistics=4 total + +statement ok +reset datafusion.execution.parquet.allow_single_file_parallelism; + +statement ok +reset datafusion.execution.batch_size; + +statement ok +reset datafusion.execution.minimum_parallel_output_files; + +statement ok +set datafusion.execution.target_partitions = 4; diff --git a/docs/source/user-guide/configs.md b/docs/source/user-guide/configs.md index 9856a13f00306..29e6b0fb3eb8e 100644 --- a/docs/source/user-guide/configs.md +++ b/docs/source/user-guide/configs.md @@ -101,7 +101,8 @@ The following configuration settings are available: | datafusion.execution.parquet.dictionary_enabled | true | (writing) Sets if dictionary encoding is enabled. If NULL, uses default parquet writer setting | | datafusion.execution.parquet.dictionary_page_size_limit | 1048576 | (writing) Sets best effort maximum dictionary page size, in bytes | | datafusion.execution.parquet.statistics_enabled | page | (writing) Sets if statistics are enabled for any column Valid values are: "none", "chunk", and "page" These values are not case sensitive. If NULL, uses default parquet writer setting | -| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. | +| datafusion.execution.parquet.max_row_group_size | 1048576 | (writing) Target maximum number of rows in each row group (defaults to 1M rows). Writing larger row groups requires more memory to write, but can get better compression and be faster to read. When `max_row_group_bytes` is also set, the writer flushes a row group when either limit is reached, whichever comes first. | +| datafusion.execution.parquet.max_row_group_bytes | NULL | (writing) Target maximum size of each row group in bytes. When set, the writer flushes whenever either this limit or `max_row_group_size` is reached, whichever comes first. Useful for bounding writer memory on wide schemas where a row-count limit can map to very different byte sizes. Matches the behavior of `parquet.block.size` in parquet-mr. If `None` (the default), only the row-count limit applies. | | datafusion.execution.parquet.created_by | datafusion version 53.1.0 | (writing) Sets "created by" property | | datafusion.execution.parquet.column_index_truncate_length | 64 | (writing) Sets column index truncate length | | datafusion.execution.parquet.statistics_truncate_length | 64 | (writing) Sets statistics truncate length. If NULL, uses default parquet writer setting | diff --git a/docs/source/user-guide/sql/format_options.md b/docs/source/user-guide/sql/format_options.md index 46d251c18ed74..a0ac8704f575d 100644 --- a/docs/source/user-guide/sql/format_options.md +++ b/docs/source/user-guide/sql/format_options.md @@ -142,6 +142,7 @@ The following options are available when reading or writing Parquet files. If an | BLOOM_FILTER_FPP | Yes | Sets bloom filter false positive probability (global or per column). | `'bloom_filter_fpp'` or `'bloom_filter_fpp::col'` | None | | BLOOM_FILTER_NDV | Yes | Sets bloom filter number of distinct values (global or per column). | `'bloom_filter_ndv'` or `'bloom_filter_ndv::col'` | None | | MAX_ROW_GROUP_SIZE | No | Sets the maximum number of rows per row group. Larger groups require more memory but can improve compression and scan efficiency. | `'max_row_group_size'` | 1048576 | +| MAX_ROW_GROUP_BYTES | No | Sets the maximum size of each row group in bytes. When both this and `MAX_ROW_GROUP_SIZE` are set, the row group flushes whenever either limit is reached. Mirrors `parquet.block.size` from parquet-mr. | `'max_row_group_bytes'` | None | | ENABLE_PAGE_INDEX | No | If true, reads the Parquet data page level metadata (the Page Index), if present, to reduce I/O and decoding. | `'enable_page_index'` | true | | PRUNING | No | If true, enables row group pruning based on min/max statistics. | `'pruning'` | true | | SKIP_METADATA | No | If true, skips optional embedded metadata in the file schema. | `'skip_metadata'` | true |