diff --git a/datafusion/execution/Cargo.toml b/datafusion/execution/Cargo.toml index 06c84d8acb493..b35ee3c531603 100644 --- a/datafusion/execution/Cargo.toml +++ b/datafusion/execution/Cargo.toml @@ -47,6 +47,7 @@ parquet_encryption = [ "parquet/encryption", ] arrow_buffer_pool = [ + "arrow/pool", "arrow-buffer/pool", ] sql = [] diff --git a/datafusion/execution/src/memory_pool/arrow.rs b/datafusion/execution/src/memory_pool/arrow.rs index 929e3b7bd27e5..b4b737347129f 100644 --- a/datafusion/execution/src/memory_pool/arrow.rs +++ b/datafusion/execution/src/memory_pool/arrow.rs @@ -18,31 +18,92 @@ //! Adapter for integrating DataFusion's [`MemoryPool`] with Arrow's memory tracking APIs. use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation}; +use arrow::array::Array; +use arrow::record_batch::RecordBatch; use std::fmt::Debug; use std::sync::Arc; /// An adapter that implements Arrow's [`arrow_buffer::MemoryPool`] trait /// by wrapping a DataFusion [`MemoryPool`]. /// -/// This allows DataFusion's memory management system to be used with Arrow's -/// memory allocation APIs. Each reservation made through this pool will be -/// tracked using the provided [`MemoryConsumer`], enabling DataFusion to -/// monitor and limit memory usage across Arrow operations. +/// All reservations made through this pool grow a single shared +/// [`MemoryReservation`]. This keeps `FairSpillPool`'s bookkeeping correct: /// -/// This is useful when you want Arrow operations (such as array builders -/// or compute kernels) to participate in DataFusion's memory management -/// and respect the same memory limits as DataFusion operators. +/// - [`Self::new`] creates its own consumer registration — use this when the +/// pool stands alone (e.g. tests, standalone Arrow computation). +/// - [`Self::from_reservation`] re-uses an existing reservation — use this +/// inside an operator so that claimed Arrow-buffer bytes are accounted under +/// the *same* pool consumer as the operator's main reservation. That way +/// neither `num_spill` nor `unspillable` changes, and the fair-share +/// formula is unaffected. #[derive(Debug)] pub struct ArrowMemoryPool { inner: Arc, - consumer: MemoryConsumer, + shared: Arc, } impl ArrowMemoryPool { - /// Creates a new [`ArrowMemoryPool`] that wraps the given DataFusion [`MemoryPool`] - /// and tracks allocations under the specified [`MemoryConsumer`]. + /// Creates a new [`ArrowMemoryPool`] with its own consumer registration. pub fn new(inner: Arc, consumer: MemoryConsumer) -> Self { - Self { inner, consumer } + let shared = Arc::new(consumer.register(&inner)); + Self { inner, shared } + } + + /// Creates a pool backed by a sibling of `reservation`. + /// + /// Calls [`MemoryReservation::new_empty`] to create a zero-size reservation + /// that shares the same [`Arc`] registration (and therefore the same pool + /// consumer) as `reservation`. This means: + /// + /// - No new consumer is registered: `FairSpillPool::num_spill` and + /// `unspillable` are both unaffected. + /// - Claimed bytes are counted toward the same `spillable`/`unspillable` + /// bucket as the operator's main reservation. + /// - The per-reservation `size` field checked by `FairSpillPool`'s + /// fair-share formula is **independent** of the main reservation's size, + /// so `update_memory_reservation()` cannot undercut live claim handles. + pub fn from_reservation( + inner: Arc, + reservation: &MemoryReservation, + ) -> Self { + let shared = Arc::new(reservation.new_empty()); + Self { inner, shared } + } +} + +/// Tracks one buffer's share of the [`ArrowMemoryPool`] shared reservation. +/// +/// On resize it adjusts the shared [`MemoryReservation`] by the delta. +/// On drop it releases the buffer's bytes from the shared reservation. +/// +/// `MemoryReservation` uses atomic interior mutability, so no external lock is +/// needed: `grow` / `shrink` are `&self` methods. +#[derive(Debug)] +struct SharedClaimHandle { + shared: Arc, + size: usize, +} + +impl arrow_buffer::MemoryReservation for SharedClaimHandle { + fn size(&self) -> usize { + self.size + } + + fn resize(&mut self, new_size: usize) { + match new_size.cmp(&self.size) { + std::cmp::Ordering::Greater => self.shared.grow(new_size - self.size), + std::cmp::Ordering::Less => self.shared.shrink(self.size - new_size), + std::cmp::Ordering::Equal => {} + } + self.size = new_size; + } +} + +impl Drop for SharedClaimHandle { + fn drop(&mut self) { + if self.size > 0 { + self.shared.shrink(self.size); + } } } @@ -58,11 +119,11 @@ impl arrow_buffer::MemoryReservation for MemoryReservation { impl arrow_buffer::MemoryPool for ArrowMemoryPool { fn reserve(&self, size: usize) -> Box { - let consumer = self.consumer.clone_with_new_id(); - let reservation = consumer.register(&self.inner); - reservation.grow(size); - - Box::new(reservation) + self.shared.grow(size); + Box::new(SharedClaimHandle { + shared: Arc::clone(&self.shared), + size, + }) } fn available(&self) -> isize { @@ -84,21 +145,32 @@ impl arrow_buffer::MemoryPool for ArrowMemoryPool { } } +/// Claims all Arrow buffers in `array` against `pool` (idempotent, recursive). +/// +/// Uses [`arrow_data::ArrayData::claim`], which covers data buffers, null buffers, +/// and all child arrays. Claiming the same physical buffer twice is a no-op. +pub fn claim_array(array: &dyn Array, pool: &dyn arrow_buffer::MemoryPool) { + array.to_data().claim(pool); +} + +/// Claims all Arrow buffers in every column of `batch` against `pool`. +/// +/// See [`claim_array`] for semantics. This is the primary entry point for +/// registering output [`RecordBatch`] memory with a [`MemoryPool`] at output +/// boundaries, replacing manual `get_array_memory_size()` bookkeeping. +pub fn claim_batch(batch: &RecordBatch, pool: &dyn arrow_buffer::MemoryPool) { + for col in batch.columns() { + claim_array(col.as_ref(), pool); + } +} + #[cfg(test)] mod tests { use super::*; use crate::memory_pool::{GreedyMemoryPool, UnboundedMemoryPool}; - use arrow::array::{Array, Int32Array}; + use arrow::array::Int32Array; use arrow_buffer::MemoryPool; - // Until https://github.com/apache/arrow-rs/pull/8918 lands, we need to iterate all - // buffers in the array. Change once the PR is released. - fn claim_array(array: &dyn Array, pool: &dyn MemoryPool) { - for buffer in array.to_data().buffers() { - buffer.claim(pool); - } - } - #[test] pub fn can_claim_array() { let pool = Arc::new(UnboundedMemoryPool::default()); diff --git a/datafusion/physical-plan/Cargo.toml b/datafusion/physical-plan/Cargo.toml index 0fc75043bf333..ee9a1f9e140c1 100644 --- a/datafusion/physical-plan/Cargo.toml +++ b/datafusion/physical-plan/Cargo.toml @@ -38,6 +38,7 @@ all-features = true workspace = true [features] +arrow_buffer_pool = ["datafusion-execution/arrow_buffer_pool"] force_hash_collisions = [] test_utils = ["arrow/test_utils"] tokio_coop = [] diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index c3f73976c721a..9c29d1a8dfbe8 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -44,6 +44,7 @@ use datafusion_common::{ internal_err, resources_datafusion_err, }; use datafusion_execution::TaskContext; +use datafusion_execution::memory_pool::arrow::{ArrowMemoryPool, claim_batch}; use datafusion_execution::memory_pool::proxy::VecAllocExt; use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion_expr::{EmitTo, GroupsAccumulator}; @@ -447,6 +448,15 @@ pub(crate) struct GroupedHashAggregateStream { /// The memory reservation for this grouping reservation: MemoryReservation, + /// Arrow memory pool backed by a sibling of `reservation` (same pool consumer). + /// + /// `claim_batch` in `emit()` grows the sibling reservation by the output + /// batch's buffer sizes. Because `from_reservation` uses `new_empty()` to + /// share the same registration, neither `FairSpillPool::num_spill` nor + /// `unspillable` changes, and `update_memory_reservation()` cannot + /// accidentally shrink the sibling below zero. + arrow_pool: Arc, + /// The behavior to trigger when out of memory occurs oom_mode: OutOfMemoryMode, @@ -601,6 +611,10 @@ impl GroupedHashAggregateStream { // to ensure fair application of back pressure amongst the memory consumers. .with_can_spill(oom_mode != OutOfMemoryMode::ReportError) .register(context.memory_pool()); + let arrow_pool = Arc::new(ArrowMemoryPool::from_reservation( + Arc::clone(context.memory_pool()), + &reservation, + )); timer.done(); let exec_state = ExecutionState::ReadingInput; @@ -682,6 +696,7 @@ impl GroupedHashAggregateStream { filter_expressions, group_by: agg_group_by, reservation, + arrow_pool, oom_mode, group_values, current_group_indices: Default::default(), @@ -1139,7 +1154,9 @@ impl GroupedHashAggregateStream { let _ = self.update_memory_reservation(); let batch = RecordBatch::try_new(schema, output)?; debug_assert!(batch.num_rows() > 0); - + if !spilling { + claim_batch(&batch, self.arrow_pool.as_ref()); + } Ok(Some(batch)) }