Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 93 additions & 0 deletions python/python/tests/test_vector_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -1911,6 +1911,99 @@ def test_optimize_indices(indexed_dataset):
assert stats["num_indices"] == 2


def test_no_stale_duplicate_after_partial_column_update(tmp_path):
# Regression test: updating an indexed vector column in place (via the
# low-level fragment.update_columns API + LanceOperation.Update) and then
# delta-optimizing the index must not leave a stale copy of the row in the
# original index segment.
#
# Mechanism: update_columns rewrites only the column data file, keeping the
# fragment id and row address. Committing the Update prunes the fragment
# from the old index segment's fragment_bitmap, but that segment's index
# file still physically holds the row's OLD vector. optimize_indices then
# builds a new delta segment with the NEW vector. Before the fix a KNN query
# searched both segments and returned the updated row TWICE - once with the
# stale vector (old segment) and once with the new value (delta segment).
np.random.seed(42)
ndim = 16

# Fragment 0: a "far" cluster bounded to [-1, 1]. No bulk vector is close to
# the query (all-10.8), so the bulk cannot crowd the stale copy out of top-k.
n_bulk = 1000
bulk = np.random.uniform(-1, 1, (n_bulk, ndim)).astype(np.float32)
table0 = pa.table(
{
"id": pa.array(range(n_bulk), type=pa.int64()),
"vector": pa.FixedSizeListArray.from_arrays(
pa.array(bulk.reshape(-1), type=pa.float32()), list_size=ndim
),
}
)
ds = lance.write_dataset(table0, tmp_path, mode="create")

# Fragment 1: a single row whose ORIGINAL vector (all 2.0) is closer to the
# query than any bulk vector, so its stale copy ranks well inside top-k.
orig = np.full((1, ndim), 2.0, dtype=np.float32)
table1 = pa.table(
{
"id": pa.array([10_000], type=pa.int64()),
"vector": pa.FixedSizeListArray.from_arrays(
pa.array(orig.reshape(-1), type=pa.float32()), list_size=ndim
),
}
)
ds = lance.write_dataset(table1, tmp_path, mode="append")
assert len(ds.get_fragments()) == 2

# One index segment covering BOTH fragments {0, 1}.
ds = ds.create_index(
"vector",
index_type="IVF_PQ",
metric="l2",
num_partitions=1,
num_sub_vectors=ndim,
)

# Overwrite fragment 1's vector in place and commit Update(fields_modified).
new_vec = [10.8] * ndim
frag = ds.get_fragment(1)
rowids = frag.to_table(columns=["id"], with_row_id=True)["_rowid"].to_pylist()
update_data = pa.table(
{
"_rowid": pa.array(rowids, type=pa.uint64()),
"vector": pa.array(
[new_vec] * len(rowids), type=pa.list_(pa.float32(), ndim)
),
}
)
updated_fragment, fields_modified = frag.update_columns(update_data)
op = lance.LanceOperation.Update(
updated_fragments=[updated_fragment],
fields_modified=fields_modified,
)
ds = lance.LanceDataset.commit(ds.uri, op, read_version=ds.version)

# Delta-optimize: appends a new segment for the updated fragment; the old
# segment is left intact, still physically holding the stale vector.
ds.optimize.optimize_indices(num_indices_to_merge=0)
ds = lance.dataset(ds.uri)
assert ds.stats.index_stats("vector_idx")["num_indices"] == 2

# KNN near the NEW value via the default vector search (searches all
# segments). The updated row must appear EXACTLY ONCE.
q = np.array(new_vec, dtype=np.float32)
res = ds.to_table(
columns=["id"],
nearest={"column": "vector", "q": q, "k": 10},
with_row_id=True,
).to_pandas()
dupes = res[res["id"] == 10_000]
assert len(dupes) == 1, (
f"updated row id=10000 returned {len(dupes)} times "
f"(stale index segment not masked); rowids={res['_rowid'].tolist()}"
)


@pytest.mark.skip(reason="retrain is deprecated")
def test_retrain_indices(indexed_dataset):
data = create_table()
Expand Down
103 changes: 89 additions & 14 deletions rust/lance/src/io/exec/knn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ use lance_index::vector::{
};
use lance_linalg::distance::DistanceType;
use lance_linalg::kernels::normalize_arrow;
use lance_select::RowAddrMask;
use lance_table::format::IndexMetadata;
use roaring::RoaringBitmap;
use tokio::sync::Notify;
use uuid::Uuid;

