diff --git a/rust/lance-core/src/error.rs b/rust/lance-core/src/error.rs index 3dcde1fc5b2..1d19b1f1202 100644 --- a/rust/lance-core/src/error.rs +++ b/rust/lance-core/src/error.rs @@ -169,6 +169,16 @@ pub enum Error { #[snafu(implicit)] location: Location, }, + #[snafu(display( + "Disk spill quota exceeded: cap={cap_bytes} bytes, used={used_bytes} bytes, requested={requested_bytes} bytes, {location}" + ))] + DiskCapExceeded { + cap_bytes: u64, + used_bytes: u64, + requested_bytes: u64, + #[snafu(implicit)] + location: Location, + }, #[snafu(display("LanceError(Index): {message}, {location}"))] Index { message: String, @@ -266,6 +276,16 @@ impl Error { IOSnafu.into_error(source) } + #[track_caller] + pub fn disk_cap_exceeded(cap_bytes: u64, used_bytes: u64, requested_bytes: u64) -> Self { + DiskCapExceededSnafu { + cap_bytes, + used_bytes, + requested_bytes, + } + .build() + } + #[track_caller] pub fn dataset_already_exists(uri: impl Into) -> Self { DatasetAlreadyExistsSnafu { uri: uri.into() }.build() @@ -512,7 +532,16 @@ impl From<&ArrowError> for Error { impl From for Error { #[track_caller] fn from(e: std::io::Error) -> Self { - Self::io_source(box_error(e)) + if e.get_ref().is_some() { + match e.into_inner().expect("io error source checked above") { + source => match source.downcast::() { + Ok(lance_err) => *lance_err, + Err(source) => Self::io_source(source), + }, + } + } else { + Self::io_source(box_error(e)) + } } } diff --git a/rust/lance-io/src/lib.rs b/rust/lance-io/src/lib.rs index c327a91c1ba..2ef686fd551 100644 --- a/rust/lance-io/src/lib.rs +++ b/rust/lance-io/src/lib.rs @@ -18,6 +18,7 @@ pub mod object_reader; pub mod object_store; pub mod object_writer; pub mod scheduler; +pub mod spill; pub mod stream; #[cfg(test)] pub mod testing; diff --git a/rust/lance-io/src/object_store.rs b/rust/lance-io/src/object_store.rs index 0c44095f117..3f0d2cccaae 100644 --- a/rust/lance-io/src/object_store.rs +++ b/rust/lance-io/src/object_store.rs @@ -46,7 +46,7 @@ pub(crate) mod test_utils; pub mod throttle; mod tracing; use crate::object_reader::SmallReader; -use crate::object_writer::{LocalWriter, WriteResult}; +use crate::object_writer::{DiskQuota, LocalWriter, WriteResult}; use crate::traits::{WriteExt, Writer}; use crate::utils::tracking_store::{IOTracker, IoStats}; use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader}; @@ -146,6 +146,8 @@ pub struct ObjectStore { download_retry_count: usize, /// IO tracker for monitoring read/write operations io_tracker: IOTracker, + /// Optional local write budget used by spill stores. + disk_quota: Option, /// The datastore prefix that uniquely identifies this object store. It encodes information /// which usually cannot be found in the URL such as Azure account name. The prefix plus the /// path uniquely identifies any object inside the store. @@ -479,6 +481,7 @@ impl ObjectStore { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT, io_tracker, + disk_quota: None, store_prefix, }; let path = Path::parse(path.path())?; @@ -536,6 +539,13 @@ impl ObjectStore { .unwrap() } + /// Local object store with a shared local write quota. + pub fn local_with_disk_quota(disk_quota: DiskQuota) -> Self { + let mut store = Self::local(); + store.disk_quota = Some(disk_quota); + store + } + /// Create a in-memory object store directly for testing. pub fn memory() -> Self { let provider = MemoryStoreProvider; @@ -756,11 +766,36 @@ impl ObjectStore { .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??; let (std_file, temp_path) = named_temp.into_parts(); let file = tokio::fs::File::from_std(std_file); - Ok(Box::new(LocalWriter::new( + Ok(Box::new(LocalWriter::new_with_disk_quota( + file, + path.clone(), + temp_path, + Arc::new(self.io_tracker.clone()), + self.disk_quota.clone(), + ))) + } + "file+uring" => { + let local_path = super::local::to_local_path(path); + let local_path = std::path::PathBuf::from(&local_path); + if let Some(parent) = local_path.parent() { + tokio::fs::create_dir_all(parent).await?; + } + let parent = local_path + .parent() + .expect("file path must have parent") + .to_owned(); + let named_temp = + tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent)) + .await + .map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??; + let (std_file, temp_path) = named_temp.into_parts(); + let file = tokio::fs::File::from_std(std_file); + Ok(Box::new(LocalWriter::new_with_disk_quota( file, path.clone(), temp_path, Arc::new(self.io_tracker.clone()), + self.disk_quota.clone(), ))) } _ => Ok(Box::new(ObjectWriter::new(self, path).await?)), @@ -1109,6 +1144,7 @@ impl ObjectStore { io_parallelism, download_retry_count, io_tracker, + disk_quota: None, store_prefix, } } diff --git a/rust/lance-io/src/object_store/providers/aws.rs b/rust/lance-io/src/object_store/providers/aws.rs index a1549120c50..e3760811298 100644 --- a/rust/lance-io/src/object_store/providers/aws.rs +++ b/rust/lance-io/src/object_store/providers/aws.rs @@ -177,6 +177,7 @@ impl ObjectStoreProvider for AwsStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/azure.rs b/rust/lance-io/src/object_store/providers/azure.rs index e61f3f3b364..1741c1ac838 100644 --- a/rust/lance-io/src/object_store/providers/azure.rs +++ b/rust/lance-io/src/object_store/providers/azure.rs @@ -283,6 +283,7 @@ impl ObjectStoreProvider for AzureBlobStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/gcp.rs b/rust/lance-io/src/object_store/providers/gcp.rs index f7f0a7672ff..b043419be71 100644 --- a/rust/lance-io/src/object_store/providers/gcp.rs +++ b/rust/lance-io/src/object_store/providers/gcp.rs @@ -136,6 +136,7 @@ impl ObjectStoreProvider for GcsStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/goosefs.rs b/rust/lance-io/src/object_store/providers/goosefs.rs index d6173571551..c2ae83eda80 100644 --- a/rust/lance-io/src/object_store/providers/goosefs.rs +++ b/rust/lance-io/src/object_store/providers/goosefs.rs @@ -158,6 +158,7 @@ impl ObjectStoreProvider for GooseFsStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: storage_options.download_retry_count(), io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/huggingface.rs b/rust/lance-io/src/object_store/providers/huggingface.rs index 4522a6c156e..7bcde0e62be 100644 --- a/rust/lance-io/src/object_store/providers/huggingface.rs +++ b/rust/lance-io/src/object_store/providers/huggingface.rs @@ -216,6 +216,7 @@ impl ObjectStoreProvider for HuggingfaceStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/local.rs b/rust/lance-io/src/object_store/providers/local.rs index ac60fc919e9..06f9765ae0e 100644 --- a/rust/lance-io/src/object_store/providers/local.rs +++ b/rust/lance-io/src/object_store/providers/local.rs @@ -31,6 +31,7 @@ impl ObjectStoreProvider for FileStoreProvider { io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM, download_retry_count, io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/memory.rs b/rust/lance-io/src/object_store/providers/memory.rs index 4c68cea6260..bac12a6c6b9 100644 --- a/rust/lance-io/src/object_store/providers/memory.rs +++ b/rust/lance-io/src/object_store/providers/memory.rs @@ -31,6 +31,7 @@ impl ObjectStoreProvider for MemoryStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count, io_tracker: Default::default(), + disk_quota: None, store_prefix: self .calculate_object_store_prefix(&base_path, params.storage_options())?, }) diff --git a/rust/lance-io/src/object_store/providers/oss.rs b/rust/lance-io/src/object_store/providers/oss.rs index b84afb8ed1f..54b72e1ace2 100644 --- a/rust/lance-io/src/object_store/providers/oss.rs +++ b/rust/lance-io/src/object_store/providers/oss.rs @@ -144,6 +144,7 @@ impl ObjectStoreProvider for OssStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: storage_options.download_retry_count(), io_tracker: Default::default(), + disk_quota: None, store_prefix: self.calculate_object_store_prefix(&url, params.storage_options())?, }) } diff --git a/rust/lance-io/src/object_store/providers/tencent.rs b/rust/lance-io/src/object_store/providers/tencent.rs index 5fa885ea5a9..36156bdbf86 100644 --- a/rust/lance-io/src/object_store/providers/tencent.rs +++ b/rust/lance-io/src/object_store/providers/tencent.rs @@ -100,6 +100,7 @@ impl ObjectStoreProvider for TencentStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: storage_options.download_retry_count(), io_tracker: Default::default(), + disk_quota: None, store_prefix: self.calculate_object_store_prefix(&url, params.storage_options())?, }) } diff --git a/rust/lance-io/src/object_store/providers/tos.rs b/rust/lance-io/src/object_store/providers/tos.rs index 923186484c6..6e4ef1a0dc4 100644 --- a/rust/lance-io/src/object_store/providers/tos.rs +++ b/rust/lance-io/src/object_store/providers/tos.rs @@ -150,6 +150,7 @@ impl ObjectStoreProvider for TosStoreProvider { io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM, download_retry_count: storage_options.download_retry_count(), io_tracker: Default::default(), + disk_quota: None, store_prefix: self.calculate_object_store_prefix(&url, params.storage_options())?, }) } diff --git a/rust/lance-io/src/object_writer.rs b/rust/lance-io/src/object_writer.rs index 4b9bb901446..84c658c5d1f 100644 --- a/rust/lance-io/src/object_writer.rs +++ b/rust/lance-io/src/object_writer.rs @@ -3,6 +3,7 @@ use std::io; use std::pin::Pin; +use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, OnceLock}; use std::task::Poll; @@ -518,11 +519,73 @@ pub struct LocalWriter { state: LocalWriteState, } +/// Shared byte budget for local spill writes. +#[derive(Debug, Clone)] +pub struct DiskQuota { + cap_bytes: u64, + used_bytes: Arc, +} + +impl DiskQuota { + pub fn new(cap_bytes: u64) -> Self { + Self { + cap_bytes, + used_bytes: Arc::new(AtomicU64::new(0)), + } + } + + pub fn cap_bytes(&self) -> u64 { + self.cap_bytes + } + + pub fn used_bytes(&self) -> u64 { + self.used_bytes.load(Ordering::Acquire) + } + + pub fn reserve(&self, requested_bytes: u64) -> Result<()> { + let mut used = self.used_bytes(); + loop { + let Some(next_used) = used.checked_add(requested_bytes) else { + return Err(Error::disk_cap_exceeded( + self.cap_bytes, + used, + requested_bytes, + )); + }; + if next_used > self.cap_bytes { + return Err(Error::disk_cap_exceeded( + self.cap_bytes, + used, + requested_bytes, + )); + } + match self.used_bytes.compare_exchange_weak( + used, + next_used, + Ordering::AcqRel, + Ordering::Acquire, + ) { + Ok(_) => return Ok(()), + Err(actual) => used = actual, + } + } + } + + pub fn release(&self, bytes: u64) { + let _ = self + .used_bytes + .fetch_update(Ordering::AcqRel, Ordering::Acquire, |used| { + Some(used.saturating_sub(bytes)) + }); + } +} + #[derive(Default)] enum LocalWriteState { Writing(WritingState), Finishing { size: usize, + disk_quota: Option, future: BoxFuture<'static, Result>, }, Done(WriteResult), @@ -536,6 +599,7 @@ struct WritingState { /// Temp path that auto-deletes on drop. Set to `None` after `persist()`. temp_path: tempfile::TempPath, io_tracker: Arc, + disk_quota: Option, } impl LocalWriter { @@ -544,6 +608,16 @@ impl LocalWriter { path: Path, temp_path: tempfile::TempPath, io_tracker: Arc, + ) -> Self { + Self::new_with_disk_quota(file, path, temp_path, io_tracker, None) + } + + pub fn new_with_disk_quota( + file: tokio::fs::File, + path: Path, + temp_path: tempfile::TempPath, + io_tracker: Arc, + disk_quota: Option, ) -> Self { Self { path, @@ -552,6 +626,7 @@ impl LocalWriter { cursor: 0, temp_path, io_tracker, + disk_quota, }), } } @@ -606,10 +681,42 @@ impl AsyncWrite for LocalWriter { buf: &[u8], ) -> Poll> { if let LocalWriteState::Writing(state) = &mut self.state { - let poll = Pin::new(&mut state.writer).poll_write(cx, buf); + if buf.is_empty() { + return Poll::Ready(Ok(0)); + } + + let requested = if let Some(disk_quota) = state.disk_quota.as_ref() { + let available = disk_quota + .cap_bytes() + .saturating_sub(disk_quota.used_bytes()) + as usize; + let requested = available.min(buf.len()); + if requested == 0 { + let err = Error::disk_cap_exceeded( + disk_quota.cap_bytes(), + disk_quota.used_bytes(), + buf.len() as u64, + ); + return Poll::Ready(Err(io::Error::other(err))); + } + if let Err(err) = disk_quota.reserve(requested as u64) { + return Poll::Ready(Err(io::Error::other(err))); + } + requested + } else { + buf.len() + }; + + let poll = Pin::new(&mut state.writer).poll_write(cx, &buf[..requested]); if let Poll::Ready(Ok(n)) = &poll { state.cursor += *n; } + if let Some(disk_quota) = state.disk_quota.as_ref() { + match &poll { + Poll::Ready(Ok(n)) => disk_quota.release((requested - *n) as u64), + Poll::Ready(Err(_)) | Poll::Pending => disk_quota.release(requested as u64), + } + } poll } else { Poll::Ready(Err(Self::already_closed_err(&self.path))) @@ -648,6 +755,7 @@ impl AsyncWrite for LocalWriter { let size = state.cursor; mut_self.state = LocalWriteState::Finishing { size, + disk_quota: state.disk_quota, future: Box::pin(Self::persist( state.temp_path, mut_self.path.clone(), @@ -672,6 +780,26 @@ impl AsyncWrite for LocalWriter { } } +impl Drop for LocalWriter { + fn drop(&mut self) { + match &self.state { + LocalWriteState::Writing(state) => { + if let Some(disk_quota) = state.disk_quota.as_ref() { + disk_quota.release(state.cursor as u64); + } + } + LocalWriteState::Finishing { + size, disk_quota, .. + } => { + if let Some(disk_quota) = disk_quota.as_ref() { + disk_quota.release(*size as u64); + } + } + LocalWriteState::Done(_) | LocalWriteState::Poisoned => {} + } + } +} + #[async_trait] impl Writer for LocalWriter { async fn tell(&mut self) -> Result { diff --git a/rust/lance-io/src/scheduler.rs b/rust/lance-io/src/scheduler.rs index efe4b9b0c24..76208c14d66 100644 --- a/rust/lance-io/src/scheduler.rs +++ b/rust/lance-io/src/scheduler.rs @@ -730,6 +730,19 @@ impl ScanScheduler { self.open_file_with_priority(path, 0, file_size_bytes).await } + /// Open a scheduler over an already-open reader. + pub fn open_reader(self: &Arc, reader: Arc) -> FileScheduler { + FileScheduler { + block_size: reader.block_size() as u64, + max_iop_size: self.object_store.max_iop_size(), + reader, + root: self.clone(), + base_priority: 0, + bypass_backpressure: false, + extra_stats: None, + } + } + fn do_submit_request( &self, reader: Arc, diff --git a/rust/lance-io/src/spill.rs b/rust/lance-io/src/spill.rs new file mode 100644 index 00000000000..1635babd8de --- /dev/null +++ b/rust/lance-io/src/spill.rs @@ -0,0 +1,175 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Temporary scratch storage for memory-budgeted builders. + +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; + +use async_trait::async_trait; +use lance_core::Result; +use lance_core::utils::tempfile::TempDir; +use object_store::path::Path; + +use crate::local::to_local_path; +use crate::object_store::ObjectStore; +use crate::object_writer::DiskQuota; +use crate::traits::{Reader, Writer}; + +/// Session-scoped scratch store for reclaimable intermediate files. +pub trait SpillStore: Send + Sync + std::fmt::Debug + 'static { + /// Create one empty scratch file. The file is deleted when the returned + /// handle is dropped. + fn create_spill_file(&self) -> Result>; +} + +/// A single reclaimable spill file. +#[async_trait] +pub trait SpillFile: Send + Sync + std::fmt::Debug { + /// Open a writer for this spill file. + async fn writer(&self) -> Result>; + + /// Open a reader for this spill file. + async fn reader(&self) -> Result>; + + /// Return the object-store path for diagnostics and tests. + fn path(&self) -> &Path; +} + +/// Local-disk implementation of [`SpillStore`]. +#[derive(Debug)] +pub struct LocalSpillStore { + temp_dir: Arc, + object_store: Arc, + disk_quota: DiskQuota, + next_file_id: AtomicU64, +} + +impl LocalSpillStore { + /// Create a local spill store capped at `cap_bytes` live scratch bytes. + pub fn try_new(cap_bytes: u64) -> Result { + let temp_dir = Arc::new(TempDir::try_new()?); + let disk_quota = DiskQuota::new(cap_bytes); + Ok(Self { + temp_dir, + object_store: Arc::new(ObjectStore::local_with_disk_quota(disk_quota.clone())), + disk_quota, + next_file_id: AtomicU64::new(0), + }) + } + + /// Create a local spill store in the system temporary directory. + pub fn new(cap_bytes: u64) -> Self { + Self::try_new(cap_bytes).expect("failed to create local spill store") + } + + /// Return the configured cap in bytes. + pub fn cap_bytes(&self) -> u64 { + self.disk_quota.cap_bytes() + } + + /// Return the currently reserved spill bytes. + pub fn used_bytes(&self) -> u64 { + self.disk_quota.used_bytes() + } +} + +impl SpillStore for LocalSpillStore { + fn create_spill_file(&self) -> Result> { + let file_id = self.next_file_id.fetch_add(1, Ordering::Relaxed); + let path = self + .temp_dir + .std_path() + .join(format!("spill-{file_id}.bin")); + let path = Path::from_absolute_path(path)?; + Ok(Box::new(LocalSpillFile { + _temp_dir: self.temp_dir.clone(), + object_store: self.object_store.clone(), + disk_quota: self.disk_quota.clone(), + path, + })) + } +} + +#[derive(Debug)] +struct LocalSpillFile { + _temp_dir: Arc, + object_store: Arc, + disk_quota: DiskQuota, + path: Path, +} + +#[async_trait] +impl SpillFile for LocalSpillFile { + async fn writer(&self) -> Result> { + self.object_store.create(&self.path).await + } + + async fn reader(&self) -> Result> { + self.object_store.open(&self.path).await + } + + fn path(&self) -> &Path { + &self.path + } +} + +impl Drop for LocalSpillFile { + fn drop(&mut self) { + let local_path = to_local_path(&self.path); + if let Ok(metadata) = std::fs::metadata(&local_path) { + self.disk_quota.release(metadata.len()); + } + if let Err(err) = std::fs::remove_file(&local_path) + && err.kind() != std::io::ErrorKind::NotFound + { + tracing::warn!(path = %self.path, error = %err, "failed to remove spill file"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use bytes::Bytes; + use lance_core::Error; + use tokio::io::AsyncWriteExt; + + #[tokio::test] + async fn local_spill_file_cleans_up_and_releases_quota() { + let store = LocalSpillStore::try_new(1024).unwrap(); + let spill = store.create_spill_file().unwrap(); + let path = spill.path().clone(); + + let mut writer = spill.writer().await.unwrap(); + writer.write_all(b"hello spill").await.unwrap(); + Writer::shutdown(&mut writer).await.unwrap(); + + assert_eq!(store.used_bytes(), 11); + assert!(std::path::Path::new(&to_local_path(&path)).exists()); + + let reader = spill.reader().await.unwrap(); + assert_eq!( + reader.get_all().await.unwrap(), + Bytes::from_static(b"hello spill") + ); + + drop(spill); + + assert_eq!(store.used_bytes(), 0); + assert!(!std::path::Path::new(&to_local_path(&path)).exists()); + } + + #[tokio::test] + async fn local_spill_file_returns_typed_error_on_cap_exhaustion() { + let store = LocalSpillStore::try_new(4).unwrap(); + let spill = store.create_spill_file().unwrap(); + let mut writer = spill.writer().await.unwrap(); + + writer.write_all(b"abcd").await.unwrap(); + let err = writer.write_all(b"e").await.unwrap_err(); + let err = Error::from(err); + + assert!(matches!(err, Error::DiskCapExceeded { .. })); + } +} diff --git a/rust/lance/src/session.rs b/rust/lance/src/session.rs index 8d5e9717570..2b6c1d520f6 100644 --- a/rust/lance/src/session.rs +++ b/rust/lance/src/session.rs @@ -9,6 +9,7 @@ use lance_core::deepsize::DeepSizeOf; use lance_core::{Error, Result}; use lance_index::IndexType; use lance_io::object_store::ObjectStoreRegistry; +use lance_io::spill::{LocalSpillStore, SpillStore}; use crate::dataset::{DEFAULT_INDEX_CACHE_SIZE, DEFAULT_METADATA_CACHE_SIZE}; use crate::session::caches::GlobalMetadataCache; @@ -53,6 +54,8 @@ pub struct Session { pub(crate) index_extensions: HashMap<(IndexType, String), Arc>, store_registry: Arc, + + spill_store: Arc, } impl DeepSizeOf for Session { @@ -83,6 +86,7 @@ impl std::fmt::Debug for Session { "index_extensions", &self.index_extensions.keys().collect::>(), ) + .field("spill_store", &self.spill_store) .finish() } } @@ -107,6 +111,7 @@ impl Session { metadata_cache: GlobalMetadataCache(LanceCache::with_capacity(metadata_cache_size)), index_extensions: HashMap::new(), store_registry, + spill_store: Arc::new(LocalSpillStore::new(u64::MAX)), } } @@ -124,9 +129,16 @@ impl Session { metadata_cache: GlobalMetadataCache(LanceCache::with_capacity(metadata_cache_size)), index_extensions: HashMap::new(), store_registry, + spill_store: Arc::new(LocalSpillStore::new(u64::MAX)), } } + /// Set the spill store used for temporary index-build scratch. + pub fn with_spill_store(mut self, spill_store: Arc) -> Self { + self.spill_store = spill_store; + self + } + /// Register a new index extension. /// /// A name can only be registered once per type of index extension. @@ -195,6 +207,11 @@ impl Session { self.store_registry.clone() } + /// Get the session spill store. + pub fn spill_store(&self) -> Arc { + self.spill_store.clone() + } + /// Get a reference to the raw metadata cache (for use in index reconstruction). pub fn file_metadata_cache(&self) -> &LanceCache { &self.metadata_cache.0