Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 30 additions & 1 deletion rust/lance-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,16 @@ pub enum Error {
#[snafu(implicit)]
location: Location,
},
#[snafu(display(
"Disk spill quota exceeded: cap={cap_bytes} bytes, used={used_bytes} bytes, requested={requested_bytes} bytes, {location}"
))]
DiskCapExceeded {
cap_bytes: u64,
used_bytes: u64,
requested_bytes: u64,
#[snafu(implicit)]
location: Location,
},
#[snafu(display("LanceError(Index): {message}, {location}"))]
Index {
message: String,
Expand Down Expand Up @@ -266,6 +276,16 @@ impl Error {
IOSnafu.into_error(source)
}

#[track_caller]
pub fn disk_cap_exceeded(cap_bytes: u64, used_bytes: u64, requested_bytes: u64) -> Self {
DiskCapExceededSnafu {
cap_bytes,
used_bytes,
requested_bytes,
}
.build()
}

#[track_caller]
pub fn dataset_already_exists(uri: impl Into<String>) -> Self {
DatasetAlreadyExistsSnafu { uri: uri.into() }.build()
Expand Down Expand Up @@ -512,7 +532,16 @@ impl From<&ArrowError> for Error {
impl From<std::io::Error> for Error {
#[track_caller]
fn from(e: std::io::Error) -> Self {
Self::io_source(box_error(e))
if e.get_ref().is_some() {
match e.into_inner().expect("io error source checked above") {
source => match source.downcast::<Self>() {
Ok(lance_err) => *lance_err,
Err(source) => Self::io_source(source),
},
}
} else {
Self::io_source(box_error(e))
}
}
}

Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ pub mod object_reader;
pub mod object_store;
pub mod object_writer;
pub mod scheduler;
pub mod spill;
pub mod stream;
#[cfg(test)]
pub mod testing;
Expand Down
40 changes: 38 additions & 2 deletions rust/lance-io/src/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) mod test_utils;
pub mod throttle;
mod tracing;
use crate::object_reader::SmallReader;
use crate::object_writer::{LocalWriter, WriteResult};
use crate::object_writer::{DiskQuota, LocalWriter, WriteResult};
use crate::traits::{WriteExt, Writer};
use crate::utils::tracking_store::{IOTracker, IoStats};
use crate::{object_reader::CloudObjectReader, object_writer::ObjectWriter, traits::Reader};
Expand Down Expand Up @@ -146,6 +146,8 @@ pub struct ObjectStore {
download_retry_count: usize,
/// IO tracker for monitoring read/write operations
io_tracker: IOTracker,
/// Optional local write budget used by spill stores.
disk_quota: Option<DiskQuota>,
/// The datastore prefix that uniquely identifies this object store. It encodes information
/// which usually cannot be found in the URL such as Azure account name. The prefix plus the
/// path uniquely identifies any object inside the store.
Expand Down Expand Up @@ -479,6 +481,7 @@ impl ObjectStore {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: DEFAULT_DOWNLOAD_RETRY_COUNT,
io_tracker,
disk_quota: None,
store_prefix,
};
let path = Path::parse(path.path())?;
Expand Down Expand Up @@ -536,6 +539,13 @@ impl ObjectStore {
.unwrap()
}

/// Local object store with a shared local write quota.
pub fn local_with_disk_quota(disk_quota: DiskQuota) -> Self {
let mut store = Self::local();
store.disk_quota = Some(disk_quota);
store
}

/// Create a in-memory object store directly for testing.
pub fn memory() -> Self {
let provider = MemoryStoreProvider;
Expand Down Expand Up @@ -756,11 +766,36 @@ impl ObjectStore {
.map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
let (std_file, temp_path) = named_temp.into_parts();
let file = tokio::fs::File::from_std(std_file);
Ok(Box::new(LocalWriter::new(
Ok(Box::new(LocalWriter::new_with_disk_quota(
file,
path.clone(),
temp_path,
Arc::new(self.io_tracker.clone()),
self.disk_quota.clone(),
)))
}
"file+uring" => {
let local_path = super::local::to_local_path(path);
let local_path = std::path::PathBuf::from(&local_path);
if let Some(parent) = local_path.parent() {
tokio::fs::create_dir_all(parent).await?;
}
let parent = local_path
.parent()
.expect("file path must have parent")
.to_owned();
let named_temp =
tokio::task::spawn_blocking(move || tempfile::NamedTempFile::new_in(parent))
.await
.map_err(|e| Error::io(format!("spawn_blocking failed: {}", e)))??;
let (std_file, temp_path) = named_temp.into_parts();
let file = tokio::fs::File::from_std(std_file);
Ok(Box::new(LocalWriter::new_with_disk_quota(
file,
path.clone(),
temp_path,
Arc::new(self.io_tracker.clone()),
self.disk_quota.clone(),
)))
}
_ => Ok(Box::new(ObjectWriter::new(self, path).await?)),
Expand Down Expand Up @@ -1109,6 +1144,7 @@ impl ObjectStore {
io_parallelism,
download_retry_count,
io_tracker,
disk_quota: None,
store_prefix,
}
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/aws.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ impl ObjectStoreProvider for AwsStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/azure.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,7 @@ impl ObjectStoreProvider for AzureBlobStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/gcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl ObjectStoreProvider for GcsStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/goosefs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl ObjectStoreProvider for GooseFsStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: storage_options.download_retry_count(),
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/huggingface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -216,6 +216,7 @@ impl ObjectStoreProvider for HuggingfaceStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl ObjectStoreProvider for FileStoreProvider {
io_parallelism: DEFAULT_LOCAL_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ impl ObjectStoreProvider for MemoryStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count,
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self
.calculate_object_store_prefix(&base_path, params.storage_options())?,
})
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/oss.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ impl ObjectStoreProvider for OssStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: storage_options.download_retry_count(),
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self.calculate_object_store_prefix(&url, params.storage_options())?,
})
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/tencent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ impl ObjectStoreProvider for TencentStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: storage_options.download_retry_count(),
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self.calculate_object_store_prefix(&url, params.storage_options())?,
})
}
Expand Down
1 change: 1 addition & 0 deletions rust/lance-io/src/object_store/providers/tos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ impl ObjectStoreProvider for TosStoreProvider {
io_parallelism: DEFAULT_CLOUD_IO_PARALLELISM,
download_retry_count: storage_options.download_retry_count(),
io_tracker: Default::default(),
disk_quota: None,
store_prefix: self.calculate_object_store_prefix(&url, params.storage_options())?,
})
}
Expand Down
Loading
Loading