Expand Down Expand Up @@ -1227,6 +1229,7 @@ impl ANNIvfSubIndexExec {
metrics: Arc<AnnIndexMetrics>,
state: Arc<ANNIvfEarlySearchResults>,
target_partitions: usize,
seg_mask: Option<Arc<RowAddrMask>>,
) -> impl Stream<Item = DataFusionResult<RecordBatch>> {
let stream = futures::stream::once(async move {
let max_nprobes = query
Expand Down Expand Up @@ -1260,20 +1263,29 @@ impl ANNIvfSubIndexExec {

// This next if check should be true, because we wouldn't get max_results otherwise
if let Some(iter_addrs) = prefilter_mask.iter_addrs() {
// We only run this on the first delta because the prefilter mask is shared
// by all deltas and we don't want to duplicate the rows.
if state
.took_no_rows_shortcut
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok()
{
// Emit the prefilter rows that the partition search did not reach.
//
// The prefilter mask is shared by all deltas. When a per-segment
// restriction is in effect (`seg_mask` is `Some`) each delta emits only
// the addresses its own segment owns; the segments partition the
// fragments, so each address is emitted by exactly one delta. Without a
// restriction the mask is global, so only the first delta emits (guarded
// by a shared flag) to avoid duplicating rows across deltas.
let should_emit = seg_mask.is_some()
|| state
.took_no_rows_shortcut
.compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
.is_ok();
if should_emit {
let initial_addrs = state.initial_ids.lock().unwrap();
let found_addrs = HashSet::<_>::from_iter(initial_addrs.iter().copied());
drop(initial_addrs);
let mask_addrs = HashSet::from_iter(iter_addrs.map(u64::from));
let not_found_addrs = mask_addrs.difference(&found_addrs);
let not_found_addrs =
UInt64Array::from_iter_values(not_found_addrs.copied());
let not_found_addrs = UInt64Array::from_iter_values(
iter_addrs.map(u64::from).filter(|addr| {
!found_addrs.contains(addr)
&& seg_mask.as_ref().is_none_or(|m| m.selected(*addr))
}),
);
let not_found_distance =
Float32Array::from_value(f32::INFINITY, not_found_addrs.len());
let not_found_batch = RecordBatch::try_new(
Expand All @@ -1283,8 +1295,8 @@ impl ANNIvfSubIndexExec {
.unwrap();
return futures::stream::once(async move { Ok(not_found_batch) }).boxed();
} else {
// We meet all the criteria for an early exit, but we aren't first
// delta so we just return an empty stream and skip the late search
// We meet all the criteria for an early exit, but we aren't the first
// delta and the mask is global, so skip to avoid duplicate rows.
return futures::stream::empty().boxed();
}
}
Expand Down Expand Up @@ -1519,6 +1531,19 @@ impl ExecutionPlan for ANNIvfSubIndexExec {
let ds = self.dataset.clone();
let column = self.query.column.clone();
let indices = self.indices.clone();
// Per-segment fragment restriction (applied as a post-filter below).
// Only enabled when every segment has a fragment_bitmap, mirroring the
// `all_have_bitmaps` gate in DatasetPreFilter::new so we never restrict
// more aggressively than the shared prefilter's fallback.
let segment_bitmaps: Arc<HashMap<Uuid, RoaringBitmap>> =
Arc::new(if indices.iter().all(|idx| idx.fragment_bitmap.is_some()) {
indices
.iter()
.map(|idx| (idx.uuid, idx.fragment_bitmap.clone().unwrap()))
.collect()
} else {
HashMap::new()
});
let prefilter_source = self.prefilter_source.clone();
let metrics = Arc::new(AnnIndexMetrics::new(&self.metrics, partition));
let metrics_clone = metrics.clone();
Expand Down Expand Up @@ -1593,6 +1618,7 @@ impl ExecutionPlan for ANNIvfSubIndexExec {
let metrics = metrics.clone();
let pre_filter = pre_filter.clone();
let state = state.clone();
let segment_bitmaps = segment_bitmaps.clone();
let mut query = query.clone();
let pruned_nprobes = early_pruning(q_c_dists.values(), query.k);
adjust_probes(&mut query, pruned_nprobes);
Expand All @@ -1602,6 +1628,28 @@ impl ExecutionPlan for ANNIvfSubIndexExec {
.await?;
let query = normalize_query_for_index(raw_index.as_ref(), query)?;

// A segment's index file may still physically contain rows for
// fragments that were pruned from its fragment_bitmap (e.g. after an
// in-place column update via update_columns). Once a newer delta
// segment owns such a fragment, the stale rows in this segment must
// not be returned, otherwise the same row is emitted by two segments.
// Build a per-segment restriction mask, reusing the scheme-aware
// helper so it is correct for both row-address and stable-row-id
// datasets. The shared prefilter is built from the union of all
// segment bitmaps and cannot express this per-segment rule.
let seg_mask = match segment_bitmaps.get(&index_uuid).cloned() {
Some(bitmap) => {
match DatasetPreFilter::create_restricted_deletion_mask(
ds.clone(),
bitmap,
) {
Some(fut) => Some(fut.await?),
None => None,
}
}
None => None,
};

let early_search = Self::initial_search(
raw_index.clone(),
query.clone(),
Expand All @@ -1621,8 +1669,34 @@ impl ExecutionPlan for ANNIvfSubIndexExec {
metrics,
state,
target_partitions,
seg_mask.clone(),
);
DataFusionResult::Ok(early_search.chain(late_search).boxed())
let combined = early_search.chain(late_search);
// Drop stale rows from this segment's search output (rows whose
// fragment the segment no longer owns). The shortcut path in
// late_search restricts its emitted rows with the same mask.
let restricted = combined.map(move |batch_res| {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This path applies the segment restriction only after the shared early/late search accounting has already counted the unmasked results. A stale hit can satisfy the coordinator and then be dropped, so the query can return fewer than k valid rows or miss the fresh row from the owning segment.

let batch = batch_res?;
let Some(seg_mask) = seg_mask.as_ref() else {
return Ok(batch);
};
if batch.num_rows() == 0 {
return Ok(batch);
}
let row_ids = batch[ROW_ID].as_primitive::<UInt64Type>();
let keep = BooleanArray::from_iter(
row_ids
.values()
.iter()
.map(|&id| Some(seg_mask.selected(id))),
);
if keep.false_count() == 0 {
return Ok(batch);
}
arrow::compute::filter_record_batch(&batch, &keep)
.map_err(|e| DataFusionError::ArrowError(Box::new(e), None))
});
DataFusionResult::Ok(restricted.boxed())
}
})
// Must use flatten_unordered to avoid deadlock.
Expand Down Expand Up @@ -2547,6 +2621,7 @@ mod tests {
prepared_metrics(),
state.clone(),
usize::MAX,
None,
)
.try_collect::<Vec<_>>()
.await
Expand Down
Loading