diff --git a/rust/lance-index/src/scalar.rs b/rust/lance-index/src/scalar.rs index a287d277a81..3a6834129b3 100644 --- a/rust/lance-index/src/scalar.rs +++ b/rust/lance-index/src/scalar.rs @@ -960,6 +960,17 @@ impl OldIndexDataFilter { .collect(), } } + + /// Apply this filter in place to a set of existing (old) row ids/addresses, + /// retaining only the rows the filter selects to keep. Used by index types + /// that merge old postings directly (e.g. bitmap) instead of re-scanning a + /// row-id array through [`Self::filter_row_ids`]. + pub fn retain_old_rows(&self, rows: &mut RowAddrTreeMap) { + match self { + Self::Fragments { to_keep, .. } => rows.retain_fragments(to_keep.iter()), + Self::RowIds(valid_row_ids) => *rows &= valid_row_ids, + } + } } impl UpdateCriteria { diff --git a/rust/lance-index/src/scalar/bitmap.rs b/rust/lance-index/src/scalar/bitmap.rs index c2a6e80e82b..ca77c0a42dc 100644 --- a/rust/lance-index/src/scalar/bitmap.rs +++ b/rust/lance-index/src/scalar/bitmap.rs @@ -812,13 +812,14 @@ impl ScalarIndex for BitmapIndex { &self, new_data: SendableRecordBatchStream, dest_store: &dyn IndexStore, - _old_data_filter: Option, + old_data_filter: Option, ) -> Result { let file = BitmapIndexPlugin::streaming_build_and_write( new_data, Some(self), dest_store, BITMAP_LOOKUP_NAME, + old_data_filter.as_ref(), ) .await?; @@ -1197,6 +1198,19 @@ async fn cleanup_bitmap_shard_files(store: &dyn IndexStore, shard_files: &[Strin #[derive(Debug, Default)] pub struct BitmapIndexPlugin; +/// Drop the rows an old posting should no longer expose -- rows whose fragment +/// was removed, or (under stable row ids) rows rewritten by an update -- keeping +/// only those `filter` still considers valid. A no-op when `filter` is `None`. +fn retain_valid( + mut bitmap: RowAddrTreeMap, + filter: Option<&super::OldIndexDataFilter>, +) -> RowAddrTreeMap { + if let Some(filter) = filter { + filter.retain_old_rows(&mut bitmap); + } + bitmap +} + impl BitmapIndexPlugin { fn get_batch_from_arrays( keys: Arc, @@ -1328,7 +1342,7 @@ impl BitmapIndexPlugin { data: SendableRecordBatchStream, index_store: &dyn IndexStore, ) -> Result { - Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME).await + Self::streaming_build_and_write(data, None, index_store, BITMAP_LOOKUP_NAME, None).await } async fn train_bitmap_shard( @@ -1343,7 +1357,8 @@ impl BitmapIndexPlugin { progress .stage_start("build_bitmap_shard", None, "rows") .await?; - let file = Self::streaming_build_and_write(data, None, index_store, &file_name).await?; + let file = + Self::streaming_build_and_write(data, None, index_store, &file_name, None).await?; progress.stage_complete("build_bitmap_shard").await?; Ok(file) } @@ -1360,6 +1375,7 @@ impl BitmapIndexPlugin { old_index: Option<&BitmapIndex>, index_store: &dyn IndexStore, output_file_name: &str, + old_data_filter: Option<&super::OldIndexDataFilter>, ) -> Result { let value_type = data_source.schema().field(0).data_type().clone(); @@ -1406,6 +1422,7 @@ impl BitmapIndexPlugin { &mut old_pos, &mut emitted_null, &mut writer, + old_data_filter, ) .await?; } @@ -1428,6 +1445,7 @@ impl BitmapIndexPlugin { &mut old_pos, &mut emitted_null, &mut writer, + old_data_filter, ) .await?; } @@ -1435,7 +1453,13 @@ impl BitmapIndexPlugin { // Emit any remaining old-only entries. if let Some(idx) = old_index { while old_pos < old_keys.len() { - let old_bitmap = idx.load_bitmap(&old_keys[old_pos], None).await?; + let old_bitmap = retain_valid( + idx.load_bitmap(&old_keys[old_pos], None) + .await? + .as_ref() + .clone(), + old_data_filter, + ); writer .emit(old_keys[old_pos].0.clone(), &old_bitmap) .await?; @@ -1450,7 +1474,8 @@ impl BitmapIndexPlugin { { let null_key = new_null_array(&value_type, 1); let null_key = ScalarValue::try_from_array(null_key.as_ref(), 0)?; - writer.emit(null_key, &idx.null_map).await?; + let null_bitmap = retain_valid((*idx.null_map).clone(), old_data_filter); + writer.emit(null_key, &null_bitmap).await?; } writer.finish().await @@ -1459,6 +1484,7 @@ impl BitmapIndexPlugin { /// Flush a completed value-run from the new data stream, emitting any /// old-only entries that sort before it and merging the old bitmap if the /// key exists in both old and new. + #[allow(clippy::too_many_arguments)] async fn finish_run( key: ScalarValue, bitmap: &mut RowAddrTreeMap, @@ -1467,13 +1493,14 @@ impl BitmapIndexPlugin { old_pos: &mut usize, emitted_null: &mut bool, writer: &mut BitmapBatchWriter, + old_data_filter: Option<&super::OldIndexDataFilter>, ) -> Result<()> { if key.is_null() { // Null values are stored separately in the old index's null_map. if let Some(idx) = old_index && !idx.null_map.is_empty() { - *bitmap |= &*idx.null_map; + *bitmap |= &retain_valid((*idx.null_map).clone(), old_data_filter); } *emitted_null = true; writer.emit(key, bitmap).await?; @@ -1482,7 +1509,13 @@ impl BitmapIndexPlugin { // Emit old-only entries that sort before this key. while *old_pos < old_keys.len() && old_keys[*old_pos] < orderable { - let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?; + let old_bitmap = retain_valid( + idx.load_bitmap(&old_keys[*old_pos], None) + .await? + .as_ref() + .clone(), + old_data_filter, + ); writer .emit(old_keys[*old_pos].0.clone(), &old_bitmap) .await?; @@ -1491,8 +1524,13 @@ impl BitmapIndexPlugin { // If the old index also has this key, merge its bitmap. if *old_pos < old_keys.len() && old_keys[*old_pos] == orderable { - let old_bitmap = idx.load_bitmap(&old_keys[*old_pos], None).await?; - *bitmap |= &*old_bitmap; + *bitmap |= &retain_valid( + idx.load_bitmap(&old_keys[*old_pos], None) + .await? + .as_ref() + .clone(), + old_data_filter, + ); *old_pos += 1; } diff --git a/rust/lance/src/index/append.rs b/rust/lance/src/index/append.rs index c4a83bf5b31..1b3c5cf35f9 100644 --- a/rust/lance/src/index/append.rs +++ b/rust/lance/src/index/append.rs @@ -65,16 +65,89 @@ async fn build_stable_row_id_filter( .try_collect::>() .await?; - let row_id_maps = row_id_sequences - .iter() - .map(|(_, seq)| RowAddrTreeMap::from(seq.as_ref())) - .collect::>(); + let frag_by_id: std::collections::HashMap = dataset + .get_fragments() + .into_iter() + .map(|f| (f.id() as u32, f)) + .collect(); + + let mut row_id_maps = Vec::with_capacity(row_id_sequences.len()); + for (frag_id, seq) in &row_id_sequences { + row_id_maps.push(live_row_ids(frag_by_id.get(frag_id), seq).await?); + } let row_id_map_refs = row_id_maps.iter().collect::>(); // Merge all fragment-local row-id sets into one exact membership structure. Ok(::union_all(&row_id_map_refs)) } +/// The fragment's live row ids: its persisted row-id sequence minus the rows +/// its deletion vector marks gone. A persisted sequence covers every row the +/// fragment ever held, so a row whose old copy was deleted (e.g. rewritten by an +/// update under the same stable row id) would otherwise be retained as a stale +/// old-index entry. +async fn live_row_ids( + fragment: Option<&crate::dataset::fragment::FileFragment>, + seq: &lance_table::rowids::RowIdSequence, +) -> Result { + let deletion_vector = match fragment { + Some(f) if f.metadata().deletion_file.is_some() => { + f.get_deletion_vector().await.ok().flatten() + } + _ => None, + }; + Ok(match deletion_vector { + Some(dv) => seq + .iter() + .enumerate() + .filter(|(offset, _)| !dv.contains(*offset as u32)) + .map(|(_, row_id)| row_id) + .collect(), + None => RowAddrTreeMap::from(seq), + }) +} + +/// Open the selected inverted (FTS) segments and merge `new_data` into them +/// through the segment-merge primitive, which materializes each old partition +/// and applies `old_data_filter` (dropping stale rows -- e.g. updated rows under +/// stable row ids). The fast `ScalarIndex::update` path only references old +/// partitions by id and cannot honor a row-level `RowIds` filter, so it must not +/// be used when old rows need to be removed. +async fn open_and_merge_inverted_segments( + dataset: &Dataset, + field_path: &str, + segments: &[&IndexMetadata], + new_data: datafusion::execution::SendableRecordBatchStream, + new_store: &LanceIndexStore, + old_data_filter: Option, +) -> Result { + let mut source_indices = Vec::with_capacity(segments.len()); + for &segment in segments { + let scalar_index = dataset + .open_scalar_index(field_path, &segment.uuid, &NoOpMetricsCollector) + .await?; + let inverted = scalar_index + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::index(format!( + "Inverted merge: expected inverted segment {}, got {:?}", + segment.uuid, + scalar_index.index_type() + )) + })?; + source_indices.push(Arc::new(inverted.clone())); + } + InvertedIndex::merge_segments( + &source_indices, + new_data, + new_store, + old_data_filter, + Arc::new(NoopIndexBuildProgress), + ) + .await +} + /// Build the [`OldIndexDataFilter`] that must be applied to existing index /// rows when their owning fragments have been pruned by compaction or /// deletions. @@ -256,6 +329,17 @@ async fn merge_scalar_indices<'a>( ) .await? } + IndexType::Inverted => { + open_and_merge_inverted_segments( + dataset.as_ref(), + field_path, + selected_old_indices, + new_data_stream, + &new_store, + old_data_filter, + ) + .await? + } _ => { reference_index .update(new_data_stream, &new_store, old_data_filter) @@ -1877,6 +1961,141 @@ mod tests { assert_eq!(query_id_count(&dataset, "song-42").await, 1); } + /// Under stable row ids, updating an indexed column and then calling + /// `optimize_indices` must not leave stale entries (old value -> updated row) + /// in the scalar index. An update deletes the old copy of each row and + /// rewrites it under the same stable row id, so the old index entry is stale + /// and must be dropped on merge. Covers BTree, Bitmap, and Inverted (FTS), + /// which take three different merge paths. + #[tokio::test] + async fn test_optimize_scalar_index_drops_stale_rows_after_update() { + use crate::dataset::UpdateBuilder; + use arrow_array::Int32Array; + use lance_index::scalar::FullTextSearchQuery; + use lance_index::scalar::inverted::InvertedIndexParams; + + let test_dir = TempStrDir::default(); + let test_uri = test_dir.as_str(); + + // 100 rows: num == id; cat = "A" for id<50 else "B"; body = "alpha" for + // id<50 else "beta". + let schema = Arc::new(Schema::new(vec![ + Field::new("id", DataType::Int32, false), + Field::new("num", DataType::Int32, false), + Field::new("cat", DataType::Utf8, false), + Field::new("body", DataType::Utf8, false), + ])); + let batch = RecordBatch::try_new( + schema.clone(), + vec![ + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(Int32Array::from_iter_values(0..100)), + Arc::new(StringArray::from_iter_values( + (0..100).map(|i| if i < 50 { "A" } else { "B" }), + )), + Arc::new(StringArray::from_iter_values( + (0..100).map(|i| if i < 50 { "alpha" } else { "beta" }), + )), + ], + ) + .unwrap(); + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone()); + let mut dataset = Dataset::write( + reader, + test_uri, + Some(WriteParams { + enable_stable_row_ids: true, + ..Default::default() + }), + ) + .await + .unwrap(); + + dataset + .create_index( + &["num"], + IndexType::BTree, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + dataset + .create_index( + &["cat"], + IndexType::Bitmap, + None, + &ScalarIndexParams::default(), + true, + ) + .await + .unwrap(); + dataset + .create_index( + &["body"], + IndexType::Inverted, + None, + &InvertedIndexParams::default(), + true, + ) + .await + .unwrap(); + + // Update the first 25 rows (id < 25): num -> -1, cat -> 'B', body -> 'beta'. + let res = UpdateBuilder::new(Arc::new(dataset.clone())) + .update_where("id < 25") + .unwrap() + .set("num", "-1") + .unwrap() + .set("cat", "'B'") + .unwrap() + .set("body", "'beta'") + .unwrap() + .build() + .unwrap() + .execute() + .await + .unwrap(); + dataset = res.new_dataset.as_ref().clone(); + + dataset + .optimize_indices(&OptimizeOptions::default()) + .await + .unwrap(); + let dataset = DatasetBuilder::from_uri(test_uri).load().await.unwrap(); + + // BTree: `num >= 0` matches ids 25..99 (75 rows); the 25 updated rows + // hold num = -1 and must not appear. + let btree_count = dataset + .scan() + .filter("num >= 0") + .unwrap() + .count_rows() + .await + .unwrap(); + assert_eq!(btree_count, 75, "btree returned stale/incorrect rows"); + + // Bitmap: only the 25 rows (ids 25..49) that still carry cat = 'A' match; + // the 25 rows updated to 'B' must not. + let bitmap_count = dataset + .scan() + .filter("cat = 'A'") + .unwrap() + .count_rows() + .await + .unwrap(); + assert_eq!(bitmap_count, 25, "bitmap returned stale rows"); + + // FTS: only the 25 rows (ids 25..49) whose body still reads "alpha" match; + // the 25 rows updated to "beta" must not. + let mut scan = dataset.scan(); + scan.full_text_search(FullTextSearchQuery::new("alpha".to_owned())) + .unwrap(); + let fts_count = scan.count_rows().await.unwrap(); + assert_eq!(fts_count, 25, "FTS index returned stale rows"); + } + #[tokio::test] async fn test_optimize_scalar_no_unindexed_fragments() { let test_dir = TempStrDir::default();