diff --git a/protos/table.proto b/protos/table.proto index d298809d5d8..69f5323c06e 100644 --- a/protos/table.proto +++ b/protos/table.proto @@ -514,6 +514,17 @@ message FragmentReuseIndexDetails { uint64 dataset_version = 1; repeated Group groups = 3; + + // Fragment IDs that some index covered but that were no longer present in + // the dataset when this version was committed, and were not part of any + // rewrite group (e.g. a fragment that was fully deleted before compaction, + // so it never entered a compaction task / Group.old_fragments). + // + // Such fragments are gone from the manifest, so their rows cannot be + // enumerated by address. Their stale index entries are pruned by fragment + // ID instead: the fragment-reuse index maps every row address whose + // fragment ID is in this list to "deleted" (None). + repeated uint32 removed_fragments = 4; } } diff --git a/rust/lance-table/src/system_index/frag_reuse.rs b/rust/lance-table/src/system_index/frag_reuse.rs index 40bbc4f58b6..d61e3a656cc 100644 --- a/rust/lance-table/src/system_index/frag_reuse.rs +++ b/rust/lance-table/src/system_index/frag_reuse.rs @@ -7,6 +7,7 @@ use arrow_array::cast::AsArray; use arrow_array::types::UInt64Type; use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array}; use lance_core::deepsize::{Context, DeepSizeOf}; +use lance_core::utils::address::RowAddress; use lance_core::{Error, Result}; use lance_select::RowAddrTreeMap; use roaring::{RoaringBitmap, RoaringTreemap}; @@ -105,6 +106,13 @@ impl TryFrom for FragReuseGroup { pub struct FragReuseVersion { pub dataset_version: u64, pub groups: Vec, + /// Fragment IDs that some index covered but that were no longer present in + /// the dataset when this version was committed and were not part of any + /// rewrite group (e.g. a fragment fully deleted before compaction). Their + /// stale index entries are pruned by fragment ID, since the fragments are + /// gone from the manifest and their rows can no longer be enumerated by + /// address. + pub removed_frags: Vec, } impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version { @@ -112,6 +120,7 @@ impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version { Self { dataset_version: version.dataset_version, groups: version.groups.iter().map(|g| g.into()).collect(), + removed_fragments: version.removed_frags.clone(), } } } @@ -127,6 +136,7 @@ impl TryFrom for FragReuseVersion { .into_iter() .map(FragReuseGroup::try_from) .collect::>()?, + removed_frags: version.removed_fragments, }) } } @@ -194,6 +204,17 @@ impl FragReuseIndexDetails { .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)), ) } + + /// Union of `removed_frags` across all versions: fragments that were + /// orphaned (covered by an index but absent from the dataset and not part + /// of any rewrite group) and must be pruned from indexes by fragment ID. + pub fn removed_frag_bitmap(&self) -> RoaringBitmap { + RoaringBitmap::from_iter( + self.versions + .iter() + .flat_map(|v| v.removed_frags.iter().copied()), + ) + } } /// An index that stores row ID maps. @@ -204,6 +225,14 @@ pub struct FragReuseIndex { pub uuid: Uuid, pub row_id_maps: Vec>>, pub details: FragReuseIndexDetails, + /// Cached union of `removed_frags` across all versions (derived from + /// `details`). Any row address whose fragment ID is in this set is treated + /// as deleted (mapped to `None`), because the fragment is gone from the + /// manifest but its rows may still be referenced by an index that has not + /// yet been physically remapped. Skipped from (de)serialization since it is + /// always rebuilt via `new` from `details`. + #[serde(skip)] + removed_frags: RoaringBitmap, } impl DeepSizeOf for FragReuseIndex { @@ -218,14 +247,29 @@ impl FragReuseIndex { row_id_maps: Vec>>, details: FragReuseIndexDetails, ) -> Self { + let removed_frags = details.removed_frag_bitmap(); Self { uuid, row_id_maps, details, + removed_frags, } } pub fn remap_row_id(&self, row_id: u64) -> Option { + // A fully-deleted fragment is removed from the manifest at delete time, + // so it never enters a compaction group and is absent from the per-group + // row_id_maps. Prune any address belonging to such an orphaned fragment + // by fragment ID; otherwise its stale index entries pass through here and + // resurface as dangling references after compaction. + if !self.removed_frags.is_empty() + && self + .removed_frags + .contains(RowAddress::new_from_u64(row_id).fragment_id()) + { + return None; + } + let mut mapped_value = Some(row_id); for row_id_map in self.row_id_maps.iter() { if mapped_value.is_some() { @@ -307,6 +351,13 @@ impl FragReuseIndex { } pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> { + // Orphaned fragments (fully deleted before compaction) are deliberately + // NOT removed from index coverage here. Their rows are already pruned + // from the index data by `remap_row_id`, and a fragment that no longer + // exists is masked out at query time by `effective_fragment_bitmap`'s + // intersection with the live fragments. Stripping a deleted fragment's + // coverage instead makes the planner treat its (now nonexistent) row + // range as an unindexed flat-scan fallback, which fails on `take`. for version in self.details.versions.iter() { for group in version.groups.iter() { let mut removed = 0; @@ -378,6 +429,7 @@ mod tests { }, ], }], + removed_frags: vec![], }; let version2 = FragReuseVersion { @@ -402,6 +454,7 @@ mod tests { }, ], }], + removed_frags: vec![], }; // Create FragReuseIndexDetails with versions in reverse order diff --git a/rust/lance/src/dataset/optimize.rs b/rust/lance/src/dataset/optimize.rs index 87dda8e7e57..c97c58f94ad 100644 --- a/rust/lance/src/dataset/optimize.rs +++ b/rust/lance/src/dataset/optimize.rs @@ -113,7 +113,7 @@ use lance_core::Error; use lance_core::datatypes::{BlobHandling, BlobKind}; use lance_core::utils::tokio::get_num_compute_intensive_cpus; use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS}; -use lance_index::frag_reuse::FragReuseGroup; +use lance_index::frag_reuse::{FRAG_REUSE_INDEX_NAME, FragReuseGroup}; use lance_table::format::{Fragment, RowIdMeta}; use roaring::{RoaringBitmap, RoaringTreemap}; use serde::{Deserialize, Serialize}; @@ -2027,7 +2027,43 @@ pub async fn commit_compaction( }; let frag_reuse_index = if options.defer_index_remap { - Some(build_new_frag_reuse_index(dataset, frag_reuse_groups, new_fragment_bitmap).await?) + // Compute orphaned fragments: those some index still covers but that are + // no longer present in the dataset and were not part of any rewrite group. + // A fragment fully deleted before compaction is removed from the manifest + // at delete time, so it never enters a compaction task (and thus is absent + // from every group's old_frags) yet may still be referenced by an index + // that has not been physically remapped. Without pruning it by fragment + // ID, those stale entries resurface as dangling references post-compaction. + // + // `dataset` here still holds the pre-commit fragments (the rewrite is not + // committed yet), so the fragments about to be compacted away are still + // present and correctly excluded from the orphaned set. + let current_frags: RoaringBitmap = dataset + .get_fragments() + .iter() + .map(|f| f.id() as u32) + .collect(); + let mut orphaned = RoaringBitmap::new(); + for index in dataset.load_indices().await?.iter() { + if index.name == FRAG_REUSE_INDEX_NAME { + continue; + } + if let Some(bitmap) = &index.fragment_bitmap { + orphaned |= bitmap; + } + } + orphaned -= ¤t_frags; + let removed_frags: Vec = orphaned.into_iter().collect(); + + Some( + build_new_frag_reuse_index( + dataset, + frag_reuse_groups, + removed_frags, + new_fragment_bitmap, + ) + .await?, + ) } else { None }; @@ -3279,6 +3315,118 @@ mod tests { assert_eq!(current_scalar_index.uuid, original_scalar_uuid); } + // Regression test for https://github.com/lancedb/lance/issues/7374 + // "Vector index can become corrupted when compaction is deferred" + // A fully-deleted fragment must be pruned from the vector index. With + // deferred compaction the inline remap is skipped, so the FRI must map every + // address of the fully-deleted fragment to None; otherwise the index later + // returns references to a fragment that was compacted away. + #[tokio::test] + async fn test_defer_index_remap_fully_deleted_fragment() { + let mut data_gen = BatchGenerator::new() + .col(Box::new( + RandomVector::new().vec_width(128).named("vec".to_owned()), + )) + .col(Box::new(IncrementingInt32::new().named("i".to_owned()))); + + // 10 fragments x 1000 rows, i = 0..9999. Fragment 4 holds i in [4000,5000). + let mut dataset = Dataset::write( + data_gen.batch(10_000), + "memory://test/table", + Some(WriteParams { + max_rows_per_file: 1_000, + ..Default::default() + }), + ) + .await + .unwrap(); + assert_eq!(dataset.get_fragments().len(), 10); + + dataset + .create_index( + &["vec"], + IndexType::Vector, + Some("vector".into()), + &VectorIndexParams::ivf_pq(10, 8, 8, MetricType::L2, 50), + false, + ) + .await + .unwrap(); + + // Fully delete fragment 4. This REMOVES fragment 4 from the manifest + // entirely (apply_deletions -> extend_deletions returns None for a + // fully-deleted fragment), so the compaction planner never sees it. + dataset.delete("i >= 4000 AND i < 5000").await.unwrap(); + + let metrics = compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 100_000, + materialize_deletions: true, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + assert!(metrics.fragments_removed > 0); + assert!(metrics.fragments_added > 0); + + // --- FRI-level check: fragment 4 must be pruned from the index --- + let frag_reuse_index_meta = dataset + .load_index_by_name(FRAG_REUSE_INDEX_NAME) + .await + .unwrap() + .expect("fragment reuse index must exist after deferred compaction"); + let details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta) + .await + .unwrap(); + let fri = open_frag_reuse_index(frag_reuse_index_meta.uuid, details.as_ref()) + .await + .unwrap(); + + // The fragment-reuse index must map every address of the fully-deleted + // fragment 4 to None (pruned). Returning Some(addr) means the vector + // index keeps a dangling reference to a fragment that no longer exists. + let mut bad = Vec::new(); + for offset in [0u32, 1, 100, 500, 999] { + let addr = u64::from(RowAddress::new_from_parts(4, offset)); + if let Some(mapped) = fri.remap_row_id(addr) { + bad.push((offset, mapped)); + } + } + assert!( + bad.is_empty(), + "fully-deleted fragment 4 addresses were NOT pruned by the FRI; \ + remap_row_id returned Some for offsets {:?} (should be None)", + bad + ); + + // --- End-to-end: vector search must succeed and not return ghost rows --- + let batch = dataset + .scan() + .nearest("vec", &Float32Array::from(vec![0.0f32; 128]), 200) + .unwrap() + .project(&["i"]) + .unwrap() + .try_into_batch() + .await + .expect("vector search after deferred compaction must succeed"); + let i_col = batch.column(0).as_primitive::(); + let ghosts: Vec = i_col + .values() + .iter() + .copied() + .filter(|&v| (4000..5000).contains(&v)) + .collect(); + assert!( + ghosts.is_empty(), + "vector search returned ids from the fully-deleted fragment range [4000,5000): {:?}", + ghosts + ); + } + #[tokio::test] async fn test_defer_index_remap_multiple_compactions() { let mut data_gen = BatchGenerator::new() diff --git a/rust/lance/src/index/frag_reuse.rs b/rust/lance/src/index/frag_reuse.rs index 23a8fec5145..2ab333c2793 100644 --- a/rust/lance/src/index/frag_reuse.rs +++ b/rust/lance/src/index/frag_reuse.rs @@ -95,11 +95,13 @@ pub(crate) async fn open_frag_reuse_index( pub(crate) async fn build_new_frag_reuse_index( dataset: &mut Dataset, frag_reuse_groups: Vec, + removed_frags: Vec, new_fragment_bitmap: RoaringBitmap, ) -> lance_core::Result { let new_version = FragReuseVersion { dataset_version: dataset.manifest.version, groups: frag_reuse_groups, + removed_frags, }; let index_meta = dataset.load_indices().await.map(|indices| {