Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -109,10 +109,13 @@ message Manifest {
// should not attempt to read the dataset.
//
// Known flags:
// * 1: deletion files are present
// * 2: row ids are stable and stored as part of the fragment metadata.
// * 4: use v2 format (deprecated)
// * 8: table config is present
// * 1 << 0: deletion files are present
// * 1 << 1: row ids are stable and stored as part of the fragment metadata.
// * 1 << 2: use v2 format (deprecated)
// * 1 << 3: table config is present
// * 1 << 4: dataset uses multiple base paths
// * 1 << 5: transaction file writes are disabled
// * 1 << 6: dataset may contain RLE v2 pages
uint64 reader_feature_flags = 9;

// Feature flags for writers.
Expand Down
176 changes: 147 additions & 29 deletions rust/lance-encoding/src/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ use crate::{
PackedStructVariablePerValueEncoder, VariablePackedStructFieldDecoder,
VariablePackedStructFieldKind,
},
rle::{RleDecompressor, RleEncoder},
rle::{
RleDecompressor, RleEncoder, RunLengthWidth, select_run_length_width,
select_run_length_width_and_size,
},
value::{ValueDecompressor, ValueEncoder},
},
},
Expand Down Expand Up @@ -140,6 +143,8 @@ pub struct DefaultCompressionStrategy {
params: CompressionParams,
/// The lance file version for compatibilities.
version: LanceFileVersion,
/// Whether the writer may emit RLE v2 run length widths.
enable_rle_v2: bool,
}

