Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions python/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

21 changes: 21 additions & 0 deletions python/python/lance/lance/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,27 @@ class _Dataset:
def get_transactions(
self, recent_transactions=10
) -> List[Optional[Transaction]]: ...
def hamming_clustering_for_ivf_partition(
self,
index_name: str,
partition_id: int,
hamming_threshold: int,
) -> pa.RecordBatchReader: ...
def get_ivf_partition_info(self, index_name: str) -> List[dict]: ...
def hamming_clustering_for_sample(
self,
column: str,
sample_size: Optional[int],
hamming_threshold: int,
) -> pa.RecordBatchReader: ...
def hamming_clustering_for_range(
self,
column: str,
fragment_id: int,
start_row: int,
num_rows: int,
hamming_threshold: int,
) -> pa.RecordBatchReader: ...

class _MergeInsertBuilder:
def __init__(self, dataset: _Dataset, on: str | Iterable[str]): ...
Expand Down
147 changes: 147 additions & 0 deletions python/python/lance/vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -749,3 +749,150 @@ def _partition_and_pq_codes_assignment() -> Iterable[pa.RecordBatch]:
data_file.path for frag in ds.get_fragments() for data_file in frag.data_files()
]
return dst_dataset_uri, shuffle_buffers


# =============================================================================
# Hamming Distance Clustering
# =============================================================================


def hamming_clustering_for_ivf_partition(
dataset: "LanceDataset",
index_name: str,
partition_id: int,
hamming_threshold: int,
) -> pa.RecordBatchReader:
"""
Perform hamming clustering on a partition of an IVF_FLAT index.

Loads a partition from an IVF_FLAT index on a hash column, computes
pairwise hamming distances between all hashes in the partition,
filters by threshold, and clusters the results using union-find.

Parameters
----------
dataset : LanceDataset
The Lance dataset containing the hash column with an IVF_FLAT index.
index_name : str
Name of the IVF_FLAT index on the hash column
partition_id : int
The partition ID within the IVF_FLAT index
hamming_threshold : int
Maximum hamming distance to consider as similar

Returns
-------
pa.RecordBatchReader
A reader yielding batches with columns:

- 'representative': uint64 - The representative row ID for each cluster
- 'duplicates': list<uint64> - List of duplicate row IDs in each cluster
"""
return dataset._ds.hamming_clustering_for_ivf_partition(
index_name, partition_id, hamming_threshold
)


def get_ivf_partition_info(
dataset: "LanceDataset",
index_name: str,
) -> List[dict]:
"""
Get partition information for an IVF_FLAT index.

Parameters
----------
dataset : LanceDataset
The Lance dataset containing the hash column with an IVF_FLAT index.
index_name : str
Name of the IVF_FLAT index

Returns
-------
list[dict]
List of partition info dicts with 'partition_id' and 'size'
"""
return dataset._ds.get_ivf_partition_info(index_name)


def hamming_clustering_for_sample(
dataset: "LanceDataset",
column: str,
sample_size: Optional[int] = None,
hamming_threshold: int = 10,
) -> pa.RecordBatchReader:
"""
Perform pairwise hamming distance clustering on a sample of the dataset.

Randomly samples rows from the dataset, computes pairwise hamming distances
between all hashes in the sample, filters by threshold, and clusters the
results using union-find.

Parameters
----------
dataset : LanceDataset
The Lance dataset containing the hash column.
column : str
Name of the hash column (must be FixedSizeList<UInt8, 8>)
sample_size : int, optional
Number of rows to sample. If None, uses all rows.
hamming_threshold : int, default 10
Maximum hamming distance to consider as similar

Returns
-------
pa.RecordBatchReader
A reader yielding batches with columns:

- 'representative': uint64 - The representative row ID for each cluster
- 'duplicates': list<uint64> - List of duplicate row IDs in each cluster
"""
return dataset._ds.hamming_clustering_for_sample(
column, sample_size, hamming_threshold
)


def hamming_clustering_for_range(
dataset: "LanceDataset",
column: str,
fragment_id: int,
start_row: int,
num_rows: int,
hamming_threshold: int = 10,
) -> pa.RecordBatchReader:
"""
Perform pairwise hamming distance clustering on a contiguous range of rows.

Reads a contiguous range of rows from a specific fragment, computes pairwise
hamming distances between all hashes in the range, filters by threshold,
and clusters the results using union-find.

Unlike sampling, this reads sequential rows which is useful for distributed
processing where each worker handles a specific range of a fragment.

Parameters
----------
dataset : LanceDataset
The Lance dataset containing the hash column.
column : str
Name of the hash column (must be FixedSizeList<UInt8, 8>)
fragment_id : int
The fragment ID to read from
start_row : int
The starting row offset within the fragment
num_rows : int
Number of rows to read from the start position
hamming_threshold : int, default 10
Maximum hamming distance to consider as similar

Returns
-------
pa.RecordBatchReader
A reader yielding batches with columns:

- 'representative': uint64 - The representative row ID for each cluster
- 'duplicates': list<uint64> - List of duplicate row IDs in each cluster
"""
return dataset._ds.hamming_clustering_for_range(
column, fragment_id, start_row, num_rows, hamming_threshold
)
37 changes: 36 additions & 1 deletion python/python/tests/test_vector.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import numpy as np
import pyarrow as pa
import pytest
from lance.vector import vec_to_table
from lance.vector import hamming_clustering_for_sample, vec_to_table


def test_dict():
Expand Down Expand Up @@ -147,3 +147,38 @@ def test_binary_vectors_invalid_metric(tmp_path):
"metric": "l2",
}
).to_table()


def _hash_table(hashes):
"""Build a table with a ``hash`` column of FixedSizeList<UInt8, 8>.

``hashes`` is a list of 8-byte sequences, one per row.
"""
flat = [byte for row in hashes for byte in row]
values = pa.FixedSizeListArray.from_arrays(
pa.array(flat, type=pa.uint8()), list_size=8
)
return pa.Table.from_arrays([values], names=["hash"])


def test_hamming_clustering_for_sample(tmp_path):
hash_a = [0, 0, 0, 0, 0, 0, 0, 0]
hash_b = [255, 0, 0, 0, 0, 0, 0, 0] # 8 bits from hash_a
hash_c = [1, 2, 3, 4, 5, 6, 7, 8] # far from both
# Rows 0,1,2 share hash_a; rows 3,4 share hash_b; row 5 is unique.
table = _hash_table([hash_a, hash_a, hash_a, hash_b, hash_b, hash_c])
dataset = lance.write_dataset(table, tmp_path / "hashes")

# threshold 0 => only exact-match hashes cluster together. Full scan
# (sample_size=None) yields deterministic row ids 0..5.
result = hamming_clustering_for_sample(dataset, "hash", None, 0).read_all()

clusters = {
rep: sorted(dups)
for rep, dups in zip(
result["representative"].to_pylist(),
result["duplicates"].to_pylist(),
)
}
# Singleton row 5 is not emitted as a cluster.
assert clusters == {0: [1, 2], 3: [4]}
Loading
Loading