Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions docs/src/format/file/encoding.md
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,10 @@ the default mini-block size is negligible. You should only consider changing thi
confirmed — through profiling — that mini-block read amplification is saturating your available bandwidth
(for example, accessing a remote object store over a constrained network link).

The maximum number of values per mini-block can be lowered via an environment variable:
The maximum number of values per mini-block can be tuned via an environment variable:

- `LANCE_MINIBLOCK_MAX_VALUES` (default `4096`): upper bound on the number of values in a single mini-block chunk.
- `LANCE_MINIBLOCK_MAX_VALUES` (default `4096`, maximum `32768`): upper bound on the number of values in a single mini-block chunk.

Reducing this value produces smaller mini-blocks, which reduces the amount of data fetched per read at the
cost of more mini-blocks and slightly more metadata overhead.
cost of more mini-blocks and slightly more metadata overhead. Increasing it can reduce metadata overhead and
improve throughput for highly compressible data, but it may increase random-read amplification.
81 changes: 63 additions & 18 deletions rust/lance-encoding/src/encodings/logical/primitive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3701,12 +3701,7 @@ struct SerializedFullZip {
//
// If we directly record the size in bytes with 12 bits we would be limited to
// 4KiB which is too small. Since we know each mini-block consists of 8 byte
// words we can store the # of words instead which gives us 32KiB. We want
// at least 24KiB so we can handle even the worst case of
// - 4Ki values compressed into an 8186 byte buffer
// - 4 bytes to describe rep & def lengths
// - 16KiB of rep & def buffer (this will almost never happen but life is easier if we
// plan for it)
// words we can store the # of words instead which gives us 32KiB.
//
// Second, each chunk in a mini-block is aligned to 8 bytes. This allows multi-byte
// values like offsets to be stored in a mini-block and safely read back out. It also
Expand Down Expand Up @@ -3906,9 +3901,9 @@ impl PrimitiveStructuralEncoder {
// 0xA) All blocks except the last must have power-of-two number of values.
// This not only makes metadata smaller but it makes decoding easier since
// batch sizes are typically a power of 2. 4 bits would allow us to express
// up to 16Ki values but we restrict this further to 4Ki values.
// up to 32Ki values.
//
// This means blocks can have 1 to 4Ki values and 8 - 32Ki bytes.
// This means blocks can have 1 to 32Ki values and 8 - 32Ki bytes.
//
// All metadata words are serialized (as little endian) into a single buffer
// of metadata values.
Expand Down Expand Up @@ -4007,7 +4002,13 @@ impl PrimitiveStructuralEncoder {
}
} else {
for &buffer_size in &chunk.buffer_sizes {
data_buffer.extend_from_slice(&(buffer_size as u16).to_le_bytes());
let buffer_size = u16::try_from(buffer_size).map_err(|_| {
Error::internal(format!(
"Mini-block buffer size ({} bytes) too large for 16-bit metadata",
buffer_size
))
})?;
data_buffer.extend_from_slice(&buffer_size.to_le_bytes());
}
}

Expand Down Expand Up @@ -4041,15 +4042,28 @@ impl PrimitiveStructuralEncoder {

let chunk_bytes = data_buffer.len() - start_pos;
let max_chunk_size = if support_large_chunk {
4 * 1024 * 1024 * 1024 // 4GB limit with u32 metadata
1_u64 << 31 // 28 bits of 8-byte words in u32 metadata
} else {
32 * 1024 // 32KiB limit with u16 metadata
};
assert!(chunk_bytes <= max_chunk_size);
assert!(chunk_bytes > 0);
assert_eq!(chunk_bytes % 8, 0);
// 4Ki values max
assert!(chunk.log_num_values <= 12);
if chunk_bytes == 0 || chunk_bytes as u64 > max_chunk_size {
return Err(Error::internal(format!(
"Mini-block chunk size {} bytes exceeds the {} byte metadata limit",
chunk_bytes, max_chunk_size
)));
}
if chunk_bytes % MINIBLOCK_ALIGNMENT != 0 {
return Err(Error::internal(format!(
"Mini-block chunk size {} bytes is not aligned to {} bytes",
chunk_bytes, MINIBLOCK_ALIGNMENT
)));
}
if chunk.log_num_values > 15 {
return Err(Error::internal(format!(
"Mini-block log_num_values {} exceeds the 4-bit metadata limit",
chunk.log_num_values
)));
}
// We subtract 1 here from chunk_bytes because we want to be able to express
// a size of 32KiB and not (32Ki - 8)B which is what we'd get otherwise with
// 0xFFF
Expand Down Expand Up @@ -5768,8 +5782,9 @@ mod tests {
use super::{
ChunkInstructions, DataBlock, DecodeMiniBlockTask, FixedPerValueDecompressor,
FixedWidthDataBlock, FullZipCacheableState, FullZipDecodeDetails, FullZipReadSource,
FullZipRepIndexDetails, FullZipScheduler, MiniBlockRepIndex, PerValueDecompressor,
PreambleAction, StructuralPageScheduler, VariableFullZipDecoder,
FullZipRepIndexDetails, FullZipScheduler, MiniBlockChunk, MiniBlockCompressed,
MiniBlockRepIndex, PerValueDecompressor, PreambleAction, StructuralPageScheduler,
VariableFullZipDecoder,
};
use crate::buffer::LanceBuffer;
use crate::compression::DefaultDecompressionStrategy;
Expand Down Expand Up @@ -6967,7 +6982,7 @@ mod tests {
#[tokio::test]
async fn test_binary_large_minichunk_size_over_max_miniblock_values() {
let mut string_data = Vec::new();
// 128kb/chunk / 6 bytes (t_9999) = 21845 > max 4096 items per chunk
// 128kb/chunk / 6 bytes (t_9999) = 21845 items per chunk
for i in 0..10000 {
string_data.push(Some(format!("t_{}", i)));
}
Expand Down Expand Up @@ -7566,6 +7581,36 @@ mod tests {
);
}

#[test]
fn test_v2_1_miniblock_serializes_log_num_values_15() {
let miniblocks = MiniBlockCompressed {
data: vec![LanceBuffer::from(vec![1_u8; 16])],
chunks: vec![
MiniBlockChunk {
buffer_sizes: vec![8],
log_num_values: 15,
},
MiniBlockChunk {
buffer_sizes: vec![8],
log_num_values: 0,
},
],
num_values: 32_769,
};

let serialized =
PrimitiveStructuralEncoder::serialize_miniblocks(miniblocks, None, None, false)
.unwrap();

let chunk_metadata = serialized.metadata.borrow_to_typed_slice::<u16>();
assert_eq!(chunk_metadata.len(), 2);
assert_eq!(
chunk_metadata[0] & 0x0F,
15,
"V2.1 metadata should use all 4 bits for log_num_values"
);
}

async fn encode_first_page(
field: arrow_schema::Field,
array: ArrayRef,
Expand Down
26 changes: 19 additions & 7 deletions rust/lance-encoding/src/encodings/logical/primitive/miniblock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,14 @@ use lance_core::Result;
pub const MAX_MINIBLOCK_BYTES: u64 = 8 * 1024 - 6;

const DEFAULT_MAX_MINIBLOCK_VALUES: u64 = 4096;
const MAX_CONFIGURABLE_MINIBLOCK_VALUES: u64 = 32768;

fn parse_max_miniblock_values() -> u64 {
let val = std::env::var("LANCE_MINIBLOCK_MAX_VALUES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_MAX_MINIBLOCK_VALUES);
val.clamp(1, DEFAULT_MAX_MINIBLOCK_VALUES)
val.clamp(1, MAX_CONFIGURABLE_MINIBLOCK_VALUES)
}

pub static MAX_MINIBLOCK_VALUES: std::sync::LazyLock<u64> =
Expand Down Expand Up @@ -58,9 +59,9 @@ pub struct MiniBlockCompressed {
/// and contain a power-of-two number of values (except for the last chunk)
///
/// By default we limit a chunk to 4Ki values and slightly less than
/// 8KiB of compressed data. This means that even in the extreme case
/// where we have 4 bytes of rep/def then we will have at most 24KiB of
/// data (values, repetition, and definition) per mini-block.
/// 8KiB of compressed value data. The byte budget remains the primary
/// constraint, so only encodings that compress many values into that
/// budget can use larger value counts when explicitly configured.
///
/// The maximum number of values per chunk can be configured via the
/// `LANCE_MINIBLOCK_MAX_VALUES` environment variable. This is only
Expand All @@ -77,8 +78,8 @@ pub struct MiniBlockChunk {
// then this should be 0 (the number of values will be calculated by subtracting the
// size of all other chunks from the total size of the page)
//
// For example, 1 would mean there are 2 values in the chunk and 12 would mean there
// are 4Ki values in the chunk.
// For example, 1 would mean there are 2 values in the chunk and 15 would mean there
// are 32Ki values in the chunk.
//
// This must be <= log2(MAX_MINIBLOCK_VALUES) (i.e. <= 12 at the default of 4096)
pub log_num_values: u8,
Expand Down Expand Up @@ -135,6 +136,14 @@ mod tests {
unsafe { std::env::remove_var("LANCE_MINIBLOCK_MAX_VALUES") };
}

#[test]
#[serial]
fn test_parse_can_raise_to_32k() {
unsafe { std::env::set_var("LANCE_MINIBLOCK_MAX_VALUES", "32768") };
assert_eq!(parse_max_miniblock_values(), 32768);
unsafe { std::env::remove_var("LANCE_MINIBLOCK_MAX_VALUES") };
}

#[test]
#[serial]
fn test_parse_clamps_zero_to_one() {
Expand All @@ -147,7 +156,10 @@ mod tests {
#[serial]
fn test_parse_clamps_above_max() {
unsafe { std::env::set_var("LANCE_MINIBLOCK_MAX_VALUES", "99999") };
assert_eq!(parse_max_miniblock_values(), DEFAULT_MAX_MINIBLOCK_VALUES);
assert_eq!(
parse_max_miniblock_values(),
MAX_CONFIGURABLE_MINIBLOCK_VALUES
);
unsafe { std::env::remove_var("LANCE_MINIBLOCK_MAX_VALUES") };
}

Expand Down
Loading