Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions protos/table.proto
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,17 @@ message FragmentReuseIndexDetails {
uint64 dataset_version = 1;

repeated Group groups = 3;

// Fragment IDs that some index covered but that were no longer present in
// the dataset when this version was committed, and were not part of any
// rewrite group (e.g. a fragment that was fully deleted before compaction,
// so it never entered a compaction task / Group.old_fragments).
//
// Such fragments are gone from the manifest, so their rows cannot be
// enumerated by address. Their stale index entries are pruned by fragment
// ID instead: the fragment-reuse index maps every row address whose
// fragment ID is in this list to "deleted" (None).
repeated uint32 removed_fragments = 4;
}
}

Expand Down
53 changes: 53 additions & 0 deletions rust/lance-table/src/system_index/frag_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use arrow_array::cast::AsArray;
use arrow_array::types::UInt64Type;
use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array};
use lance_core::deepsize::{Context, DeepSizeOf};
use lance_core::utils::address::RowAddress;
use lance_core::{Error, Result};
use lance_select::RowAddrTreeMap;
use roaring::{RoaringBitmap, RoaringTreemap};
Expand Down Expand Up @@ -105,13 +106,21 @@ impl TryFrom<pb::fragment_reuse_index_details::Group> for FragReuseGroup {
pub struct FragReuseVersion {
pub dataset_version: u64,
pub groups: Vec<FragReuseGroup>,
/// Fragment IDs that some index covered but that were no longer present in
/// the dataset when this version was committed and were not part of any
/// rewrite group (e.g. a fragment fully deleted before compaction). Their
/// stale index entries are pruned by fragment ID, since the fragments are
/// gone from the manifest and their rows can no longer be enumerated by
/// address.
pub removed_frags: Vec<u32>,
}

impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version {
fn from(version: &FragReuseVersion) -> Self {
Self {
dataset_version: version.dataset_version,
groups: version.groups.iter().map(|g| g.into()).collect(),
removed_fragments: version.removed_frags.clone(),
}
}
}
Expand All @@ -127,6 +136,7 @@ impl TryFrom<pb::fragment_reuse_index_details::Version> for FragReuseVersion {
.into_iter()
.map(FragReuseGroup::try_from)
.collect::<Result<_>>()?,
removed_frags: version.removed_fragments,
})
}
}
Expand Down Expand Up @@ -194,6 +204,17 @@ impl FragReuseIndexDetails {
.flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)),
)
}

/// Union of `removed_frags` across all versions: fragments that were
/// orphaned (covered by an index but absent from the dataset and not part
/// of any rewrite group) and must be pruned from indexes by fragment ID.
pub fn removed_frag_bitmap(&self) -> RoaringBitmap {
RoaringBitmap::from_iter(
self.versions
.iter()
.flat_map(|v| v.removed_frags.iter().copied()),
)
}
}

/// An index that stores row ID maps.
Expand All @@ -204,6 +225,14 @@ pub struct FragReuseIndex {
pub uuid: Uuid,
pub row_id_maps: Vec<HashMap<u64, Option<u64>>>,
pub details: FragReuseIndexDetails,
/// Cached union of `removed_frags` across all versions (derived from
/// `details`). Any row address whose fragment ID is in this set is treated
/// as deleted (mapped to `None`), because the fragment is gone from the
/// manifest but its rows may still be referenced by an index that has not
/// yet been physically remapped. Skipped from (de)serialization since it is
/// always rebuilt via `new` from `details`.
#[serde(skip)]
removed_frags: RoaringBitmap,
}

