Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions daft_lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

from ._blob import take_blobs
from ._lance import (
cleanup_old_versions,
compact_files,
create_scalar_index,
merge_columns,
Expand All @@ -13,6 +14,7 @@
)

__all__ = [
"cleanup_old_versions",
"compact_files",
"create_scalar_index",
"merge_columns",
Expand Down
54 changes: 54 additions & 0 deletions daft_lance/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import pathlib
from collections.abc import Callable
from datetime import timedelta
from typing import TYPE_CHECKING, Any

import lance
Expand Down Expand Up @@ -486,6 +487,59 @@ def create_scalar_index(
)


@PublicAPI
def cleanup_old_versions(
uri: str | pathlib.Path,
io_config: IOConfig | None = None,
*,
storage_options: dict[str, Any] | None = None,
older_than: timedelta | None = None,
retain_versions: int | None = None,
delete_unverified: bool = False,
error_if_tagged_old_versions: bool = True,
delete_rate_limit: int | None = None,
version: int | str | None = None,
asof: str | None = None,
block_size: int | None = None,
commit_lock: Any | None = None,
index_cache_size: int | None = None,
default_scan_options: dict[str, Any] | None = None,
metadata_cache_size_bytes: int | None = None,
base_store_params: dict[str, dict[str, str]] | None = None,
) -> Any:
"""Clean up old Lance dataset versions and unreferenced files.

This is a thin wrapper around :meth:`lance.LanceDataset.cleanup_old_versions`.
It removes versions selected by ``older_than`` / ``retain_versions`` and the
data files referenced only by those removed versions.
"""
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_options = storage_options or io_config_to_storage_options(
io_config, str(uri) if isinstance(uri, pathlib.Path) else uri
)

lance_ds = construct_lance_dataset(
uri,
storage_options=storage_options,
version=version,
asof=asof,
block_size=block_size,
commit_lock=commit_lock,
index_cache_size=index_cache_size,
default_scan_options=default_scan_options,
metadata_cache_size_bytes=metadata_cache_size_bytes,
base_store_params=base_store_params,
)

return lance_ds.cleanup_old_versions(
older_than=older_than,
retain_versions=retain_versions,
delete_unverified=delete_unverified,
error_if_tagged_old_versions=error_if_tagged_old_versions,
delete_rate_limit=delete_rate_limit,
)


@PublicAPI
def compact_files(
uri: str | pathlib.Path,
Expand Down
28 changes: 28 additions & 0 deletions tests/io/lancedb/test_lancedb_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from __future__ import annotations

from pathlib import Path

import lance
import pyarrow as pa

from daft_lance import cleanup_old_versions


def test_cleanup_old_versions_retains_latest_version(tmp_path: Path) -> None:
dataset_path = tmp_path / "cleanup_dataset"
lance.write_dataset(pa.table({"id": [1]}), dataset_path)
lance.write_dataset(pa.table({"id": [2]}), dataset_path, mode="overwrite")

assert [version["version"] for version in lance.dataset(str(dataset_path)).versions()] == [1, 2]

stats = cleanup_old_versions(
str(dataset_path),
retain_versions=1,
delete_unverified=True,
)

assert stats.old_versions == 1
assert stats.data_files_removed == 1
ds = lance.dataset(str(dataset_path))
assert [version["version"] for version in ds.versions()] == [2]
assert ds.to_table().to_pydict() == {"id": [2]}
Loading