Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions daft_lance/_lance.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ def read_lance(
metadata_cache_size_bytes: int | None = None,
fragment_group_size: int | None = None,
include_fragment_id: bool | None = None,
scan_options: dict[str, Any] | None = None,
checkpoint: CheckpointConfig | None = None,
) -> DataFrame:
"""Create a DataFrame from a LanceDB table.
Expand Down Expand Up @@ -94,6 +95,22 @@ def read_lance(
include_fragment_id : Optional, bool
Whether to display fragment_id.
if you have the behavior of 'merge_columns_df' or 'write_lance(mode = 'merge')', the `include_fragment_id` must be set to True
scan_options : optional, dict
Additional keyword arguments forwarded to ``lance.LanceDataset.scanner()``
at scan time. Use this to tune scan performance, e.g.::

scan_options = {
"batch_size": 8192,
"batch_readahead": 16,
"fragment_readahead": 4,
"scan_in_order": False,
}

Supported keys include ``batch_size``, ``batch_readahead``,
``fragment_readahead``, ``scan_in_order``, ``late_materialization``,
``prefilter``, ``offset``, ``use_scalar_index``, and ``io_buffer_size``.
Unknown keys are forwarded as-is so that future Lance scanner parameters
work without a daft-lance upgrade.
checkpoint: Optional :class:`daft.CheckpointConfig` for progress tracking across runs. Bundles the
checkpoint store, the source key column (``on=``), and optional anti-join tuning. Rows whose key
already exists in the store are skipped on re-run. Requires the Ray runner.
Expand Down Expand Up @@ -122,6 +139,10 @@ def read_lance(
>>> io_config = IOConfig(s3=S3Config(region="us-west-2", anonymous=True))
>>> df = daft.read_lance("s3://daft-oss-public-data/lance/words-test-dataset", io_config=io_config)
>>> df.show()

Read a LanceDB table with scan performance tuning:
>>> df = daft.read_lance("/path/to/lance/data/", scan_options={"batch_size": 8192, "scan_in_order": False})
>>> df.show()
"""
uri_str = str(uri)
if uri_str.startswith("rest://"):
Expand Down Expand Up @@ -151,6 +172,7 @@ def read_lance(
ds,
fragment_group_size=fragment_group_size,
include_fragment_id=include_fragment_id,
scan_options=scan_options,
)

handle = ScanOperatorHandle.from_python_scan_operator(lance_operator)
Expand Down
17 changes: 13 additions & 4 deletions daft_lance/lance_scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def _lancedb_table_factory_function(
limit: int | None = None,
include_fragment_id: bool | None = False,
nearest: dict[str, Any] | None = None,
scan_options: dict[str, Any] | None = None,
) -> Iterator[PyRecordBatch]:
if fragment_ids is not None and nearest is not None:
raise ValueError(
Expand Down Expand Up @@ -59,13 +60,15 @@ def _iter_batches() -> Iterator[PyRecordBatch]:
if limit is not None:
fragment_limit = limit - rows_yielded

scanner = ds.scanner(
scanner_kwargs = dict(scan_options or {})
scanner_kwargs.setdefault("blob_handling", "blobs_descriptions")
scanner_kwargs.update(
fragments=[fragment],
columns=cols or None,
filter=filter,
limit=fragment_limit,
blob_handling="blobs_descriptions",
)
scanner = ds.scanner(**scanner_kwargs)

for rb in scanner.to_batches():
# If we have a limit, we may need to truncate this batch
Expand All @@ -89,13 +92,15 @@ def _iter_batches() -> Iterator[PyRecordBatch]:

# If fragment_ids is None, let Lance choose fragments via index; omit the fragments parameter.
if fragment_ids is None:
scanner = ds.scanner(
scanner_kwargs = dict(scan_options or {})
scanner_kwargs.setdefault("blob_handling", "blobs_descriptions")
scanner_kwargs.update(
columns=required_columns,
filter=filter,
limit=limit,
nearest=nearest,
blob_handling="blobs_descriptions",
)
scanner = ds.scanner(**scanner_kwargs)

def _batches() -> Iterator[PyRecordBatch]:
for rb in scanner.to_batches():
Expand Down Expand Up @@ -134,12 +139,14 @@ def __init__(
ds: lance.LanceDataset,
fragment_group_size: int | None = None,
include_fragment_id: bool | None = False,
scan_options: dict[str, Any] | None = None,
):
self._ds = ds
self._pushed_filters: list[PyExpr] | None = None
self._remaining_filters: list[PyExpr] | None = None
self._fragment_group_size = fragment_group_size
self._include_fragment_id = include_fragment_id
self._scan_options = scan_options
self._enable_strict_filter_pushdown = get_context().daft_planning_config.enable_strict_filter_pushdown
base = self._ds.schema
if self._include_fragment_id:
Expand Down Expand Up @@ -304,6 +311,7 @@ def _create_scan_tasks_with_limit_and_no_filters(
rows_to_scan,
self._include_fragment_id,
None,
self._scan_options,
),
schema=task_schema._schema,
num_rows=rows_to_scan,
Expand Down Expand Up @@ -339,6 +347,7 @@ def _python_factory_func_scan_task(
self._compute_limit_pushdown_with_filter(pushdowns),
self._include_fragment_id,
nearest_option,
self._scan_options,
),
schema=self.schema()._schema,
num_rows=num_rows,
Expand Down
50 changes: 50 additions & 0 deletions tests/io/lancedb/test_lancedb_reads.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,3 +415,53 @@ def test_lancedb_limit_with_filter_and_fragment_grouping_single_task(large_lance

result = df.to_pydict()
assert result == {"big_int": [999]}


def test_lancedb_read_with_scan_options(lance_dataset_path):
"""Test that scan_options are accepted and forwarded to the Lance scanner."""
import daft_lance

df = daft_lance.read_lance(
lance_dataset_path,
scan_options={
"batch_size": 512,
"scan_in_order": True,
},
)
assert df.to_pydict() == data


def test_lancedb_read_scan_options_with_filter_and_limit(large_lance_dataset_path):
"""Test scan_options work together with filters and limits."""
import daft_lance

df = daft_lance.read_lance(
large_lance_dataset_path,
scan_options={"batch_size": 256, "scan_in_order": False},
)
df = df.filter("big_int < 100").limit(50).select("big_int")

result = df.to_pydict()
assert len(result["big_int"]) == 50


def test_lancedb_read_scan_options_with_fragment_grouping(large_lance_dataset_path):
"""Test scan_options combined with fragment grouping."""
import daft_lance

df = daft_lance.read_lance(
large_lance_dataset_path,
fragment_group_size=5,
scan_options={"batch_size": 1024},
)
result = df.to_pydict()
assert len(result["vector"]) == 10000
assert len(result["big_int"]) == 10000


def test_lancedb_read_scan_options_none(lance_dataset_path):
"""Test that scan_options=None (default) behaves identically to no scan_options."""
import daft_lance

df = daft_lance.read_lance(lance_dataset_path, scan_options=None)
assert df.to_pydict() == data
Loading
Loading