Skip to content

Commit bc74c71

Browse files
kszucsetseidl
andauthored
feat(parquet): add content defined chunking for arrow writer (#9450)
# Which issue does this PR close? - Closes #NNN. # Rationale for this change Rust implementation of apache/arrow#45360 Traditional Parquet writing splits data pages at fixed sizes, so a single inserted or deleted row causes all subsequent pages to shift — resulting in nearly every byte being re-uploaded to content-addressable storage (CAS) systems. CDC determines page boundaries via a rolling gearhash over column values, so unchanged data produces identical pages across different writes enabling storage cost reductions and faster upload times. See more details in https://huggingface.co/blog/parquet-cdc The original C++ implementation apache/arrow#45360 Evaluation tool https://github.com/huggingface/dataset-dedupe-estimator where I already integrated this PR to verify that deduplication effectiveness is on par with parquet-cpp (lower is better): <img width="984" height="411" alt="image" src="https://github.com/user-attachments/assets/e6e80931-ac76-4bdd-bf9c-ba7e06559411" /> # What changes are included in this PR? - **Content-defined chunker** at `parquet/src/column/chunker/` - **Arrow writer integration** integrated in `ArrowColumnWriter` - **Writer properties** via `CdcOptions` struct (`min_chunk_size`, `max_chunk_size`, `norm_level`) - **ColumnDescriptor**: added `repeated_ancestor_def_level` field to for nested field values iteration # Are these changes tested? Yes — unit tests are located in `cdc.rs` and ported from the C++ implementation. # Are there any user-facing changes? New **experimental** API, disabled by default — no behavior change for existing code: ```rust // Simple toggle (256 KiB min, 1 MiB max, norm_level 0) let props = WriterProperties::builder() .set_content_defined_chunking(true) .build(); // Excpliti CDC parameters let props = WriterProperties::builder() .set_cdc_options(CdcOptions { min_chunk_size: 128 * 1024, max_chunk_size: 512 * 1024, norm_level: 1 }) .build(); ``` --------- Co-authored-by: Ed Seidl <etseidl@users.noreply.github.com>
1 parent 39dda22 commit bc74c71

12 files changed

Lines changed: 3447 additions & 39 deletions

File tree

parquet/benches/arrow_writer.rs

Lines changed: 14 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
extern crate criterion;
2020

2121
use criterion::{Bencher, Criterion, Throughput};
22-
use parquet::arrow::arrow_writer::{ArrowRowGroupWriterFactory, compute_leaves};
22+
use parquet::arrow::ArrowWriter;
2323
use parquet::basic::{Compression, ZstdLevel};
2424

2525
extern crate arrow;
@@ -33,10 +33,8 @@ use arrow::datatypes::*;
3333
use arrow::util::bench_util::{create_f16_array, create_f32_array, create_f64_array};
3434
use arrow::{record_batch::RecordBatch, util::data_gen::*};
3535
use arrow_array::RecordBatchOptions;
36-
use parquet::arrow::ArrowSchemaConverter;
3736
use parquet::errors::Result;
38-
use parquet::file::properties::{WriterProperties, WriterVersion};
39-
use parquet::file::writer::SerializedFileWriter;
37+
use parquet::file::properties::{CdcOptions, WriterProperties, WriterVersion};
4038

4139
fn create_primitive_bench_batch(
4240
size: usize,
@@ -342,39 +340,21 @@ fn write_batch_with_option(
342340
batch: &RecordBatch,
343341
props: Option<WriterProperties>,
344342
) -> Result<()> {
345-
let mut file = Empty::default();
346-
let props = Arc::new(props.unwrap_or_default());
347-
let parquet_schema = ArrowSchemaConverter::new()
348-
.with_coerce_types(props.coerce_types())
349-
.convert(batch.schema_ref())?;
350-
let writer = SerializedFileWriter::new(&mut file, parquet_schema.root_schema_ptr(), props)?;
351-
let row_group_writer_factory = ArrowRowGroupWriterFactory::new(&writer, batch.schema());
343+
let props = props.unwrap_or_default();
352344

353345
bench.iter(|| {
354-
let mut row_group = row_group_writer_factory.create_column_writers(0).unwrap();
355-
356-
let mut writers = row_group.iter_mut();
357-
for (field, column) in batch
358-
.schema()
359-
.fields()
360-
.iter()
361-
.zip(black_box(batch).columns())
362-
{
363-
for leaf in compute_leaves(field.as_ref(), column).unwrap() {
364-
writers.next().unwrap().write(&leaf).unwrap()
365-
}
366-
}
367-
368-
for writer in row_group.into_iter() {
369-
black_box(writer.close()).unwrap();
370-
}
346+
let mut file = Empty::default();
347+
let mut writer =
348+
ArrowWriter::try_new(&mut file, batch.schema(), Some(props.clone())).unwrap();
349+
writer.write(black_box(batch)).unwrap();
350+
black_box(writer.close()).unwrap();
371351
});
372352

373353
Ok(())
374354
}
375355

376356
fn create_batches() -> Vec<(&'static str, RecordBatch)> {
377-
const BATCH_SIZE: usize = 4096;
357+
const BATCH_SIZE: usize = 1024 * 1024;
378358

379359
let mut batches = vec![];
380360

@@ -440,6 +420,11 @@ fn create_writer_props() -> Vec<(&'static str, WriterProperties)> {
440420
.build();
441421
props.push(("zstd_parquet_2", prop));
442422

423+
let prop = WriterProperties::builder()
424+
.set_content_defined_chunking(Some(CdcOptions::default()))
425+
.build();
426+
props.push(("cdc", prop));
427+
443428
props
444429
}
445430

parquet/src/arrow/arrow_writer/levels.rs

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
//!
4141
//! \[1\] [parquet-format#nested-encoding](https://github.com/apache/parquet-format#nested-encoding)
4242
43+
use crate::column::chunker::CdcChunk;
4344
use crate::errors::{ParquetError, Result};
4445
use arrow_array::cast::AsArray;
4546
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
@@ -801,11 +802,58 @@ impl ArrayLevels {
801802
pub fn non_null_indices(&self) -> &[usize] {
802803
&self.non_null_indices
803804
}
805+
806+
/// Create a sliced view of this `ArrayLevels` for a CDC chunk.
807+
///
808+
/// Note: `def_levels`, `rep_levels`, and `non_null_indices` are copied (not zero-copy),
809+
/// while `array` is sliced without copying.
810+
pub(crate) fn slice_for_chunk(&self, chunk: &CdcChunk) -> Self {
811+
let level_offset = chunk.level_offset;
812+
let num_levels = chunk.num_levels;
813+
let value_offset = chunk.value_offset;
814+
let num_values = chunk.num_values;
815+
let def_levels = self
816+
.def_levels
817+
.as_ref()
818+
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
819+
let rep_levels = self
820+
.rep_levels
821+
.as_ref()
822+
.map(|levels| levels[level_offset..level_offset + num_levels].to_vec());
823+
824+
// Filter non_null_indices to [value_offset, value_offset + num_values)
825+
// and shift by -value_offset. Use binary search since the slice is sorted.
826+
let value_end = value_offset + num_values;
827+
let start = self
828+
.non_null_indices
829+
.partition_point(|&idx| idx < value_offset);
830+
let end = self
831+
.non_null_indices
832+
.partition_point(|&idx| idx < value_end);
833+
let non_null_indices: Vec<usize> = self.non_null_indices[start..end]
834+
.iter()
835+
.map(|&idx| idx - value_offset)
836+
.collect();
837+
838+
let array = self.array.slice(value_offset, num_values);
839+
let logical_nulls = array.logical_nulls();
840+
841+
Self {
842+
def_levels,
843+
rep_levels,
844+
non_null_indices,
845+
max_def_level: self.max_def_level,
846+
max_rep_level: self.max_rep_level,
847+
array,
848+
logical_nulls,
849+
}
850+
}
804851
}
805852

806853
#[cfg(test)]
807854
mod tests {
808855
use super::*;
856+
use crate::column::chunker::CdcChunk;
809857

810858
use arrow_array::builder::*;
811859
use arrow_array::types::Int32Type;
@@ -2096,4 +2144,152 @@ mod tests {
20962144
let v = Arc::new(array) as ArrayRef;
20972145
LevelInfoBuilder::try_new(field, Default::default(), &v).unwrap()
20982146
}
2147+
2148+
#[test]
2149+
fn test_slice_for_chunk_flat() {
2150+
// Case 1: required field (max_def_level=0, no def/rep levels stored).
2151+
// Array has 6 values; all are non-null so non_null_indices covers every position.
2152+
// The chunk selects value_offset=2, num_values=3 → the sub-array [3, 4, 5].
2153+
// Since there are no levels, num_levels=0 and level_offset are irrelevant.
2154+
// non_null_indices [0,1,2,3,4,5] filtered to [2,4) and shifted by -2 → [0,1,2].
2155+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5, 6]));
2156+
let logical_nulls = array.logical_nulls();
2157+
let levels = ArrayLevels {
2158+
def_levels: None,
2159+
rep_levels: None,
2160+
non_null_indices: vec![0, 1, 2, 3, 4, 5],
2161+
max_def_level: 0,
2162+
max_rep_level: 0,
2163+
array,
2164+
logical_nulls,
2165+
};
2166+
let sliced = levels.slice_for_chunk(&CdcChunk {
2167+
level_offset: 0,
2168+
num_levels: 0,
2169+
value_offset: 2,
2170+
num_values: 3,
2171+
});
2172+
assert!(sliced.def_levels.is_none());
2173+
assert!(sliced.rep_levels.is_none());
2174+
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2175+
assert_eq!(sliced.array.len(), 3);
2176+
2177+
// Case 2: optional field (max_def_level=1, def levels present, no rep levels).
2178+
// Array: [Some(1), None, Some(3), None, Some(5), Some(6)]
2179+
// def_levels: [1, 0, 1, 0, 1, 1] (1=non-null, 0=null)
2180+
// non_null_indices: [0, 2, 4, 5] (array positions of the four non-null values)
2181+
//
2182+
// The chunk selects level_offset=1, num_levels=3, value_offset=1, num_values=3:
2183+
// - def_levels[1..4] = [0, 1, 0] → null, non-null, null
2184+
// - sub-array slice(1, 3) = [None, Some(3), None]
2185+
// - non_null_indices filtered to [value_offset=1, value_end=4): only index 2 qualifies,
2186+
// shifted by -1 → [1] (position of Some(3) within the sliced sub-array)
2187+
let array: ArrayRef = Arc::new(Int32Array::from(vec![
2188+
Some(1),
2189+
None,
2190+
Some(3),
2191+
None,
2192+
Some(5),
2193+
Some(6),
2194+
]));
2195+
let logical_nulls = array.logical_nulls();
2196+
let levels = ArrayLevels {
2197+
def_levels: Some(vec![1, 0, 1, 0, 1, 1]),
2198+
rep_levels: None,
2199+
non_null_indices: vec![0, 2, 4, 5],
2200+
max_def_level: 1,
2201+
max_rep_level: 0,
2202+
array,
2203+
logical_nulls,
2204+
};
2205+
let sliced = levels.slice_for_chunk(&CdcChunk {
2206+
level_offset: 1,
2207+
num_levels: 3,
2208+
value_offset: 1,
2209+
num_values: 3,
2210+
});
2211+
assert_eq!(sliced.def_levels, Some(vec![0, 1, 0]));
2212+
assert!(sliced.rep_levels.is_none());
2213+
assert_eq!(sliced.non_null_indices, vec![1]);
2214+
assert_eq!(sliced.array.len(), 3);
2215+
}
2216+
2217+
#[test]
2218+
fn test_slice_for_chunk_nested() {
2219+
// [[1,2],[3],[4,5]]: def=[2,2,2,2,2], rep=[0,1,0,0,1]
2220+
// Slice levels 2..5 (def=[2,2,2], rep=[0,0,1]), values 2..5
2221+
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]));
2222+
let logical_nulls = array.logical_nulls();
2223+
let levels = ArrayLevels {
2224+
def_levels: Some(vec![2, 2, 2, 2, 2]),
2225+
rep_levels: Some(vec![0, 1, 0, 0, 1]),
2226+
non_null_indices: vec![0, 1, 2, 3, 4],
2227+
max_def_level: 2,
2228+
max_rep_level: 1,
2229+
array,
2230+
logical_nulls,
2231+
};
2232+
let sliced = levels.slice_for_chunk(&CdcChunk {
2233+
level_offset: 2,
2234+
num_levels: 3,
2235+
value_offset: 2,
2236+
num_values: 3,
2237+
});
2238+
assert_eq!(sliced.def_levels, Some(vec![2, 2, 2]));
2239+
assert_eq!(sliced.rep_levels, Some(vec![0, 0, 1]));
2240+
// [0,1,2,3,4] filtered to [2,5) → [2,3,4] → shifted -2 → [0,1,2]
2241+
assert_eq!(sliced.non_null_indices, vec![0, 1, 2]);
2242+
assert_eq!(sliced.array.len(), 3);
2243+
}
2244+
2245+
#[test]
2246+
fn test_slice_for_chunk_non_null_indices_boundary() {
2247+
// [1, null, 3]: non_null_indices=[0, 2]; test inclusive lower / exclusive upper bounds
2248+
let array: ArrayRef = Arc::new(Int32Array::from(vec![Some(1), None, Some(3)]));
2249+
let logical_nulls = array.logical_nulls();
2250+
let levels = ArrayLevels {
2251+
def_levels: Some(vec![1, 0, 1]),
2252+
rep_levels: None,
2253+
non_null_indices: vec![0, 2],
2254+
max_def_level: 1,
2255+
max_rep_level: 0,
2256+
array,
2257+
logical_nulls,
2258+
};
2259+
assert_eq!(
2260+
levels
2261+
.slice_for_chunk(&CdcChunk {
2262+
level_offset: 0,
2263+
num_levels: 1,
2264+
value_offset: 0,
2265+
num_values: 1
2266+
})
2267+
.non_null_indices,
2268+
vec![0]
2269+
);
2270+
// idx 2 in range [1,3), shifted -1 → 1
2271+
assert_eq!(
2272+
levels
2273+
.slice_for_chunk(&CdcChunk {
2274+
level_offset: 1,
2275+
num_levels: 2,
2276+
value_offset: 1,
2277+
num_values: 2
2278+
})
2279+
.non_null_indices,
2280+
vec![1]
2281+
);
2282+
// idx 2 excluded from [1,2)
2283+
assert_eq!(
2284+
levels
2285+
.slice_for_chunk(&CdcChunk {
2286+
level_offset: 1,
2287+
num_levels: 1,
2288+
value_offset: 1,
2289+
num_values: 1
2290+
})
2291+
.non_null_indices,
2292+
Vec::<usize>::new()
2293+
);
2294+
}
20992295
}

0 commit comments

Comments
 (0)