Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datafusion/execution/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ parquet_encryption = [
"parquet/encryption",
]
arrow_buffer_pool = [
"arrow/pool",
"arrow-buffer/pool",
]
sql = []
Expand Down
122 changes: 97 additions & 25 deletions datafusion/execution/src/memory_pool/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,92 @@
//! Adapter for integrating DataFusion's [`MemoryPool`] with Arrow's memory tracking APIs.

use crate::memory_pool::{MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation};
use arrow::array::Array;
use arrow::record_batch::RecordBatch;
use std::fmt::Debug;
use std::sync::Arc;

/// An adapter that implements Arrow's [`arrow_buffer::MemoryPool`] trait
/// by wrapping a DataFusion [`MemoryPool`].
///
/// This allows DataFusion's memory management system to be used with Arrow's
/// memory allocation APIs. Each reservation made through this pool will be
/// tracked using the provided [`MemoryConsumer`], enabling DataFusion to
/// monitor and limit memory usage across Arrow operations.
/// All reservations made through this pool grow a single shared
/// [`MemoryReservation`]. This keeps `FairSpillPool`'s bookkeeping correct:
///
/// This is useful when you want Arrow operations (such as array builders
/// or compute kernels) to participate in DataFusion's memory management
/// and respect the same memory limits as DataFusion operators.
/// - [`Self::new`] creates its own consumer registration — use this when the
/// pool stands alone (e.g. tests, standalone Arrow computation).
/// - [`Self::from_reservation`] re-uses an existing reservation — use this
/// inside an operator so that claimed Arrow-buffer bytes are accounted under
/// the *same* pool consumer as the operator's main reservation. That way
/// neither `num_spill` nor `unspillable` changes, and the fair-share
/// formula is unaffected.
#[derive(Debug)]
pub struct ArrowMemoryPool {
inner: Arc<dyn MemoryPool>,
consumer: MemoryConsumer,
shared: Arc<MemoryReservation>,
}

impl ArrowMemoryPool {
/// Creates a new [`ArrowMemoryPool`] that wraps the given DataFusion [`MemoryPool`]
/// and tracks allocations under the specified [`MemoryConsumer`].
/// Creates a new [`ArrowMemoryPool`] with its own consumer registration.
pub fn new(inner: Arc<dyn MemoryPool>, consumer: MemoryConsumer) -> Self {
Self { inner, consumer }
let shared = Arc::new(consumer.register(&inner));
Self { inner, shared }
}

/// Creates a pool backed by a sibling of `reservation`.
///
/// Calls [`MemoryReservation::new_empty`] to create a zero-size reservation
/// that shares the same [`Arc`] registration (and therefore the same pool
/// consumer) as `reservation`. This means:
///
/// - No new consumer is registered: `FairSpillPool::num_spill` and
/// `unspillable` are both unaffected.
/// - Claimed bytes are counted toward the same `spillable`/`unspillable`
/// bucket as the operator's main reservation.
/// - The per-reservation `size` field checked by `FairSpillPool`'s
/// fair-share formula is **independent** of the main reservation's size,
/// so `update_memory_reservation()` cannot undercut live claim handles.
pub fn from_reservation(
inner: Arc<dyn MemoryPool>,
reservation: &MemoryReservation,
) -> Self {
let shared = Arc::new(reservation.new_empty());
Self { inner, shared }
}
}

/// Tracks one buffer's share of the [`ArrowMemoryPool`] shared reservation.
///
/// On resize it adjusts the shared [`MemoryReservation`] by the delta.
/// On drop it releases the buffer's bytes from the shared reservation.
///
/// `MemoryReservation` uses atomic interior mutability, so no external lock is
/// needed: `grow` / `shrink` are `&self` methods.
#[derive(Debug)]
struct SharedClaimHandle {
shared: Arc<MemoryReservation>,
size: usize,
}

impl arrow_buffer::MemoryReservation for SharedClaimHandle {
fn size(&self) -> usize {
self.size
}

fn resize(&mut self, new_size: usize) {
match new_size.cmp(&self.size) {
std::cmp::Ordering::Greater => self.shared.grow(new_size - self.size),
std::cmp::Ordering::Less => self.shared.shrink(self.size - new_size),
std::cmp::Ordering::Equal => {}
}
self.size = new_size;
}
}

impl Drop for SharedClaimHandle {
fn drop(&mut self) {
if self.size > 0 {
self.shared.shrink(self.size);
}
}
}

