Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions daft_lance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,22 @@
from ._lance import (
compact_files,
create_scalar_index,
delete,
merge_columns,
merge_columns_df,
merge_insert,
read_lance,
update,
)

__all__ = [
"compact_files",
"create_scalar_index",
"delete",
"merge_columns",
"merge_columns_df",
"merge_insert",
"read_lance",
"take_blobs",
"update",
]
258 changes: 257 additions & 1 deletion daft_lance/_lance.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from __future__ import annotations

import pathlib
from collections.abc import Callable
from collections.abc import Callable, Iterable
from datetime import timedelta
from typing import TYPE_CHECKING, Any

import lance
Expand All @@ -21,6 +22,7 @@
from .utils import construct_lance_dataset

if TYPE_CHECKING:
import pyarrow.compute
from lance.dataset import LanceDataset
from lance.udf import BatchUDF

Expand Down Expand Up @@ -563,3 +565,257 @@ def compact_files(
partition_num=partition_num,
concurrency=concurrency,
)


@PublicAPI
def delete(
uri: str | pathlib.Path,
predicate: str | pyarrow.compute.Expression,
io_config: IOConfig | None = None,
*,
conflict_retries: int = 10,
retry_timeout: timedelta | None = None,
storage_options: dict[str, Any] | None = None,
version: int | str | None = None,
asof: str | None = None,
block_size: int | None = None,
commit_lock: Any | None = None,
index_cache_size: int | None = None,
default_scan_options: dict[str, Any] | None = None,
metadata_cache_size_bytes: int | None = None,
) -> Any:
"""Delete rows matching a predicate from a Lance dataset.

Args:
uri: The URI of the Lance table.
predicate: SQL-like filter expression (e.g. ``"id > 10"``) or a
PyArrow compute expression.
io_config: A custom IOConfig. Defaults to None.
conflict_retries: Number of times to retry on commit conflicts.
retry_timeout: Maximum time to spend retrying. Defaults to 30 seconds.
storage_options: Extra options for storage connection.
version: If specified, load a specific version of the dataset.
asof: If specified, find the latest version created on or earlier.
block_size: Block size in bytes hint for I/O.
commit_lock: A custom commit lock.
index_cache_size: Index cache size.
default_scan_options: Default scan options.
metadata_cache_size_bytes: Size of the metadata cache in bytes.

Returns:
DeleteResult with deletion statistics.

Examples:
Delete rows where score is below a threshold:
>>> daft_lance.delete("/path/to/lance/", "score < 0.5")

Delete rows using a PyArrow expression:
>>> import pyarrow.compute as pc
>>> daft_lance.delete("/path/to/lance/", pc.field("status") == "inactive")
"""
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_options = storage_options or io_config_to_storage_options(io_config, str(uri))

lance_ds = construct_lance_dataset(
uri,
storage_options=storage_options,
version=version,
asof=asof,
block_size=block_size,
commit_lock=commit_lock,
index_cache_size=index_cache_size,
default_scan_options=default_scan_options,
metadata_cache_size_bytes=metadata_cache_size_bytes,
)

kwargs: dict[str, Any] = {"conflict_retries": conflict_retries}
if retry_timeout is not None:
kwargs["retry_timeout"] = retry_timeout

return lance_ds.delete(predicate, **kwargs)


@PublicAPI
def update(
uri: str | pathlib.Path,
updates: dict[str, str],
io_config: IOConfig | None = None,
*,
where: str | None = None,
conflict_retries: int = 10,
retry_timeout: timedelta | None = None,
storage_options: dict[str, Any] | None = None,
version: int | str | None = None,
asof: str | None = None,
block_size: int | None = None,
commit_lock: Any | None = None,
index_cache_size: int | None = None,
default_scan_options: dict[str, Any] | None = None,
metadata_cache_size_bytes: int | None = None,
) -> Any:
"""Update row values in a Lance dataset using SQL-like expressions.

Args:
uri: The URI of the Lance table.
updates: A mapping of column names to SQL expressions
(e.g. ``{"price": "price * 1.1", "updated_at": "'2026-01-01'"}``)
io_config: A custom IOConfig. Defaults to None.
where: Optional SQL predicate to filter which rows are updated.
conflict_retries: Number of times to retry on commit conflicts.
retry_timeout: Maximum time to spend retrying. Defaults to 30 seconds.
storage_options: Extra options for storage connection.
version: If specified, load a specific version of the dataset.
asof: If specified, find the latest version created on or earlier.
block_size: Block size in bytes hint for I/O.
commit_lock: A custom commit lock.
index_cache_size: Index cache size.
default_scan_options: Default scan options.
metadata_cache_size_bytes: Size of the metadata cache in bytes.

Returns:
UpdateResult with update statistics.

Examples:
Increase all prices by 10%:
>>> daft_lance.update("/path/to/lance/", {"price": "price * 1.1"})

Update only rows matching a condition:
>>> daft_lance.update(
... "/path/to/lance/",
... {"status": "'archived'"},
... where="last_modified < '2025-01-01'",
... )
"""
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_options = storage_options or io_config_to_storage_options(io_config, str(uri))

