Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 89 additions & 27 deletions daft_lance/lance_scalar_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

logger = logging.getLogger(__name__)

MERGED_SEGMENTED_INDEX_TYPES = {"INVERTED"}


class FragmentIndexHandler:
"""Handler for distributed scalar index creation on fragment batches."""
Expand Down Expand Up @@ -64,10 +66,11 @@ class SegmentedFragmentIndexHandler:

Unlike ``FragmentIndexHandler``, which writes partial index files sharing
a single UUID, this handler builds a fully independent index segment per
worker via the low-level ``_ds.create_index`` binding. The returned
``lance.Index`` metadata (including ``index_details``) is serialised
(pickled) so it can cross Daft process/serialisation boundaries. The
coordinator then commits all segments atomically with
worker via Lance's public ``create_index_uncommitted`` API when available,
with a compatibility fallback for older Lance releases that expose the
method but only support vector columns. The returned ``lance.Index``
metadata is serialised (pickled) so it can cross Daft process/serialisation
boundaries. The coordinator then commits all segments atomically with
``commit_existing_index_segments``.
"""

Expand Down Expand Up @@ -96,21 +99,68 @@ def __call__(self, fragment_ids: list[int]) -> bytes:
self.index_type,
)

# _ds.create_index returns a lance.Index dataclass when fragment_ids
# is provided (uncommitted segment mode).
index_meta: lance.Index = self.lance_ds._ds.create_index( # type: ignore[call-arg]
[self.column],
self.index_type,
index_meta = _create_index_segment(
lance_ds=self.lance_ds,
column=self.column,
index_type=self.index_type,
name=self.name,
replace=self.replace,
train=True,
storage_options=None,
kwargs={"fragment_ids": fragment_ids, **self.kwargs},
fragment_ids=fragment_ids,
**self.kwargs,
)

return pickle.dumps(index_meta)


def _create_index_segment(
lance_ds: lance.LanceDataset,
*,
column: str,
index_type: str,
name: str,
replace: bool,
fragment_ids: list[int],
**kwargs: Any,
) -> lance.Index:
"""Create one uncommitted index segment.