impl DeepSizeOf for FragReuseIndex {
Expand All @@ -218,14 +247,29 @@ impl FragReuseIndex {
row_id_maps: Vec<HashMap<u64, Option<u64>>>,
details: FragReuseIndexDetails,
) -> Self {
let removed_frags = details.removed_frag_bitmap();
Self {
uuid,
row_id_maps,
details,
removed_frags,
}
}

pub fn remap_row_id(&self, row_id: u64) -> Option<u64> {
// A fully-deleted fragment is removed from the manifest at delete time,
// so it never enters a compaction group and is absent from the per-group
// row_id_maps. Prune any address belonging to such an orphaned fragment
// by fragment ID; otherwise its stale index entries pass through here and
// resurface as dangling references after compaction.
if !self.removed_frags.is_empty()
&& self
.removed_frags
.contains(RowAddress::new_from_u64(row_id).fragment_id())
{
return None;
}

let mut mapped_value = Some(row_id);
for row_id_map in self.row_id_maps.iter() {
if mapped_value.is_some() {
Expand Down Expand Up @@ -307,6 +351,13 @@ impl FragReuseIndex {
}

pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> {
// Orphaned fragments (fully deleted before compaction) are deliberately
// NOT removed from index coverage here. Their rows are already pruned
// from the index data by `remap_row_id`, and a fragment that no longer
// exists is masked out at query time by `effective_fragment_bitmap`'s
// intersection with the live fragments. Stripping a deleted fragment's
// coverage instead makes the planner treat its (now nonexistent) row
// range as an unindexed flat-scan fallback, which fails on `take`.
for version in self.details.versions.iter() {
for group in version.groups.iter() {
let mut removed = 0;
Expand Down Expand Up @@ -378,6 +429,7 @@ mod tests {
},
],
}],
removed_frags: vec![],
};

let version2 = FragReuseVersion {
Expand All @@ -402,6 +454,7 @@ mod tests {
},
],
}],
removed_frags: vec![],
};

// Create FragReuseIndexDetails with versions in reverse order
Expand Down
152 changes: 150 additions & 2 deletions rust/lance/src/dataset/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ use lance_core::Error;
use lance_core::datatypes::{BlobHandling, BlobKind};
use lance_core::utils::tokio::get_num_compute_intensive_cpus;
use lance_core::utils::tracing::{DATASET_COMPACTING_EVENT, TRACE_DATASET_EVENTS};
use lance_index::frag_reuse::FragReuseGroup;
use lance_index::frag_reuse::{FRAG_REUSE_INDEX_NAME, FragReuseGroup};
use lance_table::format::{Fragment, RowIdMeta};
use roaring::{RoaringBitmap, RoaringTreemap};
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -2027,7 +2027,43 @@ pub async fn commit_compaction(
};

let frag_reuse_index = if options.defer_index_remap {
Some(build_new_frag_reuse_index(dataset, frag_reuse_groups, new_fragment_bitmap).await?)
// Compute orphaned fragments: those some index still covers but that are
// no longer present in the dataset and were not part of any rewrite group.
// A fragment fully deleted before compaction is removed from the manifest
// at delete time, so it never enters a compaction task (and thus is absent
// from every group's old_frags) yet may still be referenced by an index
// that has not been physically remapped. Without pruning it by fragment
// ID, those stale entries resurface as dangling references post-compaction.
//
// `dataset` here still holds the pre-commit fragments (the rewrite is not
// committed yet), so the fragments about to be compacted away are still
// present and correctly excluded from the orphaned set.
let current_frags: RoaringBitmap = dataset
.get_fragments()
.iter()
.map(|f| f.id() as u32)
.collect();
let mut orphaned = RoaringBitmap::new();
for index in dataset.load_indices().await?.iter() {
if index.name == FRAG_REUSE_INDEX_NAME {
continue;
}
if let Some(bitmap) = &index.fragment_bitmap {
orphaned |= bitmap;
}
}
orphaned -= &current_frags;
let removed_frags: Vec<u32> = orphaned.into_iter().collect();

Some(
build_new_frag_reuse_index(
dataset,
frag_reuse_groups,
removed_frags,
new_fragment_bitmap,
)
.await?,
)
} else {
None
};
Expand Down Expand Up @@ -3279,6 +3315,118 @@ mod tests {
assert_eq!(current_scalar_index.uuid, original_scalar_uuid);
}