lance_ds = construct_lance_dataset(
uri,
storage_options=storage_options,
version=version,
asof=asof,
block_size=block_size,
commit_lock=commit_lock,
index_cache_size=index_cache_size,
default_scan_options=default_scan_options,
metadata_cache_size_bytes=metadata_cache_size_bytes,
)

kwargs: dict[str, Any] = {"conflict_retries": conflict_retries}
if where is not None:
kwargs["where"] = where
if retry_timeout is not None:
kwargs["retry_timeout"] = retry_timeout

return lance_ds.update(updates, **kwargs)


@PublicAPI
def merge_insert(
df: DataFrame,
uri: str | pathlib.Path,
on: str | Iterable[str],
io_config: IOConfig | None = None,
*,
when_matched_update_all: bool = False,
when_matched_update_all_condition: str | None = None,
when_matched_delete: bool = False,
when_not_matched_insert_all: bool = False,
when_not_matched_by_source_delete: bool | str = False,
conflict_retries: int = 10,
retry_timeout: timedelta | None = None,
storage_options: dict[str, Any] | None = None,
version: int | str | None = None,
asof: str | None = None,
block_size: int | None = None,
commit_lock: Any | None = None,
index_cache_size: int | None = None,
default_scan_options: dict[str, Any] | None = None,
metadata_cache_size_bytes: int | None = None,
) -> Any:
"""UPSERT rows from a Daft DataFrame into a Lance dataset.

Builds and executes a ``MergeInsertBuilder`` from the Lance Python API.
The DataFrame is collected to Arrow and passed to
``MergeInsertBuilder.execute()``.

Args:
df: Daft DataFrame containing the rows to upsert.
uri: The URI of the Lance table.
on: Column name(s) used as the join key for matching rows.
io_config: A custom IOConfig. Defaults to None.
when_matched_update_all: If True, update all columns when a row matches.
when_matched_update_all_condition: Optional SQL condition for updates
(only applies when ``when_matched_update_all=True``).
when_matched_delete: If True, delete matched rows instead of updating.
when_not_matched_insert_all: If True, insert new rows that don't match.
when_not_matched_by_source_delete: If True, delete target rows with no
match in the source. If a string, treated as a SQL condition.
conflict_retries: Number of times to retry on commit conflicts.
retry_timeout: Maximum time to spend retrying. Defaults to 30 seconds.
storage_options: Extra options for storage connection.
version: If specified, load a specific version of the dataset.
asof: If specified, find the latest version created on or earlier.
block_size: Block size in bytes hint for I/O.
commit_lock: A custom commit lock.
index_cache_size: Index cache size.
default_scan_options: Default scan options.
metadata_cache_size_bytes: Size of the metadata cache in bytes.

Returns:
Result from ``MergeInsertBuilder.execute()``.

Examples:
Upsert rows (update matched, insert new):
>>> import daft
>>> new_data = daft.from_pydict({"id": [1, 2, 3], "value": [10, 20, 30]})
>>> daft_lance.merge_insert(
... new_data,
... "/path/to/lance/",
... on="id",
... when_matched_update_all=True,
... when_not_matched_insert_all=True,
... )

Delete matched rows (deduplication):
>>> daft_lance.merge_insert(
... duplicates_df,
... "/path/to/lance/",
... on="id",
... when_matched_delete=True,
... )
"""
io_config = context.get_context().daft_planning_config.default_io_config if io_config is None else io_config
storage_options = storage_options or io_config_to_storage_options(io_config, str(uri))

lance_ds = construct_lance_dataset(
uri,
storage_options=storage_options,
version=version,
asof=asof,
block_size=block_size,
commit_lock=commit_lock,
index_cache_size=index_cache_size,
default_scan_options=default_scan_options,
metadata_cache_size_bytes=metadata_cache_size_bytes,
)

builder = lance_ds.merge_insert(on)

if when_matched_update_all:
builder.when_matched_update_all(condition=when_matched_update_all_condition)
if when_matched_delete:
builder.when_matched_delete()
if when_not_matched_insert_all:
builder.when_not_matched_insert_all()
if when_not_matched_by_source_delete is True:
builder.when_not_matched_by_source_delete()
elif isinstance(when_not_matched_by_source_delete, str):
builder.when_not_matched_by_source_delete(when_not_matched_by_source_delete)

builder.conflict_retries(conflict_retries)
if retry_timeout is not None:
builder.retry_timeout(retry_timeout)

arrow_table = df.to_arrow()
return builder.execute(arrow_table)
Loading
Loading