Expand All @@ -58,11 +119,11 @@ impl arrow_buffer::MemoryReservation for MemoryReservation {

impl arrow_buffer::MemoryPool for ArrowMemoryPool {
fn reserve(&self, size: usize) -> Box<dyn arrow_buffer::MemoryReservation> {
let consumer = self.consumer.clone_with_new_id();
let reservation = consumer.register(&self.inner);
reservation.grow(size);

Box::new(reservation)
self.shared.grow(size);
Box::new(SharedClaimHandle {
shared: Arc::clone(&self.shared),
size,
})
}

fn available(&self) -> isize {
Expand All @@ -84,21 +145,32 @@ impl arrow_buffer::MemoryPool for ArrowMemoryPool {
}
}

/// Claims all Arrow buffers in `array` against `pool` (idempotent, recursive).
///
/// Uses [`arrow_data::ArrayData::claim`], which covers data buffers, null buffers,
/// and all child arrays. Claiming the same physical buffer twice is a no-op.
pub fn claim_array(array: &dyn Array, pool: &dyn arrow_buffer::MemoryPool) {
array.to_data().claim(pool);
}

/// Claims all Arrow buffers in every column of `batch` against `pool`.
///
/// See [`claim_array`] for semantics. This is the primary entry point for
/// registering output [`RecordBatch`] memory with a [`MemoryPool`] at output
/// boundaries, replacing manual `get_array_memory_size()` bookkeeping.
pub fn claim_batch(batch: &RecordBatch, pool: &dyn arrow_buffer::MemoryPool) {
for col in batch.columns() {
claim_array(col.as_ref(), pool);
}
}

#[cfg(test)]
mod tests {
use super::*;
use crate::memory_pool::{GreedyMemoryPool, UnboundedMemoryPool};
use arrow::array::{Array, Int32Array};
use arrow::array::Int32Array;
use arrow_buffer::MemoryPool;

// Until https://github.com/apache/arrow-rs/pull/8918 lands, we need to iterate all
// buffers in the array. Change once the PR is released.
fn claim_array(array: &dyn Array, pool: &dyn MemoryPool) {
for buffer in array.to_data().buffers() {
buffer.claim(pool);
}
}

#[test]
pub fn can_claim_array() {
let pool = Arc::new(UnboundedMemoryPool::default());
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ all-features = true
workspace = true

[features]
arrow_buffer_pool = ["datafusion-execution/arrow_buffer_pool"]
force_hash_collisions = []
test_utils = ["arrow/test_utils"]
tokio_coop = []
Expand Down
19 changes: 18 additions & 1 deletion datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ use datafusion_common::{
internal_err, resources_datafusion_err,
};
use datafusion_execution::TaskContext;
use datafusion_execution::memory_pool::arrow::{ArrowMemoryPool, claim_batch};
use datafusion_execution::memory_pool::proxy::VecAllocExt;
use datafusion_execution::memory_pool::{MemoryConsumer, MemoryReservation};
use datafusion_expr::{EmitTo, GroupsAccumulator};
Expand Down Expand Up @@ -447,6 +448,15 @@ pub(crate) struct GroupedHashAggregateStream {
/// The memory reservation for this grouping
reservation: MemoryReservation,

/// Arrow memory pool backed by a sibling of `reservation` (same pool consumer).
///
/// `claim_batch` in `emit()` grows the sibling reservation by the output
/// batch's buffer sizes. Because `from_reservation` uses `new_empty()` to
/// share the same registration, neither `FairSpillPool::num_spill` nor
/// `unspillable` changes, and `update_memory_reservation()` cannot
/// accidentally shrink the sibling below zero.
arrow_pool: Arc<ArrowMemoryPool>,

/// The behavior to trigger when out of memory occurs
oom_mode: OutOfMemoryMode,

Expand Down Expand Up @@ -601,6 +611,10 @@ impl GroupedHashAggregateStream {
// to ensure fair application of back pressure amongst the memory consumers.
.with_can_spill(oom_mode != OutOfMemoryMode::ReportError)
.register(context.memory_pool());
let arrow_pool = Arc::new(ArrowMemoryPool::from_reservation(
Arc::clone(context.memory_pool()),
&reservation,
));
timer.done();

let exec_state = ExecutionState::ReadingInput;
Expand Down Expand Up @@ -682,6 +696,7 @@ impl GroupedHashAggregateStream {
filter_expressions,
group_by: agg_group_by,
reservation,
arrow_pool,
oom_mode,
group_values,
current_group_indices: Default::default(),
Expand Down Expand Up @@ -1139,7 +1154,9 @@ impl GroupedHashAggregateStream {
let _ = self.update_memory_reservation();
let batch = RecordBatch::try_new(schema, output)?;
debug_assert!(batch.num_rows() > 0);

if !spilling {
claim_batch(&batch, self.arrow_pool.as_ref());
}
Ok(Some(batch))
}

Expand Down
Loading