// Regression test for https://github.com/lancedb/lance/issues/7374
// "Vector index can become corrupted when compaction is deferred"
// A fully-deleted fragment must be pruned from the vector index. With
// deferred compaction the inline remap is skipped, so the FRI must map every
// address of the fully-deleted fragment to None; otherwise the index later
// returns references to a fragment that was compacted away.
#[tokio::test]
async fn test_defer_index_remap_fully_deleted_fragment() {
let mut data_gen = BatchGenerator::new()
.col(Box::new(
RandomVector::new().vec_width(128).named("vec".to_owned()),
))
.col(Box::new(IncrementingInt32::new().named("i".to_owned())));

// 10 fragments x 1000 rows, i = 0..9999. Fragment 4 holds i in [4000,5000).
let mut dataset = Dataset::write(
data_gen.batch(10_000),
"memory://test/table",
Some(WriteParams {
max_rows_per_file: 1_000,
..Default::default()
}),
)
.await
.unwrap();
assert_eq!(dataset.get_fragments().len(), 10);

dataset
.create_index(
&["vec"],
IndexType::Vector,
Some("vector".into()),
&VectorIndexParams::ivf_pq(10, 8, 8, MetricType::L2, 50),
false,
)
.await
.unwrap();

// Fully delete fragment 4. This REMOVES fragment 4 from the manifest
// entirely (apply_deletions -> extend_deletions returns None for a
// fully-deleted fragment), so the compaction planner never sees it.
dataset.delete("i >= 4000 AND i < 5000").await.unwrap();

let metrics = compact_files(
&mut dataset,
CompactionOptions {
target_rows_per_fragment: 100_000,
materialize_deletions: true,
defer_index_remap: true,
..Default::default()
},
None,
)
.await
.unwrap();
assert!(metrics.fragments_removed > 0);
assert!(metrics.fragments_added > 0);

// --- FRI-level check: fragment 4 must be pruned from the index ---
let frag_reuse_index_meta = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
.expect("fragment reuse index must exist after deferred compaction");
let details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
let fri = open_frag_reuse_index(frag_reuse_index_meta.uuid, details.as_ref())
.await
.unwrap();

// The fragment-reuse index must map every address of the fully-deleted
// fragment 4 to None (pruned). Returning Some(addr) means the vector
// index keeps a dangling reference to a fragment that no longer exists.
let mut bad = Vec::new();
for offset in [0u32, 1, 100, 500, 999] {
let addr = u64::from(RowAddress::new_from_parts(4, offset));
if let Some(mapped) = fri.remap_row_id(addr) {
bad.push((offset, mapped));
}
}
assert!(
bad.is_empty(),
"fully-deleted fragment 4 addresses were NOT pruned by the FRI; \
remap_row_id returned Some for offsets {:?} (should be None)",
bad
);

// --- End-to-end: vector search must succeed and not return ghost rows ---
let batch = dataset
.scan()
.nearest("vec", &Float32Array::from(vec![0.0f32; 128]), 200)
.unwrap()
.project(&["i"])
.unwrap()
.try_into_batch()
.await
.expect("vector search after deferred compaction must succeed");
let i_col = batch.column(0).as_primitive::<Int32Type>();
let ghosts: Vec<i32> = i_col
.values()
.iter()
.copied()
.filter(|&v| (4000..5000).contains(&v))
.collect();
assert!(
ghosts.is_empty(),
"vector search returned ids from the fully-deleted fragment range [4000,5000): {:?}",
ghosts
);
}

#[tokio::test]
async fn test_defer_index_remap_multiple_compactions() {
let mut data_gen = BatchGenerator::new()
Expand Down
2 changes: 2 additions & 0 deletions rust/lance/src/index/frag_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,13 @@ pub(crate) async fn open_frag_reuse_index(
pub(crate) async fn build_new_frag_reuse_index(
dataset: &mut Dataset,
frag_reuse_groups: Vec<FragReuseGroup>,
removed_frags: Vec<u32>,
new_fragment_bitmap: RoaringBitmap,
) -> lance_core::Result<IndexMetadata> {
let new_version = FragReuseVersion {
dataset_version: dataset.manifest.version,
groups: frag_reuse_groups,
removed_frags,
};

let index_meta = dataset.load_indices().await.map(|indices| {
Expand Down
Loading