Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -6841,6 +6841,28 @@ def optimize_indices(self, **kwargs):
"""
self._dataset._ds.optimize_indices(**kwargs)

def cleanup_frag_reuse_index(self) -> None:
"""Prune obsolete generations from the ``__lance_frag_reuse`` system index.

The fragment-reuse index is created by :meth:`compact_files` when
``defer_index_remap=True``. It retains one generation per deferred
compaction so indices can be remapped lazily. Once every index has caught
up to a generation (for example after rebuilding it or calling
:meth:`optimize_indices`), that generation is obsolete and can be pruned.

This is safe to call at any time: a generation that an index has not yet
caught up to is always retained.

Examples
--------
>>> import lance
>>> import pyarrow as pa
>>> data = pa.table({"id": [1, 2, 3]})
>>> dataset = lance.write_dataset(data, "memory://frag_reuse_example")
>>> dataset.optimize.cleanup_frag_reuse_index()
"""
self._dataset._ds.cleanup_frag_reuse_index()

def enable_auto_cleanup(self, auto_cleanup_config: AutoCleanupConfig, **kwargs):
"""Enable autocleaning for an existing dataset.

Expand Down
1 change: 1 addition & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,7 @@ class _Dataset:
) -> Tuple[_Dataset, Transaction]: ...
def validate(self): ...
def migrate_manifest_paths_v2(self): ...
def cleanup_frag_reuse_index(self) -> None: ...
def drop_columns(self, columns: List[str]): ...
def add_columns_from_reader(
self, reader: pa.RecordBatchReader, batch_size: Optional[int] = None
Expand Down
56 changes: 56 additions & 0 deletions python/python/tests/test_optimize.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,62 @@ def test_defer_index_remap(tmp_path: Path):
assert any(idx.name == "__lance_frag_reuse" for idx in indices)


def test_cleanup_frag_reuse_index(tmp_path: Path):
"""cleanup_frag_reuse_index prunes only the reuse generations that every user
index has caught up to, and never the ones still in use.

Setup: 6 fragments, one BTREE scalar index. Compact with
defer_index_remap=True so the frag-reuse index is populated. While the
scalar index has not caught up, cleanup must retain the generation. After
rebuilding the scalar index (create_scalar_index with replace=True) it is
newer than the reuse generation, so cleanup may prune it and num_versions
drops to 0.
"""
base_dir = tmp_path / "dataset"
data = pa.table({"i": range(6_000), "val": range(6_000)})
dataset = lance.write_dataset(data, base_dir, max_rows_per_file=1_000)
dataset.create_scalar_index("i", "BTREE")

dataset.delete("i < 500")
dataset.optimize.compact_files(
target_rows_per_fragment=2_000, defer_index_remap=True, num_threads=1
)

dataset = lance.dataset(base_dir)
assert any(
idx.name == "__lance_frag_reuse" for idx in dataset.describe_indices()
), "precondition: defer_index_remap must have created the frag-reuse index"

before_stats = dataset.stats.index_stats("__lance_frag_reuse")
versions_before = before_stats["num_versions"]
assert versions_before >= 1, (
"precondition: frag-reuse index must have at least one version before cleanup"
)

# Negative case: index not yet remapped, so cleanup must retain the generation.
dataset.optimize.cleanup_frag_reuse_index()
dataset = lance.dataset(base_dir)
retained_stats = dataset.stats.index_stats("__lance_frag_reuse")
assert retained_stats["num_versions"] == versions_before, (
f"cleanup_frag_reuse_index must retain generations that an index has not "
f"caught up to, but num_versions went from {versions_before} to "
f"{retained_stats['num_versions']}"
)

# Positive case: rebuilding catches the index up, so cleanup can now prune all.
dataset.create_scalar_index("i", "BTREE", replace=True)
dataset = lance.dataset(base_dir)

dataset.optimize.cleanup_frag_reuse_index()

dataset = lance.dataset(base_dir)
after_stats = dataset.stats.index_stats("__lance_frag_reuse")
assert after_stats["num_versions"] == 0, (
f"cleanup_frag_reuse_index should have pruned all reuse generations "
f"but num_versions={after_stats['num_versions']}"
)


@pytest.mark.parametrize("use_commit_options", [True, False])
def test_defer_index_remap_via_commit_options(tmp_path: Path, use_commit_options: bool):
"""Compaction.commit respects defer_index_remap passed in options.
Expand Down
11 changes: 11 additions & 0 deletions python/src/dataset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2788,6 +2788,17 @@ impl Dataset {
Ok(())
}

fn cleanup_frag_reuse_index(&mut self) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
rt().block_on(
None,
lance::dataset::index::frag_reuse::cleanup_frag_reuse_index(&mut new_self),
)?
.infer_error()?;
self.ds = Arc::new(new_self);
Ok(())
}

fn drop_columns(&mut self, columns: Vec<String>) -> PyResult<()> {
let mut new_self = self.ds.as_ref().clone();
let columns: Vec<_> = columns.iter().map(|s| s.as_str()).collect();
Expand Down
111 changes: 95 additions & 16 deletions rust/lance/src/dataset/index/frag_reuse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,25 @@ pub async fn cleanup_frag_reuse_index(dataset: &mut Dataset) -> lance_core::Resu
return Ok(());
};

let frag_reuse_details = load_frag_reuse_index_details(dataset, frag_reuse_index_meta)
.await
.unwrap();
// Surface corrupt/stale metadata as a normal error instead of panicking on unwrap.
let frag_reuse_details = load_frag_reuse_index_details(dataset, frag_reuse_index_meta).await?;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup path still reads external fragment-reuse details with unchecked offset/size arithmetic. Malformed metadata can still panic or produce an invalid object-store range instead of returning a Python exception.


let mut retained_versions = Vec::new();
let mut fragment_bitmaps = RoaringBitmap::new();
for version in frag_reuse_details.versions.iter() {
let check_results = indices
.iter()
.map(|idx| is_index_remap_caught_up(version, idx))
.collect::<Vec<_>>();

if check_results
.iter()
.any(|r| matches!(r, Err(Error::InvalidInput { .. })))
{
// If the check fails, the reuse version is likely corrupted, do not retain it.
continue;
'versions: for version in frag_reuse_details.versions.iter() {
let mut all_caught_up = true;
for idx in indices.iter() {
match is_index_remap_caught_up(version, idx) {
Ok(true) => {}
Ok(false) => all_caught_up = false,
// An InvalidInput error means the reuse version is likely corrupted; drop it.
Err(Error::InvalidInput { .. }) => continue 'versions,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The cleanup path treats a partially indexed rewrite group as corrupt and drops the reuse generation, while the load path treats the same straddling state as recoverable. Calling the new Python cleanup API on an older dataset can remove the generation that keeps affected indexes safe.

// Any other error is unexpected; surface it instead of panicking.
Err(e) => return Err(e),
}
}

if !check_results.into_iter().all(|r| r.unwrap()) {
if !all_caught_up {
fragment_bitmaps.extend(version.new_frag_bitmap());
retained_versions.push(version.clone());
}
Expand Down Expand Up @@ -212,6 +210,22 @@ mod tests {
is_index_remap_caught_up(&frag_reuse_details.versions[0], scalar_index).unwrap()
);

// Cleanup must not prune a reuse version while an index has not caught up to it.
cleanup_frag_reuse_index(&mut dataset).await.unwrap();
let frag_reuse_index_meta = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
.expect("Fragment reuse index must be available");
let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta)
.await
.unwrap();
assert_eq!(
frag_reuse_details.versions.len(),
1,
"reuse version must be retained while an index is not caught up"
);

// Remap and check index is caught up
remapping::remap_column_index(&mut dataset, &["i"], index_name.clone())
.await
Expand Down Expand Up @@ -244,6 +258,71 @@ mod tests {
));
}

/// Corrupt or stale fragment-reuse metadata must surface as a normal error,
/// not a panic. Regression for the `load_frag_reuse_index_details().unwrap()`
/// that used to crash the (Python) caller on unloadable metadata.
#[tokio::test]
async fn test_cleanup_frag_reuse_index_corrupt_metadata_errors() {
let mut dataset = lance_datagen::gen_batch()
.col("i", lance_datagen::array::step::<Int32Type>())
.into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000))
.await
.unwrap();

dataset
.create_index(
&["i"],
IndexType::Scalar,
Some("scalar".into()),
&ScalarIndexParams::default(),
false,
)
.await
.unwrap();

compact_files(
&mut dataset,
CompactionOptions {
target_rows_per_fragment: 2_000,
defer_index_remap: true,
..Default::default()
},
None,
)
.await
.unwrap();

let frag_reuse_index_meta = dataset
.load_index_by_name(FRAG_REUSE_INDEX_NAME)
.await
.unwrap()
.expect("Fragment reuse index must be available");

// Replace the reuse index with one whose details cannot be loaded.
let corrupt_meta = IndexMetadata {
uuid: uuid::Uuid::new_v4(),
index_details: None,
..frag_reuse_index_meta.clone()
};
let transaction = Transaction::new(
dataset.manifest.version,
Operation::CreateIndex {
new_indices: vec![corrupt_meta],
removed_indices: vec![frag_reuse_index_meta],
},
None,
);
dataset
.apply_commit(transaction, &Default::default(), &Default::default())
.await
.unwrap();

assert!(
cleanup_frag_reuse_index(&mut dataset).await.is_err(),
"corrupt frag-reuse metadata must surface as an error, not a panic"
);
}

/// With more than one index on the table, remapping every index must catch
/// all of them up so the reuse index can be trimmed.
///
Expand Down
Loading