From 311eeef6c9d8277c76cf968bd24d99f5b4221b12 Mon Sep 17 00:00:00 2001 From: XYZhan Date: Fri, 19 Jun 2026 16:31:01 -0400 Subject: [PATCH] fix(compaction): prune fully-deleted fragments from the fragment-reuse index A fragment that has all its rows deleted is removed from the manifest at delete time, so it never enters a compaction task and is absent from every fragment-reuse-index (FRI) group's old_frags. With deferred compaction (defer_index_remap=true) the inline remap is skipped, so FragReuseIndex::remap_row_id passed those addresses through unchanged instead of pruning them. An index that still covered the removed fragment then kept dangling references to it, surfacing as 'take received reference to fragment that does not exist' errors / ghost results after a later physical remap. Record the orphaned fragment IDs (covered by an index but no longer present in the dataset, and not part of any rewrite group) on the FRI version and prune those addresses to None in remap_row_id, fixing both auto-remap at load and physical remap. Index coverage bitmaps are left intact (masked at query time by effective_fragment_bitmap); only the row data is pruned. The new proto field is additive and backward-compatible. Adds a regression test. Fixes #7374 --- protos/table.proto | 11 ++ .../src/system_index/frag_reuse.rs | 53 ++++++ rust/lance/src/dataset/optimize.rs | 152 +++++++++++++++++- rust/lance/src/index/frag_reuse.rs | 2 + 4 files changed, 216 insertions(+), 2 deletions(-) diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..69f5323c06e 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -514,6 +514,17 @@ message FragmentReuseIndexDetails { uint64 dataset_version = 1; repeated Group groups = 3; + + // Fragment IDs that some index covered but that were no longer present in + // the dataset when this version was committed, and were not part of any + // rewrite group (e.g. a fragment that was fully deleted before compaction, + // so it never entered a compaction task / Group.old_fragments). + // + // Such fragments are gone from the manifest, so their rows cannot be + // enumerated by address. Their stale index entries are pruned by fragment + // ID instead: the fragment-reuse index maps every row address whose + // fragment ID is in this list to "deleted" (None). + repeated uint32 removed_fragments = 4; } } diff --git a/rust/lance-table/src/system_index/frag_reuse.rs b/rust/lance-table/src/system_index/frag_reuse.rs index 40bbc4f58b6..d61e3a656cc 100644 --- a/rust/lance-table/src/system_index/frag_reuse.rs +++ b/rust/lance-table/src/system_index/frag_reuse.rs @@ -7,6 +7,7 @@ use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array}; use lance_core::deepsize::{Context, DeepSizeOf}; +use lance_core::utils::address::RowAddress; use lance_core::{Error, Result}; use lance_select::RowAddrTreeMap; use roaring::{RoaringBitmap, RoaringTreemap}; @@ -105,6 +106,13 @@ impl TryFrom for FragReuseGroup { pub struct FragReuseVersion { pub dataset_version: u64, pub groups: Vec, + /// Fragment IDs that some index covered but that were no longer present in + /// the dataset when this version was committed and were not part of any + /// rewrite group (e.g. a fragment fully deleted before compaction). Their + /// stale index entries are pruned by fragment ID, since the fragments are + /// gone from the manifest and their rows can no longer be enumerated by + /// address. + pub removed_frags: Vec, } impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version { @@ -112,6 +120,7 @@ impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version { Self { dataset_version: version.dataset_version, groups: version.groups.iter().map(|g| g.into()).collect(), + removed_fragments: version.removed_frags.clone(), } } } @@ -127,6 +136,7 @@ impl TryFrom for FragReuseVersion { .into_iter() .map(FragReuseGroup::try_from) .collect::>()?, + removed_frags: version.removed_fragments, }) } } @@ -194,6 +204,17 @@ impl FragReuseIndexDetails { .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)), ) } + + /// Union of `removed_frags` across all versions: fragments that were + /// orphaned (covered by an index but absent from the dataset and not part + /// of any rewrite group) and must be pruned from indexes by fragment ID. + pub fn removed_frag_bitmap(&self) -> RoaringBitmap { + RoaringBitmap::from_iter( + self.versions + .iter() + .flat_map(|v| v.removed_frags.iter().copied()), + ) + } } /// An index that stores row ID maps. @@ -204,6 +225,14 @@ pub struct FragReuseIndex { pub uuid: Uuid, pub row_id_maps: Vec>>, pub details: FragReuseIndexDetails, + /// Cached union of `removed_frags` across all versions (derived from + /// `details`). Any row address whose fragment ID is in this set is treated + /// as deleted (mapped to `None`), because the fragment is gone from the + /// manifest but its rows may still be referenced by an index that has not + /// yet been physically remapped. Skipped from (de)serialization since it is + /// always rebuilt via `new` from `details`. + #[serde(skip)] + removed_frags: RoaringBitmap, } impl DeepSizeOf for FragReuseIndex { @@ -218,14 +247,29 @@ impl FragReuseIndex { row_id_maps: Vec>>, details: FragReuseIndexDetails, ) -> Self { + let removed_frags = details.removed_frag_bitmap(); Self { uuid, row_id_maps, details, + removed_frags, } } pub fn remap_row_id(&self, row_id: u64) -> Option { + // A fully-deleted fragment is removed from the manifest at delete time, + // so it never enters a compaction group and is absent from the per-group + // row_id_maps. Prune any address belonging to such an orphaned fragment + // by fragment ID; otherwise its stale index entries pass through here and + // resurface as dangling references after compaction. + if !self.removed_frags.is_empty() + && self + .removed_frags + .contains(RowAddress::new_from_u64(row_id).fragment_id()) + { + return None; + } + let mut mapped_value = Some(row_id); for row_id_map in self.row_id_maps.iter() { if mapped_value.is_some() { @@ -307,6 +351,13 @@ impl FragReuseIndex { } pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> { + // Orphaned fragments (fully deleted before compaction) are deliberately + // NOT removed from index coverage here. Their rows are already pruned + // from the index data by `remap_row_id`, and a fragment that no longer + // exists is masked out at query time by `effective_fragment_bitmap`'s + // intersection with the live fragments. Stripping a deleted fragment's + // coverage instead makes the planner treat its (now nonexistent) row + // range as an unindexed flat-scan fallback, which fails on `take`. for version in self.details.versions.iter() { for group in version.groups.iter() { let mut removed = 0; @@ -378,6 +429,7 @@ mod tests { }, ], }], + removed_frags: vec![], }; let version2 = FragReuseVersion { @@ -402,6 +454,7 @@ mod tests { }, ], }], + removed_frags: vec![], }; // Create FragReuseIndexDetails with versions in reverse order diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 87dda8e7e57..c97c58f94ad 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -113,7 +113,7 @@ use lance_core::Error; use lance_core::datatypes::{BlobHandling, BlobKind}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS}; -use lance_index::frag_reuse::FragReuseGroup; +use lance_index::frag_reuse::{FRAG_REUSE_INDEX_NAME, FragReuseGroup}; use lance_table::format::{Fragment, RowIdMeta}; use roaring::{RoaringBitmap, RoaringTreemap}; use serde::{Deserialize, Serialize}; @@ -2027,7 +2027,43 @@ pub async fn commit_compaction( }; let frag_reuse_index = if options.defer_index_remap { - Some(build_new_frag_reuse_index(dataset, frag_reuse_groups, new_fragment_bitmap).await?) + // Compute orphaned fragments: those some index still covers but that are + // no longer present in the dataset and were not part of any rewrite group. + // A fragment fully deleted before compaction is removed from the manifest + // at delete time, so it never enters a compaction task (and thus is absent + // from every group's old_frags) yet may still be referenced by an index + // that has not been physically remapped. Without pruning it by fragment + // ID, those stale entries resurface as dangling references post-compaction. + // + // `dataset` here still holds the pre-commit fragments (the rewrite is not + // committed yet), so the fragments about to be compacted away are still + // present and correctly excluded from the orphaned set. + let current_frags: RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + let mut orphaned = RoaringBitmap::new(); + for index in dataset.load_indices().await?.iter() { + if index.name == FRAG_REUSE_INDEX_NAME { + continue; + } + if let Some(bitmap) = &index.fragment_bitmap { + orphaned |= bitmap; + } + } + orphaned -= ¤t_frags; + let removed_frags: Vec = orphaned.into_iter().collect(); + + Some( + build_new_frag_reuse_index( + dataset, + frag_reuse_groups, + removed_frags, + new_fragment_bitmap, + ) + .await?, + ) } else { None }; @@ -3279,6 +3315,118 @@ mod tests { assert_eq!(current_scalar_index.uuid, original_scalar_uuid); } + // Regression test for https://github.com/lancedb/lance/issues/7374 + // "Vector index can become corrupted when compaction is deferred" + // A fully-deleted fragment must be pruned from the vector index. With + // deferred compaction the inline remap is skipped, so the FRI must map every + // address of the fully-deleted fragment to None; otherwise the index later + // returns references to a fragment that was compacted away. + #[tokio::test] + async fn test_defer_index_remap_fully_deleted_fragment() { + let mut data_gen = BatchGenerator::new() + .col(Box::new( + RandomVector::new().vec_width(128).named("vec".to_owned()), + )) + .col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + + // 10 fragments x 1000 rows, i = 0..9999. Fragment 4 holds i in [4000,5000). + let mut dataset = Dataset::write( + data_gen.batch(10_000), + "memory://test/table", + Some(WriteParams { + max_rows_per_file: 1_000, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 10); + + dataset + .create_index( + &["vec"], + IndexType::Vector, + Some("vector".into()), + &VectorIndexParams::ivf_pq(10, 8, 8, MetricType::L2, 50), + false, + ) + .await + .unwrap(); + + // Fully delete fragment 4. This REMOVES fragment 4 from the manifest + // entirely (apply_deletions -> extend_deletions returns None for a + // fully-deleted fragment), so the compaction planner never sees it. + dataset.delete("i >= 4000 AND i < 5000").await.unwrap(); + + let metrics = compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 100_000, + materialize_deletions: true, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + assert!(metrics.fragments_removed > 0); + assert!(metrics.fragments_added > 0); + + // --- FRI-level check: fragment 4 must be pruned from the index --- + let frag_reuse_index_meta = dataset + .load_index_by_name(FRAG_REUSE_INDEX_NAME) + .await + .unwrap() + .expect("fragment reuse index must exist after deferred compaction"); + let details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta) + .await + .unwrap(); + let fri = open_frag_reuse_index(frag_reuse_index_meta.uuid, details.as_ref()) + .await + .unwrap(); + + // The fragment-reuse index must map every address of the fully-deleted + // fragment 4 to None (pruned). Returning Some(addr) means the vector + // index keeps a dangling reference to a fragment that no longer exists. + let mut bad = Vec::new(); + for offset in [0u32, 1, 100, 500, 999] { + let addr = u64::from(RowAddress::new_from_parts(4, offset)); + if let Some(mapped) = fri.remap_row_id(addr) { + bad.push((offset, mapped)); + } + } + assert!( + bad.is_empty(), + "fully-deleted fragment 4 addresses were NOT pruned by the FRI; \ + remap_row_id returned Some for offsets {:?} (should be None)", + bad + ); + + // --- End-to-end: vector search must succeed and not return ghost rows --- + let batch = dataset + .scan() + .nearest("vec", &Float32Array::from(vec![0.0f32; 128]), 200) + .unwrap() + .project(&["i"]) + .unwrap() + .try_into_batch() + .await + .expect("vector search after deferred compaction must succeed"); + let i_col = batch.column(0).as_primitive::(); + let ghosts: Vec = i_col + .values() + .iter() + .copied() + .filter(|&v| (4000..5000).contains(&v)) + .collect(); + assert!( + ghosts.is_empty(), + "vector search returned ids from the fully-deleted fragment range [4000,5000): {:?}", + ghosts + ); + } + #[tokio::test] async fn test_defer_index_remap_multiple_compactions() { let mut data_gen = BatchGenerator::new() diff --git a/rust/lance/src/index/frag_reuse.rs b/rust/lance/src/index/frag_reuse.rs index 23a8fec5145..2ab333c2793 100644 --- a/rust/lance/src/index/frag_reuse.rs +++ b/rust/lance/src/index/frag_reuse.rs @@ -95,11 +95,13 @@ pub(crate) async fn open_frag_reuse_index( pub(crate) async fn build_new_frag_reuse_index( dataset: &mut Dataset, frag_reuse_groups: Vec, + removed_frags: Vec, new_fragment_bitmap: RoaringBitmap, ) -> lance_core::Result { let new_version = FragReuseVersion { dataset_version: dataset.manifest.version, groups: frag_reuse_groups, + removed_frags, }; let index_meta = dataset.load_indices().await.map(|indices| {