diff --git a/rust/lance-file/src/reader.rs b/rust/lance-file/src/reader.rs index c454f73819e..fd9275e58d9 100644 --- a/rust/lance-file/src/reader.rs +++ b/rust/lance-file/src/reader.rs @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors use std::{ + borrow::Cow, collections::{BTreeMap, BTreeSet}, io::Cursor, ops::Range, @@ -31,7 +32,7 @@ use prost::{Message, Name}; use lance_core::{ Error, Result, - cache::LanceCache, + cache::{CacheKey, LanceCache}, datatypes::{Field, Schema}, }; use lance_encoding::format::pb as pbenc; @@ -127,6 +128,42 @@ impl CachedFileMetadata { } } +fn column_metadata_deep_size(column_metadatas: &[pbfile::ColumnMetadata]) -> usize { + column_metadatas + .iter() + .map(|cm| cm.encoded_len() * 4) + .sum::() + + std::mem::size_of_val(column_metadatas) +} + +fn column_info_deep_size(column_info: &ColumnInfo) -> usize { + let pages_size: usize = column_info + .page_infos + .iter() + .map(|pi| { + let enc_size = match &pi.encoding { + lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4, + lance_encoding::decoder::PageEncoding::Structural(e) => e.encoded_len() * 4, + }; + enc_size + + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref()) + + std::mem::size_of::() * 2 // num_rows + priority + }) + .sum(); + pages_size + + std::mem::size_of_val(column_info.buffer_offsets_and_sizes.as_ref()) + + column_info.encoding.encoded_len() * 4 + + std::mem::size_of::() // index + + std::mem::size_of::() * 2 // Arc overhead +} + +fn column_infos_deep_size(column_infos: &[Arc]) -> usize { + column_infos + .iter() + .map(|column_info| column_info_deep_size(column_info)) + .sum() +} + impl DeepSizeOf for CachedFileMetadata { fn deep_size_of_children(&self, context: &mut Context) -> usize { let schema_size = self.file_schema.deep_size_of_children(context); @@ -142,42 +179,12 @@ impl DeepSizeOf for CachedFileMetadata { // as a proxy for in-memory size. The decoded representation is typically // several times larger than the wire format due to heap-allocated // repeated/string/bytes fields, so we apply a 4x multiplier. - let column_metadatas_size: usize = self - .column_metadatas - .iter() - .map(|cm| cm.encoded_len() * 4) - .sum::() - + std::mem::size_of_val(self.column_metadatas.as_slice()); + let column_metadatas_size = column_metadata_deep_size(self.column_metadatas.as_slice()); // column_infos is Vec>. Each ColumnInfo contains // page_infos (with protobuf PageEncoding), buffer offsets, and a // column-level ColumnEncoding protobuf. - let column_infos_size: usize = self - .column_infos - .iter() - .map(|ci| { - let pages_size: usize = ci - .page_infos - .iter() - .map(|pi| { - let enc_size = match &pi.encoding { - lance_encoding::decoder::PageEncoding::Legacy(e) => e.encoded_len() * 4, - lance_encoding::decoder::PageEncoding::Structural(e) => { - e.encoded_len() * 4 - } - }; - enc_size - + std::mem::size_of_val(pi.buffer_offsets_and_sizes.as_ref()) - + std::mem::size_of::() * 2 // num_rows + priority - }) - .sum(); - pages_size - + std::mem::size_of_val(ci.buffer_offsets_and_sizes.as_ref()) - + ci.encoding.encoded_len() * 4 - + std::mem::size_of::() // index - + std::mem::size_of::() * 2 // Arc overhead - }) - .sum(); + let column_infos_size = column_infos_deep_size(self.column_infos.as_slice()); // Global buffer bytes retained for zero-IO reads (copied out of the tail). let retained_buffers_size: usize = self @@ -194,6 +201,87 @@ impl DeepSizeOf for CachedFileMetadata { } } +/// Lightweight file metadata used to locate per-column metadata on demand. +/// +/// This contains the file-level schema, row count, global buffer descriptors, +/// and column metadata offset table. Unlike [`CachedFileMetadata`], it does not +/// hold decoded metadata for every column. +#[derive(Debug)] +pub struct FileMetadataIndex { + file_schema: Arc, + num_rows: u64, + file_buffers: Vec, + column_metadata_offsets: Arc<[(u64, u64)]>, + num_columns: u32, + version: LanceFileVersion, + file_size_bytes: u64, + retained_global_buffers: BTreeMap, +} + +impl FileMetadataIndex { + /// Returns the total size of the file in bytes. + pub fn file_size(&self) -> u64 { + self.file_size_bytes + } + + /// Returns the number of physical columns in the file. + pub fn num_columns(&self) -> u32 { + self.num_columns + } +} + +impl DeepSizeOf for FileMetadataIndex { + fn deep_size_of_children(&self, context: &mut Context) -> usize { + let schema_size = self.file_schema.deep_size_of_children(context); + let buffers_size: usize = self + .file_buffers + .iter() + .map(|buffer| buffer.deep_size_of_children(context)) + .sum(); + let offsets_size = std::mem::size_of_val(self.column_metadata_offsets.as_ref()); + let retained_buffers_size: usize = self + .retained_global_buffers + .values() + .map(|buf| buf.len()) + .sum(); + schema_size + buffers_size + offsets_size + retained_buffers_size + } +} + +#[derive(Debug)] +struct CachedColumnMetadata { + column_metadata: pbfile::ColumnMetadata, + column_info: Arc, +} + +impl DeepSizeOf for CachedColumnMetadata { + fn deep_size_of_children(&self, _context: &mut Context) -> usize { + column_metadata_deep_size(std::slice::from_ref(&self.column_metadata)) + + column_info_deep_size(self.column_info.as_ref()) + } +} + +#[derive(Debug, Clone)] +struct ColumnMetadataCacheKey { + column_index: u32, + file_size_bytes: u64, +} + +impl CacheKey for ColumnMetadataCacheKey { + type ValueType = CachedColumnMetadata; + + fn key(&self) -> Cow<'_, str> { + Cow::Owned(format!( + "column_metadata/{}/{}", + self.file_size_bytes, self.column_index + )) + } + + fn type_name() -> &'static str { + "ColumnMetadata" + } +} + impl CachedFileMetadata { pub fn version(&self) -> LanceFileVersion { match (self.major_version, self.minor_version) { @@ -430,16 +518,43 @@ impl Default for FileReaderOptions { } #[derive(Debug, Clone)] -pub struct FileReader { +struct PreparedProjection { + column_infos: Vec>, + decoder_projection: ReaderProjection, +} + +#[derive(Debug, Clone)] +enum FileMetadataProvider { + Full(Arc), + Indexed(Arc), +} + +#[derive(Debug, Clone)] +struct FileReadCore { scheduler: Arc, - // The default projection to be applied to all reads base_projection: ReaderProjection, - num_rows: u64, - metadata: Arc, + metadata_provider: FileMetadataProvider, decoder_plugins: Arc, cache: Arc, options: FileReaderOptions, } + +/// A data reader for Lance files. +/// +/// This reader can use either full file metadata or indexed per-column metadata +/// internally. It intentionally does not expose APIs that require synchronous +/// access to full file metadata. +#[derive(Debug, Clone)] +pub struct FileDataReader { + core: FileReadCore, +} + +/// A Lance file reader backed by fully decoded file metadata. +#[derive(Debug, Clone)] +pub struct FileReader { + core: FileReadCore, + metadata: Arc, +} #[derive(Debug)] struct Footer { #[allow(dead_code)] @@ -460,13 +575,8 @@ const FOOTER_LEN: usize = 40; impl FileReader { pub fn with_scheduler(&self, scheduler: Arc) -> Self { Self { - scheduler, - base_projection: self.base_projection.clone(), - cache: self.cache.clone(), - decoder_plugins: self.decoder_plugins.clone(), + core: self.core.with_scheduler(scheduler), metadata: self.metadata.clone(), - options: self.options.clone(), - num_rows: self.num_rows, } } @@ -481,23 +591,23 @@ impl FileReader { &self, stats: Arc, ) -> Self { - match self.scheduler.with_io_stats(stats) { + match self.core.scheduler.with_io_stats(stats) { Some(scheduler) => self.with_scheduler(scheduler), None => self.clone(), } } pub fn num_rows(&self) -> u64 { - self.num_rows + self.core.num_rows() } pub fn metadata(&self) -> &Arc { &self.metadata } - pub fn file_statistics(&self) -> FileStatistics { - let column_metadatas = &self.metadata().column_metadatas; - + fn statistics_from_column_metadata( + column_metadatas: &[pbfile::ColumnMetadata], + ) -> FileStatistics { let column_stats = column_metadatas .iter() .map(|col_metadata| { @@ -519,22 +629,12 @@ impl FileReader { } } - pub async fn read_global_buffer(&self, index: u32) -> Result { - let buffer_desc = self.metadata.file_buffers.get(index as usize).ok_or_else(||Error::invalid_input(format!("request for global buffer at index {} but there were only {} global buffers in the file", index, self.metadata.file_buffers.len())))?; - - // If the buffer's bytes were captured by the tail read at open, serve them - // from memory with no additional I/O. Larger buffers (outside the window) - // are not retained and fall back to a dedicated read. - if let Some(bytes) = self.metadata.retained_global_buffers.get(&index) { - return Ok(bytes.clone()); - } + pub fn file_statistics(&self) -> FileStatistics { + Self::statistics_from_column_metadata(&self.metadata().column_metadatas) + } - self.scheduler - .submit_single( - buffer_desc.position..buffer_desc.position + buffer_desc.size, - 0, - ) - .await + pub async fn read_global_buffer(&self, index: u32) -> Result { + self.core.read_global_buffer(index).await } async fn read_tail(scheduler: &FileScheduler) -> Result<(Bytes, u64)> { @@ -548,6 +648,47 @@ impl FileReader { Ok((tail_bytes, file_size)) } + async fn read_range_from_tail_or_scheduler( + tail_bytes: &Bytes, + tail_offset: u64, + scheduler: &FileScheduler, + range: Range, + ) -> Result { + let tail_end = tail_offset + tail_bytes.len() as u64; + if range.start >= tail_offset && range.end <= tail_end { + let rel_start = (range.start - tail_offset) as usize; + let rel_end = (range.end - tail_offset) as usize; + Ok(tail_bytes.slice(rel_start..rel_end)) + } else { + scheduler.submit_single(range, 0).await + } + } + + fn retained_global_buffers_from_tail( + gbo_table: &[BufferDescriptor], + tail_bytes: &Bytes, + tail_offset: u64, + ) -> BTreeMap { + let tail_end = tail_offset + tail_bytes.len() as u64; + gbo_table + .iter() + .enumerate() + .skip(1) + .filter_map(|(index, buffer)| { + let start = buffer.position; + let end = buffer.position + buffer.size; + if start >= tail_offset && end <= tail_end { + let rel_start = (start - tail_offset) as usize; + let rel_end = (end - tail_offset) as usize; + let bytes = Bytes::copy_from_slice(&tail_bytes[rel_start..rel_end]); + Some((index as u32, bytes)) + } else { + None + } + }) + .collect() + } + // Checks to make sure the footer is written correctly and returns the // position of the file descriptor (which comes from the footer) fn decode_footer(footer_bytes: &Bytes) -> Result