diff --git a/daft_lance/__init__.py b/daft_lance/__init__.py index cb07862..71d9f94 100644 --- a/daft_lance/__init__.py +++ b/daft_lance/__init__.py @@ -5,6 +5,7 @@ from ._blob import take_blobs from ._lance import ( + cleanup_old_versions, compact_files, create_scalar_index, merge_columns, @@ -13,6 +14,7 @@ ) __all__ = [ + "cleanup_old_versions", "compact_files", "create_scalar_index", "merge_columns", diff --git a/daft_lance/_lance.py b/daft_lance/_lance.py index 21b7c67..13d1439 100644 --- a/daft_lance/_lance.py +++ b/daft_lance/_lance.py @@ -2,6 +2,7 @@ import pathlib from collections.abc import Callable +from datetime import timedelta from typing import TYPE_CHECKING, Any import lance @@ -486,6 +487,59 @@ def create_scalar_index( ) +@PublicAPI +def cleanup_old_versions( + uri: str | pathlib.Path, + io_config: IOConfig | None = None, + *, + storage_options: dict[str, Any] | None = None, + older_than: timedelta | None = None, + retain_versions: int | None = None, + delete_unverified: bool = False, + error_if_tagged_old_versions: bool = True, + delete_rate_limit: int | None = None, + version: int | str | None = None, + asof: str | None = None, + block_size: int | None = None, + commit_lock: Any | None = None, + index_cache_size: int | None = None, + default_scan_options: dict[str, Any] | None = None, + metadata_cache_size_bytes: int | None = None, + base_store_params: dict[str, dict[str, str]] | None = None, +) -> Any: + """Clean up old Lance dataset versions and unreferenced files. + + This is a thin wrapper around :meth:`lance.LanceDataset.cleanup_old_versions`. + It removes versions selected by ``older_than`` / ``retain_versions`` and the + data files referenced only by those removed versions. + """ + io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config + storage_options = storage_options or io_config_to_storage_options( + io_config, str(uri) if isinstance(uri, pathlib.Path) else uri + ) + + lance_ds = construct_lance_dataset( + uri, + storage_options=storage_options, + version=version, + asof=asof, + block_size=block_size, + commit_lock=commit_lock, + index_cache_size=index_cache_size, + default_scan_options=default_scan_options, + metadata_cache_size_bytes=metadata_cache_size_bytes, + base_store_params=base_store_params, + ) + + return lance_ds.cleanup_old_versions( + older_than=older_than, + retain_versions=retain_versions, + delete_unverified=delete_unverified, + error_if_tagged_old_versions=error_if_tagged_old_versions, + delete_rate_limit=delete_rate_limit, + ) + + @PublicAPI def compact_files( uri: str | pathlib.Path, diff --git a/tests/io/lancedb/test_lancedb_cleanup.py b/tests/io/lancedb/test_lancedb_cleanup.py new file mode 100644 index 0000000..595a037 --- /dev/null +++ b/tests/io/lancedb/test_lancedb_cleanup.py @@ -0,0 +1,28 @@ +from __future__ import annotations + +from pathlib import Path + +import lance +import pyarrow as pa + +from daft_lance import cleanup_old_versions + + +def test_cleanup_old_versions_retains_latest_version(tmp_path: Path) -> None: + dataset_path = tmp_path / "cleanup_dataset" + lance.write_dataset(pa.table({"id": [1]}), dataset_path) + lance.write_dataset(pa.table({"id": [2]}), dataset_path, mode="overwrite") + + assert [version["version"] for version in lance.dataset(str(dataset_path)).versions()] == [1, 2] + + stats = cleanup_old_versions( + str(dataset_path), + retain_versions=1, + delete_unverified=True, + ) + + assert stats.old_versions == 1 + assert stats.data_files_removed == 1 + ds = lance.dataset(str(dataset_path)) + assert [version["version"] for version in ds.versions()] == [2] + assert ds.to_table().to_pydict() == {"id": [2]}