diff --git a/rust/lance-index/src/scalar/bloomfilter.rs b/rust/lance-index/src/scalar/bloomfilter.rs index 856f08af772..27dbcedbcc4 100644 --- a/rust/lance-index/src/scalar/bloomfilter.rs +++ b/rust/lance-index/src/scalar/bloomfilter.rs @@ -17,6 +17,7 @@ use crate::scalar::{ use crate::{Any, pb}; use arrow_array::{Array, UInt64Array}; use arrow_schema::{DataType, Field}; +use lance_select::RowAddrTreeMap; use lance_arrow_stats::StatisticsAccumulator; use lance_core::utils::bloomfilter::as_bytes; use lance_core::utils::bloomfilter::sbbf::{Sbbf, SbbfBuilder}; @@ -43,6 +44,7 @@ use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_ const BLOOMFILTER_FILENAME: &str = "bloomfilter.lance"; const BLOOMFILTER_ITEM_META_KEY: &str = "bloomfilter_item"; +const NULL_BITMAP_META_KEY: &str = "lance:null_bitmap"; const BLOOMFILTER_PROBABILITY_META_KEY: &str = "bloomfilter_probability"; const BLOOMFILTER_INDEX_VERSION: u32 = 0; @@ -79,11 +81,13 @@ pub struct BloomFilterIndex { number_of_items: u64, // Probability of false positives, fraction between 0 and 1 probability: f64, + // Exact set of null row addresses; None for older indices without this bitmap. + null_rows: Option, } impl DeepSizeOf for BloomFilterIndex { fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize { - self.zones.deep_size_of_children(context) + self.zones.deep_size_of_children(context) + self.null_rows.deep_size_of_children(context) } } @@ -111,10 +115,21 @@ impl BloomFilterIndex { .and_then(|bs| bs.parse().ok()) .unwrap_or(*DEFAULT_PROBABILITY); + let null_rows = if let Some(idx_str) = file_schema.metadata.get(NULL_BITMAP_META_KEY) { + let idx = idx_str.parse::().map_err(|e| { + Error::invalid_input(format!("invalid null bitmap buffer index: {e}")) + })?; + let bytes = index_file.read_global_buffer(idx).await?; + Some(RowAddrTreeMap::deserialize_from(bytes.as_ref())?) + } else { + None + }; + Ok(Arc::new(Self::try_from_serialized( bloom_data, number_of_items, probability, + null_rows, )?)) } @@ -122,13 +137,14 @@ impl BloomFilterIndex { data: RecordBatch, number_of_items: u64, probability: f64, + null_rows: Option, ) -> Result { if data.num_rows() == 0 { - // Return empty index for empty data return Ok(Self { zones: Vec::new(), number_of_items, probability, + null_rows, }); } @@ -209,6 +225,7 @@ impl BloomFilterIndex { zones: blocks, number_of_items, probability, + null_rows, }) } @@ -420,6 +437,11 @@ impl ScalarIndex for BloomFilterIndex { metrics: &dyn MetricsCollector, ) -> Result { let query = query.as_any().downcast_ref::().unwrap(); + if let BloomFilterQuery::IsNull() = query { + if let Some(null_rows) = &self.null_rows { + return Ok(SearchResult::exact(null_rows.clone())); + } + } search_zones(&self.zones, metrics, |block| { self.evaluate_block_against_query(block, query) }) @@ -453,18 +475,23 @@ impl ScalarIndex for BloomFilterIndex { let processor = BloomFilterProcessor::new(params.clone())?; let trainer = ZoneTrainer::new(processor, params.number_of_items)?; - let updated_blocks = rebuild_zones(&self.zones, trainer, new_data).await?; + let (updated_blocks, new_null_rows) = rebuild_zones(&self.zones, trainer, new_data).await?; + + // Merge existing and new null rows + let mut merged_null_rows = self.null_rows.clone().unwrap_or_default(); + merged_null_rows |= &new_null_rows; // Write the combined zones back to storage let mut builder = BloomFilterIndexBuilder::try_new(params)?; builder.blocks = updated_blocks; - let file = builder.write_index(dest_store).await?; + builder.null_rows = merged_null_rows; + let files = builder.write_index(dest_store).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pb::BloomFilterIndexDetails::default()) .unwrap(), index_version: BLOOMFILTER_INDEX_VERSION, - files: vec![file], + files, }) } @@ -547,6 +574,7 @@ impl BloomFilterIndexBuilderParams { pub struct BloomFilterIndexBuilder { params: BloomFilterIndexBuilderParams, blocks: Vec, + null_rows: RowAddrTreeMap, } impl BloomFilterIndexBuilder { @@ -554,6 +582,7 @@ impl BloomFilterIndexBuilder { Ok(Self { params, blocks: Vec::new(), + null_rows: RowAddrTreeMap::new(), }) } @@ -563,7 +592,9 @@ impl BloomFilterIndexBuilder { pub async fn train(&mut self, batches_source: SendableRecordBatchStream) -> Result<()> { let processor = BloomFilterProcessor::new(self.params.clone())?; let trainer = ZoneTrainer::new(processor, self.params.number_of_items)?; - self.blocks = trainer.train(batches_source).await?; + let (blocks, null_rows) = trainer.train(batches_source).await?; + self.blocks = blocks; + self.null_rows = null_rows; Ok(()) } @@ -620,7 +651,7 @@ impl BloomFilterIndexBuilder { Ok(RecordBatch::try_new(schema, columns)?) } - pub async fn write_index(self, index_store: &dyn IndexStore) -> Result { + pub async fn write_index(self, index_store: &dyn IndexStore) -> Result> { let record_batch = self.bloomfilter_stats_as_batch()?; let mut file_schema = record_batch.schema().as_ref().clone(); @@ -628,7 +659,6 @@ impl BloomFilterIndexBuilder { BLOOMFILTER_ITEM_META_KEY.to_string(), self.params.number_of_items.to_string(), ); - file_schema.metadata.insert( BLOOMFILTER_PROBABILITY_META_KEY.to_string(), self.params.probability.to_string(), @@ -638,7 +668,21 @@ impl BloomFilterIndexBuilder { .new_index_file(BLOOMFILTER_FILENAME, Arc::new(file_schema)) .await?; index_file.write_record_batch(record_batch).await?; - index_file.finish().await + + let mut null_bitmap_bytes = Vec::with_capacity(self.null_rows.serialized_size()); + self.null_rows.serialize_into(&mut null_bitmap_bytes)?; + let null_bitmap_idx = index_file + .add_global_buffer(bytes::Bytes::from(null_bitmap_bytes)) + .await?; + + let bloomfilter_file = index_file + .finish_with_metadata(HashMap::from([( + NULL_BITMAP_META_KEY.to_string(), + null_bitmap_idx.to_string(), + )])) + .await?; + + Ok(vec![bloomfilter_file]) } } @@ -985,7 +1029,7 @@ impl BloomFilterIndexPlugin { batches_source: SendableRecordBatchStream, index_store: &dyn IndexStore, options: Option, - ) -> Result { + ) -> Result> { let mut builder = BloomFilterIndexBuilder::try_new(options.unwrap_or_default())?; builder.train(batches_source).await?; @@ -1074,12 +1118,12 @@ impl ScalarIndexPlugin for BloomFilterIndexPlugin { "must provide training request created by new_training_request".into(), ) })?; - let file = Self::train_bloomfilter_index(data, index_store, Some(request.params)).await?; + let files = Self::train_bloomfilter_index(data, index_store, Some(request.params)).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pb::BloomFilterIndexDetails::default()) .unwrap(), index_version: BLOOMFILTER_INDEX_VERSION, - files: vec![file], + files, }) } @@ -1167,7 +1211,7 @@ mod tests { use lance_select::RowAddrTreeMap; use crate::scalar::{ - BloomFilterQuery, ScalarIndex, SearchResult, + BloomFilterQuery, IndexStore, ScalarIndex, SearchResult, bloomfilter::{BloomFilterIndex, BloomFilterIndexBuilderParams}, lance_format::LanceIndexStore, }; @@ -2035,10 +2079,10 @@ mod tests { expected.insert_range(500..750); // Should match the zone containing 500 assert_eq!(result, SearchResult::at_most(expected)); - // Test IsNull query + // Test IsNull query (no nulls in data, should return exact empty set) let query = BloomFilterQuery::IsNull(); let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); - assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new())); // No nulls in the data + assert_eq!(result, SearchResult::exact(RowAddrTreeMap::new())); // Test IsIn query let query = BloomFilterQuery::IsIn(vec![ @@ -2131,4 +2175,72 @@ mod tests { _ => panic!("Expected AtMost search result from bloomfilter"), } } + + // Writes a bloomfilter file in the legacy format (no null bitmap global buffer), + // simulating an index created before the null bitmap feature was added. + async fn write_legacy_bloomfilter(store: &dyn IndexStore, has_null: bool) { + use arrow_array::BooleanArray; + use crate::scalar::bloomfilter::{ + BLOOMFILTER_FILENAME, BLOOMFILTER_ITEM_META_KEY, BLOOMFILTER_PROBABILITY_META_KEY, + }; + let schema = Arc::new(Schema::new(vec![ + Field::new("fragment_id", DataType::UInt64, false), + Field::new("zone_start", DataType::UInt64, false), + Field::new("zone_length", DataType::UInt64, false), + Field::new("has_null", DataType::Boolean, false), + Field::new("bloom_filter_data", DataType::Binary, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(UInt64Array::from(vec![0u64])) as _, + Arc::new(UInt64Array::from(vec![0u64])) as _, + Arc::new(UInt64Array::from(vec![3u64])) as _, + Arc::new(BooleanArray::from(vec![has_null])) as _, + Arc::new(arrow_array::BinaryArray::from_vec(vec![b"".as_ref()])) as _, + ], + ) + .unwrap(); + let mut file_schema = schema.as_ref().clone(); + file_schema + .metadata + .insert(BLOOMFILTER_ITEM_META_KEY.to_string(), "1000".to_string()); + file_schema + .metadata + .insert(BLOOMFILTER_PROBABILITY_META_KEY.to_string(), "0.01".to_string()); + let mut writer = store + .new_index_file(BLOOMFILTER_FILENAME, Arc::new(file_schema)) + .await + .unwrap(); + writer.write_record_batch(batch).await.unwrap(); + writer.finish().await.unwrap(); + } + + #[tokio::test] + async fn test_legacy_bloomfilter_no_null_bitmap() { + let tmpdir = TempObjDir::default(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + write_legacy_bloomfilter(store.as_ref(), true).await; + + let index = BloomFilterIndex::load(store, None, &LanceCache::no_cache()) + .await + .expect("failed to load legacy bloomfilter"); + + assert!(index.null_rows.is_none(), "legacy index should have no null bitmap"); + + // IS NULL should fall back to the has_null zone scan and return AtMost, not Exact. + let result = index + .search(&BloomFilterQuery::IsNull(), &NoOpMetricsCollector) + .await + .unwrap(); + assert!( + !result.is_exact(), + "IS NULL on a legacy index should not be exact" + ); + } } diff --git a/rust/lance-index/src/scalar/zoned.rs b/rust/lance-index/src/scalar/zoned.rs index 7ceed851bae..9c090f5e1d8 100644 --- a/rust/lance-index/src/scalar/zoned.rs +++ b/rust/lance-index/src/scalar/zoned.rs @@ -92,13 +92,14 @@ where pub async fn train( mut self, stream: SendableRecordBatchStream, - ) -> Result> { + ) -> Result<(Vec, RowAddrTreeMap)> { let zone_size = usize::try_from(self.zone_capacity).map_err(|_| { Error::invalid_input("zone capacity does not fit into usize on this platform") })?; let mut batches = chunk_concat_stream(stream, zone_size); let mut zones = Vec::new(); + let mut null_rows = RowAddrTreeMap::new(); let mut current_fragment_id: Option = None; let mut current_zone_len: usize = 0; let mut zone_start_offset: Option = None; @@ -155,6 +156,16 @@ where self.processor .process_chunk(&values.slice(batch_offset, take))?; + // Record exact row addresses for null values in this chunk. + let chunk = values.slice(batch_offset, take); + if chunk.null_count() > 0 { + for i in 0..take { + if values.is_null(batch_offset + i) { + null_rows.insert(row_addr_col.value(batch_offset + i)); + } + } + } + // Track the first and last row offsets to handle non-contiguous offsets // after deletions. Zone length (offset span) is computed as (last - first + 1), // not the actual row count. @@ -200,7 +211,7 @@ where } } - Ok(zones) + Ok((zones, null_rows)) } /// Flushes a non-empty zone and resets the processor state. @@ -266,21 +277,21 @@ where } /// Helper that retrains zones from `stream` and appends them to the existing -/// statistics. Useful for index update paths that need to merge new fragments -/// into an existing zone list. +/// statistics. Returns the combined zone list and the null-row bitmap for the +/// new data only — callers are responsible for merging with any existing bitmap. pub async fn rebuild_zones

( existing: &[P::ZoneStatistics], trainer: ZoneTrainer

, stream: SendableRecordBatchStream, -) -> Result> +) -> Result<(Vec, RowAddrTreeMap)> where P: ZoneProcessor, P::ZoneStatistics: Clone, { let mut combined = existing.to_vec(); - let mut new_zones = trainer.train(stream).await?; + let (mut new_zones, null_rows) = trainer.train(stream).await?; combined.append(&mut new_zones); - Ok(combined) + Ok((combined, null_rows)) } #[cfg(test)] @@ -362,7 +373,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 4).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Three zones: offsets [0..=3], [4..=7], [8..=9] assert_eq!(stats.len(), 3); @@ -393,7 +404,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 10).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Two zones, one per fragment (capacity=10 is large enough) assert_eq!(stats.len(), 2); @@ -447,7 +458,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 10).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // One zone containing the 3 valid rows (empty batches skipped) assert_eq!(stats.len(), 1); @@ -469,7 +480,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 1).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Three zones, one per row (capacity=1) assert_eq!(stats.len(), 3); @@ -494,7 +505,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 10000).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // One zone containing all 100 rows (capacity is large enough) assert_eq!(stats.len(), 1); @@ -530,7 +541,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 4).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Two zones: first 4 rows, then remaining 2 rows assert_eq!(stats.len(), 2); @@ -561,7 +572,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 3).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Three zones: frag 0 full zone, frag 0 partial (flushed at boundary), frag 1 assert_eq!(stats.len(), 3); @@ -602,7 +613,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 4).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Should create 2 zones (capacity=4): // Zone 0: rows at offsets [0, 1, 5, 7] (4 rows) @@ -637,7 +648,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 10).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // One zone with 3 rows, but offset span [0..=200] so length=201 due to large gaps assert_eq!(stats.len(), 1); @@ -663,7 +674,7 @@ mod tests { let processor = MockProcessor::new(); let trainer = ZoneTrainer::new(processor, 10).unwrap(); - let stats = trainer.train(stream).await.unwrap(); + let (stats, _) = trainer.train(stream).await.unwrap(); // Should create 3 zones (one per fragment) assert_eq!(stats.len(), 3); @@ -810,7 +821,7 @@ mod tests { )); let trainer = ZoneTrainer::new(MockProcessor::new(), 2).unwrap(); - let rebuilt = rebuild_zones(&existing, trainer, stream).await.unwrap(); + let (rebuilt, _) = rebuild_zones(&existing, trainer, stream).await.unwrap(); // Existing zone should remain unchanged and new stats appended afterwards assert_eq!(rebuilt.len(), 2); assert_eq!(rebuilt[0].sum, 50); @@ -840,7 +851,7 @@ mod tests { )); let trainer = ZoneTrainer::new(MockProcessor::new(), 2).unwrap(); - let rebuilt = rebuild_zones(&existing, trainer, stream).await.unwrap(); + let (rebuilt, _) = rebuild_zones(&existing, trainer, stream).await.unwrap(); // Existing zone plus two new fragments should yield three total zones assert_eq!(rebuilt.len(), 3); assert_eq!(rebuilt[0].bound.fragment_id, 0); diff --git a/rust/lance-index/src/scalar/zonemap.rs b/rust/lance-index/src/scalar/zonemap.rs index 9f2228740c2..c641ba94de9 100644 --- a/rust/lance-index/src/scalar/zonemap.rs +++ b/rust/lance-index/src/scalar/zonemap.rs @@ -27,10 +27,9 @@ use lance_core::cache::{LanceCache, WeakLanceCache}; use serde::{Deserialize, Serialize}; use std::sync::LazyLock; -use arrow_array::{ - ArrayRef, RecordBatch, UInt32Array, UInt64Array, new_empty_array, new_null_array, -}; +use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array, new_empty_array, new_null_array}; use arrow_schema::{DataType, Field}; +use lance_select::RowAddrTreeMap; use datafusion::execution::SendableRecordBatchStream; use datafusion_common::ScalarValue; use std::{collections::HashMap, sync::Arc}; @@ -50,6 +49,7 @@ const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches const ZONEMAP_FILENAME: &str = "zonemap.lance"; const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone"; +const NULL_BITMAP_META_KEY: &str = "lance:null_bitmap"; const ZONEMAP_INDEX_VERSION: u32 = 0; /// Basic stats about zonemap index @@ -110,6 +110,9 @@ pub struct ZoneMapIndex { store: Arc, fri: Option>, index_cache: WeakLanceCache, + // Exact set of null row addresses across all zones; None when loaded from an + // older index that did not persist this bitmap. + null_rows: Option, } impl std::fmt::Debug for ZoneMapIndex { @@ -127,7 +130,7 @@ impl std::fmt::Debug for ZoneMapIndex { impl DeepSizeOf for ZoneMapIndex { fn deep_size_of_children(&self, context: &mut lance_core::deepsize::Context) -> usize { - self.zones.deep_size_of_children(context) + self.zones.deep_size_of_children(context) + self.null_rows.deep_size_of_children(context) } } @@ -436,12 +439,24 @@ impl ZoneMapIndex { .get(ZONEMAP_SIZE_META_KEY) .and_then(|bs| bs.parse().ok()) .unwrap_or(ROWS_PER_ZONE_DEFAULT); + + let null_rows = if let Some(idx_str) = file_schema.metadata.get(NULL_BITMAP_META_KEY) { + let idx = idx_str.parse::().map_err(|e| { + Error::invalid_input(format!("invalid null bitmap buffer index: {e}")) + })?; + let bytes = index_file.read_global_buffer(idx).await?; + Some(RowAddrTreeMap::deserialize_from(bytes.as_ref())?) + } else { + None + }; + Ok(Arc::new(Self::try_from_serialized( zone_maps, store, fri, index_cache, rows_per_zone, + null_rows, )?)) } @@ -451,6 +466,7 @@ impl ZoneMapIndex { fri: Option>, index_cache: &LanceCache, rows_per_zone: u64, + null_rows: Option, ) -> Result { // The RecordBatch should have columns: min, max, null_count let min_col = data @@ -512,6 +528,7 @@ impl ZoneMapIndex { store, fri, index_cache: WeakLanceCache::from(index_cache), + null_rows, }); } @@ -543,6 +560,7 @@ impl ZoneMapIndex { store, fri, index_cache: WeakLanceCache::from(index_cache), + null_rows, }) } } @@ -599,6 +617,11 @@ impl ScalarIndex for ZoneMapIndex { metrics: &dyn MetricsCollector, ) -> Result { let query = query.as_any().downcast_ref::().unwrap(); + if let SargableQuery::IsNull() = query { + if let Some(null_rows) = &self.null_rows { + return Ok(SearchResult::exact(null_rows.clone())); + } + } search_zones(&self.zones, metrics, |zone| { self.evaluate_zone_against_query(zone, query) }) @@ -633,19 +656,24 @@ impl ScalarIndex for ZoneMapIndex { let options = ZoneMapIndexBuilderParams::new(self.rows_per_zone); let processor = ZoneMapProcessor::new(value_type.clone())?; let trainer = ZoneTrainer::new(processor, self.rows_per_zone)?; - let updated_zones = rebuild_zones(&self.zones, trainer, new_data).await?; + let (updated_zones, new_null_rows) = rebuild_zones(&self.zones, trainer, new_data).await?; + + // Merge existing and new null rows + let mut merged_null_rows = self.null_rows.clone().unwrap_or_default(); + merged_null_rows |= &new_null_rows; // Serialize the combined zones back into the index file let mut builder = ZoneMapIndexBuilder::try_new(options, self.data_type.clone())?; builder.options.rows_per_zone = self.rows_per_zone; builder.maps = updated_zones; - let file = builder.write_index(dest_store).await?; + builder.null_rows = merged_null_rows; + let files = builder.write_index(dest_store).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) .unwrap(), index_version: ZONEMAP_INDEX_VERSION, - files: vec![file], + files, }) } @@ -674,6 +702,8 @@ pub async fn merge_zonemap_indices( let data_type = first.data_type.clone(); let mut zones = Vec::new(); + let mut merged_null_rows = RowAddrTreeMap::new(); + let mut any_null_bitmap = false; for source in source_indices { if source.rows_per_zone != rows_per_zone { return Err(Error::invalid_input(format!( @@ -697,18 +727,27 @@ pub async fn merge_zonemap_indices( }) .cloned(), ); + if let Some(null_rows) = &source.null_rows { + any_null_bitmap = true; + let mut filtered = null_rows.clone(); + filtered.retain_fragments(fragment_filter.iter()); + merged_null_rows |= &filtered; + } } zones.sort_by_key(|zone| (zone.bound.fragment_id, zone.bound.start)); let mut builder = ZoneMapIndexBuilder::try_new(ZoneMapIndexBuilderParams::new(rows_per_zone), data_type)?; builder.maps = zones; - builder.write_index(dest_store).await?; + if any_null_bitmap { + builder.null_rows = merged_null_rows; + } + let files = builder.write_index(dest_store).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()).unwrap(), index_version: ZONEMAP_INDEX_VERSION, - files: dest_store.list_files_with_sizes().await?, + files, }) } @@ -753,6 +792,7 @@ pub struct ZoneMapIndexBuilder { items_type: DataType, maps: Vec, + null_rows: RowAddrTreeMap, } impl ZoneMapIndexBuilder { @@ -761,6 +801,7 @@ impl ZoneMapIndexBuilder { options, items_type, maps: Vec::new(), + null_rows: RowAddrTreeMap::new(), }) } @@ -770,7 +811,9 @@ impl ZoneMapIndexBuilder { pub async fn train(&mut self, batches_source: SendableRecordBatchStream) -> Result<()> { let processor = ZoneMapProcessor::new(self.items_type.clone())?; let trainer = ZoneTrainer::new(processor, self.options.rows_per_zone)?; - self.maps = trainer.train(batches_source).await?; + let (maps, null_rows) = trainer.train(batches_source).await?; + self.maps = maps; + self.null_rows = null_rows; Ok(()) } @@ -823,7 +866,7 @@ impl ZoneMapIndexBuilder { Ok(RecordBatch::try_new(schema, columns)?) } - pub async fn write_index(self, index_store: &dyn IndexStore) -> Result { + pub async fn write_index(self, index_store: &dyn IndexStore) -> Result> { let record_batch = self.zonemap_stats_as_batch()?; let mut file_schema = record_batch.schema().as_ref().clone(); @@ -836,7 +879,21 @@ impl ZoneMapIndexBuilder { .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) .await?; index_file.write_record_batch(record_batch).await?; - index_file.finish().await + + let mut null_bitmap_bytes = Vec::with_capacity(self.null_rows.serialized_size()); + self.null_rows.serialize_into(&mut null_bitmap_bytes)?; + let null_bitmap_idx = index_file + .add_global_buffer(bytes::Bytes::from(null_bitmap_bytes)) + .await?; + + let zonemap_file = index_file + .finish_with_metadata(HashMap::from([( + NULL_BITMAP_META_KEY.to_string(), + null_bitmap_idx.to_string(), + )])) + .await?; + + Ok(vec![zonemap_file]) } } @@ -941,8 +998,7 @@ impl ZoneMapIndexPlugin { batches_source: SendableRecordBatchStream, index_store: &dyn IndexStore, options: Option, - ) -> Result { - // train_zonemap_index: calling scan_aligned_chunks + ) -> Result> { let value_type = batches_source.schema().field(0).data_type().clone(); let mut builder = ZoneMapIndexBuilder::try_new(options.unwrap_or_default(), value_type)?; @@ -1033,12 +1089,12 @@ impl ScalarIndexPlugin for ZoneMapIndexPlugin { "must provide training request created by new_training_request".into(), ) })?; - let file = Self::train_zonemap_index(data, index_store, Some(request.params)).await?; + let files = Self::train_zonemap_index(data, index_store, Some(request.params)).await?; Ok(CreatedIndex { index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default()) .unwrap(), index_version: ZONEMAP_INDEX_VERSION, - files: vec![file], + files, }) } @@ -1077,7 +1133,7 @@ mod tests { use lance_datagen::ArrayGeneratorExt; use lance_datagen::{BatchCount, RowCount, array}; use lance_io::object_store::ObjectStore; - use lance_select::{NullableRowAddrSet, RowAddrTreeMap}; + use lance_select::RowAddrTreeMap; use crate::scalar::{ SargableQuery, ScalarIndex, SearchResult, @@ -1507,7 +1563,7 @@ mod tests { // Test IsNull query (should match nothing since there are no null values) let query = SargableQuery::IsNull(); let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); - assert_eq!(result, SearchResult::AtMost(NullableRowAddrSet::empty())); + assert_eq!(result, SearchResult::exact(RowAddrTreeMap::new())); // Test range queries with NaN bounds // Range with NaN as start bound (included) @@ -1550,7 +1606,7 @@ mod tests { ); let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); // Should match nothing since nothing is greater than NaN - assert_eq!(result, SearchResult::AtMost(NullableRowAddrSet::empty())); + assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new())); // Test IsIn query with mixed float types (Float16, Float32, Float64) let query = SargableQuery::IsIn(vec![ @@ -1734,7 +1790,7 @@ mod tests { // 8. IsNull query (no nulls in data, should match nothing) let query = SargableQuery::IsNull(); let result = index.search(&query, &NoOpMetricsCollector).await.unwrap(); - assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new())); + assert_eq!(result, SearchResult::exact(RowAddrTreeMap::new())); // 9. IsIn query: [0, 100, 101, 50] let query = SargableQuery::IsIn(vec![ ScalarValue::Int32(Some(0)), @@ -2443,6 +2499,7 @@ mod tests { store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), + null_rows: None, }; // Test LikePrefix query for "foo" @@ -2515,6 +2572,7 @@ mod tests { store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), + null_rows: None, }; // Test LikePrefix "test" @@ -2582,6 +2640,7 @@ mod tests { store: test_store, fri: None, index_cache: WeakLanceCache::from(&LanceCache::no_cache()), + null_rows: None, }; // Test LikePrefix with LargeUtf8 @@ -2634,4 +2693,71 @@ mod tests { // All max characters assert_eq!(compute_next_prefix("\u{10FFFF}\u{10FFFF}"), None); } + + // Writes a zonemap file in the legacy format (no null bitmap global buffer), + // simulating an index created before the null bitmap feature was added. + async fn write_legacy_zonemap(store: &dyn IndexStore, null_count: u32) { + use arrow_array::{Int32Array, UInt32Array}; + let schema = Arc::new(Schema::new(vec![ + Field::new("min", DataType::Int32, true), + Field::new("max", DataType::Int32, true), + Field::new("null_count", DataType::UInt32, false), + Field::new("nan_count", DataType::UInt32, false), + Field::new("fragment_id", DataType::UInt64, false), + Field::new("zone_start", DataType::UInt64, false), + Field::new("zone_length", DataType::UInt64, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from(vec![Some(0)])) as _, + Arc::new(Int32Array::from(vec![Some(99)])) as _, + Arc::new(UInt32Array::from(vec![null_count])) as _, + Arc::new(UInt32Array::from(vec![0u32])) as _, + Arc::new(UInt64Array::from(vec![0u64])) as _, + Arc::new(UInt64Array::from(vec![0u64])) as _, + Arc::new(UInt64Array::from(vec![100u64])) as _, + ], + ) + .unwrap(); + let mut file_schema = schema.as_ref().clone(); + file_schema + .metadata + .insert(ZONEMAP_SIZE_META_KEY.to_string(), "8192".to_string()); + let mut writer = store + .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema)) + .await + .unwrap(); + writer.write_record_batch(batch).await.unwrap(); + writer.finish().await.unwrap(); + } + + #[tokio::test] + async fn test_legacy_zonemap_no_null_bitmap() { + let tmpdir = TempObjDir::default(); + let store = Arc::new(LanceIndexStore::new( + Arc::new(ObjectStore::local()), + tmpdir.clone(), + Arc::new(LanceCache::no_cache()), + )); + + // Write a legacy index with one zone that has nulls but no null bitmap. + write_legacy_zonemap(store.as_ref(), 10).await; + + let index = ZoneMapIndex::load(store, None, &LanceCache::no_cache()) + .await + .expect("failed to load legacy zonemap"); + + assert!(index.null_rows.is_none(), "legacy index should have no null bitmap"); + + // IS NULL should fall back to the zone-scan path and return AtMost, not Exact. + let result = index + .search(&SargableQuery::IsNull(), &NoOpMetricsCollector) + .await + .unwrap(); + assert!( + !result.is_exact(), + "IS NULL on a legacy index should not be exact" + ); + } }