Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 127 additions & 15 deletions rust/lance-index/src/scalar/bloomfilter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,14 @@
use crate::scalar::{
BloomFilterQuery, BuiltinIndexType, CreatedIndex, IndexFile, ScalarIndexParams, UpdateCriteria,
};
use crate::{Any, pb};

Check warning on line 17 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs
use arrow_array::{Array, UInt64Array};
use arrow_schema::{DataType, Field};
use lance_select::RowAddrTreeMap;
use lance_arrow_stats::StatisticsAccumulator;
use lance_core::utils::bloomfilter::as_bytes;
use lance_core::utils::bloomfilter::sbbf::{Sbbf, SbbfBuilder};
use serde::{Deserialize, Serialize};

Check warning on line 24 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs

use std::sync::LazyLock;

Expand All @@ -43,6 +44,7 @@

const BLOOMFILTER_FILENAME: &str = "bloomfilter.lance";
const BLOOMFILTER_ITEM_META_KEY: &str = "bloomfilter_item";
const NULL_BITMAP_META_KEY: &str = "lance:null_bitmap";
const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability";
const BLOOMFILTER_INDEX_VERSION: u32 = 0;

Expand Down Expand Up @@ -79,11 +81,13 @@
number_of_items: u64,
// Probability of false positives, fraction between 0 and 1
probability: f64,
// Exact set of null row addresses; None for older indices without this bitmap.
null_rows: Option<RowAddrTreeMap>,
}

impl DeepSizeOf for BloomFilterIndex {
fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize {
self.zones.deep_size_of_children(context)
self.zones.deep_size_of_children(context) + self.null_rows.deep_size_of_children(context)
}
}

Expand Down Expand Up @@ -111,24 +115,36 @@
.and_then(|bs| bs.parse().ok())
.unwrap_or(*DEFAULT_PROBABILITY);

let null_rows = if let Some(idx_str) = file_schema.metadata.get(NULL_BITMAP_META_KEY) {
let idx = idx_str.parse::<u32>().map_err(|e| {
Error::invalid_input(format!("invalid null bitmap buffer index: {e}"))
})?;
let bytes = index_file.read_global_buffer(idx).await?;
Some(RowAddrTreeMap::deserialize_from(bytes.as_ref())?)
} else {
None
};

Ok(Arc::new(Self::try_from_serialized(
bloom_data,
number_of_items,
probability,
null_rows,
)?))
}

fn try_from_serialized(
data: RecordBatch,
number_of_items: u64,
probability: f64,
null_rows: Option<RowAddrTreeMap>,
) -> Result<Self> {
if data.num_rows() == 0 {
// Return empty index for empty data
return Ok(Self {
zones: Vec::new(),
number_of_items,
probability,
null_rows,
});
}

Expand Down Expand Up @@ -209,6 +225,7 @@
zones: blocks,
number_of_items,
probability,
null_rows,
})
}

Expand Down Expand Up @@ -420,6 +437,11 @@
metrics: &dyn MetricsCollector,
) -> Result<SearchResult> {
let query = query.as_any().downcast_ref::<BloomFilterQuery>().unwrap();
if let BloomFilterQuery::IsNull() = query {
if let Some(null_rows) = &self.null_rows {
return Ok(SearchResult::exact(null_rows.clone()));
}
}
search_zones(&self.zones, metrics, |block| {
self.evaluate_block_against_query(block, query)
})
Expand Down Expand Up @@ -453,18 +475,23 @@

let processor = BloomFilterProcessor::new(params.clone())?;
let trainer = ZoneTrainer::new(processor, params.number_of_items)?;
let updated_blocks = rebuild_zones(&self.zones, trainer, new_data).await?;
let (updated_blocks, new_null_rows) = rebuild_zones(&self.zones, trainer, new_data).await?;

// Merge existing and new null rows
let mut merged_null_rows = self.null_rows.clone().unwrap_or_default();
merged_null_rows |= &new_null_rows;

// Write the combined zones back to storage
let mut builder = BloomFilterIndexBuilder::try_new(params)?;
builder.blocks = updated_blocks;
let file = builder.write_index(dest_store).await?;
builder.null_rows = merged_null_rows;
let files = builder.write_index(dest_store).await?;

Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pb::BloomFilterIndexDetails::default())
.unwrap(),
index_version: BLOOMFILTER_INDEX_VERSION,
files: vec![file],
files,
})
}

