Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
125 changes: 124 additions & 1 deletion datafusion/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -840,6 +840,73 @@ impl ConfigField for Option<CdcOptions> {
}
}

/// Target maximum size of a Parquet row group in bytes.
///
/// Wraps a `usize` so the "must be greater than zero" constraint (arrow-rs
/// panics on a zero byte limit) is validated when the config is set, rather
/// than when the writer properties are built.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub struct MaxRowGroupBytes(usize);

impl MaxRowGroupBytes {
/// Creates a `MaxRowGroupBytes`, rejecting zero.
pub fn try_new(value: usize) -> Result<Self> {
if value == 0 {
return Err(DataFusionError::Configuration(
"max_row_group_bytes must be greater than 0".to_string(),
));
}
Ok(Self(value))
}

/// Returns the configured byte limit.
pub fn get(&self) -> usize {
self.0
}
}

impl FromStr for MaxRowGroupBytes {
type Err = DataFusionError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let value = s.parse::<usize>().map_err(|_| {
DataFusionError::Configuration(format!(
"Invalid max_row_group_bytes: '{s}'. Expected a positive integer."
))
})?;
Self::try_new(value)
}
}

impl Display for MaxRowGroupBytes {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}

/// `ConfigField` for `Option<MaxRowGroupBytes>`. A custom impl (rather than the
/// blanket `Option<F>` one) so an invalid value is rejected without leaving the
/// option in an invalid intermediate state on error. `MaxRowGroupBytes`
/// deliberately does not implement `Default`, so the blanket impl does not apply.
impl ConfigField for Option<MaxRowGroupBytes> {
fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) {
match self {
Some(s) => v.some(key, s, description),
None => v.none(key, description),
}
}

fn set(&mut self, _key: &str, value: &str) -> Result<()> {
*self = Some(MaxRowGroupBytes::from_str(value)?);
Ok(())
}

fn reset(&mut self, _key: &str) -> Result<()> {
*self = None;
Ok(())
}
}

