Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 20 additions & 7 deletions parquet/benches/arrow_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,14 +159,14 @@ fn create_string_dictionary_bench_batch(
true_density,
)?)
}
// commenting out until implementation of RunEndEncoded is complete. See https://github.com/apache/arrow-rs/pull/9936#discussion_r3242936421
#[allow(dead_code)]
fn create_ree_bench_batch(
value_dt: DataType,
size: usize,
null_density: f32,
null_pct: Option<u8>,
true_density: f32,
) -> Result<RecordBatch> {
const DEFAULT_NULL_PCT: u8 = 10;
let null_density = null_pct.unwrap_or(DEFAULT_NULL_PCT) as f32 / 100.0;
let fields = vec![Field::new(
"_1",
DataType::RunEndEncoded(
Expand Down Expand Up @@ -458,11 +458,24 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> {
let batch = create_string_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap();
batches.push(("string_non_null", batch));

//let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, 0.25, 0.75).unwrap();
//batches.push(("string_ree", batch));
let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, None, 0.75).unwrap();
batches.push(("string_ree", batch));

let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, None, 0.75).unwrap();
batches.push(("int32_ree", batch));

let batch = create_ree_bench_batch(DataType::Boolean, BATCH_SIZE, None, 0.75).unwrap();
batches.push(("bool_ree", batch));

let batch =
create_ree_bench_batch(DataType::FixedSizeBinary(16), BATCH_SIZE, None, 0.75).unwrap();
batches.push(("fixed_size_binary_ree", batch));

let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, Some(95), 0.75).unwrap();
batches.push(("string_ree_95pct_null", batch));

//let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, 0.25, 0.75).unwrap();
//batches.push(("int32_ree", batch));
let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, Some(95), 0.75).unwrap();
batches.push(("int32_ree_95pct_null", batch));

let batch = create_float_bench_batch_with_nans(BATCH_SIZE, 0.5).unwrap();
batches.push(("float_with_nans", batch));
Expand Down
34 changes: 33 additions & 1 deletion parquet/src/arrow/arrow_writer/levels.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,36 @@ use crate::column::chunker::CdcChunk;
use crate::column::writer::LevelDataRef;
use crate::errors::{ParquetError, Result};
use arrow_array::cast::AsArray;
use arrow_array::{Array, ArrayRef, OffsetSizeTrait};
use arrow_array::types::RunEndIndexType;
use arrow_array::{Array, ArrayRef, Int32Array, OffsetSizeTrait, RunArray, downcast_run_array};
use arrow_buffer::bit_iterator::BitIndexIterator;
use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow_schema::{DataType, Field};
use std::ops::Range;
use std::sync::Arc;

/// Expands a [`DataType::RunEndEncoded`] array into a flat (logical) array of its values type.
///
/// use `arrow_select::take` to materialize the full-length flat array.
/// This is intentionally simple (O(n)); efficiency can/should be improved
fn expand_ree_array(array: &ArrayRef) -> Result<ArrayRef> {
downcast_run_array!(
array => expand_typed_ree(array),
_ => unreachable!("expand_ree_array called on non-REE array"),
)
}

fn expand_typed_ree<R: RunEndIndexType>(run_array: &RunArray<R>) -> Result<ArrayRef> {
let run_ends = run_array.run_ends();
let values = run_array.values();
let len = run_array.len();
let indices: Int32Array = (0..len)
.map(|i| run_ends.get_physical_index(i) as i32)
.collect();
arrow_select::take::take(values.as_ref(), &indices, None)
.map_err(|e| arrow_err!("Failed to expand REE array: {}", e))
}

/// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`]
/// for each leaf column encountered
pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result<Vec<ArrayLevels>> {
Expand Down Expand Up @@ -183,6 +206,15 @@ impl LevelInfoBuilder {
let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone());
Ok(Self::Primitive(levels))
}
DataType::RunEndEncoded(_, value_field) => {
let flat = expand_ree_array(array)?;
let flat_field = Field::new(
field.name(),
value_field.data_type().clone(),
field.is_nullable(),
);
Self::try_new(&flat_field, parent_ctx, &flat)
}
DataType::Struct(children) => {
let array = array.as_struct();
let def_level = match is_nullable {
Expand Down
237 changes: 237 additions & 0 deletions parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,9 @@ impl ArrowColumnWriterFactory {
ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?),
_ => out.push(col(leaves.next().unwrap())?),
},
ArrowDataType::RunEndEncoded(_, value_field) => {
self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)?
}
_ => {
return Err(ParquetError::NYI(format!(
"Attempting to write an Arrow type {data_type} to parquet that is not yet implemented"
Expand Down Expand Up @@ -5193,4 +5196,238 @@ mod tests {
let cc = file_meta.row_group(0).column(0);
assert!(cc.column_index_range().is_none());
}

/// Writes a single-column RecordBatch to an in-memory Parquet buffer.
fn write_column_to_bytes(array: ArrayRef) -> Bytes {
let schema = Arc::new(Schema::new(vec![Field::new(
"col",
array.data_type().clone(),
true,
)]));
let buf = get_bytes_after_close(
schema.clone(),
&RecordBatch::try_new(schema, vec![array]).unwrap(),
);
Bytes::from(buf)
}

/// Reads column 0 from a single-row-group Parquet buffer, projecting it with the given schema.
/// Passing a flat schema when the buffer was written from a REE array lets callers decode
/// the physical values without the run-end encoding wrapper.
fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef {
let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema);
ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts)
.unwrap()
.build()
.unwrap()
.next()
.unwrap()
.unwrap()
.column(0)
.clone()
}

fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) {
let flat_schema = Arc::new(Schema::new(vec![Field::new(
"col",
flat.data_type().clone(),
true,
)]));
let ree_bytes = write_column_to_bytes(ree);
let flat_bytes = write_column_to_bytes(flat.clone());
assert_eq!(
ree_bytes, flat_bytes,
"REE and flat bytes should be identical"
);

let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone());
let decoded_flat = read_column_with_schema(flat_bytes, flat_schema);

assert_eq!(decoded_ree.as_ref(), flat.as_ref());
assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref());
}

