Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@ required-features = ["test_utils"]
[[bench]]
harness = false
name = "aggregate_vectorized"
required-features = ["test_utils"]

[[bench]]
harness = false
Expand Down
15 changes: 8 additions & 7 deletions datafusion/physical-plan/benches/aggregate_vectorized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use arrow::util::bench_util::{
create_primitive_array, create_string_view_array_with_len,
create_string_view_array_with_max_len,
};
use arrow::util::test_util::seedable_rng;
use arrow_schema::DataType;
use criterion::measurement::WallTime;
use criterion::{
Expand All @@ -30,7 +29,9 @@ use criterion::{
use datafusion_physical_plan::aggregates::group_values::multi_group_by::GroupColumn;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::bytes_view::ByteViewGroupValueBuilder;
use datafusion_physical_plan::aggregates::group_values::multi_group_by::primitive::PrimitiveGroupValueBuilder;
use rand::SeedableRng;
use rand::distr::{Bernoulli, Distribution};
use rand::rngs::StdRng;
use std::hint::black_box;
use std::sync::Arc;

Expand Down Expand Up @@ -128,7 +129,7 @@ fn bytes_bench(
input,
"0.75 true",
{
let mut rng = seedable_rng();
let mut rng = StdRng::seed_from_u64(42);
let d = Bernoulli::new(0.75).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
},
Expand All @@ -141,7 +142,7 @@ fn bytes_bench(
input,
"0.5 true",
{
let mut rng = seedable_rng();
let mut rng = StdRng::seed_from_u64(42);
let d = Bernoulli::new(0.5).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
},
Expand All @@ -154,7 +155,7 @@ fn bytes_bench(
input,
"0.25 true",
{
let mut rng = seedable_rng();
let mut rng = StdRng::seed_from_u64(42);
let d = Bernoulli::new(0.25).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
},
Expand Down Expand Up @@ -236,7 +237,7 @@ fn bench_single_primitive<const NULLABLE: bool>(
&input,
"0.75 true",
{
let mut rng = seedable_rng();
let mut rng = StdRng::seed_from_u64(42);
let d = Bernoulli::new(0.75).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
},
Expand All @@ -249,7 +250,7 @@ fn bench_single_primitive<const NULLABLE: bool>(
&input,
"0.5 true",
{
let mut rng = seedable_rng();
let mut rng = StdRng::seed_from_u64(42);
let d = Bernoulli::new(0.5).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
},
Expand All @@ -262,7 +263,7 @@ fn bench_single_primitive<const NULLABLE: bool>(
&input,
"0.25 true",
{
let mut rng = seedable_rng();
let mut rng = StdRng::seed_from_u64(42);
let d = Bernoulli::new(0.25).unwrap();
(0..size).map(|_| d.sample(&mut rng)).collect::<Vec<_>>()
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,11 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
}
}

fn vectorized_append_inner(&mut self, array: &ArrayRef, rows: &[usize]) {
fn vectorized_append_inner(
&mut self,
array: &ArrayRef,
rows: &[usize],
) -> Result<()> {
let arr = array.as_byte_view::<B>();
let null_count = array.null_count();
let num_rows = array.len();
Expand All @@ -166,8 +170,50 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {

Nulls::None => {
self.nulls.append_n(rows.len(), false);
for &row in rows {
self.do_append_val_inner(arr, row);
if arr.data_buffers().is_empty() {
// Fast path: all strings are inline (≤12 bytes).
// The input array's u128 views are already in the correct format;
// copy them directly instead of going through value() → make_view().
self.views.extend(rows.iter().map(|&row| arr.views()[row]));
} else {
// Slow path: some strings are non-inline (>12 bytes).
// Read views directly to avoid array.value(row) overhead and
// reuse the source view's prefix instead of recomputing it via make_view.
self.views.try_reserve(rows.len()).map_err(|e| {
datafusion_common::exec_datafusion_err!(
"failed to reserve {0} views: {e}",
rows.len()
)
})?;
for &row in rows {
let view = arr.views()[row];
let len = view as u32;
if len <= 12 {
// This row happens to be inline; copy view directly.
self.views.push(view);
} else {
let src = ByteView::from(view);
// ensure_in_progress_big_enough must be called before computing
// new_buffer_index / new_offset — it may flush in_progress to completed.
self.ensure_in_progress_big_enough(len as usize);
let new_buffer_index = self.completed.len() as u32;
let new_offset = self.in_progress.len() as u32;
let src_buf = &arr.data_buffers()[src.buffer_index as usize];
self.in_progress.extend_from_slice(
&src_buf[src.offset as usize
..(src.offset + src.length) as usize],
);
// Reuse prefix from the source view — avoids re-reading first 4 bytes.
let new_view = ByteView {
length: src.length,
prefix: src.prefix,
buffer_index: new_buffer_index,
offset: new_offset,
}
.as_u128();
self.views.push(new_view);
}
}
}
}

Expand All @@ -177,6 +223,7 @@ impl<B: ByteViewType> ByteViewGroupValueBuilder<B> {
self.views.resize(new_len, 0);
}
}
Ok(())
}

fn do_append_val_inner(&mut self, array: &GenericByteViewArray<B>, row: usize)
Expand Down Expand Up @@ -548,8 +595,7 @@ impl<B: ByteViewType> GroupColumn for ByteViewGroupValueBuilder<B> {
}

fn vectorized_append(&mut self, array: &ArrayRef, rows: &[usize]) -> Result<()> {
self.vectorized_append_inner(array, rows);
Ok(())
self.vectorized_append_inner(array, rows)
}

fn len(&self) -> usize {
Expand Down
Loading