config_namespace! {
/// Options for reading and writing parquet files
///
Expand Down Expand Up @@ -973,9 +1040,20 @@ config_namespace! {

/// (writing) Target maximum number of rows in each row group (defaults to 1M
/// rows). Writing larger row groups requires more memory to write, but
/// can get better compression and be faster to read.
/// can get better compression and be faster to read. When
/// `max_row_group_bytes` is also set, the writer flushes a row group when
/// either limit is reached, whichever comes first.
pub max_row_group_size: usize, default = 1024 * 1024

/// (writing) Target maximum size of each row group in bytes. When set,
/// the writer flushes whenever either this limit or `max_row_group_size`
/// is reached, whichever comes first. Useful for bounding writer memory
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's also update max_row_group_size for this 'whichever comes first` semantics.

/// on wide schemas where a row-count limit can map to very different
/// byte sizes. Matches the behavior of `parquet.block.size` in
/// parquet-mr. If `None` (the default), only the row-count limit
/// applies.
pub max_row_group_bytes: Option<MaxRowGroupBytes>, default = None

/// (writing) Sets "created by" property
pub created_by: String, default = concat!("datafusion version ", env!("CARGO_PKG_VERSION")).into()

Expand Down Expand Up @@ -4095,4 +4173,49 @@ mod tests {
assert_eq!(cdc.max_chunk_size, 1024 * 1024);
assert_eq!(cdc.norm_level, 0);
}

#[test]
fn max_row_group_bytes_rejects_zero() {
use crate::config::MaxRowGroupBytes;
use std::str::FromStr;

assert!(MaxRowGroupBytes::try_new(0).is_err());
assert!(MaxRowGroupBytes::from_str("0").is_err());
assert!(MaxRowGroupBytes::from_str("not_a_number").is_err());
assert_eq!(MaxRowGroupBytes::try_new(128).unwrap().get(), 128);
assert_eq!(MaxRowGroupBytes::from_str("128").unwrap().get(), 128);
}

#[test]
fn parquet_max_row_group_bytes_config_set_rejects_zero() {
use crate::config::ConfigOptions;

let mut options = ConfigOptions::new();
options
.set("datafusion.execution.parquet.max_row_group_bytes", "1024")
.unwrap();
assert_eq!(
options
.execution
.parquet
.max_row_group_bytes
.map(|v| v.get()),
Some(1024)
);

// Zero is rejected at set time, leaving the previous value unchanged.
assert!(
options
.set("datafusion.execution.parquet.max_row_group_bytes", "0")
.is_err()
);
assert_eq!(
options
.execution
.parquet
.max_row_group_bytes
.map(|v| v.get()),
Some(1024)
);
}
}
29 changes: 28 additions & 1 deletion datafusion/common/src/file_options/parquet_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,7 @@ impl ParquetOptions {
dictionary_page_size_limit,
statistics_enabled,
max_row_group_size,
max_row_group_bytes,
created_by,
column_index_truncate_length,
statistics_truncate_length,
Expand Down Expand Up @@ -225,6 +226,7 @@ impl ParquetOptions {
.unwrap_or(DEFAULT_STATISTICS_ENABLED),
)
.set_max_row_group_row_count(Some(*max_row_group_size))
.set_max_row_group_bytes(max_row_group_bytes.as_ref().map(|v| v.get()))
.set_created_by(created_by.clone())
.set_column_index_truncate_length(*column_index_truncate_length)
.set_statistics_truncate_length(*statistics_truncate_length)
Expand Down Expand Up @@ -411,7 +413,8 @@ mod tests {
#[cfg(feature = "parquet_encryption")]
use crate::config::ConfigFileEncryptionProperties;
use crate::config::{
CdcOptions, ParquetColumnOptions, ParquetEncryptionOptions, ParquetOptions,
CdcOptions, MaxRowGroupBytes, ParquetColumnOptions, ParquetEncryptionOptions,
ParquetOptions,
};
use crate::parquet_config::DFParquetWriterVersion;
use parquet::basic::Compression;
Expand Down Expand Up @@ -456,6 +459,7 @@ mod tests {
dictionary_page_size_limit: 42,
statistics_enabled: Some("chunk".into()),
max_row_group_size: 42,
max_row_group_bytes: Some(MaxRowGroupBytes::try_new(42).unwrap()),
created_by: "wordy".into(),
column_index_truncate_length: Some(42),
statistics_truncate_length: Some(42),
Expand Down Expand Up @@ -565,6 +569,9 @@ mod tests {
max_row_group_size: props
.max_row_group_row_count()
.unwrap_or(DEFAULT_MAX_ROW_GROUP_ROW_COUNT),
max_row_group_bytes: props
.max_row_group_bytes()
.and_then(|v| MaxRowGroupBytes::try_new(v).ok()),
created_by: props.created_by().to_string(),
column_index_truncate_length: props.column_index_truncate_length(),
statistics_truncate_length: props.statistics_truncate_length(),
Expand Down Expand Up @@ -888,6 +895,26 @@ mod tests {
assert!(WriterPropertiesBuilder::try_from(&opts).is_err());
}

#[test]
fn test_max_row_group_bytes_disabled_by_default() {
let mut opts = TableParquetOptions::default();
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
assert_eq!(props.max_row_group_bytes(), None);
}

#[test]
fn test_max_row_group_bytes_propagated_to_writer_props() {
let mut opts = TableParquetOptions::default();
opts.global.max_row_group_bytes =
Some(MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap());
opts.arrow_schema(&Arc::new(Schema::empty()));

let props = WriterPropertiesBuilder::try_from(&opts).unwrap().build();
assert_eq!(props.max_row_group_bytes(), Some(64 * 1024 * 1024));
}

#[test]
fn test_bloom_filter_set_ndv_only() {
// the TableParquetOptions::default, with only ndv set
Expand Down
4 changes: 4 additions & 0 deletions datafusion/proto-common/proto/datafusion_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,10 @@ message ParquetOptions {
uint64 max_predicate_cache_size = 33;
}

oneof max_row_group_bytes_opt {
uint64 max_row_group_bytes = 37;
}

CdcOptions content_defined_chunking = 35;

// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
Expand Down
26 changes: 23 additions & 3 deletions datafusion/proto-common/src/from_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,8 @@ use datafusion_common::{
DataFusionError, JoinSide, ScalarValue, Statistics, TableReference,
arrow_datafusion_err,
config::{
CdcOptions, CsvOptions, JsonOptions, ParquetColumnOptions, ParquetOptions,
TableParquetOptions,
CdcOptions, CsvOptions, JsonOptions, MaxRowGroupBytes, ParquetColumnOptions,
ParquetOptions, TableParquetOptions,
},
file_options::{csv_writer::CsvWriterOptions, json_writer::JsonWriterOptions},
parsers::CompressionTypeVariant,
Expand Down Expand Up @@ -1130,6 +1130,9 @@ impl TryFrom<&protobuf::ParquetOptions> for ParquetOptions {
max_predicate_cache_size: value.max_predicate_cache_size_opt.map(|opt| match opt {
protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v) => Some(v as usize),
}).unwrap_or(None),
max_row_group_bytes: value.max_row_group_bytes_opt.and_then(|opt| match opt {
protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => MaxRowGroupBytes::try_new(v as usize).ok(),
}),
use_content_defined_chunking: value.content_defined_chunking.map(|cdc| {
let defaults = CdcOptions::default();
CdcOptions {
Expand Down Expand Up @@ -1329,7 +1332,9 @@ pub(crate) fn csv_writer_options_from_proto(

#[cfg(test)]
mod tests {
use datafusion_common::config::{CdcOptions, ParquetOptions, TableParquetOptions};
use datafusion_common::config::{
CdcOptions, MaxRowGroupBytes, ParquetOptions, TableParquetOptions,
};

fn parquet_options_proto_round_trip(opts: ParquetOptions) -> ParquetOptions {
let proto: crate::protobuf_common::ParquetOptions =
Expand Down Expand Up @@ -1373,6 +1378,21 @@ mod tests {
assert_eq!(recovered.coerce_int96_tz, Some("UTC".to_string()));
}

#[test]
fn test_parquet_options_max_row_group_bytes_round_trip() {
let opts = ParquetOptions {
max_row_group_bytes: Some(
MaxRowGroupBytes::try_new(64 * 1024 * 1024).unwrap(),
),
..ParquetOptions::default()
};
let recovered = parquet_options_proto_round_trip(opts.clone());
assert_eq!(
recovered.max_row_group_bytes.map(|v| v.get()),
Some(64 * 1024 * 1024)
);
}

#[test]
fn test_table_parquet_options_coerce_int96_tz_round_trip() {
let mut opts = TableParquetOptions::default();
Expand Down
24 changes: 24 additions & 0 deletions datafusion/proto-common/src/generated/pbjson.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6171,6 +6171,9 @@ impl serde::Serialize for ParquetOptions {
if self.max_predicate_cache_size_opt.is_some() {
len += 1;
}
if self.max_row_group_bytes_opt.is_some() {
len += 1;
}
if self.coerce_int96_tz_opt.is_some() {
len += 1;
}
Expand Down Expand Up @@ -6342,6 +6345,15 @@ impl serde::Serialize for ParquetOptions {
}
}
}
if let Some(v) = self.max_row_group_bytes_opt.as_ref() {
match v {
parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v) => {
#[allow(clippy::needless_borrow)]
#[allow(clippy::needless_borrows_for_generic_args)]
struct_ser.serialize_field("maxRowGroupBytes", ToString::to_string(&v).as_str())?;
}
}
}
if let Some(v) = self.coerce_int96_tz_opt.as_ref() {
match v {
parquet_options::CoerceInt96TzOpt::CoerceInt96Tz(v) => {
Expand Down Expand Up @@ -6422,6 +6434,8 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"coerceInt96",
"max_predicate_cache_size",
"maxPredicateCacheSize",
"max_row_group_bytes",
"maxRowGroupBytes",
"coerce_int96_tz",
"coerceInt96Tz",
];
Expand Down Expand Up @@ -6461,6 +6475,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
BloomFilterNdv,
CoerceInt96,
MaxPredicateCacheSize,
MaxRowGroupBytes,
CoerceInt96Tz,
}
impl<'de> serde::Deserialize<'de> for GeneratedField {
Expand Down Expand Up @@ -6516,6 +6531,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
"bloomFilterNdv" | "bloom_filter_ndv" => Ok(GeneratedField::BloomFilterNdv),
"coerceInt96" | "coerce_int96" => Ok(GeneratedField::CoerceInt96),
"maxPredicateCacheSize" | "max_predicate_cache_size" => Ok(GeneratedField::MaxPredicateCacheSize),
"maxRowGroupBytes" | "max_row_group_bytes" => Ok(GeneratedField::MaxRowGroupBytes),
"coerceInt96Tz" | "coerce_int96_tz" => Ok(GeneratedField::CoerceInt96Tz),
_ => Err(serde::de::Error::unknown_field(value, FIELDS)),
}
Expand Down Expand Up @@ -6569,6 +6585,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
let mut bloom_filter_ndv_opt__ = None;
let mut coerce_int96_opt__ = None;
let mut max_predicate_cache_size_opt__ = None;
let mut max_row_group_bytes_opt__ = None;
let mut coerce_int96_tz_opt__ = None;
while let Some(k) = map_.next_key()? {
match k {
Expand Down Expand Up @@ -6784,6 +6801,12 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
}
max_predicate_cache_size_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(x.0));
}
GeneratedField::MaxRowGroupBytes => {
if max_row_group_bytes_opt__.is_some() {
return Err(serde::de::Error::duplicate_field("maxRowGroupBytes"));
}
max_row_group_bytes_opt__ = map_.next_value::<::std::option::Option<::pbjson::private::NumberDeserialize<_>>>()?.map(|x| parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(x.0));
}
GeneratedField::CoerceInt96Tz => {
if coerce_int96_tz_opt__.is_some() {
return Err(serde::de::Error::duplicate_field("coerceInt96Tz"));
Expand Down Expand Up @@ -6826,6 +6849,7 @@ impl<'de> serde::Deserialize<'de> for ParquetOptions {
bloom_filter_ndv_opt: bloom_filter_ndv_opt__,
coerce_int96_opt: coerce_int96_opt__,
max_predicate_cache_size_opt: max_predicate_cache_size_opt__,
max_row_group_bytes_opt: max_row_group_bytes_opt__,
coerce_int96_tz_opt: coerce_int96_tz_opt__,
})
}
Expand Down
9 changes: 9 additions & 0 deletions datafusion/proto-common/src/generated/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -900,6 +900,10 @@ pub struct ParquetOptions {
pub max_predicate_cache_size_opt: ::core::option::Option<
parquet_options::MaxPredicateCacheSizeOpt,
>,
#[prost(oneof = "parquet_options::MaxRowGroupBytesOpt", tags = "37")]
pub max_row_group_bytes_opt: ::core::option::Option<
parquet_options::MaxRowGroupBytesOpt,
>,
/// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
/// is set. When `Some`, INT96 columns coerce to
/// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
Expand Down Expand Up @@ -964,6 +968,11 @@ pub mod parquet_options {
#[prost(uint64, tag = "33")]
MaxPredicateCacheSize(u64),
}
#[derive(Clone, Copy, PartialEq, Eq, Hash, ::prost::Oneof)]
pub enum MaxRowGroupBytesOpt {
#[prost(uint64, tag = "37")]
MaxRowGroupBytes(u64),
}
/// Optional timezone applied to INT96-coerced timestamps when `coerce_int96`
/// is set. When `Some`, INT96 columns coerce to
/// `Timestamp(<coerce_int96>, Some(<tz>))` instead of the default
Expand Down
1 change: 1 addition & 0 deletions datafusion/proto-common/src/to_proto/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -938,6 +938,7 @@ impl TryFrom<&ParquetOptions> for protobuf::ParquetOptions {
coerce_int96_opt: value.coerce_int96.clone().map(protobuf::parquet_options::CoerceInt96Opt::CoerceInt96),
coerce_int96_tz_opt: value.coerce_int96_tz.clone().map(protobuf::parquet_options::CoerceInt96TzOpt::CoerceInt96Tz),
max_predicate_cache_size_opt: value.max_predicate_cache_size.map(|v| protobuf::parquet_options::MaxPredicateCacheSizeOpt::MaxPredicateCacheSize(v as u64)),
max_row_group_bytes_opt: value.max_row_group_bytes.map(|v| protobuf::parquet_options::MaxRowGroupBytesOpt::MaxRowGroupBytes(v.get() as u64)),
content_defined_chunking: value.use_content_defined_chunking.as_ref().map(|cdc|
protobuf::CdcOptions {
min_chunk_size: cdc.min_chunk_size as u64,
Expand Down
Loading
Loading