From d0dfe9923db72526c7abfb20864018c42cb21dab Mon Sep 17 00:00:00 2001 From: EeshanBembi Date: Thu, 23 Apr 2026 12:27:23 +0530 Subject: [PATCH 1/2] perf: fast-path inline strings in ByteViewGroupValueBuilder::vectorized_append MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the input StringView/BinaryView array has no data buffers (all values ≤12 bytes, stored inline), skip the value() → make_view() round-trip in do_append_val_inner and instead copy the u128 views directly. Arrow guarantees valid arrays have zero-padded inline views, so the direct copy is semantically identical and lets the compiler vectorize the loop. Also pre-reserve views capacity in the slow path (non-inline strings) to avoid repeated Vec reallocation. Closes #21568 --- .../group_values/multi_group_by/bytes_view.rs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index e94e4547e1a75..b812508e9b7b6 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -166,8 +166,18 @@ impl ByteViewGroupValueBuilder { Nulls::None => { self.nulls.append_n(rows.len(), false); - for &row in rows { - self.do_append_val_inner(arr, row); + if arr.data_buffers().is_empty() { + // Fast path: all strings are inline (≤12 bytes). + // The input array's u128 views are already in the correct format; + // copy them directly instead of going through value() → make_view(). + self.views.extend(rows.iter().map(|&row| arr.views()[row])); + } else { + // Slow path: some strings are non-inline (>12 bytes). + // Pre-reserve views capacity to avoid repeated reallocation. + self.views.reserve(rows.len()); + for &row in rows { + self.do_append_val_inner(arr, row); + } } } From 3701c099d57b1ba6bc8e9528fb19ed71cfb64a8f Mon Sep 17 00:00:00 2001 From: EeshanBembi Date: Thu, 23 Apr 2026 15:03:33 +0530 Subject: [PATCH 2/2] perf: optimize slow path and fix benchmark in ByteViewGroupValueBuilder MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three follow-up changes addressing PR review and CI feedback on #21794: 1. Remove `required-features = ["test_utils"]` from the `aggregate_vectorized` bench target. The gate caused the bench to be silently skipped when CI ran `cargo bench --features=parquet --bench aggregate_vectorized`. The bench compiles fine in a workspace build where `arrow/test_utils` is already enabled via feature unification through `datafusion-physical-expr`. 2. Replace `arrow::util::test_util::seedable_rng()` with `StdRng::seed_from_u64(42)` from the already-present `rand` crate. This was the only import from `test_util` (as opposed to `bench_util`) and removes the last explicit dependency on the `test_utils` feature in the benchmark. 3. Optimize the slow path in `vectorized_append_inner` (non-inline strings, `!data_buffers().is_empty()`), addressing Dandandan's review comment. Instead of calling `do_append_val_inner` which goes through `array.value(row)` (buffer lookup + slice construction) and then `make_view` (re-reads the first 4 bytes to build the prefix), the new path reads `arr.views()[row]` directly and copies `src.prefix` from the source `ByteView`. Benchmarks show 41–53% improvement for 64-byte strings and 6–16% improvement for random-length strings (null_density=0.0). --- datafusion/physical-plan/Cargo.toml | 1 - .../benches/aggregate_vectorized.rs | 15 +++--- .../group_values/multi_group_by/bytes_view.rs | 48 ++++++++++++++++--- 3 files changed, 50 insertions(+), 14 deletions(-) diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0fc75043bf333..4b2b31febef2a 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -123,7 +123,6 @@ required-features = ["test_utils"] [[bench]] harness = false name = "aggregate_vectorized" -required-features = ["test_utils"] [[bench]] harness = false diff --git a/datafusion/physical-plan/benches/aggregate_vectorized.rs b/datafusion/physical-plan/benches/aggregate_vectorized.rs index 48ca76d80d2d3..488647d5f8315 100644 --- a/datafusion/physical-plan/benches/aggregate_vectorized.rs +++ b/datafusion/physical-plan/benches/aggregate_vectorized.rs @@ -21,7 +21,6 @@ use arrow::util::bench_util::{ create_primitive_array, create_string_view_array_with_len, create_string_view_array_with_max_len, }; -use arrow::util::test_util::seedable_rng; use arrow_schema::DataType; use criterion::measurement::WallTime; use criterion::{ @@ -30,7 +29,9 @@ use criterion::{ use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn; use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder; use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder; +use rand::SeedableRng; use rand::distr::{Bernoulli, Distribution}; +use rand::rngs::StdRng; use std::hint::black_box; use std::sync::Arc; @@ -128,7 +129,7 @@ fn bytes_bench( input, "0.75 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.75).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -141,7 +142,7 @@ fn bytes_bench( input, "0.5 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.5).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -154,7 +155,7 @@ fn bytes_bench( input, "0.25 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.25).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -236,7 +237,7 @@ fn bench_single_primitive( &input, "0.75 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.75).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -249,7 +250,7 @@ fn bench_single_primitive( &input, "0.5 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.5).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, @@ -262,7 +263,7 @@ fn bench_single_primitive( &input, "0.25 true", { - let mut rng = seedable_rng(); + let mut rng = StdRng::seed_from_u64(42); let d = Bernoulli::new(0.25).unwrap(); (0..size).map(|_| d.sample(&mut rng)).collect::>() }, diff --git a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs index b812508e9b7b6..abc3aba88ad48 100644 --- a/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs +++ b/datafusion/physical-plan/src/aggregates/group_values/multi_group_by/bytes_view.rs @@ -145,7 +145,11 @@ impl ByteViewGroupValueBuilder { } } - fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) { + fn vectorized_append_inner( + &mut self, + array: &ArrayRef, + rows: &[usize], + ) -> Result<()> { let arr = array.as_byte_view::(); let null_count = array.null_count(); let num_rows = array.len(); @@ -173,10 +177,42 @@ impl ByteViewGroupValueBuilder { self.views.extend(rows.iter().map(|&row| arr.views()[row])); } else { // Slow path: some strings are non-inline (>12 bytes). - // Pre-reserve views capacity to avoid repeated reallocation. - self.views.reserve(rows.len()); + // Read views directly to avoid array.value(row) overhead and + // reuse the source view's prefix instead of recomputing it via make_view. + self.views.try_reserve(rows.len()).map_err(|e| { + datafusion_common::exec_datafusion_err!( + "failed to reserve {0} views: {e}", + rows.len() + ) + })?; for &row in rows { - self.do_append_val_inner(arr, row); + let view = arr.views()[row]; + let len = view as u32; + if len <= 12 { + // This row happens to be inline; copy view directly. + self.views.push(view); + } else { + let src = ByteView::from(view); + // ensure_in_progress_big_enough must be called before computing + // new_buffer_index / new_offset — it may flush in_progress to completed. + self.ensure_in_progress_big_enough(len as usize); + let new_buffer_index = self.completed.len() as u32; + let new_offset = self.in_progress.len() as u32; + let src_buf = &arr.data_buffers()[src.buffer_index as usize]; + self.in_progress.extend_from_slice( + &src_buf[src.offset as usize + ..(src.offset + src.length) as usize], + ); + // Reuse prefix from the source view — avoids re-reading first 4 bytes. + let new_view = ByteView { + length: src.length, + prefix: src.prefix, + buffer_index: new_buffer_index, + offset: new_offset, + } + .as_u128(); + self.views.push(new_view); + } } } } @@ -187,6 +223,7 @@ impl ByteViewGroupValueBuilder { self.views.resize(new_len, 0); } } + Ok(()) } fn do_append_val_inner(&mut self, array: &GenericByteViewArray, row: usize) @@ -558,8 +595,7 @@ impl GroupColumn for ByteViewGroupValueBuilder { } fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> { - self.vectorized_append_inner(array, rows); - Ok(()) + self.vectorized_append_inner(array, rows) } fn len(&self) -> usize {