diff --git a/parquet/benches/arrow_writer.rs b/parquet/benches/arrow_writer.rs index 80d3e7144b19..6b1866353b08 100644 --- a/parquet/benches/arrow_writer.rs +++ b/parquet/benches/arrow_writer.rs @@ -159,14 +159,14 @@ fn create_string_dictionary_bench_batch( true_density, )?) } -// commenting out until implementation of RunEndEncoded is complete. See https://github.com/apache/arrow-rs/pull/9936#discussion_r3242936421 -#[allow(dead_code)] fn create_ree_bench_batch( value_dt: DataType, size: usize, - null_density: f32, + null_pct: Option, true_density: f32, ) -> Result { + const DEFAULT_NULL_PCT: u8 = 10; + let null_density = null_pct.unwrap_or(DEFAULT_NULL_PCT) as f32 / 100.0; let fields = vec![Field::new( "_1", DataType::RunEndEncoded( @@ -458,11 +458,24 @@ fn create_batches() -> Vec<(&'static str, RecordBatch)> { let batch = create_string_bench_batch_non_null(BATCH_SIZE, 0.25, 0.75).unwrap(); batches.push(("string_non_null", batch)); - //let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, 0.25, 0.75).unwrap(); - //batches.push(("string_ree", batch)); + let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, None, 0.75).unwrap(); + batches.push(("string_ree", batch)); + + let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, None, 0.75).unwrap(); + batches.push(("int32_ree", batch)); + + let batch = create_ree_bench_batch(DataType::Boolean, BATCH_SIZE, None, 0.75).unwrap(); + batches.push(("bool_ree", batch)); + + let batch = + create_ree_bench_batch(DataType::FixedSizeBinary(16), BATCH_SIZE, None, 0.75).unwrap(); + batches.push(("fixed_size_binary_ree", batch)); + + let batch = create_ree_bench_batch(DataType::Utf8, BATCH_SIZE, Some(95), 0.75).unwrap(); + batches.push(("string_ree_95pct_null", batch)); - //let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, 0.25, 0.75).unwrap(); - //batches.push(("int32_ree", batch)); + let batch = create_ree_bench_batch(DataType::Int32, BATCH_SIZE, Some(95), 0.75).unwrap(); + batches.push(("int32_ree_95pct_null", batch)); let batch = create_float_bench_batch_with_nans(BATCH_SIZE, 0.5).unwrap(); batches.push(("float_with_nans", batch)); diff --git a/parquet/src/arrow/arrow_writer/levels.rs b/parquet/src/arrow/arrow_writer/levels.rs index 10f90f707c08..385b021e25e4 100644 --- a/parquet/src/arrow/arrow_writer/levels.rs +++ b/parquet/src/arrow/arrow_writer/levels.rs @@ -44,13 +44,36 @@ use crate::column::chunker::CdcChunk; use crate::column::writer::LevelDataRef; use crate::errors::{ParquetError, Result}; use arrow_array::cast::AsArray; -use arrow_array::{Array, ArrayRef, OffsetSizeTrait}; +use arrow_array::types::RunEndIndexType; +use arrow_array::{Array, ArrayRef, Int32Array, OffsetSizeTrait, RunArray, downcast_run_array}; use arrow_buffer::bit_iterator::BitIndexIterator; use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer}; use arrow_schema::{DataType, Field}; use std::ops::Range; use std::sync::Arc; +/// Expands a [`DataType::RunEndEncoded`] array into a flat (logical) array of its values type. +/// +/// use `arrow_select::take` to materialize the full-length flat array. +/// This is intentionally simple (O(n)); efficiency can/should be improved +fn expand_ree_array(array: &ArrayRef) -> Result { + downcast_run_array!( + array => expand_typed_ree(array), + _ => unreachable!("expand_ree_array called on non-REE array"), + ) +} + +fn expand_typed_ree(run_array: &RunArray) -> Result { + let run_ends = run_array.run_ends(); + let values = run_array.values(); + let len = run_array.len(); + let indices: Int32Array = (0..len) + .map(|i| run_ends.get_physical_index(i) as i32) + .collect(); + arrow_select::take::take(values.as_ref(), &indices, None) + .map_err(|e| arrow_err!("Failed to expand REE array: {}", e)) +} + /// Performs a depth-first scan of the children of `array`, constructing [`ArrayLevels`] /// for each leaf column encountered pub(crate) fn calculate_array_levels(array: &ArrayRef, field: &Field) -> Result> { @@ -183,6 +206,15 @@ impl LevelInfoBuilder { let levels = ArrayLevels::new(parent_ctx, is_nullable, array.clone()); Ok(Self::Primitive(levels)) } + DataType::RunEndEncoded(_, value_field) => { + let flat = expand_ree_array(array)?; + let flat_field = Field::new( + field.name(), + value_field.data_type().clone(), + field.is_nullable(), + ); + Self::try_new(&flat_field, parent_ctx, &flat) + } DataType::Struct(children) => { let array = array.as_struct(); let def_level = match is_nullable { diff --git a/parquet/src/arrow/arrow_writer/mod.rs b/parquet/src/arrow/arrow_writer/mod.rs index 79542caed9b7..f2ee5fb45c7c 100644 --- a/parquet/src/arrow/arrow_writer/mod.rs +++ b/parquet/src/arrow/arrow_writer/mod.rs @@ -1285,6 +1285,9 @@ impl ArrowColumnWriterFactory { ArrowDataType::FixedSizeBinary(_) => out.push(bytes(leaves.next().unwrap())?), _ => out.push(col(leaves.next().unwrap())?), }, + ArrowDataType::RunEndEncoded(_, value_field) => { + self.get_arrow_column_writer(value_field.data_type(), props, leaves, out)? + } _ => { return Err(ParquetError::NYI(format!( "Attempting to write an Arrow type {data_type} to parquet that is not yet implemented" @@ -5193,4 +5196,238 @@ mod tests { let cc = file_meta.row_group(0).column(0); assert!(cc.column_index_range().is_none()); } + + /// Writes a single-column RecordBatch to an in-memory Parquet buffer. + fn write_column_to_bytes(array: ArrayRef) -> Bytes { + let schema = Arc::new(Schema::new(vec![Field::new( + "col", + array.data_type().clone(), + true, + )])); + let buf = get_bytes_after_close( + schema.clone(), + &RecordBatch::try_new(schema, vec![array]).unwrap(), + ); + Bytes::from(buf) + } + + /// Reads column 0 from a single-row-group Parquet buffer, projecting it with the given schema. + /// Passing a flat schema when the buffer was written from a REE array lets callers decode + /// the physical values without the run-end encoding wrapper. + fn read_column_with_schema(bytes: Bytes, schema: SchemaRef) -> ArrayRef { + let opts = crate::arrow::arrow_reader::ArrowReaderOptions::new().with_schema(schema); + ParquetRecordBatchReaderBuilder::try_new_with_options(bytes, opts) + .unwrap() + .build() + .unwrap() + .next() + .unwrap() + .unwrap() + .column(0) + .clone() + } + + fn ree_write_read_roundtrip(ree: ArrayRef, flat: ArrayRef) { + let flat_schema = Arc::new(Schema::new(vec![Field::new( + "col", + flat.data_type().clone(), + true, + )])); + let ree_bytes = write_column_to_bytes(ree); + let flat_bytes = write_column_to_bytes(flat.clone()); + assert_eq!( + ree_bytes, flat_bytes, + "REE and flat bytes should be identical" + ); + + let decoded_ree = read_column_with_schema(ree_bytes, flat_schema.clone()); + let decoded_flat = read_column_with_schema(flat_bytes, flat_schema); + + assert_eq!(decoded_ree.as_ref(), flat.as_ref()); + assert_eq!(decoded_ree.as_ref(), decoded_flat.as_ref()); + } + + #[test] + fn ree_string() { + let ree: ArrayRef = Arc::new( + [Some("a"), Some("a"), None, Some("b"), Some("b")] + .into_iter() + .collect::(), + ); + let flat: ArrayRef = Arc::new(StringArray::from(vec![ + Some("a"), + Some("a"), + None, + Some("b"), + Some("b"), + ])); + ree_write_read_roundtrip(ree, flat); + } + + #[test] + fn ree_int32() { + let mut b = PrimitiveRunBuilder::::new(); + for v in [Some(1), Some(1), None, Some(2), Some(2)] { + b.append_option(v); + } + let ree: ArrayRef = Arc::new(b.finish()); + let flat: ArrayRef = Arc::new(Int32Array::from(vec![ + Some(1), + Some(1), + None, + Some(2), + Some(2), + ])); + ree_write_read_roundtrip(ree, flat); + } + + #[test] + fn ree_bool() { + // run_ends [3, 5, 7] → [T,T,T, null,null, F,F] + let ree: ArrayRef = Arc::new( + RunArray::try_new( + &Int32Array::from(vec![3, 5, 7]), + &BooleanArray::from(vec![Some(true), None, Some(false)]), + ) + .unwrap(), + ); + let flat: ArrayRef = Arc::new(BooleanArray::from(vec![ + Some(true), + Some(true), + Some(true), + None, + None, + Some(false), + Some(false), + ])); + ree_write_read_roundtrip(ree, flat); + } + + #[test] + fn ree_fixed_size_binary() { + let mk = |vals: &[Option<&[u8]>]| -> FixedSizeBinaryArray { + let mut b = FixedSizeBinaryBuilder::new(2); + for v in vals { + match v { + Some(x) => b.append_value(x).unwrap(), + None => b.append_null(), + } + } + b.finish() + }; + // run_ends [2, 4, 6] → [aa,aa, null,null, bb,bb] + let ree: ArrayRef = Arc::new( + RunArray::try_new( + &Int32Array::from(vec![2, 4, 6]), + &mk(&[Some(b"aa"), None, Some(b"bb")]), + ) + .unwrap(), + ); + let flat: ArrayRef = Arc::new(mk(&[ + Some(b"aa"), + Some(b"aa"), + None, + None, + Some(b"bb"), + Some(b"bb"), + ])); + ree_write_read_roundtrip(ree, flat); + } + + #[test] + fn ree_single_run() { + let ree: ArrayRef = Arc::new(["x", "x", "x"].into_iter().collect::()); + let flat: ArrayRef = Arc::new(StringArray::from(vec!["x", "x", "x"])); + ree_write_read_roundtrip(ree, flat); + } + + #[test] + fn ree_float32() { + // run_ends [2, 4, 5] → [1.0, 1.0, null, null, 2.5] + let ree: ArrayRef = Arc::new( + RunArray::try_new( + &Int32Array::from(vec![2, 4, 5]), + &Float32Array::from(vec![Some(1.0_f32), None, Some(2.5_f32)]), + ) + .unwrap(), + ); + let flat: ArrayRef = Arc::new(Float32Array::from(vec![ + Some(1.0_f32), + Some(1.0_f32), + None, + None, + Some(2.5_f32), + ])); + ree_write_read_roundtrip(ree, flat); + } + + #[test] + fn ree_sliced() { + // A sliced (non-zero offset) REE array: verify that get_physical_index + // correctly accounts for the logical offset when expanding. + // Full array: run_ends [3, 5, 7] → [a,a,a, b,b, c,c] + // After slice(2, 5) the logical view is [a, b, b, c, c]. + let full: ArrayRef = Arc::new( + RunArray::try_new( + &Int32Array::from(vec![3, 5, 7]), + &StringArray::from(vec!["a", "b", "c"]), + ) + .unwrap(), + ); + let sliced = full.slice(2, 5); + let flat: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "b", "c", "c"])); + ree_write_read_roundtrip(sliced, flat); + } + + #[test] + fn ree_struct_with_ree_child() { + // Struct with a REE string field and a REE int field — confirms + // recursion visits every child and each collapses to the right leaf type. + let run_ends = Int32Array::from(vec![2i32, 3, 5]); + + let col_a: ArrayRef = Arc::new( + RunArray::try_new( + &run_ends, + &StringArray::from(vec![Some("foo"), None, Some("bar")]), + ) + .unwrap(), + ); + let col_b: ArrayRef = Arc::new( + RunArray::try_new(&run_ends, &Int32Array::from(vec![Some(1), None, Some(2)])).unwrap(), + ); + + let struct_array: ArrayRef = Arc::new(StructArray::new( + Fields::from(vec![ + Field::new("a", col_a.data_type().clone(), true), + Field::new("b", col_b.data_type().clone(), true), + ]), + vec![col_a, col_b], + None, + )); + + let schema = Arc::new(Schema::new(vec![Field::new( + "row", + struct_array.data_type().clone(), + true, + )])); + let batch = RecordBatch::try_new(schema.clone(), vec![struct_array]).unwrap(); + + let mut buf = Vec::new(); + let mut writer = ArrowWriter::try_new(&mut buf, schema, None).unwrap(); + writer.write(&batch).unwrap(); + let metadata = writer.close().unwrap(); + + let parquet_schema = metadata.file_metadata().schema_descr(); + assert_eq!(parquet_schema.num_columns(), 2); + assert_eq!( + parquet_schema.column(0).physical_type(), + crate::basic::Type::BYTE_ARRAY + ); + assert_eq!(parquet_schema.column(0).path().string(), "row.a"); + assert_eq!( + parquet_schema.column(1).physical_type(), + crate::basic::Type::INT32 + ); + assert_eq!(parquet_schema.column(1).path().string(), "row.b"); + } } diff --git a/parquet/src/arrow/schema/mod.rs b/parquet/src/arrow/schema/mod.rs index 7fe6fbc9d93d..7878e7c49bd3 100644 --- a/parquet/src/arrow/schema/mod.rs +++ b/parquet/src/arrow/schema/mod.rs @@ -311,6 +311,15 @@ pub fn encode_arrow_schema(schema: &Schema) -> String { BASE64_STANDARD.encode(&len_prefix_schema) } +fn flatten_ree_field(field: &Field) -> Field { + match field.data_type() { + DataType::RunEndEncoded(_, value_field) => field + .clone() + .with_data_type(value_field.data_type().clone()), + _ => field.clone(), + } +} + /// Mutates writer metadata by storing the encoded Arrow schema hint in /// [`ARROW_SCHEMA_META_KEY`]. /// @@ -318,6 +327,22 @@ pub fn encode_arrow_schema(schema: &Schema) -> String { /// /// [`ARROW_SCHEMA_META_KEY`]: crate::arrow::ARROW_SCHEMA_META_KEY pub fn add_encoded_arrow_schema_to_metadata(schema: &Schema, props: &mut WriterProperties) { + let has_ree = schema + .fields() + .iter() + .any(|f| matches!(f.data_type(), DataType::RunEndEncoded(_, _))); + let flat_schema; + let schema = if has_ree { + let flat_fields: Vec = schema + .fields() + .iter() + .map(|f| flatten_ree_field(f)) + .collect(); + flat_schema = Schema::new_with_metadata(flat_fields, schema.metadata().clone()); + &flat_schema + } else { + schema + }; let encoded = encode_arrow_schema(schema); let schema_kv = KeyValue { @@ -833,9 +858,12 @@ fn arrow_to_parquet_type(field: &Field, coerce_types: bool) -> Result { let dict_field = field.clone().with_data_type(value.as_ref().clone()); arrow_to_parquet_type(&dict_field, coerce_types) } - DataType::RunEndEncoded(_, _) => Err(arrow_err!( - "Converting RunEndEncodedType to parquet not supported", - )), + DataType::RunEndEncoded(_, value_field) => { + let ree_value_field = field + .clone() + .with_data_type(value_field.data_type().clone()); + arrow_to_parquet_type(&ree_value_field, coerce_types) + } } }