Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 113 additions & 0 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,68 @@ impl MemoryReservation {
pub fn take(&mut self) -> MemoryReservation {
self.split(self.size.load(atomic::Ordering::Relaxed))
}

/// Attempts to grow the reservation by `capacity` bytes and returns
/// a [`ReservationGuard`] that will automatically shrink the reservation
/// when dropped.
///
/// This is useful for tracking transient allocations that are freed
/// when a scope exits (including error paths via `?`).
///
/// Call [`ReservationGuard::release`] to prevent the automatic shrink
/// when ownership of the allocated memory is transferred elsewhere.
pub fn try_grow_guard(&self, capacity: usize) -> Result<ReservationGuard<'_>> {
self.try_grow(capacity)?;
Ok(ReservationGuard {
reservation: self,
size: capacity,
})
}

/// Grows the reservation by `capacity` bytes (infallible) and returns
/// a [`ReservationGuard`] that will automatically shrink on drop.
///
/// Use only for named exceptions where the allocation is required for
/// correctness/progress and cannot be deferred (e.g., join probe-side
/// index arrays that must be allocated to produce results).
pub fn grow_guard(&self, capacity: usize) -> ReservationGuard<'_> {
self.grow(capacity);
ReservationGuard {
reservation: self,
size: capacity,
}
}
}

/// RAII guard that automatically shrinks a [`MemoryReservation`] on drop.
///
/// Created by [`MemoryReservation::try_grow_guard`]. When the guard is
/// dropped, it shrinks the reservation by the guarded size. Call
/// [`Self::release`] to transfer ownership and prevent the automatic shrink.
pub struct ReservationGuard<'a> {
reservation: &'a MemoryReservation,
size: usize,
}

impl ReservationGuard<'_> {
/// Prevents the automatic shrink on drop, effectively transferring
/// ownership of the reserved bytes to a longer-lived reservation.
pub fn release(mut self) {
self.size = 0;
}

/// Returns the guarded size in bytes.
pub fn size(&self) -> usize {
self.size
}
}

impl Drop for ReservationGuard<'_> {
fn drop(&mut self) {
if self.size > 0 {
self.reservation.shrink(self.size);
}
}
}

impl Drop for MemoryReservation {
Expand Down Expand Up @@ -670,4 +732,55 @@ mod tests {
assert_eq!(r1.size(), 0);
assert_eq!(pool.reserved(), 80);
}

#[test]
fn test_try_grow_guard_auto_shrinks() {
let pool = Arc::new(GreedyMemoryPool::new(1000)) as _;
let r1 = MemoryConsumer::new("test").register(&pool);

{
let _guard = r1.try_grow_guard(100).unwrap();
assert_eq!(r1.size(), 100);
assert_eq!(pool.reserved(), 100);
}
assert_eq!(r1.size(), 0);
assert_eq!(pool.reserved(), 0);
}

#[test]
fn test_try_grow_guard_release_prevents_shrink() {
let pool = Arc::new(GreedyMemoryPool::new(1000)) as _;
let r1 = MemoryConsumer::new("test").register(&pool);

{
let guard = r1.try_grow_guard(100).unwrap();
guard.release();
}
assert_eq!(r1.size(), 100);
assert_eq!(pool.reserved(), 100);
}

#[test]
fn test_grow_guard_auto_shrinks() {
let pool = Arc::new(GreedyMemoryPool::new(1000)) as _;
let r1 = MemoryConsumer::new("test").register(&pool);

{
let _guard = r1.grow_guard(200);
assert_eq!(r1.size(), 200);
}
assert_eq!(r1.size(), 0);
assert_eq!(pool.reserved(), 0);
}

#[test]
fn test_try_grow_guard_error_path() {
let pool = Arc::new(GreedyMemoryPool::new(50)) as _;
let r1 = MemoryConsumer::new("test").register(&pool);

let result = r1.try_grow_guard(100);
assert!(result.is_err());
assert_eq!(r1.size(), 0);
assert_eq!(pool.reserved(), 0);
}
}
28 changes: 21 additions & 7 deletions datafusion/physical-plan/benches/spill_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use criterion::{
use datafusion_common::config::SpillCompression;
use datafusion_common::human_readable_size;
use datafusion_common::instant::Instant;
use datafusion_execution::memory_pool::MemoryConsumer;
use datafusion_execution::runtime_env::RuntimeEnv;
use datafusion_physical_plan::SpillManager;
use datafusion_physical_plan::common::collect;
Expand Down Expand Up @@ -90,7 +91,14 @@ fn bench_spill_io(c: &mut Criterion) {
Field::new("c2", DataType::Date32, true),
Field::new("c3", DataType::Decimal128(11, 2), true),
]));
let spill_manager = SpillManager::new(env, metrics, schema);
let spill_manager = SpillManager::new(
Arc::clone(&env),
metrics,
Arc::clone(&schema),
MemoryConsumer::new("bench")
.with_can_spill(true)
.register(&env.memory_pool),
);