fn try_bss_for_mini_block(
Expand All @@ -165,6 +170,7 @@ fn try_bss_for_mini_block(
fn try_rle_for_mini_block(
data: &FixedWidthDataBlock,
params: &CompressionFieldParams,
enable_rle_v2: bool,
) -> Option<Box<dyn MiniBlockCompressor>> {
let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
Expand All @@ -188,16 +194,22 @@ fn try_rle_for_mini_block(
return None;
}

// Estimate the encoded size.
//
// RLE stores (value, run_length) pairs. Run lengths are u8 and long runs are split into
// multiple entries of up to 255 values. We don't know the run length distribution here,
// so we conservatively account for splitting with an upper bound.
let num_values = data.num_values;
let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);

let raw_bytes = (num_values as u128) * (type_size as u128);
let rle_bytes = (estimated_pairs as u128) * ((type_size + 1) as u128);
let (run_length_width, rle_bytes) = if enable_rle_v2 {
select_run_length_width_and_size(&data.data, data.num_values, data.bits_per_value).ok()?
} else {
// Estimate the encoded size.
//
// Compatibility RLE stores (value, u8 run_length) pairs. Long runs are split into
// multiple entries of up to 255 values. We don't know the run length distribution here,
// so we conservatively account for splitting with an upper bound.
let estimated_pairs = (run_count.saturating_add(num_values / 255)).min(num_values);
(
RunLengthWidth::U8,
(estimated_pairs as u128) * ((type_size + 1) as u128),
)
};

if rle_bytes < raw_bytes {
#[cfg(feature = "bitpacking")]
Expand All @@ -208,7 +220,9 @@ fn try_rle_for_mini_block(
return None;
}
}
return Some(Box::new(RleEncoder::new()));
return Some(Box::new(RleEncoder::with_run_length_width(
run_length_width,
)));
}
None
}
Expand All @@ -217,14 +231,15 @@ fn try_rle_for_block(
data: &FixedWidthDataBlock,
version: LanceFileVersion,
params: &CompressionFieldParams,
) -> Option<(Box<dyn BlockCompressor>, CompressiveEncoding)> {
enable_rle_v2: bool,
) -> Result<Option<(Box<dyn BlockCompressor>, CompressiveEncoding)>> {
if version < LanceFileVersion::V2_2 {
return None;
return Ok(None);
}

let bits = data.bits_per_value;
if !matches!(bits, 8 | 16 | 32 | 64) {
return None;
return Ok(None);
}

let run_count = data.expect_single_stat::<UInt64Type>(Stat::RunCount);
Expand All @@ -233,14 +248,19 @@ fn try_rle_for_block(
.unwrap_or(DEFAULT_RLE_COMPRESSION_THRESHOLD);

if (run_count as f64) < (data.num_values as f64) * threshold {
let compressor = Box::new(RleEncoder::new());
let run_length_width = if enable_rle_v2 {
select_run_length_width(&data.data, data.num_values, data.bits_per_value)?
} else {
RunLengthWidth::U8
};
let compressor = Box::new(RleEncoder::with_run_length_width(run_length_width));
let encoding = ProtobufUtils21::rle(
ProtobufUtils21::flat(bits, None),
ProtobufUtils21::flat(/*bits_per_value=*/ 8, None),
ProtobufUtils21::flat(run_length_width.bits_per_value(), None),
);
return Some((compressor, encoding));
return Ok(Some((compressor, encoding)));
}
None
Ok(None)
}

fn try_bitpack_for_mini_block(_data: &FixedWidthDataBlock) -> Option<Box<dyn MiniBlockCompressor>> {
Expand Down Expand Up @@ -384,6 +404,7 @@ impl DefaultCompressionStrategy {
Self {
params,
version: LanceFileVersion::default(),
enable_rle_v2: false,
}
}

Expand All @@ -393,6 +414,12 @@ impl DefaultCompressionStrategy {
self
}

/// Allow this strategy to emit RLE v2 run length widths.
pub fn with_rle_v2(mut self) -> Self {
self.enable_rle_v2 = true;
self
}

/// Parse compression parameters from field metadata
fn parse_field_metadata(field: &Field, version: &LanceFileVersion) -> CompressionFieldParams {
let mut params = CompressionFieldParams::default();
Expand Down Expand Up @@ -456,7 +483,7 @@ impl DefaultCompressionStrategy {
}

let base = try_bss_for_mini_block(data, params)
.or_else(|| try_rle_for_mini_block(data, params))
.or_else(|| try_rle_for_mini_block(data, params, self.enable_rle_v2))
.or_else(|| try_bitpack_for_mini_block(data))
.unwrap_or_else(|| Box::new(ValueEncoder::default()));

Expand Down Expand Up @@ -664,7 +691,7 @@ impl CompressionStrategy for DefaultCompressionStrategy {
match data {
DataBlock::FixedWidth(fixed_width) => {
if let Some((compressor, encoding)) =
try_rle_for_block(fixed_width, self.version, &field_params)
try_rle_for_block(fixed_width, self.version, &field_params, self.enable_rle_v2)?
{
return Ok((compressor, encoding));
}
Expand Down Expand Up @@ -815,8 +842,11 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {
Ok(Box::new(ValueDecompressor::from_fsl(fsl)))
}
Compression::Rle(rle) => {
let bits_per_value = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::new(bits_per_value)))
let (bits_per_value, run_length_width) = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::with_run_length_width(
bits_per_value,
run_length_width,
)))
}
Compression::ByteStreamSplit(bss) => {
let Compression::Flat(values) =
Expand Down Expand Up @@ -1005,15 +1035,18 @@ impl DecompressionStrategy for DefaultDecompressionStrategy {
Ok(Box::new(general_decompressor))
}
Compression::Rle(rle) => {
let bits_per_value = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::new(bits_per_value)))
let (bits_per_value, run_length_width) = validate_rle_compression(rle)?;
Ok(Box::new(RleDecompressor::with_run_length_width(
bits_per_value,
run_length_width,
)))
}
_ => todo!(),
}
}
}
/// Validates RLE compression format and extracts bits_per_value
fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
/// Validates RLE compression format and extracts value and run length widths.
fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<(u64, RunLengthWidth)> {
let values = rle
.values
.as_ref()
Expand Down Expand Up @@ -1043,14 +1076,22 @@ fn validate_rle_compression(rle: &crate::format::pb21::Rle) -> Result<u64> {
));
};

if run_lengths.bits_per_value != 8 {
if !matches!(values.bits_per_value, 8 | 16 | 32 | 64) {
return Err(Error::invalid_input(format!(
"RLE compression only supports 8-bit run lengths, got {}",
run_lengths.bits_per_value
"RLE compression only supports 8, 16, 32, or 64-bit values, got {}",
values.bits_per_value
)));
}

Ok(values.bits_per_value)
let run_length_width =
RunLengthWidth::from_bits(run_lengths.bits_per_value).ok_or_else(|| {
Error::invalid_input(format!(
"RLE compression only supports 8, 16, or 32-bit run lengths, got {}",
run_lengths.bits_per_value
))
})?;

Ok((values.bits_per_value, run_length_width))
}

#[cfg(test)]
Expand Down Expand Up @@ -1659,6 +1700,83 @@ mod tests {
assert!(debug_str.contains("RleEncoder"));
}

#[test]
fn test_rle_v2_miniblock_selects_u16_run_lengths() {
let mut metadata = HashMap::new();
metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "1.0".to_string());
metadata.insert(BSS_META_KEY.to_string(), "off".to_string());
let mut field = create_test_field("test_column", DataType::Int32);
field.metadata = metadata;

let values = vec![7i32; 1000];
let mut data = FixedWidthDataBlock {
bits_per_value: 32,
data: LanceBuffer::reinterpret_vec(values),
num_values: 1000,
block_info: BlockInfo::default(),
};
data.compute_stat();
let data = DataBlock::FixedWidth(data);

