From 5d3e83870b42b6e58e911b1955ce2ccd8bc59da1 Mon Sep 17 00:00:00 2001 From: Xuanwo Date: Fri, 19 Jun 2026 23:57:40 +0800 Subject: [PATCH] feat: add RLE v2 run length widths --- protos/table.proto | 11 +- rust/lance-encoding/src/compression.rs | 176 ++++- .../src/encodings/physical/general.rs | 16 +- .../src/encodings/physical/rle.rs | 602 ++++++++++++++---- .../lance-namespace-impls/src/dir/manifest.rs | 3 +- rust/lance-table/src/feature_flags.rs | 17 +- rust/lance-table/src/format/manifest.rs | 7 +- rust/lance/src/dataset.rs | 4 + rust/lance/src/dataset/tests/dataset_io.rs | 291 ++++++++- rust/lance/src/dataset/transaction.rs | 19 + rust/lance/src/dataset/write.rs | 118 ++++ rust/lance/src/dataset/write/commit.rs | 99 ++- rust/lance/src/dataset/write/insert.rs | 30 +- rust/lance/src/io/commit.rs | 7 +- 14 files changed, 1207 insertions(+), 193 deletions(-) diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..e5c5dc82b3f 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -109,10 +109,13 @@ message Manifest { // should not attempt to read the dataset. // // Known flags: - // * 1: deletion files are present - // * 2: row ids are stable and stored as part of the fragment metadata. - // * 4: use v2 format (deprecated) - // * 8: table config is present + // * 1 << 0: deletion files are present + // * 1 << 1: row ids are stable and stored as part of the fragment metadata. + // * 1 << 2: use v2 format (deprecated) + // * 1 << 3: table config is present + // * 1 << 4: dataset uses multiple base paths + // * 1 << 5: transaction file writes are disabled + // * 1 << 6: dataset may contain RLE v2 pages uint64 reader_feature_flags = 9; // Feature flags for writers. diff --git a/rust/lance-encoding/src/compression.rs b/rust/lance-encoding/src/compression.rs index e5d617e44ef..4420503f806 100644 --- a/rust/lance-encoding/src/compression.rs +++ b/rust/lance-encoding/src/compression.rs @@ -51,7 +51,10 @@ use crate::{ PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder, VariablePackedStructFieldKind, }, - rle::{RleDecompressor, RleEncoder}, + rle::{ + RleDecompressor, RleEncoder, RunLengthWidth, select_run_length_width, + select_run_length_width_and_size, + }, value::{ValueDecompressor, ValueEncoder}, }, }, @@ -140,6 +143,8 @@ pub struct DefaultCompressionStrategy { params: CompressionParams, /// The lance file version for compatibilities. version: LanceFileVersion, + /// Whether the writer may emit RLE v2 run length widths. + enable_rle_v2: bool, } fn try_bss_for_mini_block( @@ -165,6 +170,7 @@ fn try_bss_for_mini_block( fn try_rle_for_mini_block( data: &FixedWidthDataBlock, params: &CompressionFieldParams, + enable_rle_v2: bool, ) -> Option> { let bits = data.bits_per_value; if !matches!(bits, 8 | 16 | 32 | 64) { @@ -188,16 +194,22 @@ fn try_rle_for_mini_block( return None; } - // Estimate the encoded size. - // - // RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into - // multiple entries of up to 255 values. We don't know the run length distribution here, - // so we conservatively account for splitting with an upper bound. let num_values = data.num_values; - let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values); - let raw_bytes = (num_values as u128) * (type_size as u128); - let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128); + let (run_length_width, rle_bytes) = if enable_rle_v2 { + select_run_length_width_and_size(&data.data, data.num_values, data.bits_per_value).ok()? + } else { + // Estimate the encoded size. + // + // Compatibility RLE stores (value, u8 run_length) pairs. Long runs are split into + // multiple entries of up to 255 values. We don't know the run length distribution here, + // so we conservatively account for splitting with an upper bound. + let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values); + ( + RunLengthWidth::U8, + (estimated_pairs as u128) * ((type_size + 1) as u128), + ) + }; if rle_bytes < raw_bytes { #[cfg(feature = "bitpacking")] @@ -208,7 +220,9 @@ fn try_rle_for_mini_block( return None; } } - return Some(Box::new(RleEncoder::new())); + return Some(Box::new(RleEncoder::with_run_length_width( + run_length_width, + ))); } None } @@ -217,14 +231,15 @@ fn try_rle_for_block( data: &FixedWidthDataBlock, version: LanceFileVersion, params: &CompressionFieldParams, -) -> Option<(Box, CompressiveEncoding)> { + enable_rle_v2: bool, +) -> Result, CompressiveEncoding)>> { if version < LanceFileVersion::V2_2 { - return None; + return Ok(None); } let bits = data.bits_per_value; if !matches!(bits, 8 | 16 | 32 | 64) { - return None; + return Ok(None); } let run_count = data.expect_single_stat::(Stat::RunCount); @@ -233,14 +248,19 @@ fn try_rle_for_block( .unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD); if (run_count as f64) < (data.num_values as f64) * threshold { - let compressor = Box::new(RleEncoder::new()); + let run_length_width = if enable_rle_v2 { + select_run_length_width(&data.data, data.num_values, data.bits_per_value)? + } else { + RunLengthWidth::U8 + }; + let compressor = Box::new(RleEncoder::with_run_length_width(run_length_width)); let encoding = ProtobufUtils21::rle( ProtobufUtils21::flat(bits, None), - ProtobufUtils21::flat(/*bits_per_value=*/ 8, None), + ProtobufUtils21::flat(run_length_width.bits_per_value(), None), ); - return Some((compressor, encoding)); + return Ok(Some((compressor, encoding))); } - None + Ok(None) } fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option> { @@ -384,6 +404,7 @@ impl DefaultCompressionStrategy { Self { params, version: LanceFileVersion::default(), + enable_rle_v2: false, } } @@ -393,6 +414,12 @@ impl DefaultCompressionStrategy { self } + /// Allow this strategy to emit RLE v2 run length widths. + pub fn with_rle_v2(mut self) -> Self { + self.enable_rle_v2 = true; + self + } + /// Parse compression parameters from field metadata fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams { let mut params = CompressionFieldParams::default(); @@ -456,7 +483,7 @@ impl DefaultCompressionStrategy { } let base = try_bss_for_mini_block(data, params) - .or_else(|| try_rle_for_mini_block(data, params)) + .or_else(|| try_rle_for_mini_block(data, params, self.enable_rle_v2)) .or_else(|| try_bitpack_for_mini_block(data)) .unwrap_or_else(|| Box::new(ValueEncoder::default())); @@ -664,7 +691,7 @@ impl CompressionStrategy for DefaultCompressionStrategy { match data { DataBlock::FixedWidth(fixed_width) => { if let Some((compressor, encoding)) = - try_rle_for_block(fixed_width, self.version, &field_params) + try_rle_for_block(fixed_width, self.version, &field_params, self.enable_rle_v2)? { return Ok((compressor, encoding)); } @@ -815,8 +842,11 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(ValueDecompressor::from_fsl(fsl))) } Compression::Rle(rle) => { - let bits_per_value = validate_rle_compression(rle)?; - Ok(Box::new(RleDecompressor::new(bits_per_value))) + let (bits_per_value, run_length_width) = validate_rle_compression(rle)?; + Ok(Box::new(RleDecompressor::with_run_length_width( + bits_per_value, + run_length_width, + ))) } Compression::ByteStreamSplit(bss) => { let Compression::Flat(values) = @@ -1005,15 +1035,18 @@ impl DecompressionStrategy for DefaultDecompressionStrategy { Ok(Box::new(general_decompressor)) } Compression::Rle(rle) => { - let bits_per_value = validate_rle_compression(rle)?; - Ok(Box::new(RleDecompressor::new(bits_per_value))) + let (bits_per_value, run_length_width) = validate_rle_compression(rle)?; + Ok(Box::new(RleDecompressor::with_run_length_width( + bits_per_value, + run_length_width, + ))) } _ => todo!(), } } } -/// Validates RLE compression format and extracts bits_per_value -fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result { +/// Validates RLE compression format and extracts value and run length widths. +fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<(u64, RunLengthWidth)> { let values = rle .values .as_ref() @@ -1043,14 +1076,22 @@ fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result { )); }; - if run_lengths.bits_per_value != 8 { + if !matches!(values.bits_per_value, 8 | 16 | 32 | 64) { return Err(Error::invalid_input(format!( - "RLE compression only supports 8-bit run lengths, got {}", - run_lengths.bits_per_value + "RLE compression only supports 8, 16, 32, or 64-bit values, got {}", + values.bits_per_value ))); } - Ok(values.bits_per_value) + let run_length_width = + RunLengthWidth::from_bits(run_lengths.bits_per_value).ok_or_else(|| { + Error::invalid_input(format!( + "RLE compression only supports 8, 16, or 32-bit run lengths, got {}", + run_lengths.bits_per_value + )) + })?; + + Ok((values.bits_per_value, run_length_width)) } #[cfg(test)] @@ -1659,6 +1700,83 @@ mod tests { assert!(debug_str.contains("RleEncoder")); } + #[test] + fn test_rle_v2_miniblock_selects_u16_run_lengths() { + let mut metadata = HashMap::new(); + metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "1.0".to_string()); + metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); + let mut field = create_test_field("test_column", DataType::Int32); + field.metadata = metadata; + + let values = vec![7i32; 1000]; + let mut data = FixedWidthDataBlock { + bits_per_value: 32, + data: LanceBuffer::reinterpret_vec(values), + num_values: 1000, + block_info: BlockInfo::default(), + }; + data.compute_stat(); + let data = DataBlock::FixedWidth(data); + + let strategy = DefaultCompressionStrategy::new().with_rle_v2(); + let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap(); + let (_compressed, encoding) = compressor.compress(data).unwrap(); + let Compression::Rle(rle) = encoding.compression.as_ref().unwrap() else { + panic!("expected RLE encoding"); + }; + let Compression::Flat(run_lengths) = rle + .run_lengths + .as_ref() + .unwrap() + .compression + .as_ref() + .unwrap() + else { + panic!("expected flat run lengths"); + }; + assert_eq!(run_lengths.bits_per_value, 16); + } + + #[test] + fn test_rle_v2_uses_selected_width_cost_before_bitpacking() { + let mut metadata = HashMap::new(); + metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "1.0".to_string()); + metadata.insert(BSS_META_KEY.to_string(), "off".to_string()); + let mut field = create_test_field("test_column", DataType::Int32); + field.metadata = metadata; + + let values = vec![0i32; 4096]; + let mut data = FixedWidthDataBlock { + bits_per_value: 32, + data: LanceBuffer::reinterpret_vec(values), + num_values: 4096, + block_info: BlockInfo::default(), + }; + data.compute_stat(); + let data = DataBlock::FixedWidth(data); + + let strategy = DefaultCompressionStrategy::new().with_rle_v2(); + let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap(); + let debug_str = format!("{compressor:?}"); + assert!(debug_str.contains("RleEncoder")); + + let (_compressed, encoding) = compressor.compress(data).unwrap(); + let Compression::Rle(rle) = encoding.compression.as_ref().unwrap() else { + panic!("expected RLE encoding"); + }; + let Compression::Flat(run_lengths) = rle + .run_lengths + .as_ref() + .unwrap() + .compression + .as_ref() + .unwrap() + else { + panic!("expected flat run lengths"); + }; + assert_eq!(run_lengths.bits_per_value, 16); + } + #[test] fn test_field_metadata_override_params() { // Set up params with one configuration diff --git a/rust/lance-encoding/src/encodings/physical/general.rs b/rust/lance-encoding/src/encodings/physical/general.rs index 4d58f72e71a..53c61928870 100644 --- a/rust/lance-encoding/src/encodings/physical/general.rs +++ b/rust/lance-encoding/src/encodings/physical/general.rs @@ -161,7 +161,7 @@ mod tests { // Small data with RLE - should not compress due to size threshold TestCase { name: "small_rle_data", - inner_encoder: Box::new(RleEncoder), + inner_encoder: Box::new(RleEncoder::new()), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -173,7 +173,7 @@ mod tests { // Large repeated data with RLE + LZ4 TestCase { name: "large_rle_lz4", - inner_encoder: Box::new(RleEncoder), + inner_encoder: Box::new(RleEncoder::new()), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -185,7 +185,7 @@ mod tests { // Large repeated data with RLE + Zstd TestCase { name: "large_rle_zstd", - inner_encoder: Box::new(RleEncoder), + inner_encoder: Box::new(RleEncoder::new()), compression: CompressionConfig { scheme: CompressionScheme::Zstd, level: Some(3), @@ -403,7 +403,7 @@ mod tests { // Test that small buffers don't get compressed let small_test = TestCase { name: "small_buffer_no_compression", - inner_encoder: Box::new(RleEncoder), + inner_encoder: Box::new(RleEncoder::new()), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -496,7 +496,7 @@ mod tests { // RLE produces 2 buffers (values and lengths), test that both are handled correctly let data = create_repeated_i32_block(vec![1; 100]); let compressor = GeneralMiniBlockCompressor::new( - Box::new(RleEncoder), + Box::new(RleEncoder::new()), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -519,7 +519,7 @@ mod tests { // Test case 1: 32-bit RLE data let test_32 = TestCase { name: "rle_32bit_with_general_wrapper", - inner_encoder: Box::new(RleEncoder), + inner_encoder: Box::new(RleEncoder::new()), compression: CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -532,7 +532,7 @@ mod tests { // For 32-bit RLE, the compression strategy should automatically wrap it // Let's directly test the compressor let compressor = GeneralMiniBlockCompressor::new( - Box::new(RleEncoder), + Box::new(RleEncoder::new()), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, @@ -589,7 +589,7 @@ mod tests { let block_64 = DataBlock::from_array(array_64); let compressor_64 = GeneralMiniBlockCompressor::new( - Box::new(RleEncoder), + Box::new(RleEncoder::new()), CompressionConfig { scheme: CompressionScheme::Lz4, level: None, diff --git a/rust/lance-encoding/src/encodings/physical/rle.rs b/rust/lance-encoding/src/encodings/physical/rle.rs index 88e27bf954f..66960fb3dc4 100644 --- a/rust/lance-encoding/src/encodings/physical/rle.rs +++ b/rust/lance-encoding/src/encodings/physical/rle.rs @@ -10,7 +10,7 @@ //! RLE uses a dual-buffer format to store compressed data: //! //! - **Values Buffer**: Stores unique values in their original data type -//! - **Lengths Buffer**: Stores the repeat count for each value as u8 +//! - **Lengths Buffer**: Stores the repeat count for each value as u8, u16, or u32 //! //! ### Example //! @@ -18,13 +18,13 @@ //! //! Encoded as: //! - Values buffer: `[1, 2, 3]` (3 × 4 bytes for i32) -//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8) +//! - Lengths buffer: `[3, 2, 4]` (3 × 1 byte for u8 in compatibility mode) //! //! ### Long Run Handling //! -//! When a run exceeds 255 values, it is split into multiple runs of 255 -//! followed by a final run with the remainder. For example, a run of 1000 -//! identical values becomes 4 runs: [255, 255, 255, 235]. +//! In compatibility mode, when a run exceeds 255 values, it is split into multiple +//! runs of 255 followed by a final run with the remainder. RLE v2 can use u16 or +//! u32 run lengths to reduce this splitting. //! //! ## Supported Types //! @@ -70,13 +70,195 @@ use crate::format::pb21::CompressiveEncoding; use lance_core::{Error, Result}; +/// Width used to encode RLE run lengths. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub(crate) enum RunLengthWidth { + /// Compatibility mode. Runs longer than 255 values are split. + U8, + /// RLE v2 mode for runs up to 65,535 values per entry. + U16, + /// RLE v2 mode for runs up to 4,294,967,295 values per entry. + U32, +} + +impl RunLengthWidth { + pub(crate) fn from_bits(bits_per_value: u64) -> Option { + match bits_per_value { + 8 => Some(Self::U8), + 16 => Some(Self::U16), + 32 => Some(Self::U32), + _ => None, + } + } + + pub(crate) fn bits_per_value(self) -> u64 { + match self { + Self::U8 => 8, + Self::U16 => 16, + Self::U32 => 32, + } + } + + fn bytes_per_value(self) -> usize { + match self { + Self::U8 => 1, + Self::U16 => 2, + Self::U32 => 4, + } + } + + fn max_run_length(self) -> u64 { + match self { + Self::U8 => u8::MAX as u64, + Self::U16 => u16::MAX as u64, + Self::U32 => u32::MAX as u64, + } + } + + fn write_length(self, length: u64, dst: &mut Vec) { + match self { + Self::U8 => dst.push(length as u8), + Self::U16 => dst.extend_from_slice(&(length as u16).to_le_bytes()), + Self::U32 => dst.extend_from_slice(&(length as u32).to_le_bytes()), + } + } + + fn read_length(self, bytes: &[u8]) -> u64 { + match self { + Self::U8 => bytes[0] as u64, + Self::U16 => u16::from_le_bytes([bytes[0], bytes[1]]) as u64, + Self::U32 => u32::from_le_bytes([bytes[0], bytes[1], bytes[2], bytes[3]]) as u64, + } + } +} + +/// Select the lowest-cost run length width for a fixed-width data block. +pub(crate) fn select_run_length_width( + data: &LanceBuffer, + num_values: u64, + bits_per_value: u64, +) -> Result { + select_run_length_width_and_size(data, num_values, bits_per_value).map(|(width, _)| width) +} + +/// Select the lowest-cost run length width and return the encoded size estimate. +pub(crate) fn select_run_length_width_and_size( + data: &LanceBuffer, + num_values: u64, + bits_per_value: u64, +) -> Result<(RunLengthWidth, u128)> { + match bits_per_value { + 8 => select_run_length_width_generic::(data, num_values), + 16 => select_run_length_width_generic::(data, num_values), + 32 => select_run_length_width_generic::(data, num_values), + 64 => select_run_length_width_generic::(data, num_values), + _ => Err(Error::invalid_input_source( + format!("RLE encoding bits_per_value must be 8, 16, 32, or 64, got {bits_per_value}") + .into(), + )), + } +} + +fn select_run_length_width_generic( + data: &LanceBuffer, + num_values: u64, +) -> Result<(RunLengthWidth, u128)> +where + T: bytemuck::Pod + PartialEq + Copy + ArrowNativeType, +{ + let num_values = usize::try_from(num_values).map_err(|_| { + Error::invalid_input_source( + format!("RLE num_values does not fit in usize: {num_values}").into(), + ) + })?; + if num_values == 0 { + return Ok((RunLengthWidth::U8, 0)); + } + + let type_size = std::mem::size_of::(); + let expected_bytes = num_values.checked_mul(type_size).ok_or_else(|| { + Error::invalid_input_source( + format!("RLE input byte length overflow: {num_values} values of {type_size} bytes") + .into(), + ) + })?; + if data.len() != expected_bytes { + return Err(Error::invalid_input_source( + format!( + "RLE input data size mismatch: {} bytes for {} values of {} bytes", + data.len(), + num_values, + type_size + ) + .into(), + )); + } + + let values_ref = data.borrow_to_typed_slice::(); + let values: &[T] = values_ref.as_ref(); + let mut costs = [0_u128; 3]; + + let mut current_value = values[0]; + let mut current_length = 1_u64; + for &value in values.iter().skip(1) { + if value == current_value { + current_length += 1; + } else { + accumulate_width_costs(current_length, type_size, &mut costs); + current_value = value; + current_length = 1; + } + } + accumulate_width_costs(current_length, type_size, &mut costs); + + let widths = [RunLengthWidth::U8, RunLengthWidth::U16, RunLengthWidth::U32]; + let mut best_idx = 0usize; + let mut best_cost = costs[0]; + for (idx, &cost) in costs.iter().enumerate().skip(1) { + if cost < best_cost { + best_idx = idx; + best_cost = cost; + } + } + Ok((widths[best_idx], best_cost)) +} + +fn accumulate_width_costs(run_length: u64, type_size: usize, costs: &mut [u128; 3]) { + let widths = [RunLengthWidth::U8, RunLengthWidth::U16, RunLengthWidth::U32]; + // The current encoder uses miniblock-sized chunks for both miniblock and block paths. + let max_segment_values = *MAX_MINIBLOCK_VALUES; + let mut remaining = run_length; + while remaining > 0 { + let segment = remaining.min(max_segment_values); + for (idx, width) in widths.iter().enumerate() { + let entries = segment.div_ceil(width.max_run_length()); + costs[idx] += (entries as u128) * ((type_size + width.bytes_per_value()) as u128); + } + remaining -= segment; + } +} + /// RLE encoder for miniblock format -#[derive(Debug, Default)] -pub struct RleEncoder; +#[derive(Debug)] +pub struct RleEncoder { + run_length_width: RunLengthWidth, +} + +impl Default for RleEncoder { + fn default() -> Self { + Self::new() + } +} impl RleEncoder { pub fn new() -> Self { - Self + Self { + run_length_width: RunLengthWidth::U8, + } + } + + pub(crate) fn with_run_length_width(run_length_width: RunLengthWidth) -> Self { + Self { run_length_width } } fn encode_data( @@ -89,17 +271,23 @@ impl RleEncoder { return Ok((Vec::new(), Vec::new())); } + let num_values = usize::try_from(num_values).map_err(|_| { + Error::invalid_input_source( + format!("RLE num_values does not fit in usize: {num_values}").into(), + ) + })?; let bytes_per_value = (bits_per_value / 8) as usize; + let bytes_per_length = self.run_length_width.bytes_per_value(); // Pre-allocate global buffers with estimated capacity // Assume average compression ratio of ~10:1 (10 values per run) - let estimated_runs = num_values as usize / 10; + let estimated_runs = num_values / 10; let mut all_values = Vec::with_capacity(estimated_runs * bytes_per_value); - let mut all_lengths = Vec::with_capacity(estimated_runs); + let mut all_lengths = Vec::with_capacity(estimated_runs * bytes_per_length); let mut chunks = Vec::new(); let mut offset = 0usize; - let mut values_remaining = num_values as usize; + let mut values_remaining = num_values; while values_remaining > 0 { let values_start = all_values.len(); @@ -134,7 +322,14 @@ impl RleEncoder { &mut all_values, &mut all_lengths, ), - _ => unreachable!("RLE encoding bits_per_value must be 8, 16, 32 or 64"), + _ => { + return Err(Error::invalid_input_source( + format!( + "RLE encoding bits_per_value must be 8, 16, 32, or 64, got {bits_per_value}" + ) + .into(), + )); + } }; if values_processed == 0 { @@ -175,22 +370,7 @@ impl RleEncoder { )) } - /// Encodes a chunk of data using RLE compression with dynamic boundary detection. - /// - /// This function processes values sequentially, detecting runs (sequences of identical values) - /// and encoding them as (value, length) pairs. It dynamically determines whether this chunk - /// should be the last chunk based on how many values were processed. - /// - /// # Key Features: - /// - Tracks byte usage to ensure we don't exceed MAX_MINIBLOCK_BYTES - /// - Maintains power-of-2 checkpoints for non-last chunks - /// - Splits long runs (>255) into multiple entries - /// - Dynamically determines if this is the last chunk - /// - /// # Returns: - /// - num_runs: Number of runs encoded - /// - values_processed: Number of input values processed - /// - is_last_chunk: Whether this chunk processed all remaining values + /// Encodes the largest valid mini-block prefix from `offset`. fn encode_chunk_rolling( &self, data: &LanceBuffer, @@ -203,7 +383,6 @@ impl RleEncoder { T: bytemuck::Pod + PartialEq + Copy + std::fmt::Debug + ArrowNativeType, { let type_size = std::mem::size_of::(); - let chunk_start = offset * type_size; let max_by_count = *MAX_MINIBLOCK_VALUES as usize; let max_values = values_remaining.min(max_by_count); @@ -217,112 +396,101 @@ impl RleEncoder { let chunk_buffer = data.slice_with_length(chunk_start, chunk_len); let typed_data_ref = chunk_buffer.borrow_to_typed_slice::(); let typed_data: &[T] = typed_data_ref.as_ref(); + let max_values = max_values.min(typed_data.len()); if typed_data.is_empty() { return (0, 0, false); } - // Record starting positions for this chunk let values_start = all_values.len(); + let all_remaining_values_fit = values_remaining <= max_by_count; + let encoded_size = self.encoded_size(&typed_data[..max_values]); + let (values_to_encode, is_last_chunk) = if all_remaining_values_fit + && encoded_size <= MAX_MINIBLOCK_BYTES as usize + { + (max_values, true) + } else if let Some(values_to_encode) = self.largest_power_of_two_prefix::(typed_data) { + (values_to_encode, false) + } else { + return (0, 0, false); + }; - let mut current_value = typed_data[0]; - let mut current_length = 1u64; - let mut bytes_used = 0usize; - let mut total_values_encoded = 0usize; // Track total encoded values + self.encode_values(&typed_data[..values_to_encode], all_values, all_lengths); - // Power-of-2 checkpoints for ensuring non-last chunks have valid sizes. - // - // We start from a slightly larger minimum checkpoint for smaller types since - // they encode more compactly and are less likely to hit MAX_MINIBLOCK_BYTES. - let min_checkpoint_log2 = match type_size { - 1 => 8, // 256 - 2 => 7, // 128 - _ => 6, // 64 - }; - let max_checkpoint_log2 = (values_remaining.min(*MAX_MINIBLOCK_VALUES as usize)) - .next_power_of_two() - .ilog2(); - let mut checkpoint_log2 = min_checkpoint_log2; + let num_runs = (all_values.len() - values_start) / type_size; + (num_runs, values_to_encode, is_last_chunk) + } + + fn largest_power_of_two_prefix(&self, values: &[T]) -> Option + where + T: bytemuck::Pod + PartialEq + Copy, + { + let max_prefix = values.len().min(*MAX_MINIBLOCK_VALUES as usize); + let mut prefix = 1usize << max_prefix.ilog2(); + while prefix > 1 { + if self.encoded_size(&values[..prefix]) <= MAX_MINIBLOCK_BYTES as usize { + return Some(prefix); + } + prefix >>= 1; + } + None + } + + fn encoded_size(&self, values: &[T]) -> usize + where + T: bytemuck::Pod + PartialEq + Copy, + { + if values.is_empty() { + return 0; + } - // Save state at checkpoints so we can roll back if needed - let mut last_checkpoint_state = None; + let mut current_value = values[0]; + let mut current_length = 1u64; + let mut encoded_size = 0usize; - for &value in typed_data[1..].iter() { + for &value in values.iter().skip(1) { if value == current_value { current_length += 1; } else { - // Calculate space needed (may need multiple u8s if run > 255) - let run_chunks = current_length.div_ceil(255) as usize; - let bytes_needed = run_chunks * (type_size + 1); - - // Stop if adding this run would exceed byte limit - if bytes_used + bytes_needed > MAX_MINIBLOCK_BYTES as usize { - if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state { - // Roll back to last power-of-2 checkpoint - all_values.truncate(val_pos); - all_lengths.truncate(len_pos); - let num_runs = (val_pos - values_start) / type_size; - return (num_runs, checkpoint_values, false); - } - break; - } - - bytes_used += self.add_run(¤t_value, current_length, all_values, all_lengths); - total_values_encoded += current_length as usize; + encoded_size += self.run_size::(current_length); current_value = value; current_length = 1; } - - // Check if we reached a power-of-2 checkpoint. - while checkpoint_log2 <= max_checkpoint_log2 { - let checkpoint_values = 1usize << checkpoint_log2; - if checkpoint_values > values_remaining || total_values_encoded < checkpoint_values - { - break; - } - last_checkpoint_state = Some(( - all_values.len(), - all_lengths.len(), - bytes_used, - checkpoint_values, - )); - checkpoint_log2 += 1; - } } + encoded_size += self.run_size::(current_length); + encoded_size + } - // After the loop, we always have a pending run that needs to be added - // unless we've exceeded the byte limit - if current_length > 0 { - let run_chunks = current_length.div_ceil(255) as usize; - let bytes_needed = run_chunks * (type_size + 1); + fn run_size(&self, length: u64) -> usize + where + T: bytemuck::Pod, + { + let type_size = std::mem::size_of::(); + let run_chunks = length.div_ceil(self.run_length_width.max_run_length()) as usize; + run_chunks * (type_size + self.run_length_width.bytes_per_value()) + } - if bytes_used + bytes_needed <= MAX_MINIBLOCK_BYTES as usize { - let _ = self.add_run(¤t_value, current_length, all_values, all_lengths); - total_values_encoded += current_length as usize; - } + fn encode_values(&self, values: &[T], all_values: &mut Vec, all_lengths: &mut Vec) + where + T: bytemuck::Pod + PartialEq + Copy, + { + if values.is_empty() { + return; } - // Determine if we've processed all remaining values - let is_last_chunk = total_values_encoded == values_remaining; - - // Non-last chunks must have power-of-2 values for miniblock format - if !is_last_chunk { - if total_values_encoded.is_power_of_two() { - // Already at power-of-2 boundary - } else if let Some((val_pos, len_pos, _, checkpoint_values)) = last_checkpoint_state { - // Roll back to last valid checkpoint - all_values.truncate(val_pos); - all_lengths.truncate(len_pos); - let num_runs = (val_pos - values_start) / type_size; - return (num_runs, checkpoint_values, false); + let mut current_value = values[0]; + let mut current_length = 1u64; + + for &value in values.iter().skip(1) { + if value == current_value { + current_length += 1; } else { - // No valid checkpoint, can't create a valid chunk - return (0, 0, false); + self.add_run(¤t_value, current_length, all_values, all_lengths); + current_value = value; + current_length = 1; } } - - let num_runs = (all_values.len() - values_start) / type_size; - (num_runs, total_values_encoded, is_last_chunk) + self.add_run(¤t_value, current_length, all_values, all_lengths); } fn add_run( @@ -337,24 +505,26 @@ impl RleEncoder { { let value_bytes = bytemuck::bytes_of(value); let type_size = std::mem::size_of::(); - let num_full_chunks = (length / 255) as usize; - let remainder = (length % 255) as u8; + let max_run_length = self.run_length_width.max_run_length(); + let num_full_chunks = (length / max_run_length) as usize; + let remainder = length % max_run_length; let total_chunks = num_full_chunks + if remainder > 0 { 1 } else { 0 }; all_values.reserve(total_chunks * type_size); - all_lengths.reserve(total_chunks); + all_lengths.reserve(total_chunks * self.run_length_width.bytes_per_value()); for _ in 0..num_full_chunks { all_values.extend_from_slice(value_bytes); - all_lengths.push(255); + self.run_length_width + .write_length(max_run_length, all_lengths); } if remainder > 0 { all_values.extend_from_slice(value_bytes); - all_lengths.push(remainder); + self.run_length_width.write_length(remainder, all_lengths); } - total_chunks * (type_size + 1) + total_chunks * (type_size + self.run_length_width.bytes_per_value()) } } @@ -376,7 +546,7 @@ impl MiniBlockCompressor for RleEncoder { let encoding = ProtobufUtils21::rle( ProtobufUtils21::flat(bits_per_value, None), - ProtobufUtils21::flat(/*bits_per_value=*/ 8, None), + ProtobufUtils21::flat(self.run_length_width.bits_per_value(), None), ); Ok((compressed, encoding)) @@ -418,11 +588,25 @@ impl BlockCompressor for RleEncoder { #[derive(Debug)] pub struct RleDecompressor { bits_per_value: u64, + run_length_width: RunLengthWidth, } impl RleDecompressor { pub fn new(bits_per_value: u64) -> Self { - Self { bits_per_value } + Self { + bits_per_value, + run_length_width: RunLengthWidth::U8, + } + } + + pub(crate) fn with_run_length_width( + bits_per_value: u64, + run_length_width: RunLengthWidth, + ) -> Self { + Self { + bits_per_value, + run_length_width, + } } fn decode_data(&self, data: Vec, num_values: u64) -> Result { @@ -453,7 +637,15 @@ impl RleDecompressor { 16 => self.decode_generic::(values_buffer, lengths_buffer, num_values)?, 32 => self.decode_generic::(values_buffer, lengths_buffer, num_values)?, 64 => self.decode_generic::(values_buffer, lengths_buffer, num_values)?, - _ => unreachable!("RLE decoding bits_per_value must be 8, 16, 32, 64, or 128"), + _ => { + return Err(Error::invalid_input_source( + format!( + "RLE decoding bits_per_value must be 8, 16, 32, or 64, got {}", + self.bits_per_value + ) + .into(), + )); + } }; Ok(DataBlock::FixedWidth(FixedWidthDataBlock { @@ -474,6 +666,7 @@ impl RleDecompressor { T: bytemuck::Pod + Copy + std::fmt::Debug + ArrowNativeType, { let type_size = std::mem::size_of::(); + let length_size = self.run_length_width.bytes_per_value(); if values_buffer.is_empty() || lengths_buffer.is_empty() { if num_values == 0 { @@ -485,19 +678,22 @@ impl RleDecompressor { } } - if !values_buffer.len().is_multiple_of(type_size) || lengths_buffer.is_empty() { + if !values_buffer.len().is_multiple_of(type_size) + || !lengths_buffer.len().is_multiple_of(length_size) + { return Err(Error::invalid_input_source(format!( - "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes", + "Invalid buffer sizes for RLE {} decoding: values {} bytes (not divisible by {}), lengths {} bytes (not divisible by {})", std::any::type_name::(), values_buffer.len(), type_size, - lengths_buffer.len() + lengths_buffer.len(), + length_size ) .into())); } let num_runs = values_buffer.len() / type_size; - let num_length_entries = lengths_buffer.len(); + let num_length_entries = lengths_buffer.len() / length_size; if num_runs != num_length_entries { return Err(Error::invalid_input_source( format!( @@ -510,39 +706,56 @@ impl RleDecompressor { let values_ref = values_buffer.borrow_to_typed_slice::(); let values: &[T] = values_ref.as_ref(); - let lengths: &[u8] = lengths_buffer.as_ref(); - - let expected_value_count = num_values as usize; - let mut decoded: Vec = Vec::with_capacity(expected_value_count); - - for (value, &length) in values.iter().zip(lengths.iter()) { - if decoded.len() == expected_value_count { - break; - } + let lengths = lengths_buffer.as_ref(); + let expected_value_count = usize::try_from(num_values).map_err(|_| { + Error::invalid_input_source( + format!("RLE num_values does not fit in usize: {num_values}").into(), + ) + })?; + let mut decoded_value_count = 0usize; + for length_bytes in lengths.chunks_exact(length_size) { + let length = self.run_length_width.read_length(length_bytes); if length == 0 { return Err(Error::invalid_input_source( "RLE decoding encountered a zero run length".into(), )); } - - let remaining = expected_value_count - decoded.len(); - let write_len = (length as usize).min(remaining); - - decoded.resize(decoded.len() + write_len, *value); + let length = usize::try_from(length).map_err(|_| { + Error::invalid_input_source( + format!("RLE run length does not fit in usize: {length}").into(), + ) + })?; + decoded_value_count = decoded_value_count.checked_add(length).ok_or_else(|| { + Error::invalid_input_source("RLE run length sum overflowed usize".into()) + })?; + if decoded_value_count > expected_value_count { + return Err(Error::invalid_input_source( + format!( + "RLE decoding overflowed expected value count: produced at least {}, expected {}", + decoded_value_count, expected_value_count + ) + .into(), + )); + } } - if decoded.len() != expected_value_count { + if decoded_value_count != expected_value_count { return Err(Error::invalid_input_source( format!( "RLE decoding produced {} values, expected {}", - decoded.len(), - expected_value_count + decoded_value_count, expected_value_count ) .into(), )); } + let mut decoded: Vec = Vec::with_capacity(expected_value_count); + for (value, length_bytes) in values.iter().zip(lengths.chunks_exact(length_size)) { + let length = self.run_length_width.read_length(length_bytes) as usize; + decoded.resize(decoded.len() + length, *value); + } + trace!( "RLE decoded {} {} values", num_values, @@ -641,6 +854,58 @@ mod tests { assert_eq!(lengths_buffer.len(), 6); } + #[test] + fn test_rle_v2_u16_miniblock_encoding() { + let encoder = RleEncoder::with_run_length_width(RunLengthWidth::U16); + + let data = vec![42i32; 1000]; + let array = Int32Array::from(data); + let (compressed, encoding) = + MiniBlockCompressor::compress(&encoder, DataBlock::from_array(array)).unwrap(); + + assert_eq!(compressed.data[0].len(), 4); + assert_eq!(compressed.data[1].len(), 2); + assert_eq!(compressed.data[1].as_ref(), &1000u16.to_le_bytes()); + + let rle = match encoding.compression.as_ref().unwrap() { + crate::format::pb21::compressive_encoding::Compression::Rle(rle) => rle, + other => panic!("expected RLE encoding, got {other:?}"), + }; + let run_lengths = rle.run_lengths.as_ref().unwrap(); + let flat = match run_lengths.compression.as_ref().unwrap() { + crate::format::pb21::compressive_encoding::Compression::Flat(flat) => flat, + other => panic!("expected flat run lengths, got {other:?}"), + }; + assert_eq!(flat.bits_per_value, 16); + + let decompressor = RleDecompressor::with_run_length_width(32, RunLengthWidth::U16); + let decompressed = MiniBlockDecompressor::decompress( + &decompressor, + compressed.data, + compressed.num_values, + ) + .unwrap(); + match decompressed { + DataBlock::FixedWidth(block) => { + let values = block.data.borrow_to_typed_slice::(); + assert_eq!(values.as_ref(), vec![42i32; 1000]); + } + _ => panic!("Expected FixedWidth block"), + } + } + + #[test] + fn test_select_run_length_width_prefers_u16_for_long_runs() { + let data = vec![7i32; 300]; + let bytes = data + .iter() + .flat_map(|value| value.to_le_bytes()) + .collect::>(); + let width = + select_run_length_width(&LanceBuffer::from(bytes), data.len() as u64, 32).unwrap(); + assert_eq!(width, RunLengthWidth::U16); + } + // ========== Round-trip Tests for Different Types ========== #[test] @@ -760,6 +1025,61 @@ mod tests { ); } + #[test] + fn test_u16_length_buffer_must_be_aligned() { + let decompressor = RleDecompressor::with_run_length_width(32, RunLengthWidth::U16); + let values = LanceBuffer::from(vec![1, 0, 0, 0]); + let lengths = LanceBuffer::from(vec![5]); + let result = MiniBlockDecompressor::decompress(&decompressor, vec![values, lengths], 5); + assert!(matches!(&result, Err(Error::InvalidInput { .. }))); + assert!( + result + .unwrap_err() + .to_string() + .contains("not divisible by 2") + ); + } + + #[test] + fn test_rle_rejects_underflow_overflow_and_zero_lengths() { + let decompressor = RleDecompressor::with_run_length_width(32, RunLengthWidth::U16); + let value = LanceBuffer::from(1i32.to_le_bytes().to_vec()); + + let underflow = MiniBlockDecompressor::decompress( + &decompressor, + vec![ + value.clone(), + LanceBuffer::from(4u16.to_le_bytes().to_vec()), + ], + 5, + ) + .unwrap_err(); + assert!(underflow.to_string().contains("produced 4 values")); + + let overflow = MiniBlockDecompressor::decompress( + &decompressor, + vec![ + value.clone(), + LanceBuffer::from(6u16.to_le_bytes().to_vec()), + ], + 5, + ) + .unwrap_err(); + assert!( + overflow + .to_string() + .contains("overflowed expected value count") + ); + + let zero = MiniBlockDecompressor::decompress( + &decompressor, + vec![value, LanceBuffer::from(0u16.to_le_bytes().to_vec())], + 5, + ) + .unwrap_err(); + assert!(zero.to_string().contains("zero run length")); + } + #[test] fn test_empty_data_handling() { let encoder = RleEncoder::new(); diff --git a/rust/lance-namespace-impls/src/dir/manifest.rs b/rust/lance-namespace-impls/src/dir/manifest.rs index aae924378da..b1a470ba93b 100644 --- a/rust/lance-namespace-impls/src/dir/manifest.rs +++ b/rust/lance-namespace-impls/src/dir/manifest.rs @@ -1770,7 +1770,8 @@ impl ManifestNamespace { indices: Option>, transaction: Transaction, ) -> std::result::Result<(), CommitError> { - apply_feature_flags(manifest, false, false).map_err(CommitError::from)?; + let use_rle_v2 = manifest.uses_rle_v2(); + apply_feature_flags(manifest, false, false, use_rle_v2).map_err(CommitError::from)?; let timestamp_nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .map(|d| d.as_nanos()) diff --git a/rust/lance-table/src/feature_flags.rs b/rust/lance-table/src/feature_flags.rs index 096f0da79e5..832960de169 100644 --- a/rust/lance-table/src/feature_flags.rs +++ b/rust/lance-table/src/feature_flags.rs @@ -20,14 +20,17 @@ pub const FLAG_TABLE_CONFIG: u64 = 8; pub const FLAG_BASE_PATHS: u64 = 16; /// Disable writing transaction file under _transaction/, this flag is set when we only want to write inline transaction in manifest pub const FLAG_DISABLE_TRANSACTION_FILE: u64 = 32; +/// Dataset may contain RLE v2 pages. +pub const FLAG_RLE_V2: u64 = 64; /// The first bit that is unknown as a feature flag -pub const FLAG_UNKNOWN: u64 = 64; +pub const FLAG_UNKNOWN: u64 = 128; /// Set the reader and writer feature flags in the manifest based on the contents of the manifest. pub fn apply_feature_flags( manifest: &mut Manifest, enable_stable_row_id: bool, disable_transaction_file: bool, + enable_rle_v2: bool, ) -> Result<()> { // Reset flags manifest.reader_feature_flags = 0; @@ -74,6 +77,10 @@ pub fn apply_feature_flags( if disable_transaction_file { manifest.writer_feature_flags |= FLAG_DISABLE_TRANSACTION_FILE; } + if enable_rle_v2 { + manifest.reader_feature_flags |= FLAG_RLE_V2; + manifest.writer_feature_flags |= FLAG_RLE_V2; + } Ok(()) } @@ -103,10 +110,12 @@ mod tests { assert!(can_read_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_read_dataset(super::FLAG_BASE_PATHS)); assert!(can_read_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); + assert!(can_read_dataset(super::FLAG_RLE_V2)); assert!(can_read_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS | super::FLAG_USE_V2_FORMAT_DEPRECATED + | super::FLAG_RLE_V2 )); assert!(!can_read_dataset(super::FLAG_UNKNOWN)); } @@ -120,12 +129,14 @@ mod tests { assert!(can_write_dataset(super::FLAG_TABLE_CONFIG)); assert!(can_write_dataset(super::FLAG_BASE_PATHS)); assert!(can_write_dataset(super::FLAG_DISABLE_TRANSACTION_FILE)); + assert!(can_write_dataset(super::FLAG_RLE_V2)); assert!(can_write_dataset( super::FLAG_DELETION_FILES | super::FLAG_STABLE_ROW_IDS | super::FLAG_USE_V2_FORMAT_DEPRECATED | super::FLAG_TABLE_CONFIG | super::FLAG_BASE_PATHS + | super::FLAG_RLE_V2 )); assert!(!can_write_dataset(super::FLAG_UNKNOWN)); } @@ -151,7 +162,7 @@ mod tests { DataStorageFormat::default(), HashMap::new(), // Empty base_paths ); - apply_feature_flags(&mut normal_manifest, false, false).unwrap(); + apply_feature_flags(&mut normal_manifest, false, false, false).unwrap(); assert_eq!(normal_manifest.reader_feature_flags & FLAG_BASE_PATHS, 0); assert_eq!(normal_manifest.writer_feature_flags & FLAG_BASE_PATHS, 0); // Test 2: Dataset with base_paths (shallow clone or multi-base) should have FLAG_BASE_PATHS @@ -171,7 +182,7 @@ mod tests { DataStorageFormat::default(), base_paths, ); - apply_feature_flags(&mut multi_base_manifest, false, false).unwrap(); + apply_feature_flags(&mut multi_base_manifest, false, false, false).unwrap(); assert_ne!( multi_base_manifest.reader_feature_flags & FLAG_BASE_PATHS, 0 diff --git a/rust/lance-table/src/format/manifest.rs b/rust/lance-table/src/format/manifest.rs index 9845061b7e4..382292ec0f8 100644 --- a/rust/lance-table/src/format/manifest.rs +++ b/rust/lance-table/src/format/manifest.rs @@ -16,7 +16,7 @@ use std::ops::Range; use std::sync::Arc; use super::Fragment; -use crate::feature_flags::{FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag}; +use crate::feature_flags::{FLAG_RLE_V2, FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag}; use crate::format::fragment::DataFileFieldInterner; use crate::format::pb; use lance_core::cache::LanceCache; @@ -494,6 +494,11 @@ impl Manifest { self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0 } + /// Whether the dataset may contain RLE v2 pages. + pub fn uses_rle_v2(&self) -> bool { + self.reader_feature_flags & FLAG_RLE_V2 != 0 + } + /// Creates a serialized copy of the manifest, suitable for IPC or temp storage /// and can be used to create a dataset pub fn serialized(&self) -> Vec { diff --git a/rust/lance/src/dataset.rs b/rust/lance/src/dataset.rs index 3e0d77704da..a64c2665b44 100644 --- a/rust/lance/src/dataset.rs +++ b/rust/lance/src/dataset.rs @@ -3436,6 +3436,7 @@ pub(crate) struct ManifestWriteConfig { use_legacy_format: Option, // default None storage_format: Option, // default None disable_transaction_file: bool, // default false + use_rle_v2: bool, // default false } impl Default for ManifestWriteConfig { @@ -3447,6 +3448,7 @@ impl Default for ManifestWriteConfig { disable_transaction_file: false, use_legacy_format: None, storage_format: None, + use_rle_v2: false, } } } @@ -3474,10 +3476,12 @@ pub(crate) async fn write_manifest_file( // Preserve it here so this second apply_feature_flags call does not clear it // when config.use_stable_row_ids is false (the ManifestWriteConfig default). let use_stable_row_ids = config.use_stable_row_ids || manifest.uses_stable_row_ids(); + let use_rle_v2 = config.use_rle_v2 || manifest.uses_rle_v2(); apply_feature_flags( manifest, use_stable_row_ids, config.disable_transaction_file, + use_rle_v2, )?; } diff --git a/rust/lance/src/dataset/tests/dataset_io.rs b/rust/lance/src/dataset/tests/dataset_io.rs index c0c90fc71c9..700fdae07a1 100644 --- a/rust/lance/src/dataset/tests/dataset_io.rs +++ b/rust/lance/src/dataset/tests/dataset_io.rs @@ -15,7 +15,7 @@ use crate::session::Session; use crate::{Dataset, Error, Result}; use lance_table::format::DataStorageFormat; -use crate::dataset::write::{WriteMode, WriteParams}; +use crate::dataset::write::{CommitBuilder, InsertBuilder, WriteMode, WriteParams}; use arrow::array::as_struct_array; use arrow::compute::concat_batches; use arrow_array::RecordBatch; @@ -856,6 +856,7 @@ async fn test_write_manifest( use_legacy_format: None, storage_format: None, disable_transaction_file: false, + use_rle_v2: false, }, dataset.manifest_location.naming_scheme, None, @@ -890,6 +891,294 @@ async fn test_write_manifest( assert!(matches!(write_result, Err(Error::NotSupported { .. }))); } +#[tokio::test] +async fn test_rle_v2_write_sets_and_preserves_feature_flag() { + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![7; 1000]))], + ) + .unwrap(); + + let batches = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()); + let mut dataset = Dataset::write( + batches, + &test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::Stable), + enable_rle_v2: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + let manifest = read_manifest( + dataset.object_store.as_ref(), + &dataset + .commit_handler + .resolve_latest_location(&dataset.base, dataset.object_store.as_ref()) + .await + .unwrap() + .path, + None, + ) + .await + .unwrap(); + assert_ne!( + manifest.reader_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + assert_ne!( + manifest.writer_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + + let append_batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![9; 1000]))], + ) + .unwrap(); + let append_batches = + RecordBatchIterator::new(vec![Ok(append_batch)].into_iter(), schema.clone()); + dataset = Dataset::write( + append_batches, + &test_uri, + Some(WriteParams { + mode: WriteMode::Append, + ..Default::default() + }), + ) + .await + .unwrap(); + + assert_ne!( + dataset.manifest.reader_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + assert_ne!( + dataset.manifest.writer_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + + let actual = dataset.scan().try_into_batch().await.unwrap(); + let expected = RecordBatch::try_new( + schema, + vec![Arc::new(Int32Array::from( + [vec![7; 1000], vec![9; 1000]].concat(), + ))], + ) + .unwrap(); + assert_eq!(actual, expected); +} + +#[tokio::test] +async fn test_rle_v2_uncommitted_create_commits_feature_flag() { + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![7; 1000]))], + ) + .unwrap(); + + let transaction = InsertBuilder::new(test_uri.as_str()) + .with_params(&WriteParams { + enable_rle_v2: true, + ..Default::default() + }) + .execute_uncommitted(vec![batch]) + .await + .unwrap(); + + let dataset = CommitBuilder::new(test_uri.as_str()) + .execute(transaction) + .await + .unwrap(); + assert_ne!( + dataset.manifest.reader_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + assert_ne!( + dataset.manifest.writer_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); +} + +#[tokio::test] +async fn test_rle_v2_shallow_clone_preserves_feature_flag() { + let test_uri = TempStrDir::default(); + let clone_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![7; 1000]))], + ) + .unwrap(); + + let mut dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema), + &test_uri, + Some(WriteParams { + enable_rle_v2: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + let clone = dataset + .shallow_clone(clone_uri.as_str(), dataset.version().version, None) + .await + .unwrap(); + assert_ne!( + clone.manifest.reader_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + assert_ne!( + clone.manifest.writer_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); +} + +#[tokio::test] +async fn test_rle_v2_rejects_enabling_existing_unflagged_dataset() { + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![1; 1000]))], + ) + .unwrap(); + let dataset = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone()), + &test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::Stable), + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!( + dataset.manifest.reader_feature_flags & feature_flags::FLAG_RLE_V2, + 0 + ); + + let direct_append = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![2; 1000]))], + ) + .unwrap(); + let result = Dataset::write( + RecordBatchIterator::new(vec![Ok(direct_append)].into_iter(), schema.clone()), + &test_uri, + Some(WriteParams { + mode: WriteMode::Append, + enable_rle_v2: true, + ..Default::default() + }), + ) + .await; + assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!( + result + .unwrap_err() + .to_string() + .contains("only be enabled when creating a new dataset") + ); + + let overwrite_batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![3; 1000]))], + ) + .unwrap(); + let result = Dataset::write( + RecordBatchIterator::new(vec![Ok(overwrite_batch)].into_iter(), schema.clone()), + &test_uri, + Some(WriteParams { + mode: WriteMode::Overwrite, + enable_rle_v2: true, + ..Default::default() + }), + ) + .await; + assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!( + result + .unwrap_err() + .to_string() + .contains("only be enabled when creating a new dataset") + ); + + let append_batch = RecordBatch::try_new( + schema.clone(), + vec![Arc::new(Int32Array::from(vec![2; 1000]))], + ) + .unwrap(); + let dataset = Arc::new(dataset); + let uncommitted_result = InsertBuilder::new(dataset.clone()) + .with_params(&WriteParams { + mode: WriteMode::Append, + enable_rle_v2: true, + ..Default::default() + }) + .execute_uncommitted(vec![append_batch]) + .await; + assert!(matches!( + uncommitted_result, + Err(Error::InvalidInput { .. }) + )); + assert!( + uncommitted_result + .unwrap_err() + .to_string() + .contains("only be enabled when creating a new dataset") + ); +} + +#[tokio::test] +async fn test_rle_v2_rejects_legacy_storage() { + let test_uri = TempStrDir::default(); + let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])); + let batch = + RecordBatch::try_new(schema.clone(), vec![Arc::new(Int32Array::from(vec![1; 8]))]).unwrap(); + + let result = Dataset::write( + RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema), + &test_uri, + Some(WriteParams { + data_storage_version: Some(LanceFileVersion::Legacy), + enable_rle_v2: true, + ..Default::default() + }), + ) + .await; + assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!(result.unwrap_err().to_string().contains("RLE v2 requires")); +} + #[rstest] #[tokio::test] async fn append_dataset( diff --git a/rust/lance/src/dataset/transaction.rs b/rust/lance/src/dataset/transaction.rs index 4555cd7ee6c..225450a4c58 100644 --- a/rust/lance/src/dataset/transaction.rs +++ b/rust/lance/src/dataset/transaction.rs @@ -53,6 +53,8 @@ use uuid::Uuid; /// Version 1 is the initial dataset version in the Lance format. const UNKNOWN_CREATED_AT_VERSION: u64 = 1; +pub(crate) const TRANSACTION_PROPERTY_REQUIRES_RLE_V2: &str = "lance.requires_rle_v2"; + /// Look up the `created_at` version for a single UPDATE-branch row ID. /// /// Callers must only call this for row IDs that are confirmed to be present in @@ -1696,6 +1698,13 @@ impl Transaction { .build() } + pub(crate) fn requires_rle_v2(&self) -> bool { + self.transaction_properties + .as_ref() + .and_then(|props| props.get(TRANSACTION_PROPERTY_REQUIRES_RLE_V2)) + .is_some_and(|value| value == "true") + } + fn fragments_with_ids<'a, T>( new_fragments: T, fragment_id: &'a mut u64, @@ -2364,10 +2373,20 @@ impl Transaction { .map(|m| m.uses_stable_row_ids()) .unwrap_or(false); let use_stable_row_ids = config.use_stable_row_ids || inherited; + let inherited_rle_v2 = current_manifest.map(|m| m.uses_rle_v2()).unwrap_or(false); + let use_rle_v2 = config.use_rle_v2 || inherited_rle_v2; + let storage_version = manifest.data_storage_format.lance_file_version()?; + if use_rle_v2 && storage_version < LanceFileVersion::V2_1 { + return Err(Error::invalid_input(format!( + "RLE v2 requires file version >= 2.1 (got {:?})", + storage_version + ))); + } apply_feature_flags( &mut manifest, use_stable_row_ids, config.disable_transaction_file, + use_rle_v2, )?; } manifest.set_timestamp(timestamp_to_nanos(config.timestamp)); diff --git a/rust/lance/src/dataset/write.rs b/rust/lance/src/dataset/write.rs index ff0a119158c..81e738afb5c 100644 --- a/rust/lance/src/dataset/write.rs +++ b/rust/lance/src/dataset/write.rs @@ -20,6 +20,8 @@ use lance_core::{Error, Result, datatypes::Schema}; use lance_datafusion::chunker::{break_stream, chunk_stream}; use lance_datafusion::spill::{SpillReceiver, SpillSender, create_replay_spill}; use lance_datafusion::utils::StreamingWriteSource; +use lance_encoding::compression::DefaultCompressionStrategy; +use lance_encoding::encoder::{FieldEncodingStrategy, StructuralEncodingStrategy}; use lance_file::previous::writer::{ FileWriter as PreviousFileWriter, ManifestProvider as PreviousManifestProvider, }; @@ -323,6 +325,14 @@ pub struct WriteParams { /// secondary indices need to be updated to point to new row ids. pub enable_stable_row_ids: bool, + /// If set to true, a new dataset may contain RLE pages with 16-bit or + /// 32-bit run lengths. + /// + /// This can only be enabled when creating a new dataset. Existing datasets + /// that already have the RLE v2 feature flag inherit it automatically. + /// Existing datasets without the flag reject this option. + pub enable_rle_v2: bool, + /// If set to true, and this is a new dataset, uses the new v2 manifest paths. /// These allow constant-time lookups for the latest manifest on object storage. /// This parameter has no effect on existing datasets. To migrate an existing @@ -407,6 +417,7 @@ impl Default for WriteParams { commit_handler: None, data_storage_version: None, enable_stable_row_ids: false, + enable_rle_v2: false, enable_v2_manifest_paths: true, session: None, auto_cleanup: None, @@ -432,6 +443,14 @@ impl WriteParams { } } + /// Allow a new dataset to contain RLE pages with 16-bit or 32-bit run lengths. + pub fn with_rle_v2(self) -> Self { + Self { + enable_rle_v2: true, + ..self + } + } + pub fn storage_version_or_default(&self) -> LanceFileVersion { self.data_storage_version.unwrap_or_default() } @@ -568,6 +587,8 @@ pub async fn do_write_fragments( storage_version: LanceFileVersion, target_bases_info: Option>, ) -> Result> { + validate_rle_v2_write(dataset, ¶ms, storage_version)?; + let adapter = SchemaAdapter::new(data.schema()); let data = adapter.to_physical_stream(data); @@ -595,6 +616,10 @@ pub async fn do_write_fragments( .map(|ds| ds.session.store_registry()) .unwrap_or_else(|| params.store_registry()); let source_store_params = params.store_params.clone().unwrap_or_default(); + let enable_rle_v2 = params.enable_rle_v2 + || dataset + .map(|ds| ds.manifest().uses_rle_v2()) + .unwrap_or(false); let writer_generator = WriterGenerator::new( object_store.clone(), @@ -608,6 +633,7 @@ pub async fn do_write_fragments( source_store_registry, source_store_params, params.blob_pack_file_size_threshold, + enable_rle_v2, ); let mut writer: Option> = None; let mut num_rows_in_current_file = 0; @@ -1092,6 +1118,28 @@ pub async fn write_fragments_internal( Ok((fragments, schema)) } +fn validate_rle_v2_write( + dataset: Option<&Dataset>, + params: &WriteParams, + storage_version: LanceFileVersion, +) -> Result<()> { + let existing_uses_rle_v2 = dataset + .map(|ds| ds.manifest().uses_rle_v2()) + .unwrap_or(false); + if params.enable_rle_v2 && dataset.is_some() && !existing_uses_rle_v2 { + return Err(Error::invalid_input( + "RLE v2 can only be enabled when creating a new dataset", + )); + } + if (params.enable_rle_v2 || existing_uses_rle_v2) && storage_version < LanceFileVersion::V2_1 { + return Err(Error::invalid_input(format!( + "RLE v2 requires file version >= 2.1 (got {:?})", + storage_version + ))); + } + Ok(()) +} + fn legacy_blob_field_path(schema: &Schema) -> Option { schema .fields_pre_order() @@ -1261,6 +1309,7 @@ pub(super) async fn open_update_writer( add_data_dir: true, external_base_resolver, source_store_registry: dataset.session.store_registry(), + enable_rle_v2: dataset.manifest().uses_rle_v2(), ..Default::default() }, ) @@ -1277,6 +1326,7 @@ struct WriterOptions { source_store_registry: Arc, source_store_params: ObjectStoreParams, blob_pack_file_size_threshold: Option, + enable_rle_v2: bool, } async fn open_writer_with_options( @@ -1295,6 +1345,7 @@ async fn open_writer_with_options( source_store_registry, source_store_params, blob_pack_file_size_threshold, + enable_rle_v2, } = options; let data_file_key = generate_random_filename(); @@ -1328,6 +1379,18 @@ async fn open_writer_with_options( schema.clone(), FileWriterOptions { format_version: Some(storage_version), + encoding_strategy: if enable_rle_v2 { + Some(Arc::new(StructuralEncodingStrategy { + compression_strategy: Arc::new( + DefaultCompressionStrategy::new() + .with_version(storage_version) + .with_rle_v2(), + ), + version: storage_version, + }) as Arc) + } else { + None + }, ..Default::default() }, )?; @@ -1386,6 +1449,7 @@ struct WriterGenerator { source_store_registry: Arc, source_store_params: ObjectStoreParams, blob_pack_file_size_threshold: Option, + enable_rle_v2: bool, /// Counter for round-robin selection next_base_index: AtomicUsize, } @@ -1404,6 +1468,7 @@ impl WriterGenerator { source_store_registry: Arc, source_store_params: ObjectStoreParams, blob_pack_file_size_threshold: Option, + enable_rle_v2: bool, ) -> Self { Self { object_store, @@ -1417,6 +1482,7 @@ impl WriterGenerator { source_store_registry, source_store_params, blob_pack_file_size_threshold, + enable_rle_v2, next_base_index: AtomicUsize::new(0), } } @@ -1451,6 +1517,7 @@ impl WriterGenerator { source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), blob_pack_file_size_threshold: self.blob_pack_file_size_threshold, + enable_rle_v2: self.enable_rle_v2, }, ) .await? @@ -1469,6 +1536,7 @@ impl WriterGenerator { source_store_registry: self.source_store_registry.clone(), source_store_params: self.source_store_params.clone(), blob_pack_file_size_threshold: self.blob_pack_file_size_threshold, + enable_rle_v2: self.enable_rle_v2, }, ) .await? @@ -2179,6 +2247,7 @@ mod tests { Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), None, + false, ); // Create a writer @@ -2297,6 +2366,7 @@ mod tests { Arc::new(ObjectStoreRegistry::default()), ObjectStoreParams::default(), None, + false, ); // Create test batch @@ -3133,6 +3203,54 @@ mod tests { assert_eq!(dataset.schema().fields.len(), 2); } + #[tokio::test] + async fn test_do_write_fragments_rejects_rle_v2_for_existing_unflagged_dataset() { + use arrow_array::record_batch; + + let temp_dir = TempDir::default(); + let dataset_uri = format!("file://{}", temp_dir.std_path().display()); + let batch = record_batch!(("id", Int32, [1, 2, 3])).unwrap(); + let dataset = InsertBuilder::new(&dataset_uri) + .execute(vec![batch.clone()]) + .await + .unwrap(); + + let arrow_schema = batch.schema(); + let schema = Schema::try_from(arrow_schema.as_ref()).unwrap(); + let stream = RecordBatchStreamAdapter::new( + arrow_schema, + futures::stream::iter(vec![Ok::<_, DataFusionError>(batch)]), + ); + + let result = do_write_fragments( + Some(&dataset), + dataset.object_store.clone(), + &dataset.base, + &schema, + Box::pin(stream), + WriteParams { + mode: WriteMode::Append, + enable_rle_v2: true, + ..Default::default() + }, + dataset + .manifest() + .data_storage_format + .lance_file_version() + .unwrap(), + None, + ) + .await; + + assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!( + result + .unwrap_err() + .to_string() + .contains("only be enabled when creating a new dataset") + ); + } + #[tokio::test] async fn test_disk_full_error() { use std::io::{self, ErrorKind}; diff --git a/rust/lance/src/dataset/write/commit.rs b/rust/lance/src/dataset/write/commit.rs index baad71b3e39..9b0c891ea0e 100644 --- a/rust/lance/src/dataset/write/commit.rs +++ b/rust/lance/src/dataset/write/commit.rs @@ -38,6 +38,7 @@ use tracing::info; pub struct CommitBuilder<'a> { dest: WriteDestination<'a>, use_stable_row_ids: Option, + enable_rle_v2: bool, enable_v2_manifest_paths: bool, storage_format: Option, commit_handler: Option>, @@ -59,6 +60,7 @@ impl<'a> CommitBuilder<'a> { Self { dest: dest.into(), use_stable_row_ids: None, + enable_rle_v2: false, enable_v2_manifest_paths: true, storage_format: None, commit_handler: None, @@ -85,6 +87,15 @@ impl<'a> CommitBuilder<'a> { self } + /// Require the committed dataset version to allow RLE v2 run length widths. + /// + /// This can only be used when creating a new dataset. Existing datasets that + /// already have the RLE v2 feature flag inherit it automatically. + pub fn enable_rle_v2(mut self, enable_rle_v2: bool) -> Self { + self.enable_rle_v2 = enable_rle_v2; + self + } + /// Pass the storage format to use for the dataset. /// /// This is only needed when creating a new empty table. If any data files are @@ -345,6 +356,27 @@ impl<'a> CommitBuilder<'a> { } else { self.use_stable_row_ids.unwrap_or(false) }; + let existing_uses_rle_v2 = dest + .dataset() + .map(|ds| ds.manifest.uses_rle_v2()) + .unwrap_or(false); + let requires_rle_v2 = self.enable_rle_v2 || transaction.requires_rle_v2(); + if requires_rle_v2 + && let Some(ds) = dest.dataset() + && !ds.manifest.uses_rle_v2() + { + return Err(Error::invalid_input( + "RLE v2 can only be enabled when creating a new dataset", + )); + } + if requires_rle_v2 + && self + .storage_format + .is_some_and(|version| version < LanceFileVersion::V2_1) + { + return Err(Error::invalid_input("RLE v2 requires file version >= 2.1")); + } + let use_rle_v2 = requires_rle_v2 || existing_uses_rle_v2; // Validate storage format matches existing dataset if let Some(ds) = dest.dataset() @@ -365,6 +397,7 @@ impl<'a> CommitBuilder<'a> { let manifest_config = ManifestWriteConfig { use_stable_row_ids, storage_format: self.storage_format.map(DataStorageFormat::new), + use_rle_v2, ..Default::default() }; @@ -490,6 +523,15 @@ impl<'a> CommitBuilder<'a> { let read_version = transactions.iter().map(|t| t.read_version).min().unwrap(); + let transaction_properties = if transactions.iter().any(Transaction::requires_rle_v2) { + Some(Arc::new(HashMap::from([( + crate::dataset::transaction::TRANSACTION_PROPERTY_REQUIRES_RLE_V2.to_string(), + "true".to_string(), + )]))) + } else { + None + }; + let merged = Transaction { uuid: uuid::Uuid::new_v4().hyphenated().to_string(), operation: Operation::Append { @@ -503,8 +545,7 @@ impl<'a> CommitBuilder<'a> { }, read_version, tag: None, - //TODO: handle batch transaction merges in the future - transaction_properties: None, + transaction_properties, }; let dataset = self.execute(merged.clone()).await?; Ok(BatchCommitResult { dataset, merged }) @@ -525,6 +566,7 @@ mod tests { use arrow::array::{Int32Array, RecordBatch}; use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema}; + use lance_core::datatypes::Schema; use lance_io::utils::CachedFileSize; use lance_io::{assert_io_eq, assert_io_gt}; use lance_table::format::{DataFile, Fragment}; @@ -534,6 +576,7 @@ mod tests { use crate::utils::test::ThrottledStoreWrapper; + use crate::dataset::transaction::TRANSACTION_PROPERTY_REQUIRES_RLE_V2; use crate::dataset::{InsertBuilder, WriteParams}; use super::*; @@ -571,6 +614,58 @@ mod tests { } } + fn sample_legacy_fragment() -> Fragment { + let (major_version, minor_version) = LanceFileVersion::Legacy.to_numbers(); + Fragment { + id: 0, + files: vec![DataFile { + path: "file.lance".to_string(), + fields: Arc::from([0]), + column_indices: Arc::from([]), + file_major_version: major_version, + file_minor_version: minor_version, + file_size_bytes: CachedFileSize::new(100), + base_id: None, + }], + deletion_file: None, + row_id_meta: None, + physical_rows: Some(10), + last_updated_at_version_meta: None, + created_at_version_meta: None, + } + } + + #[tokio::test] + async fn test_rle_v2_rejects_inferred_legacy_storage() { + let schema = Schema::try_from(&ArrowSchema::new(vec![ArrowField::new( + "i", + DataType::Int32, + false, + )])) + .unwrap(); + let transaction = Transaction { + uuid: uuid::Uuid::new_v4().hyphenated().to_string(), + operation: Operation::Overwrite { + schema, + fragments: vec![sample_legacy_fragment()], + config_upsert_values: None, + initial_bases: None, + }, + read_version: 0, + tag: None, + transaction_properties: Some(Arc::new(HashMap::from([( + TRANSACTION_PROPERTY_REQUIRES_RLE_V2.to_string(), + "true".to_string(), + )]))), + }; + + let result = CommitBuilder::new("memory://rle-v2-legacy") + .execute(transaction) + .await; + assert!(matches!(result, Err(Error::InvalidInput { .. }))); + assert!(result.unwrap_err().to_string().contains("RLE v2 requires")); + } + #[tokio::test] async fn test_reuse_session() { // Need to use in-memory for accurate IOPS tracking. diff --git a/rust/lance/src/dataset/write/insert.rs b/rust/lance/src/dataset/write/insert.rs index bfd702c9c3b..72581930a8b 100644 --- a/rust/lance/src/dataset/write/insert.rs +++ b/rust/lance/src/dataset/write/insert.rs @@ -21,7 +21,9 @@ use object_store::path::Path; use crate::Dataset; use crate::dataset::ReadParams; use crate::dataset::builder::DatasetBuilder; -use crate::dataset::transaction::{Operation, Transaction, TransactionBuilder}; +use crate::dataset::transaction::{ + Operation, TRANSACTION_PROPERTY_REQUIRES_RLE_V2, Transaction, TransactionBuilder, +}; use crate::dataset::write::{validate_and_resolve_target_bases, write_fragments_internal}; use crate::{Error, Result}; use tracing::info; @@ -273,12 +275,36 @@ impl<'a> InsertBuilder<'a> { .unwrap_or(0), operation, ) - .transaction_properties(context.params.transaction_properties.clone()) + .transaction_properties(Self::transaction_properties(context)) .build(); Ok(transaction) } + fn transaction_properties(context: &WriteContext<'_>) -> Option>> { + let requires_rle_v2 = context.params.enable_rle_v2 + || context + .dest + .dataset() + .map(|ds| ds.manifest().uses_rle_v2()) + .unwrap_or(false); + if !requires_rle_v2 { + return context.params.transaction_properties.clone(); + } + + let mut properties = context + .params + .transaction_properties + .as_deref() + .cloned() + .unwrap_or_default(); + properties.insert( + TRANSACTION_PROPERTY_REQUIRES_RLE_V2.to_string(), + "true".to_string(), + ); + Some(Arc::new(properties)) + } + fn validate_write(&self, context: &mut WriteContext, data_schema: &Schema) -> Result<()> { // Write mode match (&context.params.mode, &context.dest) { diff --git a/rust/lance/src/io/commit.rs b/rust/lance/src/io/commit.rs index ce0d29d550b..b11b6c59cb5 100644 --- a/rust/lance/src/io/commit.rs +++ b/rust/lance/src/io/commit.rs @@ -30,6 +30,7 @@ use lance_file::version::LanceFileVersion; use lance_index::metrics::NoOpMetricsCollector; use lance_io::utils::CachedFileSize; use lance_select::RowAddrTreeMap; +use lance_table::feature_flags::FLAG_RLE_V2; use lance_table::format::{ DETACHED_VERSION_MASK, DataStorageFormat, DeletionFile, Fragment, IndexMetadata, Manifest, WriterVersion, is_detached_version, list_index_files_with_sizes, pb, @@ -183,13 +184,17 @@ async fn do_commit_new_dataset( .max() .map(|id| *id + 1) .unwrap_or(0); - let new_manifest = source_manifest.shallow_clone( + let mut new_manifest = source_manifest.shallow_clone( ref_name.clone(), ref_path.clone(), new_base_id, branch_name.clone(), transaction_file.clone(), ); + if source_manifest.uses_rle_v2() { + new_manifest.reader_feature_flags |= FLAG_RLE_V2; + new_manifest.writer_feature_flags |= FLAG_RLE_V2; + } let updated_indices = if let Some(index_section_pos) = source_manifest.index_section { let reader = object_store.open(&source_manifest_location.path).await?;