let mut group = c.benchmark_group("spill_io");
let rt = Runtime::new().unwrap();
Expand All @@ -116,7 +124,7 @@ fn bench_spill_io(c: &mut Criterion) {
|spill_file| {
rt.block_on(async {
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.read_spill_as_stream(spill_file, None, None)
.unwrap();
let _ = collect(stream).await.unwrap();
})
Expand Down Expand Up @@ -504,9 +512,15 @@ fn benchmark_spill_batches_for_all_codec(

for &compression in compressions {
let metrics = SpillMetrics::new(&ExecutionPlanMetricsSet::new(), 0);
let spill_manager =
SpillManager::new(Arc::clone(&env), metrics.clone(), Arc::clone(&schema))
.with_compression_type(compression);
let spill_manager = SpillManager::new(
Arc::clone(&env),
metrics.clone(),
Arc::clone(&schema),
MemoryConsumer::new("bench")
.with_can_spill(true)
.register(&env.memory_pool),
)
.with_compression_type(compression);

let bench_id = BenchmarkId::new(batch_label, compression.to_string());
group.bench_with_input(bench_id, &spill_manager, |b, spill_manager| {
Expand All @@ -522,7 +536,7 @@ fn benchmark_spill_batches_for_all_codec(
.unwrap()
.unwrap();
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.read_spill_as_stream(spill_file, None, None)
.unwrap();
let _ = collect(stream).await.unwrap();
})
Expand Down Expand Up @@ -557,7 +571,7 @@ fn benchmark_spill_batches_for_all_codec(
let start = Instant::now();
rt.block_on(async {
let stream = spill_manager
.read_spill_as_stream(spill_file, None)
.read_spill_as_stream(spill_file, None, None)
.unwrap();
let _ = collect(stream).await.unwrap();
});
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/aggregates/group_values/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ pub trait GroupValues: Send {
/// Emits the group values
fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>>;

/// Returns an estimate of the memory that will be allocated by [`Self::emit`]
/// for the decode/output buffers.
///
/// This is used by the aggregation operator to pre-reserve memory before
/// calling `emit()`, ensuring the memory pool is aware of transient
/// decode buffer allocations.
fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize;

/// Clear the contents and shrink the capacity to the size of the batch (free up memory usage)
fn clear_shrink(&mut self, num_rows: usize);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1088,6 +1088,19 @@ impl<const STREAMING: bool> GroupValues for GroupValuesColumn<STREAMING> {
self.group_values[0].len()
}

fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize {
let total = self.len();
let emit_count = match emit_to {
EmitTo::All => total,
EmitTo::First(n) => (*n).min(total),
};
if total == 0 {
return 0;
}
let group_values_size: usize = self.group_values.iter().map(|v| v.size()).sum();
group_values_size * emit_count / total
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let mut output = match emit_to {
EmitTo::All => {
Expand Down
15 changes: 15 additions & 0 deletions datafusion/physical-plan/src/aggregates/group_values/row.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,21 @@ impl GroupValues for GroupValuesRows {
.unwrap_or(0)
}

fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize {
let total_rows = self.len();
if total_rows == 0 {
return 0;
}
let rows_size = self.group_values.as_ref().map(|v| v.size()).unwrap_or(0);
match emit_to {
EmitTo::All => rows_size,
EmitTo::First(n) => {
let n = (*n).min(total_rows);
rows_size * n / total_rows
}
}
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let mut group_values = self
.group_values
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,14 @@ impl GroupValues for GroupValuesBoolean {
+ self.null_group.is_some() as usize
}

fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize {
let emit_count = match emit_to {
EmitTo::All => self.len(),
EmitTo::First(n) => (*n).min(self.len()),
};
emit_count.div_ceil(8) * 2
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
let len = self.len();
let mut builder = BooleanBufferBuilder::new(len);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ impl<O: OffsetSizeTrait> GroupValues for GroupValuesBytes<O> {
self.num_groups
}

fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize {
let total = self.len();
let emit_count = match emit_to {
EmitTo::All => total,
EmitTo::First(n) => (*n).min(total),
};
if total == 0 {
return 0;
}
// Offsets + data bytes (proportional) + null bitmap
(emit_count + 1) * std::mem::size_of::<O>()
+ self.size() * emit_count / total
+ emit_count.div_ceil(8)
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,17 @@ impl GroupValues for GroupValuesBytesView {
self.num_groups
}

fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize {
let emit_count = match emit_to {
EmitTo::All => self.len(),
EmitTo::First(n) => (*n).min(self.len()),
};
// Views (16 bytes each) + out-of-line data estimate + null bitmap
emit_count * 16
+ self.size() * emit_count / self.len().max(1)
+ emit_count.div_ceil(8)
}

fn emit(&mut self, emit_to: EmitTo) -> datafusion_common::Result<Vec<ArrayRef>> {
// Reset the map to default, and convert it into a single array
let map_contents = self.map.take().into_state();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,14 @@ where
self.values.len()
}

fn estimated_emit_size(&self, emit_to: &EmitTo) -> usize {
let emit_count = match emit_to {
EmitTo::All => self.len(),
EmitTo::First(n) => (*n).min(self.len()),
};
emit_count * std::mem::size_of::<T::Native>() + emit_count.div_ceil(8)
}

fn emit(&mut self, emit_to: EmitTo) -> Result<Vec<ArrayRef>> {
fn build_primitive<T: ArrowPrimitiveType>(
values: Vec<T::Native>,
Expand Down
Loading
Loading