Prefer Lance's public ``create_index_uncommitted`` API. ``pylance 7.0.0``
exposes that method but only accepts vector columns, so scalar indexes need
the existing low-level fallback until the scalar public API is available in
released wheels.
"""
try:
return lance_ds.create_index_uncommitted(
column=column,
index_type=index_type,
name=name,
replace=replace,
train=True,
fragment_ids=fragment_ids,
**kwargs,
)
except TypeError as exc:
message = str(exc)
if "Vector column" not in message or "FixedSizeListArray" not in message:
raise

logger.info(
"Falling back to Lance's low-level uncommitted index segment API for %s; public scalar segment API is unavailable in this pylance version.",
index_type,
)

raw_dataset = cast(Any, lance_ds._ds)
return cast(lance.Index, raw_dataset.create_index(
[column],
index_type,
name=name,
replace=replace,
train=True,
storage_options=None,
kwargs={"fragment_ids": fragment_ids, **kwargs},
))


def create_scalar_index_internal(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
Expand All @@ -134,11 +184,10 @@ def create_scalar_index_internal(
inverted full-text index type.

When ``segmented=True`` and ``index_type`` is ``BTREE``, a cleaner segmented workflow
is used instead: each worker builds a fully independent index segment via the low-level
``_ds.create_index`` binding (which returns ``lance.Index`` metadata including
``index_details``), and the coordinator commits them atomically with
``commit_existing_index_segments``. This resolves a known issue where ``index_details``
was left empty in the legacy path, preventing ``describe_indices()`` from working.
is used instead: each worker builds a fully independent index segment, and the
coordinator commits them atomically with ``commit_existing_index_segments``. This
resolves a known issue where ``index_details`` was left empty in the legacy path,
preventing ``describe_indices()`` from working.
"""
if not column:
raise ValueError("Column name cannot be empty")
Expand Down Expand Up @@ -240,7 +289,7 @@ def create_scalar_index_internal(
# Choose between the segmented workflow and the legacy partitioned-and-merged
# workflow. Segmented mode produces proper ``index_details`` so
# ``describe_indices()`` works correctly.
if segmented and index_type == "BTREE":
if segmented and index_type in ("BTREE", "INVERTED"):
_create_segmented_index(
lance_ds=lance_ds,
uri=uri,
Expand Down Expand Up @@ -289,11 +338,10 @@ def _create_segmented_index(
) -> None:
"""Segmented index workflow: each worker builds an independent segment.

Workers call the low-level ``_ds.create_index`` binding (which returns
``lance.Index`` metadata with ``index_details`` populated), pickle the
result so it can traverse Daft serialisation boundaries, and return it.
The coordinator unpickles all segments and commits them atomically via
``commit_existing_index_segments``.
Workers call Lance's uncommitted index segment API, pickle the returned
``lance.Index`` metadata so it can traverse Daft serialisation boundaries,
and return it. The coordinator unpickles all segments and commits them
atomically via ``commit_existing_index_segments``.
"""
handler_cls = daft.cls(
SegmentedFragmentIndexHandler,
Expand Down Expand Up @@ -322,20 +370,34 @@ def _create_segmented_index(
pickle.loads(raw) for raw in collected.to_pydict()["index_meta"]
]

# Reload dataset to pick up the latest version (segment files were written
# by workers against the version that was current at their invocation time).
lance_ds = lance.LanceDataset(uri, storage_options=storage_options)
index_metas = _prepare_index_segments_for_commit(lance_ds, index_type, index_metas)

logger.info(
"Collected %d index segments; committing as segmented index %s",
len(index_metas),
name,
)

# Reload dataset to pick up the latest version (segment files were written
# by workers against the version that was current at their invocation time).
lance_ds = lance.LanceDataset(uri, storage_options=storage_options)
lance_ds.commit_existing_index_segments(name, column, index_metas)

logger.info("Segmented index %s committed successfully", name)


def _prepare_index_segments_for_commit(
lance_ds: lance.LanceDataset,
index_type: str,
index_metas: list[lance.Index | lance.indices.IndexSegment],
) -> list[lance.Index | lance.indices.IndexSegment]:
"""Prepare worker-built segments for the final manifest commit."""
if index_type not in MERGED_SEGMENTED_INDEX_TYPES or len(index_metas) <= 1:
return index_metas

merged = lance_ds.merge_existing_index_segments([cast(lance.Index, segment) for segment in index_metas])
return [merged]


def _create_partitioned_index(
lance_ds: lance.LanceDataset,
uri: str | pathlib.Path,
Expand Down
8 changes: 6 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@
name = "daft-lance"
version = "0.4.0"
description = "Lance integration for Daft - compaction, indexing, merge columns, and REST operations"
requires-python = ">=3.10"
requires-python = ">=3.10,<3.14"
license = "Apache-2.0"
readme = "README.md"
dependencies = [
"lance-namespace>=0.6.0",
"lance-namespace-urllib3-client>=0.6.0",
"pylance>=7.0.0"
"pylance>=8.0.0b11"
]

[dependency-groups]
Expand All @@ -34,6 +34,10 @@ ignore_missing_imports = true
warn_return_any = true
warn_unused_configs = true

[tool.uv]
extra-index-url = ["https://pypi.fury.io/lance-format"]
index-strategy = "unsafe-best-match"

[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
125 changes: 119 additions & 6 deletions tests/io/lancedb/test_lancedb_scalar_index.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from __future__ import annotations

import pickle
import tempfile
from pathlib import Path

Expand All @@ -9,6 +10,7 @@
import daft
from daft.dependencies import pd
from daft_lance import create_scalar_index
from daft_lance.lance_scalar_index import SegmentedFragmentIndexHandler, _prepare_index_segments_for_commit


@pytest.fixture
Expand Down Expand Up @@ -601,6 +603,121 @@ def test_build_distributed_index_zonemap_invalid_string_column(self, temp_dir):
class TestSegmentedBTreeIndex:
"""Test cases for segmented BTree index functionality."""

def test_segmented_handler_uses_public_uncommitted_index_api(self):
"""Test that segmented workers use Lance's public uncommitted index API."""

class FakeLanceDataset:
def __init__(self):
self.calls = []

@property
def _ds(self):
raise AssertionError("segmented index creation must not use private _ds.create_index")

def create_index_uncommitted(self, **kwargs):
self.calls.append(kwargs)
return {"segment": "metadata"}

fake_ds = FakeLanceDataset()
handler = SegmentedFragmentIndexHandler(
lance_ds=fake_ds,
column="price",
index_type="BTREE",
name="price_idx",
replace=True,
custom="value",
)

raw_segment = handler([1, 2])

assert pickle.loads(raw_segment) == {"segment": "metadata"}
assert fake_ds.calls == [
{
"column": "price",
"index_type": "BTREE",
"name": "price_idx",
"replace": True,
"train": True,
"fragment_ids": [1, 2],
"custom": "value",
}
]

def test_segmented_handler_falls_back_when_public_scalar_api_is_unavailable(self):
"""Test compatibility with Lance versions whose public API is vector-only."""

class FakeInnerDataset:
def __init__(self):
self.calls = []

def create_index(self, *args, **kwargs):
self.calls.append((args, kwargs))
return {"segment": "fallback-metadata"}

class FakeLanceDataset:
def __init__(self):
self._ds = FakeInnerDataset()

def create_index_uncommitted(self, **kwargs):
raise TypeError("Vector column price must be FixedSizeListArray, got double")

fake_ds = FakeLanceDataset()
handler = SegmentedFragmentIndexHandler(
lance_ds=fake_ds,
column="price",
index_type="BTREE",
name="price_idx",
replace=True,
)

raw_segment = handler([1, 2])

assert pickle.loads(raw_segment) == {"segment": "fallback-metadata"}
assert fake_ds._ds.calls == [
(
(["price"], "BTREE"),
{
"name": "price_idx",
"replace": True,
"train": True,
"storage_options": None,
"kwargs": {"fragment_ids": [1, 2]},
},
)
]

def test_segmented_inverted_segments_are_merged_before_commit(self):
"""Test that INVERTED segments are merged into one physical segment before commit."""

class FakeLanceDataset:
def __init__(self):
self.calls = []

def merge_existing_index_segments(self, segments):
self.calls.append(segments)
return {"segment": "merged"}

fake_ds = FakeLanceDataset()
segments = [{"segment": "a"}, {"segment": "b"}]

prepared = _prepare_index_segments_for_commit(fake_ds, "INVERTED", segments)

assert prepared == [{"segment": "merged"}]
assert fake_ds.calls == [segments]

def test_segmented_btree_segments_are_committed_without_merge(self):
"""Test that non-merged segmented index types keep their physical segments."""

class FakeLanceDataset:
def merge_existing_index_segments(self, segments):
raise AssertionError("BTREE segments must not be merged")

segments = [{"segment": "a"}, {"segment": "b"}]

prepared = _prepare_index_segments_for_commit(FakeLanceDataset(), "BTREE", segments)

assert prepared is segments

def test_segmented_btree_basic(self, temp_dir):
"""Test basic segmented BTree index creation and query."""
data = {
Expand Down Expand Up @@ -822,14 +939,10 @@ def test_segmented_false_uses_legacy_flow(self, temp_dir):
index_names = [idx["name"] for idx in indices]
assert "price_legacy_idx" in index_names

def test_segmented_inverted_falls_back_to_legacy(self, multi_fragment_lance_dataset):
"""Test that segmented=True with INVERTED still uses the legacy flow.

The segmented workflow currently only supports BTREE.
"""
def test_segmented_inverted_creates_index(self, multi_fragment_lance_dataset):
"""Test that segmented=True with INVERTED creates an index."""
dataset_uri = multi_fragment_lance_dataset

# segmented=True but INVERTED => legacy flow is used
create_scalar_index(
uri=dataset_uri,
column="text",
Expand Down
Loading
Loading