let strategy = DefaultCompressionStrategy::new().with_rle_v2();
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let (_compressed, encoding) = compressor.compress(data).unwrap();
let Compression::Rle(rle) = encoding.compression.as_ref().unwrap() else {
panic!("expected RLE encoding");
};
let Compression::Flat(run_lengths) = rle
.run_lengths
.as_ref()
.unwrap()
.compression
.as_ref()
.unwrap()
else {
panic!("expected flat run lengths");
};
assert_eq!(run_lengths.bits_per_value, 16);
}

#[test]
fn test_rle_v2_uses_selected_width_cost_before_bitpacking() {
let mut metadata = HashMap::new();
metadata.insert(RLE_THRESHOLD_META_KEY.to_string(), "1.0".to_string());
metadata.insert(BSS_META_KEY.to_string(), "off".to_string());
let mut field = create_test_field("test_column", DataType::Int32);
field.metadata = metadata;

let values = vec![0i32; 4096];
let mut data = FixedWidthDataBlock {
bits_per_value: 32,
data: LanceBuffer::reinterpret_vec(values),
num_values: 4096,
block_info: BlockInfo::default(),
};
data.compute_stat();
let data = DataBlock::FixedWidth(data);

let strategy = DefaultCompressionStrategy::new().with_rle_v2();
let compressor = strategy.create_miniblock_compressor(&field, &data).unwrap();
let debug_str = format!("{compressor:?}");
assert!(debug_str.contains("RleEncoder"));

let (_compressed, encoding) = compressor.compress(data).unwrap();
let Compression::Rle(rle) = encoding.compression.as_ref().unwrap() else {
panic!("expected RLE encoding");
};
let Compression::Flat(run_lengths) = rle
.run_lengths
.as_ref()
.unwrap()
.compression
.as_ref()
.unwrap()
else {
panic!("expected flat run lengths");
};
assert_eq!(run_lengths.bits_per_value, 16);
}

#[test]
fn test_field_metadata_override_params() {
// Set up params with one configuration
Expand Down
16 changes: 8 additions & 8 deletions rust/lance-encoding/src/encodings/physical/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ mod tests {
// Small data with RLE - should not compress due to size threshold
TestCase {
name: "small_rle_data",
inner_encoder: Box::new(RleEncoder),
inner_encoder: Box::new(RleEncoder::new()),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -173,7 +173,7 @@ mod tests {
// Large repeated data with RLE + LZ4
TestCase {
name: "large_rle_lz4",
inner_encoder: Box::new(RleEncoder),
inner_encoder: Box::new(RleEncoder::new()),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -185,7 +185,7 @@ mod tests {
// Large repeated data with RLE + Zstd
TestCase {
name: "large_rle_zstd",
inner_encoder: Box::new(RleEncoder),
inner_encoder: Box::new(RleEncoder::new()),
compression: CompressionConfig {
scheme: CompressionScheme::Zstd,
level: Some(3),
Expand Down Expand Up @@ -403,7 +403,7 @@ mod tests {
// Test that small buffers don't get compressed
let small_test = TestCase {
name: "small_buffer_no_compression",
inner_encoder: Box::new(RleEncoder),
inner_encoder: Box::new(RleEncoder::new()),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand Down Expand Up @@ -496,7 +496,7 @@ mod tests {
// RLE produces 2 buffers (values and lengths), test that both are handled correctly
let data = create_repeated_i32_block(vec![1; 100]);
let compressor = GeneralMiniBlockCompressor::new(
Box::new(RleEncoder),
Box::new(RleEncoder::new()),
CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -519,7 +519,7 @@ mod tests {
// Test case 1: 32-bit RLE data
let test_32 = TestCase {
name: "rle_32bit_with_general_wrapper",
inner_encoder: Box::new(RleEncoder),
inner_encoder: Box::new(RleEncoder::new()),
compression: CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand All @@ -532,7 +532,7 @@ mod tests {
// For 32-bit RLE, the compression strategy should automatically wrap it
// Let's directly test the compressor
let compressor = GeneralMiniBlockCompressor::new(
Box::new(RleEncoder),
Box::new(RleEncoder::new()),
CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand Down Expand Up @@ -589,7 +589,7 @@ mod tests {
let block_64 = DataBlock::from_array(array_64);

let compressor_64 = GeneralMiniBlockCompressor::new(
Box::new(RleEncoder),
Box::new(RleEncoder::new()),
CompressionConfig {
scheme: CompressionScheme::Lz4,
level: None,
Expand Down
Loading
Loading