Expand Down Expand Up @@ -547,13 +574,15 @@
pub struct BloomFilterIndexBuilder {
params: BloomFilterIndexBuilderParams,
blocks: Vec<BloomFilterStatistics>,
null_rows: RowAddrTreeMap,
}

impl BloomFilterIndexBuilder {
pub fn try_new(params: BloomFilterIndexBuilderParams) -> Result<Self> {
Ok(Self {
params,
blocks: Vec::new(),
null_rows: RowAddrTreeMap::new(),
})
}

Expand All @@ -563,7 +592,9 @@
pub async fn train(&mut self, batches_source: SendableRecordBatchStream) -> Result<()> {
let processor = BloomFilterProcessor::new(self.params.clone())?;
let trainer = ZoneTrainer::new(processor, self.params.number_of_items)?;
self.blocks = trainer.train(batches_source).await?;
let (blocks, null_rows) = trainer.train(batches_source).await?;
self.blocks = blocks;
self.null_rows = null_rows;
Ok(())
}

Expand Down Expand Up @@ -620,15 +651,14 @@
Ok(RecordBatch::try_new(schema, columns)?)
}

pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<IndexFile> {
pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<Vec<IndexFile>> {
let record_batch = self.bloomfilter_stats_as_batch()?;

let mut file_schema = record_batch.schema().as_ref().clone();
file_schema.metadata.insert(
BLOOMFILTER_ITEM_META_KEY.to_string(),
self.params.number_of_items.to_string(),
);

file_schema.metadata.insert(
BLOOMFILTER_PROBABILITY_META_KEY.to_string(),
self.params.probability.to_string(),
Expand All @@ -638,7 +668,21 @@
.new_index_file(BLOOMFILTER_FILENAME, Arc::new(file_schema))
.await?;
index_file.write_record_batch(record_batch).await?;
index_file.finish().await

let mut null_bitmap_bytes = Vec::with_capacity(self.null_rows.serialized_size());
self.null_rows.serialize_into(&mut null_bitmap_bytes)?;
let null_bitmap_idx = index_file
.add_global_buffer(bytes::Bytes::from(null_bitmap_bytes))
.await?;

let bloomfilter_file = index_file
.finish_with_metadata(HashMap::from([(
NULL_BITMAP_META_KEY.to_string(),
null_bitmap_idx.to_string(),
)]))
.await?;

Ok(vec![bloomfilter_file])
}
}

Expand Down Expand Up @@ -985,7 +1029,7 @@
batches_source: SendableRecordBatchStream,
index_store: &dyn IndexStore,
options: Option<BloomFilterIndexBuilderParams>,
) -> Result<IndexFile> {
) -> Result<Vec<IndexFile>> {
let mut builder = BloomFilterIndexBuilder::try_new(options.unwrap_or_default())?;

builder.train(batches_source).await?;
Expand Down Expand Up @@ -1074,12 +1118,12 @@
"must provide training request created by new_training_request".into(),
)
})?;
let file = Self::train_bloomfilter_index(data, index_store, Some(request.params)).await?;
let files = Self::train_bloomfilter_index(data, index_store, Some(request.params)).await?;
Ok(CreatedIndex {
index_details: prost_types::Any::from_msg(&pb::BloomFilterIndexDetails::default())
.unwrap(),
index_version: BLOOMFILTER_INDEX_VERSION,
files: vec![file],
files,
})
}

Expand Down Expand Up @@ -1167,7 +1211,7 @@
use lance_select::RowAddrTreeMap;

