diff --git a/python/python/tests/test_vector_index.py b/python/python/tests/test_vector_index.py index 4e3addfedb8..6e8acfa1d3c 100644 --- a/python/python/tests/test_vector_index.py +++ b/python/python/tests/test_vector_index.py @@ -1911,6 +1911,99 @@ def test_optimize_indices(indexed_dataset): assert stats["num_indices"] == 2 +def test_no_stale_duplicate_after_partial_column_update(tmp_path): + # Regression test: updating an indexed vector column in place (via the + # low-level fragment.update_columns API + LanceOperation.Update) and then + # delta-optimizing the index must not leave a stale copy of the row in the + # original index segment. + # + # Mechanism: update_columns rewrites only the column data file, keeping the + # fragment id and row address. Committing the Update prunes the fragment + # from the old index segment's fragment_bitmap, but that segment's index + # file still physically holds the row's OLD vector. optimize_indices then + # builds a new delta segment with the NEW vector. Before the fix a KNN query + # searched both segments and returned the updated row TWICE - once with the + # stale vector (old segment) and once with the new value (delta segment). + np.random.seed(42) + ndim = 16 + + # Fragment 0: a "far" cluster bounded to [-1, 1]. No bulk vector is close to + # the query (all-10.8), so the bulk cannot crowd the stale copy out of top-k. + n_bulk = 1000 + bulk = np.random.uniform(-1, 1, (n_bulk, ndim)).astype(np.float32) + table0 = pa.table( + { + "id": pa.array(range(n_bulk), type=pa.int64()), + "vector": pa.FixedSizeListArray.from_arrays( + pa.array(bulk.reshape(-1), type=pa.float32()), list_size=ndim + ), + } + ) + ds = lance.write_dataset(table0, tmp_path, mode="create") + + # Fragment 1: a single row whose ORIGINAL vector (all 2.0) is closer to the + # query than any bulk vector, so its stale copy ranks well inside top-k. + orig = np.full((1, ndim), 2.0, dtype=np.float32) + table1 = pa.table( + { + "id": pa.array([10_000], type=pa.int64()), + "vector": pa.FixedSizeListArray.from_arrays( + pa.array(orig.reshape(-1), type=pa.float32()), list_size=ndim + ), + } + ) + ds = lance.write_dataset(table1, tmp_path, mode="append") + assert len(ds.get_fragments()) == 2 + + # One index segment covering BOTH fragments {0, 1}. + ds = ds.create_index( + "vector", + index_type="IVF_PQ", + metric="l2", + num_partitions=1, + num_sub_vectors=ndim, + ) + + # Overwrite fragment 1's vector in place and commit Update(fields_modified). + new_vec = [10.8] * ndim + frag = ds.get_fragment(1) + rowids = frag.to_table(columns=["id"], with_row_id=True)["_rowid"].to_pylist() + update_data = pa.table( + { + "_rowid": pa.array(rowids, type=pa.uint64()), + "vector": pa.array( + [new_vec] * len(rowids), type=pa.list_(pa.float32(), ndim) + ), + } + ) + updated_fragment, fields_modified = frag.update_columns(update_data) + op = lance.LanceOperation.Update( + updated_fragments=[updated_fragment], + fields_modified=fields_modified, + ) + ds = lance.LanceDataset.commit(ds.uri, op, read_version=ds.version) + + # Delta-optimize: appends a new segment for the updated fragment; the old + # segment is left intact, still physically holding the stale vector. + ds.optimize.optimize_indices(num_indices_to_merge=0) + ds = lance.dataset(ds.uri) + assert ds.stats.index_stats("vector_idx")["num_indices"] == 2 + + # KNN near the NEW value via the default vector search (searches all + # segments). The updated row must appear EXACTLY ONCE. + q = np.array(new_vec, dtype=np.float32) + res = ds.to_table( + columns=["id"], + nearest={"column": "vector", "q": q, "k": 10}, + with_row_id=True, + ).to_pandas() + dupes = res[res["id"] == 10_000] + assert len(dupes) == 1, ( + f"updated row id=10000 returned {len(dupes)} times " + f"(stale index segment not masked); rowids={res['_rowid'].tolist()}" + ) + + @pytest.mark.skip(reason="retrain is deprecated") def test_retrain_indices(indexed_dataset): data = create_table() diff --git a/rust/lance/src/io/exec/knn.rs b/rust/lance/src/io/exec/knn.rs index 0ceddf7c5ee..e8fa89da425 100644 --- a/rust/lance/src/io/exec/knn.rs +++ b/rust/lance/src/io/exec/knn.rs @@ -56,7 +56,9 @@ use lance_index::vector::{ }; use lance_linalg::distance::DistanceType; use lance_linalg::kernels::normalize_arrow; +use lance_select::RowAddrMask; use lance_table::format::IndexMetadata; +use roaring::RoaringBitmap; use tokio::sync::Notify; use uuid::Uuid; @@ -1227,6 +1229,7 @@ impl ANNIvfSubIndexExec { metrics: Arc, state: Arc, target_partitions: usize, + seg_mask: Option>, ) -> impl Stream> { let stream = futures::stream::once(async move { let max_nprobes = query @@ -1260,20 +1263,29 @@ impl ANNIvfSubIndexExec { // This next if check should be true, because we wouldn't get max_results otherwise if let Some(iter_addrs) = prefilter_mask.iter_addrs() { - // We only run this on the first delta because the prefilter mask is shared - // by all deltas and we don't want to duplicate the rows. - if state - .took_no_rows_shortcut - .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) - .is_ok() - { + // Emit the prefilter rows that the partition search did not reach. + // + // The prefilter mask is shared by all deltas. When a per-segment + // restriction is in effect (`seg_mask` is `Some`) each delta emits only + // the addresses its own segment owns; the segments partition the + // fragments, so each address is emitted by exactly one delta. Without a + // restriction the mask is global, so only the first delta emits (guarded + // by a shared flag) to avoid duplicating rows across deltas. + let should_emit = seg_mask.is_some() + || state + .took_no_rows_shortcut + .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed) + .is_ok(); + if should_emit { let initial_addrs = state.initial_ids.lock().unwrap(); let found_addrs = HashSet::<_>::from_iter(initial_addrs.iter().copied()); drop(initial_addrs); - let mask_addrs = HashSet::from_iter(iter_addrs.map(u64::from)); - let not_found_addrs = mask_addrs.difference(&found_addrs); - let not_found_addrs = - UInt64Array::from_iter_values(not_found_addrs.copied()); + let not_found_addrs = UInt64Array::from_iter_values( + iter_addrs.map(u64::from).filter(|addr| { + !found_addrs.contains(addr) + && seg_mask.as_ref().is_none_or(|m| m.selected(*addr)) + }), + ); let not_found_distance = Float32Array::from_value(f32::INFINITY, not_found_addrs.len()); let not_found_batch = RecordBatch::try_new( @@ -1283,8 +1295,8 @@ impl ANNIvfSubIndexExec { .unwrap(); return futures::stream::once(async move { Ok(not_found_batch) }).boxed(); } else { - // We meet all the criteria for an early exit, but we aren't first - // delta so we just return an empty stream and skip the late search + // We meet all the criteria for an early exit, but we aren't the first + // delta and the mask is global, so skip to avoid duplicate rows. return futures::stream::empty().boxed(); } } @@ -1519,6 +1531,19 @@ impl ExecutionPlan for ANNIvfSubIndexExec { let ds = self.dataset.clone(); let column = self.query.column.clone(); let indices = self.indices.clone(); + // Per-segment fragment restriction (applied as a post-filter below). + // Only enabled when every segment has a fragment_bitmap, mirroring the + // `all_have_bitmaps` gate in DatasetPreFilter::new so we never restrict + // more aggressively than the shared prefilter's fallback. + let segment_bitmaps: Arc> = + Arc::new(if indices.iter().all(|idx| idx.fragment_bitmap.is_some()) { + indices + .iter() + .map(|idx| (idx.uuid, idx.fragment_bitmap.clone().unwrap())) + .collect() + } else { + HashMap::new() + }); let prefilter_source = self.prefilter_source.clone(); let metrics = Arc::new(AnnIndexMetrics::new(&self.metrics, partition)); let metrics_clone = metrics.clone(); @@ -1593,6 +1618,7 @@ impl ExecutionPlan for ANNIvfSubIndexExec { let metrics = metrics.clone(); let pre_filter = pre_filter.clone(); let state = state.clone(); + let segment_bitmaps = segment_bitmaps.clone(); let mut query = query.clone(); let pruned_nprobes = early_pruning(q_c_dists.values(), query.k); adjust_probes(&mut query, pruned_nprobes); @@ -1602,6 +1628,28 @@ impl ExecutionPlan for ANNIvfSubIndexExec { .await?; let query = normalize_query_for_index(raw_index.as_ref(), query)?; + // A segment's index file may still physically contain rows for + // fragments that were pruned from its fragment_bitmap (e.g. after an + // in-place column update via update_columns). Once a newer delta + // segment owns such a fragment, the stale rows in this segment must + // not be returned, otherwise the same row is emitted by two segments. + // Build a per-segment restriction mask, reusing the scheme-aware + // helper so it is correct for both row-address and stable-row-id + // datasets. The shared prefilter is built from the union of all + // segment bitmaps and cannot express this per-segment rule. + let seg_mask = match segment_bitmaps.get(&index_uuid).cloned() { + Some(bitmap) => { + match DatasetPreFilter::create_restricted_deletion_mask( + ds.clone(), + bitmap, + ) { + Some(fut) => Some(fut.await?), + None => None, + } + } + None => None, + }; + let early_search = Self::initial_search( raw_index.clone(), query.clone(), @@ -1621,8 +1669,34 @@ impl ExecutionPlan for ANNIvfSubIndexExec { metrics, state, target_partitions, + seg_mask.clone(), ); - DataFusionResult::Ok(early_search.chain(late_search).boxed()) + let combined = early_search.chain(late_search); + // Drop stale rows from this segment's search output (rows whose + // fragment the segment no longer owns). The shortcut path in + // late_search restricts its emitted rows with the same mask. + let restricted = combined.map(move |batch_res| { + let batch = batch_res?; + let Some(seg_mask) = seg_mask.as_ref() else { + return Ok(batch); + }; + if batch.num_rows() == 0 { + return Ok(batch); + } + let row_ids = batch[ROW_ID].as_primitive::(); + let keep = BooleanArray::from_iter( + row_ids + .values() + .iter() + .map(|&id| Some(seg_mask.selected(id))), + ); + if keep.false_count() == 0 { + return Ok(batch); + } + arrow::compute::filter_record_batch(&batch, &keep) + .map_err(|e| DataFusionError::ArrowError(Box::new(e), None)) + }); + DataFusionResult::Ok(restricted.boxed()) } }) // Must use flatten_unordered to avoid deadlock. @@ -2547,6 +2621,7 @@ mod tests { prepared_metrics(), state.clone(), usize::MAX, + None, ) .try_collect::>() .await