#[test]
fn ree_string() {
let ree: ArrayRef = Arc::new(
[Some("a"), Some("a"), None, Some("b"), Some("b")]
.into_iter()
.collect::<Int32RunArray>(),
);
let flat: ArrayRef = Arc::new(StringArray::from(vec![
Some("a"),
Some("a"),
None,
Some("b"),
Some("b"),
]));
ree_write_read_roundtrip(ree, flat);
}

#[test]
fn ree_int32() {
let mut b = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
for v in [Some(1), Some(1), None, Some(2), Some(2)] {
b.append_option(v);
}
let ree: ArrayRef = Arc::new(b.finish());
let flat: ArrayRef = Arc::new(Int32Array::from(vec![
Some(1),
Some(1),
None,
Some(2),
Some(2),
]));
ree_write_read_roundtrip(ree, flat);
}

#[test]
fn ree_bool() {
// run_ends [3, 5, 7] → [T,T,T, null,null, F,F]
let ree: ArrayRef = Arc::new(
RunArray::try_new(
&Int32Array::from(vec![3, 5, 7]),
&BooleanArray::from(vec![Some(true), None, Some(false)]),
)
.unwrap(),
);
let flat: ArrayRef = Arc::new(BooleanArray::from(vec![
Some(true),
Some(true),
Some(true),
None,
None,
Some(false),
Some(false),
]));
ree_write_read_roundtrip(ree, flat);
}

#[test]
fn ree_fixed_size_binary() {
let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray {
let mut b = FixedSizeBinaryBuilder::new(2);
for v in vals {
match v {
Some(x) => b.append_value(x).unwrap(),
None => b.append_null(),
}
}
b.finish()
};
// run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb]
let ree: ArrayRef = Arc::new(
RunArray::try_new(
&Int32Array::from(vec![2, 4, 6]),
&mk(&[Some(b"aa"), None, Some(b"bb")]),
)
.unwrap(),
);
let flat: ArrayRef = Arc::new(mk(&[
Some(b"aa"),
Some(b"aa"),
None,
None,
Some(b"bb"),
Some(b"bb"),
]));
ree_write_read_roundtrip(ree, flat);
}

#[test]
fn ree_single_run() {
let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::<Int32RunArray>());
let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"]));
ree_write_read_roundtrip(ree, flat);
}

#[test]
fn ree_float32() {
// run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5]
let ree: ArrayRef = Arc::new(
RunArray::try_new(
&Int32Array::from(vec![2, 4, 5]),
&Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]),
)
.unwrap(),
);
let flat: ArrayRef = Arc::new(Float32Array::from(vec![
Some(1.0_f32),
Some(1.0_f32),
None,
None,
Some(2.5_f32),
]));
ree_write_read_roundtrip(ree, flat);
}

#[test]
fn ree_sliced() {
// A sliced (non-zero offset) REE array: verify that get_physical_index
// correctly accounts for the logical offset when expanding.
// Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c]
// After slice(2, 5) the logical view is [a, b, b, c, c].
let full: ArrayRef = Arc::new(
RunArray::try_new(
&Int32Array::from(vec![3, 5, 7]),
&StringArray::from(vec!["a", "b", "c"]),
)
.unwrap(),
);
let sliced = full.slice(2, 5);
let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"]));
ree_write_read_roundtrip(sliced, flat);
}

#[test]
fn ree_struct_with_ree_child() {
// Struct with a REE string field and a REE int field — confirms
// recursion visits every child and each collapses to the right leaf type.
let run_ends = Int32Array::from(vec![2i32, 3, 5]);

let col_a: ArrayRef = Arc::new(
RunArray::try_new(
&run_ends,
&StringArray::from(vec![Some("foo"), None, Some("bar")]),
)
.unwrap(),
);
let col_b: ArrayRef = Arc::new(
RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(),
);

let struct_array: ArrayRef = Arc::new(StructArray::new(
Fields::from(vec![
Field::new("a", col_a.data_type().clone(), true),
Field::new("b", col_b.data_type().clone(), true),
]),
vec![col_a, col_b],
None,
));

let schema = Arc::new(Schema::new(vec![Field::new(
"row",
struct_array.data_type().clone(),
true,
)]));
let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap();

let mut buf = Vec::new();
let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap();
writer.write(&batch).unwrap();
let metadata = writer.close().unwrap();

let parquet_schema = metadata.file_metadata().schema_descr();
assert_eq!(parquet_schema.num_columns(), 2);
assert_eq!(
parquet_schema.column(0).physical_type(),
crate::basic::Type::BYTE_ARRAY
);
assert_eq!(parquet_schema.column(0).path().string(), "row.a");
assert_eq!(
parquet_schema.column(1).physical_type(),
crate::basic::Type::INT32
);
assert_eq!(parquet_schema.column(1).path().string(), "row.b");
}
}
Loading
Loading