From d90747b9349ee2a7aa48b8055086e41ee876ea6b Mon Sep 17 00:00:00 2001 From: gstamatakis95 <126914070+gstamatakis95@users.noreply.github.com> Date: Thu, 11 Jun 2026 00:28:01 +0200 Subject: [PATCH 1/3] fix(python): added cleanup_frag_reuse_index --- python/python/lance/dataset.py | 4 +++ python/python/lance/lance/__init__.pyi | 1 + python/python/tests/test_optimize.py | 43 ++++++++++++++++++++++++++ python/src/dataset.rs | 11 +++++++ 4 files changed, 59 insertions(+) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index 831e194f9f3..3f6b553bf5f 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4622,6 +4622,10 @@ def migrate_manifest_paths_v2(self): """ self._ds.migrate_manifest_paths_v2() + def cleanup_frag_reuse_index(self) -> None: + """Prune obsolete generations from the ``__lance_frag_reuse`` system index.""" + self._ds.cleanup_frag_reuse_index() + def delete_config_keys(self, keys: list[str]) -> None: """Delete specified configuration keys from the dataset. diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 38d82738063..9af7167940e 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -446,6 +446,7 @@ class _Dataset: ) -> Tuple[_Dataset, Transaction]: ... def validate(self): ... def migrate_manifest_paths_v2(self): ... + def cleanup_frag_reuse_index(self) -> None: ... def drop_columns(self, columns: List[str]): ... def add_columns_from_reader( self, reader: pa.RecordBatchReader, batch_size: Optional[int] = None diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index ccd889db116..ec3fbe53431 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -324,6 +324,49 @@ def test_defer_index_remap(tmp_path: Path): assert any(idx.name == "__lance_frag_reuse" for idx in indices) +def test_cleanup_frag_reuse_index(tmp_path: Path): + """cleanup_frag_reuse_index prunes all reuse generations that every user + index has caught up to. + + Setup: 6 fragments, one BTREE scalar index. Compact with + defer_index_remap=True so the frag-reuse index is populated. Rebuild the + scalar index (create_scalar_index with replace=True) so it is newer than the + reuse generation, meaning the generation can be pruned. Then call + cleanup_frag_reuse_index and verify that num_versions drops to 0. + """ + base_dir = tmp_path / "dataset" + data = pa.table({"i": range(6_000), "val": range(6_000)}) + dataset = lance.write_dataset(data, base_dir, max_rows_per_file=1_000) + dataset.create_scalar_index("i", "BTREE") + + dataset.delete("i < 500") + dataset.optimize.compact_files( + target_rows_per_fragment=2_000, defer_index_remap=True, num_threads=1 + ) + + dataset = lance.dataset(base_dir) + assert any( + idx.name == "__lance_frag_reuse" for idx in dataset.describe_indices() + ), "precondition: defer_index_remap must have created the frag-reuse index" + + before_stats = dataset.stats.index_stats("__lance_frag_reuse") + assert before_stats["num_versions"] >= 1, ( + "precondition: frag-reuse index must have at least one version before cleanup" + ) + + dataset.create_scalar_index("i", "BTREE", replace=True) + dataset = lance.dataset(base_dir) + + dataset.cleanup_frag_reuse_index() + + dataset = lance.dataset(base_dir) + after_stats = dataset.stats.index_stats("__lance_frag_reuse") + assert after_stats["num_versions"] == 0, ( + f"cleanup_frag_reuse_index should have pruned all reuse generations " + f"but num_versions={after_stats['num_versions']}" + ) + + @pytest.mark.filterwarnings("ignore::DeprecationWarning") def test_describe_indices_matches_list_indices_for_frag_reuse(tmp_path: Path): """describe_indices() and list_indices() must agree on the index_type diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8bfa81aeae4..e8e864d300a 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2788,6 +2788,17 @@ impl Dataset { Ok(()) } + fn cleanup_frag_reuse_index(&mut self) -> PyResult<()> { + let mut new_self = self.ds.as_ref().clone(); + rt().block_on( + None, + lance::dataset::index::frag_reuse::cleanup_frag_reuse_index(&mut new_self), + )? + .map_err(|err: lance::Error| PyIOError::new_err(err.to_string()))?; + self.ds = Arc::new(new_self); + Ok(()) + } + fn drop_columns(&mut self, columns: Vec) -> PyResult<()> { let mut new_self = self.ds.as_ref().clone(); let columns: Vec<_> = columns.iter().map(|s| s.as_str()).collect(); From 8e65b016d45a3d9ec23ef37db63c9dfabb66099b Mon Sep 17 00:00:00 2001 From: gstamatakis95 <126914070+gstamatakis95@users.noreply.github.com> Date: Sat, 20 Jun 2026 14:15:38 +0200 Subject: [PATCH 2/3] fix(frag-reuse): surface cleanup metadata load failure instead of panicking --- python/python/tests/test_optimize.py | 27 ++++++++++++++++------ rust/lance/src/dataset/index/frag_reuse.rs | 21 ++++++++++++++--- 2 files changed, 38 insertions(+), 10 deletions(-) diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index ec3fbe53431..18534c5837d 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -325,14 +325,15 @@ def test_defer_index_remap(tmp_path: Path): def test_cleanup_frag_reuse_index(tmp_path: Path): - """cleanup_frag_reuse_index prunes all reuse generations that every user - index has caught up to. + """cleanup_frag_reuse_index prunes only the reuse generations that every user + index has caught up to, and never the ones still in use. Setup: 6 fragments, one BTREE scalar index. Compact with - defer_index_remap=True so the frag-reuse index is populated. Rebuild the - scalar index (create_scalar_index with replace=True) so it is newer than the - reuse generation, meaning the generation can be pruned. Then call - cleanup_frag_reuse_index and verify that num_versions drops to 0. + defer_index_remap=True so the frag-reuse index is populated. While the + scalar index has not caught up, cleanup must retain the generation. After + rebuilding the scalar index (create_scalar_index with replace=True) it is + newer than the reuse generation, so cleanup may prune it and num_versions + drops to 0. """ base_dir = tmp_path / "dataset" data = pa.table({"i": range(6_000), "val": range(6_000)}) @@ -350,10 +351,22 @@ def test_cleanup_frag_reuse_index(tmp_path: Path): ), "precondition: defer_index_remap must have created the frag-reuse index" before_stats = dataset.stats.index_stats("__lance_frag_reuse") - assert before_stats["num_versions"] >= 1, ( + versions_before = before_stats["num_versions"] + assert versions_before >= 1, ( "precondition: frag-reuse index must have at least one version before cleanup" ) + # Negative case: index not yet remapped, so cleanup must retain the generation. + dataset.cleanup_frag_reuse_index() + dataset = lance.dataset(base_dir) + retained_stats = dataset.stats.index_stats("__lance_frag_reuse") + assert retained_stats["num_versions"] == versions_before, ( + f"cleanup_frag_reuse_index must retain generations that an index has not " + f"caught up to, but num_versions went from {versions_before} to " + f"{retained_stats['num_versions']}" + ) + + # Positive case: rebuilding catches the index up, so cleanup can now prune all. dataset.create_scalar_index("i", "BTREE", replace=True) dataset = lance.dataset(base_dir) diff --git a/rust/lance/src/dataset/index/frag_reuse.rs b/rust/lance/src/dataset/index/frag_reuse.rs index 4fbefcd4725..969087177bd 100644 --- a/rust/lance/src/dataset/index/frag_reuse.rs +++ b/rust/lance/src/dataset/index/frag_reuse.rs @@ -38,9 +38,8 @@ pub async fn cleanup_frag_reuse_index(dataset: &mut Dataset) -> lance_core::Resu return Ok(()); }; - let frag_reuse_details = load_frag_reuse_index_details(dataset, frag_reuse_index_meta) - .await - .unwrap(); + // Surface corrupt/stale metadata as a normal error instead of panicking on unwrap. + let frag_reuse_details = load_frag_reuse_index_details(dataset, frag_reuse_index_meta).await?; let mut retained_versions = Vec::new(); let mut fragment_bitmaps = RoaringBitmap::new(); @@ -212,6 +211,22 @@ mod tests { is_index_remap_caught_up(&frag_reuse_details.versions[0], scalar_index).unwrap() ); + // Cleanup must not prune a reuse version while an index has not caught up to it. + cleanup_frag_reuse_index(&mut dataset).await.unwrap(); + let frag_reuse_index_meta = dataset + .load_index_by_name(FRAG_REUSE_INDEX_NAME) + .await + .unwrap() + .expect("Fragment reuse index must be available"); + let frag_reuse_details = load_frag_reuse_index_details(&dataset, &frag_reuse_index_meta) + .await + .unwrap(); + assert_eq!( + frag_reuse_details.versions.len(), + 1, + "reuse version must be retained while an index is not caught up" + ); + // Remap and check index is caught up remapping::remap_column_index(&mut dataset, &["i"], index_name.clone()) .await From a26d17c2900051f334b193956e4ba096f371462a Mon Sep 17 00:00:00 2001 From: gstamatakis95 <126914070+gstamatakis95@users.noreply.github.com> Date: Sat, 20 Jun 2026 15:24:34 +0200 Subject: [PATCH 3/3] fix(frag-reuse): address review feedback on cleanup_frag_reuse_index --- python/python/lance/dataset.py | 26 ++++++- python/python/tests/test_optimize.py | 4 +- python/src/dataset.rs | 2 +- rust/lance/src/dataset/index/frag_reuse.rs | 90 ++++++++++++++++++---- 4 files changed, 102 insertions(+), 20 deletions(-) diff --git a/python/python/lance/dataset.py b/python/python/lance/dataset.py index ec9406b9fe4..5bcf3edc037 100644 --- a/python/python/lance/dataset.py +++ b/python/python/lance/dataset.py @@ -4660,10 +4660,6 @@ def migrate_manifest_paths_v2(self): """ self._ds.migrate_manifest_paths_v2() - def cleanup_frag_reuse_index(self) -> None: - """Prune obsolete generations from the ``__lance_frag_reuse`` system index.""" - self._ds.cleanup_frag_reuse_index() - def delete_config_keys(self, keys: list[str]) -> None: """Delete specified configuration keys from the dataset. @@ -6845,6 +6841,28 @@ def optimize_indices(self, **kwargs): """ self._dataset._ds.optimize_indices(**kwargs) + def cleanup_frag_reuse_index(self) -> None: + """Prune obsolete generations from the ``__lance_frag_reuse`` system index. + + The fragment-reuse index is created by :meth:`compact_files` when + ``defer_index_remap=True``. It retains one generation per deferred + compaction so indices can be remapped lazily. Once every index has caught + up to a generation (for example after rebuilding it or calling + :meth:`optimize_indices`), that generation is obsolete and can be pruned. + + This is safe to call at any time: a generation that an index has not yet + caught up to is always retained. + + Examples + -------- + >>> import lance + >>> import pyarrow as pa + >>> data = pa.table({"id": [1, 2, 3]}) + >>> dataset = lance.write_dataset(data, "memory://frag_reuse_example") + >>> dataset.optimize.cleanup_frag_reuse_index() + """ + self._dataset._ds.cleanup_frag_reuse_index() + def enable_auto_cleanup(self, auto_cleanup_config: AutoCleanupConfig, **kwargs): """Enable autocleaning for an existing dataset. diff --git a/python/python/tests/test_optimize.py b/python/python/tests/test_optimize.py index ad4f6b6e34a..0bb7ecda587 100644 --- a/python/python/tests/test_optimize.py +++ b/python/python/tests/test_optimize.py @@ -357,7 +357,7 @@ def test_cleanup_frag_reuse_index(tmp_path: Path): ) # Negative case: index not yet remapped, so cleanup must retain the generation. - dataset.cleanup_frag_reuse_index() + dataset.optimize.cleanup_frag_reuse_index() dataset = lance.dataset(base_dir) retained_stats = dataset.stats.index_stats("__lance_frag_reuse") assert retained_stats["num_versions"] == versions_before, ( @@ -370,7 +370,7 @@ def test_cleanup_frag_reuse_index(tmp_path: Path): dataset.create_scalar_index("i", "BTREE", replace=True) dataset = lance.dataset(base_dir) - dataset.cleanup_frag_reuse_index() + dataset.optimize.cleanup_frag_reuse_index() dataset = lance.dataset(base_dir) after_stats = dataset.stats.index_stats("__lance_frag_reuse") diff --git a/python/src/dataset.rs b/python/src/dataset.rs index a917719a2f6..c44bb19e2a9 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -2794,7 +2794,7 @@ impl Dataset { None, lance::dataset::index::frag_reuse::cleanup_frag_reuse_index(&mut new_self), )? - .map_err(|err: lance::Error| PyIOError::new_err(err.to_string()))?; + .infer_error()?; self.ds = Arc::new(new_self); Ok(()) } diff --git a/rust/lance/src/dataset/index/frag_reuse.rs b/rust/lance/src/dataset/index/frag_reuse.rs index 191c9bb3b2d..df5fe568e41 100644 --- a/rust/lance/src/dataset/index/frag_reuse.rs +++ b/rust/lance/src/dataset/index/frag_reuse.rs @@ -43,21 +43,20 @@ pub async fn cleanup_frag_reuse_index(dataset: &mut Dataset) -> lance_core::Resu let mut retained_versions = Vec::new(); let mut fragment_bitmaps = RoaringBitmap::new(); - for version in frag_reuse_details.versions.iter() { - let check_results = indices - .iter() - .map(|idx| is_index_remap_caught_up(version, idx)) - .collect::>(); - - if check_results - .iter() - .any(|r| matches!(r, Err(Error::InvalidInput { .. }))) - { - // If the check fails, the reuse version is likely corrupted, do not retain it. - continue; + 'versions: for version in frag_reuse_details.versions.iter() { + let mut all_caught_up = true; + for idx in indices.iter() { + match is_index_remap_caught_up(version, idx) { + Ok(true) => {} + Ok(false) => all_caught_up = false, + // An InvalidInput error means the reuse version is likely corrupted; drop it. + Err(Error::InvalidInput { .. }) => continue 'versions, + // Any other error is unexpected; surface it instead of panicking. + Err(e) => return Err(e), + } } - if !check_results.into_iter().all(|r| r.unwrap()) { + if !all_caught_up { fragment_bitmaps.extend(version.new_frag_bitmap()); retained_versions.push(version.clone()); } @@ -259,6 +258,71 @@ mod tests { )); } + /// Corrupt or stale fragment-reuse metadata must surface as a normal error, + /// not a panic. Regression for the `load_frag_reuse_index_details().unwrap()` + /// that used to crash the (Python) caller on unloadable metadata. + #[tokio::test] + async fn test_cleanup_frag_reuse_index_corrupt_metadata_errors() { + let mut dataset = lance_datagen::gen_batch() + .col("i", lance_datagen::array::step::()) + .into_ram_dataset(FragmentCount::from(6), FragmentRowCount::from(1000)) + .await + .unwrap(); + + dataset + .create_index( + &["i"], + IndexType::Scalar, + Some("scalar".into()), + &ScalarIndexParams::default(), + false, + ) + .await + .unwrap(); + + compact_files( + &mut dataset, + CompactionOptions { + target_rows_per_fragment: 2_000, + defer_index_remap: true, + ..Default::default() + }, + None, + ) + .await + .unwrap(); + + let frag_reuse_index_meta = dataset + .load_index_by_name(FRAG_REUSE_INDEX_NAME) + .await + .unwrap() + .expect("Fragment reuse index must be available"); + + // Replace the reuse index with one whose details cannot be loaded. + let corrupt_meta = IndexMetadata { + uuid: uuid::Uuid::new_v4(), + index_details: None, + ..frag_reuse_index_meta.clone() + }; + let transaction = Transaction::new( + dataset.manifest.version, + Operation::CreateIndex { + new_indices: vec![corrupt_meta], + removed_indices: vec![frag_reuse_index_meta], + }, + None, + ); + dataset + .apply_commit(transaction, &Default::default(), &Default::default()) + .await + .unwrap(); + + assert!( + cleanup_frag_reuse_index(&mut dataset).await.is_err(), + "corrupt frag-reuse metadata must surface as an error, not a panic" + ); + } + /// With more than one index on the table, remapping every index must catch /// all of them up so the reuse index can be trimmed. ///