use crate::scalar::{
BloomFilterQuery, ScalarIndex, SearchResult,
BloomFilterQuery, IndexStore, ScalarIndex, SearchResult,
bloomfilter::{BloomFilterIndex, BloomFilterIndexBuilderParams},
lance_format::LanceIndexStore,
};
Expand Down Expand Up @@ -2035,10 +2079,10 @@
expected.insert_range(500..750); // Should match the zone containing 500
assert_eq!(result, SearchResult::at_most(expected));

// Test IsNull query
// Test IsNull query (no nulls in data, should return exact empty set)
let query = BloomFilterQuery::IsNull();
let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new())); // No nulls in the data
assert_eq!(result, SearchResult::exact(RowAddrTreeMap::new()));

// Test IsIn query
let query = BloomFilterQuery::IsIn(vec![
Expand Down Expand Up @@ -2131,4 +2175,72 @@
_ => panic!("Expected AtMost search result from bloomfilter"),
}
}

// Writes a bloomfilter file in the legacy format (no null bitmap global buffer),

Check warning on line 2179 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs
// simulating an index created before the null bitmap feature was added.
async fn write_legacy_bloomfilter(store: &dyn IndexStore, has_null: bool) {
use arrow_array::BooleanArray;
use crate::scalar::bloomfilter::{
BLOOMFILTER_FILENAME, BLOOMFILTER_ITEM_META_KEY, BLOOMFILTER_PROBABILITY_META_KEY,
};
let schema = Arc::new(Schema::new(vec![

Check warning on line 2186 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs
Field::new("fragment_id", DataType::UInt64, false),
Field::new("zone_start", DataType::UInt64, false),
Field::new("zone_length", DataType::UInt64, false),
Field::new("has_null", DataType::Boolean, false),
Field::new("bloom_filter_data", DataType::Binary, false),
]));
let batch = RecordBatch::try_new(
schema.clone(),
vec![
Arc::new(UInt64Array::from(vec![0u64])) as _,
Arc::new(UInt64Array::from(vec![0u64])) as _,
Arc::new(UInt64Array::from(vec![3u64])) as _,
Arc::new(BooleanArray::from(vec![has_null])) as _,
Arc::new(arrow_array::BinaryArray::from_vec(vec![b"".as_ref()])) as _,
],
)
.unwrap();
let mut file_schema = schema.as_ref().clone();
file_schema

Check warning on line 2205 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs
.metadata
.insert(BLOOMFILTER_ITEM_META_KEY.to_string(), "1000".to_string());
file_schema
.metadata
.insert(BLOOMFILTER_PROBABILITY_META_KEY.to_string(), "0.01".to_string());
let mut writer = store
.new_index_file(BLOOMFILTER_FILENAME, Arc::new(file_schema))
.await
.unwrap();
writer.write_record_batch(batch).await.unwrap();
writer.finish().await.unwrap();
}

#[tokio::test]
async fn test_legacy_bloomfilter_no_null_bitmap() {
let tmpdir = TempObjDir::default();
let store = Arc::new(LanceIndexStore::new(
Arc::new(ObjectStore::local()),
tmpdir.clone(),
Arc::new(LanceCache::no_cache()),
));

write_legacy_bloomfilter(store.as_ref(), true).await;

let index = BloomFilterIndex::load(store, None, &LanceCache::no_cache())
.await

Check warning on line 2231 in rust/lance-index/src/scalar/bloomfilter.rs

View workflow job for this annotation

GitHub Actions / format

Diff in /home/runner/work/lance/lance/rust/lance-index/src/scalar/bloomfilter.rs
.expect("failed to load legacy bloomfilter");

assert!(index.null_rows.is_none(), "legacy index should have no null bitmap");

// IS NULL should fall back to the has_null zone scan and return AtMost, not Exact.
let result = index
.search(&BloomFilterQuery::IsNull(), &NoOpMetricsCollector)
.await
.unwrap();
assert!(
!result.is_exact(),
"IS NULL on a legacy index should not be exact"
);
}
}
Loading
Loading