From ec02d9aed8f22861b12a25e69797f7c0d13b935e Mon Sep 17 00:00:00 2001 From: Brendan Clement Date: Fri, 19 Jun 2026 16:44:22 -0400 Subject: [PATCH] feat: support hamming clustering Add SIMD-accelerated pairwise hamming distance over 64-bit binary hashes, plus union-find clustering to group binary vectors within a hamming-distance threshold (near-duplicate detection). - lance-linalg: pairwise_hamming_distance[_parallel] with AVX-512 / AVX2 / scalar kernels, PairwiseResult, UnionFind, Cluster/ClusteringResult, extract_hashes_from_fixed_list, cluster_edges/cluster_pairwise_result. - lance: hamming_clustering_for_ivf_partition / for_sample / for_range / from_hashes and get_ivf_partition_info, returning a RecordBatchReader of (representative, duplicates) clusters. - python: thin bindings + wrappers in lance.vector, type stubs, and a test. Recreates #6265 (originally authored by Jack Ye) on top of current main, updating imports/signatures for upstream API drift. Co-Authored-By: Jack Ye --- Cargo.lock | 1 + python/Cargo.lock | 1 + python/python/lance/lance/__init__.pyi | 21 + python/python/lance/vector.py | 147 +++ python/python/tests/test_vector.py | 37 +- python/src/dataset.rs | 182 +++ rust/lance-linalg/Cargo.toml | 5 +- rust/lance-linalg/benches/hamming.rs | 52 - rust/lance-linalg/src/distance.rs | 6 +- rust/lance-linalg/src/distance/hamming.rs | 1323 ++++++++++++++++++++- rust/lance/Cargo.toml | 4 + rust/lance/benches/hamming.rs | 228 ++++ rust/lance/src/index/vector.rs | 1 + rust/lance/src/index/vector/hamming.rs | 938 +++++++++++++++ 14 files changed, 2885 insertions(+), 61 deletions(-) delete mode 100644 rust/lance-linalg/benches/hamming.rs create mode 100644 rust/lance/benches/hamming.rs create mode 100644 rust/lance/src/index/vector/hamming.rs diff --git a/Cargo.lock b/Cargo.lock index 89d20bdf647..11a5fb65a7b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4905,6 +4905,7 @@ dependencies = [ "num-traits", "proptest", "rand 0.9.4", + "rayon", ] [[package]] diff --git a/python/Cargo.lock b/python/Cargo.lock index 01d2edda1c8..126714795cc 100644 --- a/python/Cargo.lock +++ b/python/Cargo.lock @@ -4488,6 +4488,7 @@ dependencies = [ "lance-core", "num-traits", "rand 0.9.4", + "rayon", ] [[package]] diff --git a/python/python/lance/lance/__init__.pyi b/python/python/lance/lance/__init__.pyi index 74db076db41..26ad75a27b7 100644 --- a/python/python/lance/lance/__init__.pyi +++ b/python/python/lance/lance/__init__.pyi @@ -463,6 +463,27 @@ class _Dataset: def get_transactions( self, recent_transactions=10 ) -> List[Optional[Transaction]]: ... + def hamming_clustering_for_ivf_partition( + self, + index_name: str, + partition_id: int, + hamming_threshold: int, + ) -> pa.RecordBatchReader: ... + def get_ivf_partition_info(self, index_name: str) -> List[dict]: ... + def hamming_clustering_for_sample( + self, + column: str, + sample_size: Optional[int], + hamming_threshold: int, + ) -> pa.RecordBatchReader: ... + def hamming_clustering_for_range( + self, + column: str, + fragment_id: int, + start_row: int, + num_rows: int, + hamming_threshold: int, + ) -> pa.RecordBatchReader: ... class _MergeInsertBuilder: def __init__(self, dataset: _Dataset, on: str | Iterable[str]): ... diff --git a/python/python/lance/vector.py b/python/python/lance/vector.py index 34a6154a321..5ce5e8b61e5 100644 --- a/python/python/lance/vector.py +++ b/python/python/lance/vector.py @@ -749,3 +749,150 @@ def _partition_and_pq_codes_assignment() -> Iterable[pa.RecordBatch]: data_file.path for frag in ds.get_fragments() for data_file in frag.data_files() ] return dst_dataset_uri, shuffle_buffers + + +# ============================================================================= +# Hamming Distance Clustering +# ============================================================================= + + +def hamming_clustering_for_ivf_partition( + dataset: "LanceDataset", + index_name: str, + partition_id: int, + hamming_threshold: int, +) -> pa.RecordBatchReader: + """ + Perform hamming clustering on a partition of an IVF_FLAT index. + + Loads a partition from an IVF_FLAT index on a hash column, computes + pairwise hamming distances between all hashes in the partition, + filters by threshold, and clusters the results using union-find. + + Parameters + ---------- + dataset : LanceDataset + The Lance dataset containing the hash column with an IVF_FLAT index. + index_name : str + Name of the IVF_FLAT index on the hash column + partition_id : int + The partition ID within the IVF_FLAT index + hamming_threshold : int + Maximum hamming distance to consider as similar + + Returns + ------- + pa.RecordBatchReader + A reader yielding batches with columns: + + - 'representative': uint64 - The representative row ID for each cluster + - 'duplicates': list - List of duplicate row IDs in each cluster + """ + return dataset._ds.hamming_clustering_for_ivf_partition( + index_name, partition_id, hamming_threshold + ) + + +def get_ivf_partition_info( + dataset: "LanceDataset", + index_name: str, +) -> List[dict]: + """ + Get partition information for an IVF_FLAT index. + + Parameters + ---------- + dataset : LanceDataset + The Lance dataset containing the hash column with an IVF_FLAT index. + index_name : str + Name of the IVF_FLAT index + + Returns + ------- + list[dict] + List of partition info dicts with 'partition_id' and 'size' + """ + return dataset._ds.get_ivf_partition_info(index_name) + + +def hamming_clustering_for_sample( + dataset: "LanceDataset", + column: str, + sample_size: Optional[int] = None, + hamming_threshold: int = 10, +) -> pa.RecordBatchReader: + """ + Perform pairwise hamming distance clustering on a sample of the dataset. + + Randomly samples rows from the dataset, computes pairwise hamming distances + between all hashes in the sample, filters by threshold, and clusters the + results using union-find. + + Parameters + ---------- + dataset : LanceDataset + The Lance dataset containing the hash column. + column : str + Name of the hash column (must be FixedSizeList) + sample_size : int, optional + Number of rows to sample. If None, uses all rows. + hamming_threshold : int, default 10 + Maximum hamming distance to consider as similar + + Returns + ------- + pa.RecordBatchReader + A reader yielding batches with columns: + + - 'representative': uint64 - The representative row ID for each cluster + - 'duplicates': list - List of duplicate row IDs in each cluster + """ + return dataset._ds.hamming_clustering_for_sample( + column, sample_size, hamming_threshold + ) + + +def hamming_clustering_for_range( + dataset: "LanceDataset", + column: str, + fragment_id: int, + start_row: int, + num_rows: int, + hamming_threshold: int = 10, +) -> pa.RecordBatchReader: + """ + Perform pairwise hamming distance clustering on a contiguous range of rows. + + Reads a contiguous range of rows from a specific fragment, computes pairwise + hamming distances between all hashes in the range, filters by threshold, + and clusters the results using union-find. + + Unlike sampling, this reads sequential rows which is useful for distributed + processing where each worker handles a specific range of a fragment. + + Parameters + ---------- + dataset : LanceDataset + The Lance dataset containing the hash column. + column : str + Name of the hash column (must be FixedSizeList) + fragment_id : int + The fragment ID to read from + start_row : int + The starting row offset within the fragment + num_rows : int + Number of rows to read from the start position + hamming_threshold : int, default 10 + Maximum hamming distance to consider as similar + + Returns + ------- + pa.RecordBatchReader + A reader yielding batches with columns: + + - 'representative': uint64 - The representative row ID for each cluster + - 'duplicates': list - List of duplicate row IDs in each cluster + """ + return dataset._ds.hamming_clustering_for_range( + column, fragment_id, start_row, num_rows, hamming_threshold + ) diff --git a/python/python/tests/test_vector.py b/python/python/tests/test_vector.py index c02c8312f88..4ea4e7d425e 100644 --- a/python/python/tests/test_vector.py +++ b/python/python/tests/test_vector.py @@ -5,7 +5,7 @@ import numpy as np import pyarrow as pa import pytest -from lance.vector import vec_to_table +from lance.vector import hamming_clustering_for_sample, vec_to_table def test_dict(): @@ -147,3 +147,38 @@ def test_binary_vectors_invalid_metric(tmp_path): "metric": "l2", } ).to_table() + + +def _hash_table(hashes): + """Build a table with a ``hash`` column of FixedSizeList. + + ``hashes`` is a list of 8-byte sequences, one per row. + """ + flat = [byte for row in hashes for byte in row] + values = pa.FixedSizeListArray.from_arrays( + pa.array(flat, type=pa.uint8()), list_size=8 + ) + return pa.Table.from_arrays([values], names=["hash"]) + + +def test_hamming_clustering_for_sample(tmp_path): + hash_a = [0, 0, 0, 0, 0, 0, 0, 0] + hash_b = [255, 0, 0, 0, 0, 0, 0, 0] # 8 bits from hash_a + hash_c = [1, 2, 3, 4, 5, 6, 7, 8] # far from both + # Rows 0,1,2 share hash_a; rows 3,4 share hash_b; row 5 is unique. + table = _hash_table([hash_a, hash_a, hash_a, hash_b, hash_b, hash_c]) + dataset = lance.write_dataset(table, tmp_path / "hashes") + + # threshold 0 => only exact-match hashes cluster together. Full scan + # (sample_size=None) yields deterministic row ids 0..5. + result = hamming_clustering_for_sample(dataset, "hash", None, 0).read_all() + + clusters = { + rep: sorted(dups) + for rep, dups in zip( + result["representative"].to_pylist(), + result["duplicates"].to_pylist(), + ) + } + # Singleton row 5 is not emitted as a cluster. + assert clusters == {0: [1, 2], 3: [4]} diff --git a/python/src/dataset.rs b/python/src/dataset.rs index 8bfa81aeae4..31eaa96a654 100644 --- a/python/src/dataset.rs +++ b/python/src/dataset.rs @@ -3428,6 +3428,188 @@ impl Dataset { self.ds.clone(), )) } + + /// Perform pairwise hamming distance clustering on a partition of an IVF_FLAT index. + /// + /// This function loads a specific partition from an IVF_FLAT index on a hash column, + /// computes pairwise hamming distances between all hashes in the partition, + /// filters by threshold, and clusters the results using union-find. + /// + /// Parameters + /// ---------- + /// index_name : str + /// Name of the IVF_FLAT index on the hash column + /// partition_id : int + /// The partition ID within the IVF_FLAT index + /// hamming_threshold : int + /// Maximum hamming distance to consider as similar + /// + /// Returns + /// ------- + /// pyarrow.RecordBatchReader + /// A reader yielding batches with columns: + /// - 'representative': uint64 - The representative row ID for each cluster + /// - 'duplicates': list - List of duplicate row IDs in each cluster + #[pyo3(signature = (index_name, partition_id, hamming_threshold))] + fn hamming_clustering_for_ivf_partition( + &self, + py: Python<'_>, + index_name: &str, + partition_id: usize, + hamming_threshold: u32, + ) -> PyResult>> { + use lance::index::vector::hamming::hamming_clustering_for_ivf_partition; + + let ds = self.ds.as_ref(); + let reader = rt() + .block_on( + Some(py), + hamming_clustering_for_ivf_partition( + ds, + index_name, + partition_id, + hamming_threshold, + ), + )? + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + Ok(PyArrowType(reader)) + } + + /// Get partition information for an IVF_FLAT index. + /// + /// Parameters + /// ---------- + /// index_name : str + /// Name of the IVF_FLAT index + /// + /// Returns + /// ------- + /// List[dict] + /// List of partition info dicts with 'partition_id' and 'size' + #[pyo3(signature = (index_name))] + fn get_ivf_partition_info( + &self, + py: Python<'_>, + index_name: &str, + ) -> PyResult>> { + use lance::index::vector::hamming::get_ivf_partition_info; + + let ds = self.ds.as_ref(); + let result = rt() + .block_on(Some(py), get_ivf_partition_info(ds, index_name))? + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + let partitions: PyResult> = result + .iter() + .map(|p| { + let dict = PyDict::new(py); + dict.set_item("partition_id", p.partition_id)?; + dict.set_item("size", p.size)?; + Ok(dict.into()) + }) + .collect(); + + partitions + } + + /// Perform pairwise hamming distance clustering on sampled rows from a dataset. + /// + /// This function samples N rows randomly from the dataset, extracts hashes, + /// computes pairwise hamming distances, and clusters the results. + /// It's useful for benchmarking and testing without requiring an IVF index. + /// + /// Parameters + /// ---------- + /// column : str + /// Name of the hash column (must be FixedSizeList) + /// sample_size : int, optional + /// Number of rows to sample (if None or >= total rows, uses all rows) + /// hamming_threshold : int + /// Maximum hamming distance to consider as similar + /// + /// Returns + /// ------- + /// pyarrow.RecordBatchReader + /// A reader yielding batches with columns: + /// - 'representative': uint64 - The representative row ID for each cluster + /// - 'duplicates': list - List of duplicate row IDs in each cluster + #[pyo3(signature = (column, sample_size, hamming_threshold))] + fn hamming_clustering_for_sample( + &self, + py: Python<'_>, + column: &str, + sample_size: Option, + hamming_threshold: u32, + ) -> PyResult>> { + use lance::index::vector::hamming::hamming_clustering_for_sample; + + let ds = self.ds.as_ref(); + let reader = rt() + .block_on( + Some(py), + hamming_clustering_for_sample(ds, column, sample_size, hamming_threshold), + )? + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + Ok(PyArrowType(reader)) + } + + /// Perform pairwise hamming distance clustering on a contiguous range of rows from a fragment. + /// + /// This function reads a contiguous range of rows from a specific fragment, + /// extracts hashes, computes pairwise hamming distances, and clusters the results. + /// Unlike sampling, this reads sequential rows which is useful for distributed + /// processing where each worker handles a specific range of a fragment. + /// + /// Parameters + /// ---------- + /// column : str + /// Name of the hash column (must be FixedSizeList) + /// fragment_id : int + /// The fragment ID to read from + /// start_row : int + /// The starting row offset within the fragment + /// num_rows : int + /// Number of rows to read from the start position + /// hamming_threshold : int + /// Maximum hamming distance to consider as similar + /// + /// Returns + /// ------- + /// pyarrow.RecordBatchReader + /// A reader yielding batches with columns: + /// - 'representative': uint64 - The representative row ID for each cluster + /// - 'duplicates': list - List of duplicate row IDs in each cluster + #[pyo3(signature = (column, fragment_id, start_row, num_rows, hamming_threshold))] + fn hamming_clustering_for_range( + &self, + py: Python<'_>, + column: &str, + fragment_id: usize, + start_row: usize, + num_rows: usize, + hamming_threshold: u32, + ) -> PyResult>> { + use lance::index::vector::hamming::hamming_clustering_for_range; + + let ds = self.ds.as_ref(); + let reader = rt() + .block_on( + Some(py), + hamming_clustering_for_range( + ds, + column, + fragment_id, + start_row, + num_rows, + hamming_threshold, + ), + )? + .map_err(|err| PyValueError::new_err(err.to_string()))?; + + Ok(PyArrowType(reader)) + } } #[pyclass(name = "SqlQuery", module = "_lib", subclass, skip_from_py_object)] diff --git a/rust/lance-linalg/Cargo.toml b/rust/lance-linalg/Cargo.toml index cf91deb69d7..6a188ec3c62 100644 --- a/rust/lance-linalg/Cargo.toml +++ b/rust/lance-linalg/Cargo.toml @@ -18,6 +18,7 @@ lance-arrow = { workspace = true } lance-core = { workspace = true } num-traits = { workspace = true } rand = { workspace = true } +rayon = { workspace = true } [dev-dependencies] approx = { workspace = true } @@ -50,10 +51,6 @@ harness = false name = "cosine" harness = false -[[bench]] -name = "hamming" -harness = false - [[bench]] name = "norm_l2" harness = false diff --git a/rust/lance-linalg/benches/hamming.rs b/rust/lance-linalg/benches/hamming.rs deleted file mode 100644 index 9af3bf4614b..00000000000 --- a/rust/lance-linalg/benches/hamming.rs +++ /dev/null @@ -1,52 +0,0 @@ -// SPDX-License-Identifier: Apache-2.0 -// SPDX-FileCopyrightText: Copyright The Lance Authors - -use std::iter::repeat_with; - -use std::hint::black_box; - -use criterion::{Criterion, criterion_group, criterion_main}; -use lance_linalg::distance::hamming::{hamming, hamming_scalar}; -use rand::Rng; - -const DIMENSION: usize = 1024; -const TOTAL: usize = 1024 * 1024; // 1M vectors - -fn bench_hamming(c: &mut Criterion) { - let mut rng = rand::rng(); - - let key = repeat_with(|| rng.random::()) - .take(DIMENSION) - .collect::>(); - let target = repeat_with(|| rng.random::()) - .take(TOTAL * DIMENSION) - .collect::>(); - - c.bench_function("hamming,scalar", |b| { - b.iter(|| { - black_box( - target - .chunks_exact(DIMENSION) - .map(|tgt| hamming_scalar(&key, tgt)) - .sum::(), - ); - }) - }); - - c.bench_function("hamming,auto_vec", |b| { - b.iter(|| { - black_box( - target - .chunks_exact(DIMENSION) - .map(|tgt| hamming(&key, tgt)) - .sum::(), - ); - }) - }); -} - -criterion_group!( - name=benches; - config = Criterion::default().significance_level(0.1).sample_size(10); - targets = bench_hamming); -criterion_main!(benches); diff --git a/rust/lance-linalg/src/distance.rs b/rust/lance-linalg/src/distance.rs index a356d5c1225..23d1cae2d63 100644 --- a/rust/lance-linalg/src/distance.rs +++ b/rust/lance-linalg/src/distance.rs @@ -27,7 +27,11 @@ pub mod norm_l2; pub use cosine::*; pub use dot::*; -use hamming::hamming_distance_arrow_batch; +pub use hamming::{ + Cluster, ClusteringResult, PairwiseResult, UnionFind, cluster_edges, cluster_pairwise_result, + extract_hashes_from_fixed_list, hamming_distance_arrow_batch, hamming_u64, + pairwise_hamming_distance, pairwise_hamming_distance_parallel, +}; pub use l2::*; use lance_core::deepsize::DeepSizeOf; pub use norm_l2::*; diff --git a/rust/lance-linalg/src/distance/hamming.rs b/rust/lance-linalg/src/distance/hamming.rs index d8fd60f4054..a6f4b038195 100644 --- a/rust/lance-linalg/src/distance/hamming.rs +++ b/rust/lance-linalg/src/distance/hamming.rs @@ -2,14 +2,24 @@ // SPDX-FileCopyrightText: Copyright The Lance Authors //! Hamming distance. +//! +//! This module provides hamming distance computation for binary vectors, +//! including SIMD-accelerated pairwise hamming distance for 64-bit hashes. +use std::collections::HashMap; use std::sync::Arc; -use crate::{Error, Result}; +use arrow_array::builder::{ListBuilder, UInt64Builder}; use arrow_array::cast::AsArray; use arrow_array::types::UInt8Type; -use arrow_array::{Array, FixedSizeListArray, Float32Array}; -use arrow_schema::DataType; +use arrow_array::{ + Array, ArrayRef, FixedSizeListArray, Float32Array, RecordBatch, RecordBatchIterator, + RecordBatchReader, UInt32Array, UInt64Array, +}; +use arrow_schema::{DataType, Field, Schema, SchemaRef}; +use rayon::prelude::*; + +use crate::{Error, Result}; pub trait Hamming { /// Hamming distance between two vectors. @@ -86,6 +96,640 @@ pub fn hamming_distance_arrow_batch( ))) } +/// Compute hamming distance between two 64-bit values using POPCNT. +#[inline(always)] +pub fn hamming_u64(a: u64, b: u64) -> u32 { + (a ^ b).count_ones() +} + +/// Result of pairwise hamming distance computation. +#[derive(Debug, Clone)] +pub struct PairwiseResult { + pub row_id_a: Vec, + pub row_id_b: Vec, + pub distances: Vec, +} + +impl PairwiseResult { + pub fn new() -> Self { + Self { + row_id_a: Vec::new(), + row_id_b: Vec::new(), + distances: Vec::new(), + } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + row_id_a: Vec::with_capacity(capacity), + row_id_b: Vec::with_capacity(capacity), + distances: Vec::with_capacity(capacity), + } + } + + pub fn push(&mut self, a: u64, b: u64, dist: u32) { + self.row_id_a.push(a); + self.row_id_b.push(b); + self.distances.push(dist); + } + + pub fn len(&self) -> usize { + self.row_id_a.len() + } + + pub fn is_empty(&self) -> bool { + self.row_id_a.is_empty() + } + + pub fn extend(&mut self, other: Self) { + self.row_id_a.extend(other.row_id_a); + self.row_id_b.extend(other.row_id_b); + self.distances.extend(other.distances); + } + + /// Convert to Arrow RecordBatch, consuming self. + pub fn into_record_batch(self) -> RecordBatch { + let schema = Arc::new(Schema::new(vec![ + Field::new("row_id_a", DataType::UInt64, false), + Field::new("row_id_b", DataType::UInt64, false), + Field::new("distance", DataType::UInt32, false), + ])); + + let row_id_a = Arc::new(UInt64Array::from(self.row_id_a)); + let row_id_b = Arc::new(UInt64Array::from(self.row_id_b)); + let distances = Arc::new(UInt32Array::from(self.distances)); + + RecordBatch::try_new(schema, vec![row_id_a, row_id_b, distances]) + .expect("Failed to create RecordBatch") + } +} + +impl Default for PairwiseResult { + fn default() -> Self { + Self::new() + } +} + +/// Compute hamming distances for a query against multiple targets. +/// Uses SIMD acceleration when available. +#[inline] +pub fn hamming_batch_u64(query: u64, targets: &[u64], results: &mut [u32]) { + debug_assert_eq!(targets.len(), results.len()); + hamming_batch_simd(query, targets, results); +} + +/// SIMD-accelerated batch hamming distance computation. +#[inline] +fn hamming_batch_simd(query: u64, targets: &[u64], results: &mut [u32]) { + #[cfg(target_arch = "x86_64")] + { + if is_x86_feature_detected!("avx512vpopcntdq") && is_x86_feature_detected!("avx512f") { + unsafe { + hamming_batch_avx512(query, targets, results); + } + return; + } + if is_x86_feature_detected!("avx2") { + unsafe { + hamming_batch_avx2(query, targets, results); + } + return; + } + } + + // Scalar fallback (LLVM auto-vectorizes well on Apple Silicon) + hamming_batch_scalar(query, targets, results); +} + +/// Scalar fallback using count_ones() which compiles to POPCNT. +#[inline] +fn hamming_batch_scalar(query: u64, targets: &[u64], results: &mut [u32]) { + // Unroll for better auto-vectorization + let n = targets.len(); + let chunks = n / 8; + let mut i = 0; + + for _ in 0..chunks { + results[i] = (query ^ targets[i]).count_ones(); + results[i + 1] = (query ^ targets[i + 1]).count_ones(); + results[i + 2] = (query ^ targets[i + 2]).count_ones(); + results[i + 3] = (query ^ targets[i + 3]).count_ones(); + results[i + 4] = (query ^ targets[i + 4]).count_ones(); + results[i + 5] = (query ^ targets[i + 5]).count_ones(); + results[i + 6] = (query ^ targets[i + 6]).count_ones(); + results[i + 7] = (query ^ targets[i + 7]).count_ones(); + i += 8; + } + + // Handle remainder + while i < n { + results[i] = (query ^ targets[i]).count_ones(); + i += 1; + } +} + +/// AVX-512 VPOPCNTDQ: Process 8 x 64-bit values at once. +#[cfg(target_arch = "x86_64")] +#[target_feature(enable = "avx512f", enable = "avx512vpopcntdq")] +unsafe fn hamming_batch_avx512(query: u64, targets: &[u64], results: &mut [u32]) { + use std::arch::x86_64::*; + + let n = targets.len(); + let query_vec = _mm512_set1_epi64(query as i64); + + let chunks = n / 8; + let remainder = n % 8; + + for i in 0..chunks { + let offset = i * 8; + let targets_ptr = targets.as_ptr().add(offset) as *const __m512i; + let target_vec = _mm512_loadu_si512(targets_ptr); + + let xor_result = _mm512_xor_si512(query_vec, target_vec); + let popcount = _mm512_popcnt_epi64(xor_result); + let popcount_32 = _mm512_cvtepi64_epi32(popcount); + + _mm256_storeu_si256( + results.as_mut_ptr().add(offset) as *mut __m256i, + popcount_32, + ); + } + + if remainder > 0 { + let offset = chunks * 8; + for j in 0..remainder { + results[offset + j] = (query ^ targets[offset + j]).count_ones(); + } + } +} + +/// AVX2 popcount using lookup table (Harley-Seal / PSHUFB method). +#[cfg(target_arch = "x86_64")] +#[target_feature(enable = "avx2")] +unsafe fn hamming_batch_avx2(query: u64, targets: &[u64], results: &mut [u32]) { + use std::arch::x86_64::*; + + let n = targets.len(); + + let lookup = _mm256_setr_epi8( + 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, 3, 4, 0, 1, 1, 2, 1, 2, 2, 3, 1, 2, 2, 3, 2, 3, + 3, 4, + ); + let low_mask = _mm256_set1_epi8(0x0f); + let query_vec = _mm256_set1_epi64x(query as i64); + + let chunks = n / 4; + let remainder = n % 4; + + for i in 0..chunks { + let offset = i * 4; + let targets_ptr = targets.as_ptr().add(offset) as *const __m256i; + let target_vec = _mm256_loadu_si256(targets_ptr); + + let xor_result = _mm256_xor_si256(query_vec, target_vec); + + // Popcount using nibble lookup + let lo = _mm256_and_si256(xor_result, low_mask); + let hi = _mm256_and_si256(_mm256_srli_epi16(xor_result, 4), low_mask); + let popcnt_lo = _mm256_shuffle_epi8(lookup, lo); + let popcnt_hi = _mm256_shuffle_epi8(lookup, hi); + let popcnt_bytes = _mm256_add_epi8(popcnt_lo, popcnt_hi); + let popcount = _mm256_sad_epu8(popcnt_bytes, _mm256_setzero_si256()); + + let results_ptr = results.as_mut_ptr().add(offset); + *results_ptr = _mm256_extract_epi32::<0>(popcount) as u32; + *results_ptr.add(1) = _mm256_extract_epi32::<2>(popcount) as u32; + *results_ptr.add(2) = _mm256_extract_epi32::<4>(popcount) as u32; + *results_ptr.add(3) = _mm256_extract_epi32::<6>(popcount) as u32; + } + + if remainder > 0 { + let offset = chunks * 4; + for j in 0..remainder { + results[offset + j] = (query ^ targets[offset + j]).count_ones(); + } + } +} + +/// Compute pairwise hamming distances for all pairs of hashes. +/// +/// Returns pairs where distance <= threshold (if provided). +/// +/// # Arguments +/// * `hashes` - Vector of 64-bit hash values +/// * `row_ids` - Optional row IDs (defaults to indices if None) +/// * `threshold` - Optional maximum distance to include in results +pub fn pairwise_hamming_distance( + hashes: &[u64], + row_ids: Option<&[u64]>, + threshold: Option, +) -> PairwiseResult { + let n = hashes.len(); + if n < 2 { + return PairwiseResult::new(); + } + + let threshold = threshold.unwrap_or(u32::MAX); + let num_pairs = n * (n - 1) / 2; + let mut result = PairwiseResult::with_capacity(num_pairs.min(1_000_000)); + + for i in 0..n { + for j in (i + 1)..n { + let dist = hamming_u64(hashes[i], hashes[j]); + if dist <= threshold { + let id_a = row_ids.map_or(i as u64, |ids| ids[i]); + let id_b = row_ids.map_or(j as u64, |ids| ids[j]); + result.push(id_a, id_b, dist); + } + } + } + + result +} + +/// Compute pairwise hamming distances in parallel using rayon + SIMD. +/// +/// Uses chunked parallelization for balanced workload distribution. +pub fn pairwise_hamming_distance_parallel( + hashes: &[u64], + row_ids: Option<&[u64]>, + threshold: Option, +) -> PairwiseResult { + let n = hashes.len(); + if n < 2 { + return PairwiseResult::new(); + } + + let threshold = threshold.unwrap_or(u32::MAX); + let total_pairs = n * (n - 1) / 2; + + // For small datasets, use sequential to avoid thread overhead + if total_pairs < 10_000 { + return pairwise_hamming_distance(hashes, row_ids, Some(threshold)); + } + + let threads = rayon::current_num_threads(); + let pairs_per_chunk = total_pairs.div_ceil(threads); + let chunks = compute_balanced_chunks(n, pairs_per_chunk); + + let results: Vec = chunks + .into_par_iter() + .map(|(start_row, end_row)| { + process_row_range(hashes, row_ids, threshold, start_row, end_row) + }) + .collect(); + + let mut combined = PairwiseResult::new(); + for r in results { + combined.extend(r); + } + combined +} + +/// Compute balanced chunks for parallel processing. +fn compute_balanced_chunks(n: usize, target_pairs_per_chunk: usize) -> Vec<(usize, usize)> { + let mut chunks = Vec::new(); + let mut current_start = 0; + let mut current_pairs = 0; + + for i in 0..n { + let pairs_for_row = n - i - 1; + current_pairs += pairs_for_row; + + if current_pairs >= target_pairs_per_chunk || i == n - 1 { + chunks.push((current_start, i + 1)); + current_start = i + 1; + current_pairs = 0; + } + } + + chunks +} + +/// Process a range of rows for pairwise comparison using SIMD. +fn process_row_range( + hashes: &[u64], + row_ids: Option<&[u64]>, + threshold: u32, + start_row: usize, + end_row: usize, +) -> PairwiseResult { + let n = hashes.len(); + let mut result = PairwiseResult::new(); + + for i in start_row..end_row { + let remaining = n - i - 1; + if remaining == 0 { + continue; + } + + let mut distances = vec![0u32; remaining]; + hamming_batch_u64(hashes[i], &hashes[i + 1..], &mut distances); + + let id_a = row_ids.map_or(i as u64, |ids| ids[i]); + for (j_offset, &dist) in distances.iter().enumerate() { + if dist <= threshold { + let j = i + 1 + j_offset; + let id_b = row_ids.map_or(j as u64, |ids| ids[j]); + result.push(id_a, id_b, dist); + } + } + } + + result +} + +/// Extract u64 hashes from a FixedSizeList Arrow array. +pub fn extract_hashes_from_fixed_list(array: &FixedSizeListArray) -> Result> { + let list_size = array.value_length(); + if list_size != 8 { + return Err(Error::InvalidArgumentError(format!( + "Expected FixedSizeList with size 8, got size {}", + list_size + ))); + } + + let values = array + .values() + .as_any() + .downcast_ref::() + .ok_or_else(|| { + Error::InvalidArgumentError("Expected UInt8Array values in FixedSizeList".to_string()) + })?; + + let n = array.len(); + let mut hashes = Vec::with_capacity(n); + + for i in 0..n { + let start = i * 8; + let bytes = &values.values()[start..start + 8]; + let mut arr = [0u8; 8]; + arr.copy_from_slice(bytes); + hashes.push(u64::from_le_bytes(arr)); + } + + Ok(hashes) +} + +/// Union-Find data structure with path compression for clustering. +pub struct UnionFind { + parent: HashMap, + rank: HashMap, +} + +impl UnionFind { + pub fn new() -> Self { + Self { + parent: HashMap::new(), + rank: HashMap::new(), + } + } + + pub fn with_capacity(capacity: usize) -> Self { + Self { + parent: HashMap::with_capacity(capacity), + rank: HashMap::with_capacity(capacity), + } + } + + /// Find the root of a node with path compression. + pub fn find(&mut self, x: u64) -> u64 { + if let std::collections::hash_map::Entry::Vacant(e) = self.parent.entry(x) { + e.insert(x); + self.rank.insert(x, 0); + return x; + } + + let mut current = x; + let mut path = Vec::new(); + + while self.parent[¤t] != current { + path.push(current); + current = self.parent[¤t]; + } + let root = current; + + for node in path { + self.parent.insert(node, root); + } + + root + } + + /// Union two nodes, using union by rank. + pub fn union(&mut self, a: u64, b: u64) -> bool { + let root_a = self.find(a); + let root_b = self.find(b); + + if root_a == root_b { + return false; + } + + let rank_a = self.rank[&root_a]; + let rank_b = self.rank[&root_b]; + + if rank_a < rank_b { + self.parent.insert(root_a, root_b); + } else if rank_a > rank_b { + self.parent.insert(root_b, root_a); + } else if root_a < root_b { + self.parent.insert(root_b, root_a); + *self.rank.get_mut(&root_a).unwrap() += 1; + } else { + self.parent.insert(root_a, root_b); + *self.rank.get_mut(&root_b).unwrap() += 1; + } + + true + } + + pub fn nodes(&self) -> impl Iterator { + self.parent.keys() + } + + pub fn len(&self) -> usize { + self.parent.len() + } + + pub fn is_empty(&self) -> bool { + self.parent.is_empty() + } +} + +impl Default for UnionFind { + fn default() -> Self { + Self::new() + } +} + +/// A cluster with representative and duplicates. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct Cluster { + /// The representative row ID (smallest in the cluster). + pub representative: u64, + /// List of duplicate row IDs (excludes the representative). + pub duplicates: Vec, +} + +impl Cluster { + pub fn size(&self) -> usize { + 1 + self.duplicates.len() + } +} + +/// Result of the clustering operation. +#[derive(Debug, Clone)] +pub struct ClusteringResult { + /// List of clusters, each with a representative and duplicates. + pub clusters: Vec, +} + +impl ClusteringResult { + pub fn num_clusters(&self) -> usize { + self.clusters.len() + } + + pub fn num_duplicates(&self) -> usize { + self.clusters.iter().map(|c| c.duplicates.len()).sum() + } + + pub fn num_unique(&self) -> usize { + self.clusters.len() + } + + /// Get the schema for clustering result batches. + pub fn schema() -> SchemaRef { + Arc::new(Schema::new(vec![ + Field::new("representative", DataType::UInt64, false), + Field::new( + "duplicates", + DataType::List(Arc::new(Field::new("item", DataType::UInt64, true))), + false, + ), + ])) + } + + /// Convert to Arrow RecordBatch with columns: + /// - `representative`: `UInt64` + /// - `duplicates`: `List` + pub fn to_record_batch(&self) -> RecordBatch { + let schema = Self::schema(); + + let mut representatives = Vec::with_capacity(self.clusters.len()); + let mut duplicates_builder = ListBuilder::new(UInt64Builder::new()); + + for cluster in &self.clusters { + representatives.push(cluster.representative); + for &dup in &cluster.duplicates { + duplicates_builder.values().append_value(dup); + } + duplicates_builder.append(true); + } + + let representative_array: ArrayRef = Arc::new(UInt64Array::from(representatives)); + let duplicates_array: ArrayRef = Arc::new(duplicates_builder.finish()); + + RecordBatch::try_new(schema, vec![representative_array, duplicates_array]) + .expect("Failed to create RecordBatch") + } + + /// Convert to a RecordBatchReader that yields batches of the specified size. + /// + /// # Arguments + /// * `batch_size` - Number of clusters per batch (default: 10000) + pub fn into_reader(self, batch_size: Option) -> Box { + let batch_size = batch_size.unwrap_or(10_000); + let schema = Self::schema(); + + if self.clusters.is_empty() { + // Return empty reader + let batches: Vec> = vec![]; + return Box::new(RecordBatchIterator::new(batches, schema)); + } + + let batches: Vec> = self + .clusters + .chunks(batch_size) + .map(|chunk| { + let mut representatives = Vec::with_capacity(chunk.len()); + let mut duplicates_builder = ListBuilder::new(UInt64Builder::new()); + + for cluster in chunk { + representatives.push(cluster.representative); + for &dup in &cluster.duplicates { + duplicates_builder.values().append_value(dup); + } + duplicates_builder.append(true); + } + + let representative_array: ArrayRef = Arc::new(UInt64Array::from(representatives)); + let duplicates_array: ArrayRef = Arc::new(duplicates_builder.finish()); + + RecordBatch::try_new(Self::schema(), vec![representative_array, duplicates_array]) + }) + .collect(); + + Box::new(RecordBatchIterator::new(batches, schema)) + } +} + +/// Cluster edges using union-find algorithm. +/// +/// Takes a list of edges (row_id_a, row_id_b) and groups connected nodes +/// into clusters. Each cluster has a representative (smallest row ID) +/// and a list of duplicates. +pub fn cluster_edges(edges: I) -> ClusteringResult +where + I: IntoIterator, +{ + let mut uf = UnionFind::new(); + + for (a, b) in edges { + uf.union(a, b); + } + + let mut clusters_map: HashMap> = HashMap::new(); + let nodes: Vec = uf.nodes().copied().collect(); + + for node in nodes { + let root = uf.find(node); + clusters_map.entry(root).or_default().push(node); + } + + let mut clusters = Vec::new(); + for (_root, mut members) in clusters_map { + members.sort_unstable(); + + if members.len() > 1 { + let representative = *members.iter().min().unwrap(); + let duplicates: Vec = members + .into_iter() + .filter(|&m| m != representative) + .collect(); + + clusters.push(Cluster { + representative, + duplicates, + }); + } + } + + clusters.sort_by_key(|c| c.representative); + + ClusteringResult { clusters } +} + +/// Cluster edges from PairwiseResult. +pub fn cluster_pairwise_result(result: &PairwiseResult) -> ClusteringResult { + let edges = result + .row_id_a + .iter() + .zip(result.row_id_b.iter()) + .map(|(&a, &b)| (a, b)); + + cluster_edges(edges) +} + #[cfg(test)] mod tests { use super::*; @@ -102,4 +746,677 @@ mod tests { let y = vec![0b1101_1010, 0b1010_1010, 0b1010_1001]; assert_eq!(hamming(&x, &y), 2.0); } + + #[test] + fn test_hamming_u64() { + assert_eq!(hamming_u64(0, 0), 0); + assert_eq!(hamming_u64(0, 1), 1); + assert_eq!(hamming_u64(0b1111, 0b0000), 4); + assert_eq!(hamming_u64(u64::MAX, 0), 64); + assert_eq!(hamming_u64(0xAAAAAAAAAAAAAAAA, 0x5555555555555555), 64); + } + + #[test] + fn test_hamming_batch_u64() { + let query = 0u64; + let targets: Vec = (0..128).collect(); + let mut results = vec![0u32; 128]; + + hamming_batch_u64(query, &targets, &mut results); + + assert_eq!(results[0], 0); + assert_eq!(results[1], 1); + assert_eq!(results[3], 2); // 0b11 has 2 bits set + assert_eq!(results[7], 3); // 0b111 has 3 bits set + } + + #[test] + fn test_pairwise_basic() { + let hashes = vec![0b0000u64, 0b0001, 0b0011, 0b0111]; + let result = pairwise_hamming_distance(&hashes, None, None); + + assert_eq!(result.len(), 6); // C(4,2) = 6 pairs + assert!(result.distances.iter().all(|&d| d <= 3)); + } + + #[test] + fn test_pairwise_with_threshold() { + let hashes = vec![0b0000u64, 0b0001, 0b1111]; + let result = pairwise_hamming_distance(&hashes, None, Some(1)); + + assert_eq!(result.len(), 1); + assert_eq!(result.row_id_a[0], 0); + assert_eq!(result.row_id_b[0], 1); + assert_eq!(result.distances[0], 1); + } + + #[test] + fn test_pairwise_with_row_ids() { + let hashes = vec![0b0000u64, 0b0001]; + let row_ids = vec![100u64, 200u64]; + let result = pairwise_hamming_distance(&hashes, Some(&row_ids), None); + + assert_eq!(result.len(), 1); + assert_eq!(result.row_id_a[0], 100); + assert_eq!(result.row_id_b[0], 200); + } + + #[test] + fn test_pairwise_parallel() { + let hashes: Vec = (0..100).collect(); + let result_seq = pairwise_hamming_distance(&hashes, None, None); + let result_par = pairwise_hamming_distance_parallel(&hashes, None, None); + + assert_eq!(result_seq.len(), result_par.len()); + } + + #[test] + fn test_union_find_basic() { + let mut uf = UnionFind::new(); + + assert_eq!(uf.find(1), 1); + assert_eq!(uf.find(2), 2); + assert_eq!(uf.find(3), 3); + + assert!(uf.union(1, 2)); + assert_eq!(uf.find(1), uf.find(2)); + + assert!(uf.union(2, 3)); + assert_eq!(uf.find(1), uf.find(3)); + + assert!(!uf.union(1, 3)); + } + + #[test] + fn test_cluster_edges_simple() { + let edges = vec![(1, 2), (2, 3), (4, 5)]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 2); + + let c1 = result + .clusters + .iter() + .find(|c| c.representative == 1) + .unwrap(); + assert_eq!(c1.duplicates.len(), 2); + assert!(c1.duplicates.contains(&2)); + assert!(c1.duplicates.contains(&3)); + + let c2 = result + .clusters + .iter() + .find(|c| c.representative == 4) + .unwrap(); + assert_eq!(c2.duplicates.len(), 1); + assert!(c2.duplicates.contains(&5)); + } + + #[test] + fn test_cluster_pairwise_result() { + let hashes = vec![0b0000u64, 0b0001, 0b0011]; // distances: (0,1)=1, (0,2)=2, (1,2)=1 + let pairwise = pairwise_hamming_distance(&hashes, None, Some(1)); // threshold 1 + + // Only pairs with distance <= 1: (0,1) and (1,2) + assert_eq!(pairwise.len(), 2); + + let clustering = cluster_pairwise_result(&pairwise); + // All three should be in one cluster since 0-1-2 are connected + assert_eq!(clustering.num_clusters(), 1); + assert_eq!(clustering.clusters[0].representative, 0); + assert_eq!(clustering.clusters[0].duplicates.len(), 2); + } + + #[test] + fn test_into_record_batch() { + let hashes = vec![0b0000u64, 0b0001, 0b0011]; + let result = pairwise_hamming_distance(&hashes, None, None); + let batch = result.into_record_batch(); + + assert_eq!(batch.num_rows(), 3); + assert_eq!(batch.num_columns(), 3); + assert_eq!(batch.schema().field(0).name(), "row_id_a"); + assert_eq!(batch.schema().field(1).name(), "row_id_b"); + assert_eq!(batch.schema().field(2).name(), "distance"); + } + + // ========================================================================= + // Additional tests from pairwise-hamming reference implementation + // ========================================================================= + + /// Reference implementation for validation - simple O(n²) nested loop + fn reference_pairwise(hashes: &[u64], threshold: Option) -> Vec<(usize, usize, u32)> { + let threshold = threshold.unwrap_or(u32::MAX); + let mut results = Vec::new(); + for i in 0..hashes.len() { + for j in (i + 1)..hashes.len() { + let dist = (hashes[i] ^ hashes[j]).count_ones(); + if dist <= threshold { + results.push((i, j, dist)); + } + } + } + results + } + + /// Convert PairwiseResult to sorted vec for comparison + fn result_to_sorted_vec(result: &PairwiseResult) -> Vec<(u64, u64, u32)> { + let mut v: Vec<_> = result + .row_id_a + .iter() + .zip(result.row_id_b.iter()) + .zip(result.distances.iter()) + .map(|((&a, &b), &d)| (a, b, d)) + .collect(); + v.sort(); + v + } + + #[test] + fn test_pairwise_correctness_small() { + // Deterministic hashes with known distances + let hashes = vec![ + 0b0000_0000u64, // 0 + 0b0000_0001u64, // 1 bit from 0 + 0b0000_0011u64, // 2 bits from 0, 1 bit from 1 + 0b0000_0111u64, // 3 bits from 0, 2 bits from 1, 1 bit from 2 + 0b0000_1111u64, // 4 bits from 0, 3 bits from 1, 2 bits from 2, 1 bit from 3 + ]; + + let result = pairwise_hamming_distance(&hashes, None, None); + let reference = reference_pairwise(&hashes, None); + + assert_eq!(result.len(), reference.len()); + assert_eq!(result.len(), 10); // C(5,2) = 10 pairs + + // Verify specific distances + let result_vec = result_to_sorted_vec(&result); + for (i, j, expected_dist) in &reference { + let found = result_vec + .iter() + .find(|(a, b, _)| *a == *i as u64 && *b == *j as u64); + assert!(found.is_some(), "Missing pair ({}, {})", i, j); + assert_eq!( + found.unwrap().2, + *expected_dist, + "Wrong distance for pair ({}, {})", + i, + j + ); + } + } + + #[test] + fn test_pairwise_correctness_1000_deterministic() { + // Generate deterministic hashes using simple linear pattern + let hashes: Vec = (0u64..1000) + .map(|i| i.wrapping_mul(0x123456789ABCDEF)) + .collect(); + + let result_seq = pairwise_hamming_distance(&hashes, None, Some(10)); + let result_par = pairwise_hamming_distance_parallel(&hashes, None, Some(10)); + let reference = reference_pairwise(&hashes, Some(10)); + + // Both implementations should match reference + assert_eq!( + result_seq.len(), + reference.len(), + "Sequential result count mismatch" + ); + assert_eq!( + result_par.len(), + reference.len(), + "Parallel result count mismatch" + ); + + // Verify all pairs match + let seq_sorted = result_to_sorted_vec(&result_seq); + let par_sorted = result_to_sorted_vec(&result_par); + + for (i, j, dist) in &reference { + let seq_found = seq_sorted + .iter() + .find(|(a, b, _)| *a == *i as u64 && *b == *j as u64); + let par_found = par_sorted + .iter() + .find(|(a, b, _)| *a == *i as u64 && *b == *j as u64); + + assert!( + seq_found.is_some(), + "Sequential missing pair ({}, {})", + i, + j + ); + assert!(par_found.is_some(), "Parallel missing pair ({}, {})", i, j); + assert_eq!(seq_found.unwrap().2, *dist); + assert_eq!(par_found.unwrap().2, *dist); + } + } + + #[test] + fn test_pairwise_correctness_10000_deterministic() { + // Larger test with 10K hashes + let hashes: Vec = (0u64..10_000) + .map(|i| { + // Mix bits using a simple hash-like transformation + let x = i.wrapping_mul(0xDEADBEEFCAFEBABE); + x ^ (x >> 17) ^ (x << 13) + }) + .collect(); + + let result_seq = pairwise_hamming_distance(&hashes, None, Some(5)); + let result_par = pairwise_hamming_distance_parallel(&hashes, None, Some(5)); + + // Both should find the same number of pairs + assert_eq!( + result_seq.len(), + result_par.len(), + "10K test: sequential found {} pairs, parallel found {} pairs", + result_seq.len(), + result_par.len() + ); + + // Verify they contain the same pairs (sorted comparison) + let seq_sorted = result_to_sorted_vec(&result_seq); + let par_sorted = result_to_sorted_vec(&result_par); + assert_eq!(seq_sorted, par_sorted, "10K test: pair contents differ"); + } + + #[test] + fn test_pairwise_total_pairs_count() { + // Without threshold, should return exactly n*(n-1)/2 pairs + for n in [10, 50, 100, 500] { + let hashes: Vec = (0..n).map(|i| i as u64).collect(); + let result = pairwise_hamming_distance_parallel(&hashes, None, None); + let expected = n * (n - 1) / 2; + assert_eq!( + result.len(), + expected, + "n={}: expected {} pairs, got {}", + n, + expected, + result.len() + ); + } + } + + #[test] + fn test_pairwise_threshold_filtering() { + // All identical hashes should have distance 0 + let hashes = vec![0xABCDEF0123456789u64; 100]; + let result = pairwise_hamming_distance_parallel(&hashes, None, Some(0)); + + // All pairs should be included (distance 0) + assert_eq!(result.len(), 100 * 99 / 2); + assert!(result.distances.iter().all(|&d| d == 0)); + + // With threshold 0 and all different hashes, should find fewer pairs + let different_hashes: Vec = (0u64..100).collect(); + let result2 = pairwise_hamming_distance_parallel(&different_hashes, None, Some(0)); + // Only pairs with identical values should match (none in this case except 0^0) + assert!(result2.len() < 100 * 99 / 2); + } + + #[test] + fn test_pairwise_row_ids_preserved() { + let hashes: Vec = (0u64..100).collect(); + let row_ids: Vec = (1000u64..1100).collect(); // offset row IDs + + let result = pairwise_hamming_distance_parallel(&hashes, Some(&row_ids), Some(5)); + + // All row IDs should be in range [1000, 1100) + for &id in &result.row_id_a { + assert!((1000..1100).contains(&id), "row_id_a {} out of range", id); + } + for &id in &result.row_id_b { + assert!((1000..1100).contains(&id), "row_id_b {} out of range", id); + } + // row_id_a should always be less than row_id_b (upper triangular) + for (&a, &b) in result.row_id_a.iter().zip(result.row_id_b.iter()) { + assert!(a < b, "Expected row_id_a < row_id_b, got {} >= {}", a, b); + } + } + + #[test] + fn test_pairwise_distance_bounds() { + // All distances should be in [0, 64] for u64 hashes + let hashes: Vec = (0u64..1000).map(|i| i.wrapping_mul(0x123456789)).collect(); + + let result = pairwise_hamming_distance_parallel(&hashes, None, None); + + for &d in &result.distances { + assert!(d <= 64, "Distance {} exceeds maximum 64", d); + } + } + + #[test] + fn test_pairwise_symmetry() { + // Hamming distance is symmetric: d(a,b) = d(b,a) + let hashes: Vec = vec![ + 0x0000000000000000, + 0xFFFFFFFFFFFFFFFF, + 0xAAAAAAAAAAAAAAAA, + 0x5555555555555555, + 0x123456789ABCDEF0, + ]; + + let result = pairwise_hamming_distance(&hashes, None, None); + + // For each pair (i,j), verify distance matches manual calculation + for idx in 0..result.len() { + let i = result.row_id_a[idx] as usize; + let j = result.row_id_b[idx] as usize; + let dist = result.distances[idx]; + + let expected = (hashes[i] ^ hashes[j]).count_ones(); + assert_eq!(dist, expected, "Distance mismatch for pair ({}, {})", i, j); + } + } + + #[test] + fn test_balanced_chunks() { + // Verify chunks are reasonably balanced + let n = 10000; + let total_pairs = n * (n - 1) / 2; + let target_per_chunk = total_pairs / 16; + + let chunks = compute_balanced_chunks(n, target_per_chunk); + + // Should have roughly 16 chunks + assert!( + chunks.len() >= 14 && chunks.len() <= 18, + "Expected ~16 chunks, got {}", + chunks.len() + ); + + // Each chunk should have roughly equal work + for (start, end) in &chunks { + let mut chunk_pairs = 0usize; + for i in *start..*end { + chunk_pairs += n - i - 1; + } + // Allow 20% deviation from target + let lower = target_per_chunk * 80 / 100; + // last chunk may be smaller + assert!( + chunk_pairs >= lower || *end == n, + "Chunk [{}, {}) has {} pairs, expected ~{}", + start, + end, + chunk_pairs, + target_per_chunk + ); + } + + // Chunks should cover all rows without gaps + assert_eq!(chunks[0].0, 0); + assert_eq!(chunks.last().unwrap().1, n); + for i in 1..chunks.len() { + assert_eq!(chunks[i].0, chunks[i - 1].1, "Gap between chunks"); + } + } + + // ========================================================================= + // SIMD-specific tests + // ========================================================================= + + #[test] + #[cfg(target_arch = "x86_64")] + fn test_avx2_popcount() { + if !is_x86_feature_detected!("avx2") { + return; + } + + let query = 0u64; + let targets = vec![0u64, 1, 3, 7, 15, 31, 63, 127]; + let mut results = vec![0u32; 8]; + + unsafe { + hamming_batch_avx2(query, &targets, &mut results); + } + + assert_eq!(results[0], 0); // 0 ^ 0 = 0 bits + assert_eq!(results[1], 1); // 0 ^ 1 = 1 bit + assert_eq!(results[2], 2); // 0 ^ 3 = 2 bits + assert_eq!(results[3], 3); // 0 ^ 7 = 3 bits + assert_eq!(results[4], 4); // 0 ^ 15 = 4 bits + assert_eq!(results[5], 5); // 0 ^ 31 = 5 bits + assert_eq!(results[6], 6); // 0 ^ 63 = 6 bits + assert_eq!(results[7], 7); // 0 ^ 127 = 7 bits + } + + #[test] + #[cfg(target_arch = "x86_64")] + fn test_avx2_max_distance() { + if !is_x86_feature_detected!("avx2") { + return; + } + + let query = 0u64; + let targets = vec![u64::MAX; 4]; + let mut results = vec![0u32; 4]; + + unsafe { + hamming_batch_avx2(query, &targets, &mut results); + } + + for &r in &results { + assert_eq!(r, 64); + } + } + + #[test] + #[cfg(target_arch = "x86_64")] + fn test_avx512_popcount() { + if !is_x86_feature_detected!("avx512vpopcntdq") || !is_x86_feature_detected!("avx512f") { + return; + } + + let query = 0u64; + let targets = vec![0u64, 1, 3, 7, 15, 31, 63, 127]; + let mut results = vec![0u32; 8]; + + unsafe { + hamming_batch_avx512(query, &targets, &mut results); + } + + assert_eq!(results[0], 0); + assert_eq!(results[1], 1); + assert_eq!(results[2], 2); + assert_eq!(results[3], 3); + assert_eq!(results[4], 4); + assert_eq!(results[5], 5); + assert_eq!(results[6], 6); + assert_eq!(results[7], 7); + } + + // ========================================================================= + // Additional clustering tests + // ========================================================================= + + #[test] + fn test_union_find_path_compression() { + let mut uf = UnionFind::new(); + + // Create a chain: 1 -> 2 -> 3 -> 4 -> 5 + uf.union(4, 5); + uf.union(3, 4); + uf.union(2, 3); + uf.union(1, 2); + + // All should have the same root + let root = uf.find(1); + assert_eq!(uf.find(2), root); + assert_eq!(uf.find(3), root); + assert_eq!(uf.find(4), root); + assert_eq!(uf.find(5), root); + } + + #[test] + fn test_cluster_edges_single_cluster() { + // All connected: 1-2-3-4-5 + let edges = vec![(1, 2), (2, 3), (3, 4), (4, 5)]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 1); + let cluster = &result.clusters[0]; + assert_eq!(cluster.representative, 1); + assert_eq!(cluster.duplicates.len(), 4); + assert_eq!(cluster.size(), 5); + } + + #[test] + fn test_cluster_edges_no_duplicates() { + // No edges means no clusters + let edges: Vec<(u64, u64)> = vec![]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 0); + assert_eq!(result.num_duplicates(), 0); + } + + #[test] + fn test_cluster_edges_self_loop() { + // Self-loop shouldn't create a cluster (size 1) + let edges = vec![(1, 1), (2, 3)]; + let result = cluster_edges(edges); + + // Only {2,3} should be a cluster + assert_eq!(result.num_clusters(), 1); + assert_eq!(result.clusters[0].representative, 2); + } + + #[test] + fn test_cluster_edges_duplicate_edges() { + // Duplicate edges should be handled correctly + let edges = vec![(1, 2), (1, 2), (2, 3), (2, 3), (3, 1)]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 1); + assert_eq!(result.clusters[0].size(), 3); + } + + #[test] + fn test_cluster_edges_large() { + // Create 100 clusters of size 10 each + let mut edges = Vec::new(); + for cluster_id in 0..100u64 { + let base = cluster_id * 10; + for i in 0..9 { + edges.push((base + i, base + i + 1)); + } + } + + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 100); + for cluster in &result.clusters { + assert_eq!(cluster.size(), 10); + assert_eq!(cluster.duplicates.len(), 9); + } + } + + #[test] + fn test_cluster_edges_random_order() { + // Same edges in different order should produce same result + let edges1 = vec![(1, 2), (2, 3), (4, 5), (3, 4)]; + let edges2 = vec![(4, 5), (1, 2), (3, 4), (2, 3)]; + let edges3 = vec![(3, 4), (4, 5), (2, 3), (1, 2)]; + + let r1 = cluster_edges(edges1); + let r2 = cluster_edges(edges2); + let r3 = cluster_edges(edges3); + + // All should produce the same single cluster + assert_eq!(r1.num_clusters(), 1); + assert_eq!(r2.num_clusters(), 1); + assert_eq!(r3.num_clusters(), 1); + + assert_eq!(r1.clusters[0].representative, 1); + assert_eq!(r2.clusters[0].representative, 1); + assert_eq!(r3.clusters[0].representative, 1); + + assert_eq!(r1.clusters[0].size(), 5); + assert_eq!(r2.clusters[0].size(), 5); + assert_eq!(r3.clusters[0].size(), 5); + } + + #[test] + fn test_cluster_edges_non_contiguous_ids() { + // Row IDs don't need to be contiguous + let edges = vec![(100, 200), (200, 500), (1000, 2000)]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 2); + + let c1 = result + .clusters + .iter() + .find(|c| c.representative == 100) + .unwrap(); + assert_eq!(c1.duplicates, vec![200, 500]); + + let c2 = result + .clusters + .iter() + .find(|c| c.representative == 1000) + .unwrap(); + assert_eq!(c2.duplicates, vec![2000]); + } + + #[test] + fn test_cluster_representative_is_minimum() { + // Representative should always be the minimum row ID in cluster + let edges = vec![ + (5, 3), + (3, 7), + (7, 1), // 1 is minimum + (100, 50), + (50, 75), // 50 is minimum + ]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 2); + + let c1 = result + .clusters + .iter() + .find(|c| c.duplicates.contains(&7)) + .unwrap(); + assert_eq!(c1.representative, 1); + + let c2 = result + .clusters + .iter() + .find(|c| c.duplicates.contains(&100)) + .unwrap(); + assert_eq!(c2.representative, 50); + } + + #[test] + fn test_cluster_duplicates_sorted() { + // Duplicates should be sorted + let edges = vec![(1, 5), (1, 3), (1, 7), (1, 2)]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 1); + assert_eq!(result.clusters[0].representative, 1); + assert_eq!(result.clusters[0].duplicates, vec![2, 3, 5, 7]); + } + + #[test] + fn test_clustering_result_stats() { + let edges = vec![ + (1, 2), + (2, 3), // cluster of 3 + (10, 20), + (20, 30), + (30, 40), // cluster of 4 + ]; + let result = cluster_edges(edges); + + assert_eq!(result.num_clusters(), 2); + assert_eq!(result.num_duplicates(), 5); // 2 + 3 + assert_eq!(result.num_unique(), 2); + } } diff --git a/rust/lance/Cargo.toml b/rust/lance/Cargo.toml index 440c3fb301a..6586c928de7 100644 --- a/rust/lance/Cargo.toml +++ b/rust/lance/Cargo.toml @@ -300,5 +300,9 @@ harness = false name = "concurrent_append" harness = false +[[bench]] +name = "hamming" +harness = false + [lints] workspace = true diff --git a/rust/lance/benches/hamming.rs b/rust/lance/benches/hamming.rs new file mode 100644 index 00000000000..7e926a795db --- /dev/null +++ b/rust/lance/benches/hamming.rs @@ -0,0 +1,228 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Benchmark for hamming distance clustering. +//! +//! This benchmark tests the pairwise hamming distance computation and clustering +//! performance at various scales. +//! +//! Run with: cargo bench -p lance --bench hamming +//! +//! Environment variables: +//! - DATASET_URI: Path to a dataset with a hash column (optional, generates random if not set) +//! - HASH_COLUMN: Name of the hash column (default: "hash") +//! - SAMPLE_SIZE: Number of rows to sample (default: 10000) +//! - THRESHOLD: Hamming distance threshold (default: 10) + +#![allow(clippy::print_stdout)] + +use std::env; +use std::sync::Arc; +use std::time::Instant; + +use arrow_array::{FixedSizeListArray, RecordBatch, RecordBatchIterator, UInt8Array}; +use arrow_schema::{DataType, Field, FieldRef, Schema}; +use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main}; +use lance_arrow::FixedSizeListArrayExt; +use rand::Rng; + +use lance::index::vector::hamming::{ + hamming_clustering_for_sample, hamming_clustering_from_hashes, +}; +use lance::{Dataset, dataset::WriteParams}; +use lance_linalg::distance::pairwise_hamming_distance_parallel; + +#[cfg(target_os = "linux")] +use lance_testing::pprof::{Output, PProfProfiler}; + +/// Generate random 64-bit hashes. +fn generate_random_hashes(n: usize) -> Vec { + let mut rng = rand::rng(); + (0..n).map(|_| rng.random()).collect() +} + +/// Generate random hash dataset as Arrow arrays. +fn generate_hash_batch(num_rows: usize) -> RecordBatch { + let mut rng = rand::rng(); + + // Generate random bytes for the hashes (8 bytes per hash) + let bytes: Vec = (0..num_rows * 8).map(|_| rng.random()).collect(); + let values = UInt8Array::from(bytes); + + let hash_array = FixedSizeListArray::try_new_from_values(values, 8).unwrap(); + + let schema = Arc::new(Schema::new(vec![Field::new( + "hash", + DataType::FixedSizeList(FieldRef::new(Field::new("item", DataType::UInt8, true)), 8), + false, + )])); + + RecordBatch::try_new(schema, vec![Arc::new(hash_array)]).unwrap() +} + +/// Create a test dataset with random hashes. +async fn create_hash_dataset(path: &std::path::Path, num_rows: usize) { + let batch = generate_hash_batch(num_rows); + let schema = batch.schema(); + + let write_params = WriteParams { + max_rows_per_file: num_rows, + max_rows_per_group: 10_000, + ..Default::default() + }; + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + Dataset::write(reader, path.to_str().unwrap(), Some(write_params)) + .await + .unwrap(); +} + +/// Benchmark pure pairwise hamming computation (no I/O). +fn bench_pairwise_compute(c: &mut Criterion) { + let mut group = c.benchmark_group("hamming_pairwise_compute"); + + for size in [1_000, 5_000, 10_000, 20_000] { + let hashes = generate_random_hashes(size); + let total_pairs = (size as u64) * (size as u64 - 1) / 2; + + group.throughput(Throughput::Elements(total_pairs)); + group.bench_with_input(BenchmarkId::new("parallel", size), &hashes, |b, hashes| { + b.iter(|| { + pairwise_hamming_distance_parallel(hashes, None, Some(10)); + }); + }); + } + + group.finish(); +} + +/// Benchmark full clustering pipeline (compute + cluster). +fn bench_cluster_hashes(c: &mut Criterion) { + let mut group = c.benchmark_group("hamming_cluster"); + + for size in [1_000, 5_000, 10_000] { + let hashes = generate_random_hashes(size); + + group.bench_with_input( + BenchmarkId::new("full_pipeline", size), + &hashes, + |b, hashes| { + b.iter(|| { + hamming_clustering_from_hashes(hashes, None, 10); + }); + }, + ); + } + + group.finish(); +} + +/// Benchmark with dataset I/O (if DATASET_URI is set). +fn bench_dataset_cluster(c: &mut Criterion) { + let rt = tokio::runtime::Runtime::new().unwrap(); + + // Check if we should use an external dataset + let dataset_uri = env::var("DATASET_URI").ok(); + let hash_column = env::var("HASH_COLUMN").unwrap_or_else(|_| "hash".to_string()); + let sample_size: usize = env::var("SAMPLE_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(10_000); + let threshold: u32 = env::var("THRESHOLD") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(10); + + let mut group = c.benchmark_group("hamming_dataset"); + + if let Some(uri) = dataset_uri { + // Use external dataset + println!("Using external dataset: {}", uri); + println!( + "Column: {}, Sample: {}, Threshold: {}", + hash_column, sample_size, threshold + ); + + let dataset = rt.block_on(async { Dataset::open(&uri).await.unwrap() }); + + group.bench_function(format!("external_sample_{}", sample_size), |b| { + b.to_async(&rt).iter(|| async { + hamming_clustering_for_sample(&dataset, &hash_column, Some(sample_size), threshold) + .await + .unwrap() + }); + }); + } else { + // Create temporary dataset with random hashes + let temp_dir = tempfile::tempdir().unwrap(); + let uri = temp_dir.path().join("bench_hashes.lance"); + + rt.block_on(async { + create_hash_dataset(&uri, 100_000).await; + }); + + let dataset = rt.block_on(async { Dataset::open(uri.to_str().unwrap()).await.unwrap() }); + + for sample in [1_000, 5_000, 10_000] { + group.bench_function(format!("generated_sample_{}", sample), |b| { + let ds = dataset.clone(); + b.to_async(&rt).iter(|| { + let ds = ds.clone(); + async move { + hamming_clustering_for_sample(&ds, "hash", Some(sample), 10) + .await + .unwrap() + } + }); + }); + } + } + + group.finish(); +} + +/// Quick standalone benchmark that prints results (for quick testing). +#[allow(dead_code)] +fn run_quick_bench() { + println!("=== Hamming Distance Clustering Benchmark ===\n"); + + let sizes = [1_000, 5_000, 10_000, 20_000]; + + for &size in &sizes { + let hashes = generate_random_hashes(size); + let total_pairs = (size as u64) * (size as u64 - 1) / 2; + + println!("Size: {} rows, {} pairs", size, total_pairs); + let start = Instant::now(); + let reader = hamming_clustering_from_hashes(&hashes, None, 10); + // Consume the reader to count clusters + let cluster_count: usize = reader.map(|b| b.unwrap().num_rows()).sum(); + let elapsed = start.elapsed(); + + let pairs_per_sec = total_pairs as f64 / elapsed.as_secs_f64(); + println!( + " Total time: {:?} ({:.2}M pairs/sec)", + elapsed, + pairs_per_sec / 1_000_000.0 + ); + println!(" Total clusters: {}", cluster_count); + println!(); + } +} + +#[cfg(target_os = "linux")] +criterion_group! { + name = benches; + config = Criterion::default().with_profiler(PProfProfiler::new(100, Output::Flamegraph(None))); + targets = bench_pairwise_compute, bench_cluster_hashes, bench_dataset_cluster +} + +#[cfg(not(target_os = "linux"))] +criterion_group!( + benches, + bench_pairwise_compute, + bench_cluster_hashes, + bench_dataset_cluster +); + +criterion_main!(benches); diff --git a/rust/lance/src/index/vector.rs b/rust/lance/src/index/vector.rs index 0eb66ea2ede..af48bc94c41 100644 --- a/rust/lance/src/index/vector.rs +++ b/rust/lance/src/index/vector.rs @@ -9,6 +9,7 @@ use std::{any::Any, collections::HashMap}; pub mod builder; pub(crate) mod details; +pub mod hamming; pub mod ivf; pub mod pq; pub mod utils; diff --git a/rust/lance/src/index/vector/hamming.rs b/rust/lance/src/index/vector/hamming.rs new file mode 100644 index 00000000000..ba6ea98c42d --- /dev/null +++ b/rust/lance/src/index/vector/hamming.rs @@ -0,0 +1,938 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright The Lance Authors + +//! Hamming distance clustering for IVF_FLAT indices. +//! +//! This module provides functionality to perform pairwise hamming distance +//! computation and clustering on specific partitions of IVF_FLAT indices. + +use std::time::Instant; + +use arrow_array::RecordBatchReader; +use arrow_array::cast::AsArray; +use arrow_array::types::UInt64Type; +use arrow_schema::DataType; +use lance_core::{Error, Result}; +use lance_index::metrics::NoOpMetricsCollector; +use lance_index::vector::VectorIndex; +use lance_index::vector::flat::index::{FlatBinQuantizer, FlatIndex}; +use lance_index::vector::flat::storage::FLAT_COLUMN; +use lance_index::vector::storage::VectorStore; +use lance_linalg::distance::{ + ClusteringResult, cluster_pairwise_result, extract_hashes_from_fixed_list, + pairwise_hamming_distance_parallel, +}; +use rand::rng; +use rand::seq::index::sample; + +use crate::dataset::Dataset; +use crate::index::{DatasetIndexExt, DatasetIndexInternalExt}; + +use super::ivf::v2::IVFIndex; + +/// Perform pairwise hamming distance clustering on a partition of an IVF_FLAT index. +/// +/// This function loads a specific partition from an IVF_FLAT index on a hash column, +/// computes pairwise hamming distances between all hashes in the partition, +/// filters by threshold, and clusters the results using union-find. +/// +/// # Arguments +/// +/// * `dataset` - The Lance dataset +/// * `index_name` - Name of the IVF_FLAT index on the hash column +/// * `partition_id` - The partition ID within the IVF_FLAT index +/// * `hamming_threshold` - Maximum hamming distance to consider as similar +/// +/// # Returns +/// +/// A `RecordBatchReader` yielding batches with columns: +/// - `representative`: UInt64 - The representative row ID for each cluster +/// - `duplicates`: `List` - List of duplicate row IDs in each cluster +/// +/// # Errors +/// +/// Returns an error if: +/// - The index doesn't exist or is not an IVF_FLAT index +/// - The indexed column has wrong type (must be `FixedSizeList`) +/// - The partition ID is out of range +pub async fn hamming_clustering_for_ivf_partition( + dataset: &Dataset, + index_name: &str, + partition_id: usize, + hamming_threshold: u32, +) -> Result> { + // Load indices and find the IVF_FLAT index + let indices = dataset.load_indices().await?; + let index_meta = indices + .iter() + .find(|idx| idx.name == index_name) + .ok_or_else(|| { + Error::invalid_input(format!("Index '{}' not found on dataset", index_name)) + })?; + + // Get the column name from the index metadata + let schema = dataset.schema(); + let field_id = index_meta + .fields + .first() + .ok_or_else(|| Error::invalid_input(format!("Index '{}' has no fields", index_name)))?; + let field = schema.field_by_id(*field_id).ok_or_else(|| { + Error::invalid_input(format!( + "Field with id {} not found in schema for index '{}'", + field_id, index_name + )) + })?; + let column = &field.name; + + // Check column is FixedSizeList + let data_type = field.data_type(); + match data_type { + DataType::FixedSizeList(inner, 8) => { + if *inner.data_type() != DataType::UInt8 { + return Err(Error::invalid_input(format!( + "Column '{}' must be FixedSizeList, got FixedSizeList<{:?}, 8>", + column, + inner.data_type() + ))); + } + } + _ => { + return Err(Error::invalid_input(format!( + "Column '{}' must be FixedSizeList, got {:?}", + column, data_type + ))); + } + } + + // Open the vector index + let index = dataset + .open_vector_index(column, &index_meta.uuid, &NoOpMetricsCollector) + .await?; + + // Try to downcast to IVFIndex (IVF_FLAT for binary data) + let ivf_index = index + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + Error::invalid_input(format!( + "Index '{}' is not an IVF_FLAT index for binary data", + index_name + )) + })?; + + // Check partition ID is valid + let num_partitions = ivf_index.ivf_model().num_partitions(); + if partition_id >= num_partitions { + return Err(Error::invalid_input(format!( + "Partition ID {} is out of range (0..{})", + partition_id, num_partitions + ))); + } + + // Load the partition storage + let storage = ivf_index.load_partition_storage(partition_id, None).await?; + + // Get row IDs + let row_id_slice: Vec = storage.row_ids().copied().collect(); + + if row_id_slice.is_empty() { + let empty = ClusteringResult { + clusters: Vec::new(), + }; + return Ok(empty.into_reader(None)); + } + + // Get vectors from the storage batches + let batches: Vec<_> = storage.to_batches()?.collect(); + if batches.is_empty() { + let empty = ClusteringResult { + clusters: Vec::new(), + }; + return Ok(empty.into_reader(None)); + } + + // Extract the hash vectors from the FLAT_COLUMN + let mut all_hashes = Vec::new(); + for batch in &batches { + let vectors = batch + .column_by_name(FLAT_COLUMN) + .ok_or_else(|| { + Error::invalid_input(format!("Column '{}' not found in storage", FLAT_COLUMN)) + })? + .as_fixed_size_list(); + let hashes = extract_hashes_from_fixed_list(vectors)?; + all_hashes.extend(hashes); + } + + // Compute pairwise hamming distances with threshold filtering + let pairwise_result = pairwise_hamming_distance_parallel( + &all_hashes, + Some(&row_id_slice), + Some(hamming_threshold), + ); + + // Cluster the results + let clustering = cluster_pairwise_result(&pairwise_result); + + Ok(clustering.into_reader(None)) +} + +/// Get partition statistics for an IVF_FLAT index. +pub async fn get_ivf_partition_info( + dataset: &Dataset, + index_name: &str, +) -> Result> { + let indices = dataset.load_indices().await?; + let index_meta = indices + .iter() + .find(|idx| idx.name == index_name) + .ok_or_else(|| { + Error::invalid_input(format!("Index '{}' not found on dataset", index_name)) + })?; + + // Get the column name from the index metadata + let schema = dataset.schema(); + let field_id = index_meta + .fields + .first() + .ok_or_else(|| Error::invalid_input(format!("Index '{}' has no fields", index_name)))?; + let field = schema.field_by_id(*field_id).ok_or_else(|| { + Error::invalid_input(format!( + "Field with id {} not found in schema for index '{}'", + field_id, index_name + )) + })?; + let column = &field.name; + + let index = dataset + .open_vector_index(column, &index_meta.uuid, &NoOpMetricsCollector) + .await?; + + let ivf_index = index + .as_any() + .downcast_ref::>() + .ok_or_else(|| { + Error::invalid_input(format!( + "Index '{}' is not an IVF_FLAT index for binary data", + index_name + )) + })?; + + let num_partitions = ivf_index.ivf_model().num_partitions(); + let mut partition_infos = Vec::with_capacity(num_partitions); + + for i in 0..num_partitions { + partition_infos.push(PartitionInfo { + partition_id: i, + size: ivf_index.ivf_model().partition_size(i), + }); + } + + Ok(partition_infos) +} + +/// Information about an IVF partition. +#[derive(Debug, Clone)] +pub struct PartitionInfo { + pub partition_id: usize, + pub size: usize, +} + +/// Perform pairwise hamming distance clustering on sampled rows from a dataset. +/// +/// This function samples N rows randomly from the dataset, extracts hashes, +/// computes pairwise hamming distances, and clusters the results. +/// It's useful for benchmarking and testing without requiring an IVF index. +/// +/// # Arguments +/// +/// * `dataset` - The Lance dataset +/// * `column` - Name of the hash column (must be `FixedSizeList`) +/// * `sample_size` - Number of rows to sample (if None or >= total rows, uses all rows) +/// * `hamming_threshold` - Maximum hamming distance to consider as similar +/// +/// # Returns +/// +/// A `RecordBatchReader` yielding batches with columns: +/// - `representative`: UInt64 - The representative row ID for each cluster +/// - `duplicates`: `List` - List of duplicate row IDs in each cluster +pub async fn hamming_clustering_for_sample( + dataset: &Dataset, + column: &str, + sample_size: Option, + hamming_threshold: u32, +) -> Result> { + // Validate column exists and has correct type + let schema = dataset.schema(); + let field = schema.field(column).ok_or_else(|| { + Error::invalid_input(format!("Column '{}' not found in dataset schema", column)) + })?; + + // Check column is FixedSizeList + let data_type = field.data_type(); + match data_type { + DataType::FixedSizeList(inner, 8) => { + if *inner.data_type() != DataType::UInt8 { + return Err(Error::invalid_input(format!( + "Column '{}' must be FixedSizeList, got FixedSizeList<{:?}, 8>", + column, + inner.data_type() + ))); + } + } + _ => { + return Err(Error::invalid_input(format!( + "Column '{}' must be FixedSizeList, got {:?}", + column, data_type + ))); + } + } + + // Get total row count + let total_rows: usize = dataset + .get_fragments() + .iter() + .filter_map(|f| f.metadata().physical_rows) + .sum(); + + let use_sampling = sample_size.is_some_and(|s| s < total_rows); + let effective_sample = sample_size.unwrap_or(total_rows).min(total_rows); + + // Read data + let (hashes, row_ids) = if use_sampling { + // Random sample using take() with _rowid (take uses positional indices) + let indices: Vec = sample(&mut rng(), total_rows, effective_sample) + .iter() + .map(|i| i as u64) + .collect(); + + let batch = dataset + .take( + &indices, + crate::dataset::ProjectionRequest::from_columns( + [column, "_rowid"], + dataset.schema(), + ), + ) + .await?; + + let rowid_col = batch.column_by_name("_rowid").ok_or_else(|| { + Error::invalid_input("_rowid column not found in take result".to_string()) + })?; + let row_ids = rowid_col.as_primitive::(); + let row_id_vec: Vec = row_ids.values().to_vec(); + + let hash_col = batch.column_by_name(column).ok_or_else(|| { + Error::invalid_input(format!("Column '{}' not found in result", column)) + })?; + let hashes_arr = hash_col.as_fixed_size_list(); + let hashes = extract_hashes_from_fixed_list(hashes_arr)?; + + (hashes, row_id_vec) + } else { + // Full scan + let batch = dataset + .scan() + .project(&[column])? + .with_row_id() + .try_into_batch() + .await?; + + let rowid_col = batch.column_by_name("_rowid").ok_or_else(|| { + Error::invalid_input("_rowid column not found in scan result".to_string()) + })?; + let row_ids = rowid_col.as_primitive::(); + let row_id_vec: Vec = row_ids.values().to_vec(); + + let hash_col = batch.column_by_name(column).ok_or_else(|| { + Error::invalid_input(format!("Column '{}' not found in result", column)) + })?; + let hashes_arr = hash_col.as_fixed_size_list(); + let hashes = extract_hashes_from_fixed_list(hashes_arr)?; + + (hashes, row_id_vec) + }; + + if hashes.len() < 2 { + let empty = ClusteringResult { + clusters: Vec::new(), + }; + return Ok(empty.into_reader(None)); + } + + // Compute pairwise hamming distances + let pairwise = + pairwise_hamming_distance_parallel(&hashes, Some(&row_ids), Some(hamming_threshold)); + + // Cluster edges + let clustering = cluster_pairwise_result(&pairwise); + + Ok(clustering.into_reader(None)) +} + +/// Perform pairwise hamming distance clustering on a contiguous range of rows from a fragment. +/// +/// This function reads a contiguous range of rows from a specific fragment, +/// extracts hashes, computes pairwise hamming distances, and clusters the results. +/// Unlike sampling, this reads sequential rows which is useful for distributed +/// processing where each worker handles a specific range of a fragment. +/// +/// # Arguments +/// +/// * `dataset` - The Lance dataset +/// * `column` - Name of the hash column (must be `FixedSizeList`) +/// * `fragment_id` - The fragment ID to read from +/// * `start_row` - The starting row offset within the fragment +/// * `num_rows` - Number of rows to read from the start position +/// * `hamming_threshold` - Maximum hamming distance to consider as similar +/// +/// # Returns +/// +/// A `RecordBatchReader` yielding batches with columns: +/// - `representative`: UInt64 - The representative row ID for each cluster +/// - `duplicates`: `List` - List of duplicate row IDs in each cluster +/// +/// # Errors +/// +/// Returns an error if: +/// - The fragment doesn't exist +/// - The column has wrong type (must be `FixedSizeList`) +/// - The row range is out of bounds +pub async fn hamming_clustering_for_range( + dataset: &Dataset, + column: &str, + fragment_id: usize, + start_row: usize, + num_rows: usize, + hamming_threshold: u32, +) -> Result> { + // Validate column exists and has correct type + let schema = dataset.schema(); + let field = schema.field(column).ok_or_else(|| { + Error::invalid_input(format!("Column '{}' not found in dataset schema", column)) + })?; + + // Check column is FixedSizeList + let data_type = field.data_type(); + match data_type { + DataType::FixedSizeList(inner, 8) => { + if *inner.data_type() != DataType::UInt8 { + return Err(Error::invalid_input(format!( + "Column '{}' must be FixedSizeList, got FixedSizeList<{:?}, 8>", + column, + inner.data_type() + ))); + } + } + _ => { + return Err(Error::invalid_input(format!( + "Column '{}' must be FixedSizeList, got {:?}", + column, data_type + ))); + } + } + + // Get the fragment + let fragment = dataset.get_fragment(fragment_id).ok_or_else(|| { + Error::invalid_input(format!("Fragment with ID {} not found", fragment_id)) + })?; + + // Get fragment metadata for physical row count + let fragment_meta = fragment.metadata().clone(); + let physical_rows = fragment_meta + .physical_rows + .ok_or_else(|| Error::invalid_input("Fragment has no physical_rows metadata"))?; + + // Validate the range + if start_row >= physical_rows { + return Err(Error::invalid_input(format!( + "start_row {} is out of range for fragment with {} physical rows", + start_row, physical_rows + ))); + } + + // Adjust num_rows if it exceeds available rows + let effective_num_rows = num_rows.min(physical_rows - start_row); + + if effective_num_rows == 0 { + let empty = ClusteringResult { + clusters: Vec::new(), + }; + return Ok(empty.into_reader(None)); + } + + // Use scanner with the specific fragment and limit/offset + let batch = dataset + .scan() + .with_fragments(vec![fragment_meta]) + .project(&[column])? + .with_row_id() + .limit(Some(effective_num_rows as i64), Some(start_row as i64))? + .try_into_batch() + .await?; + + // Extract row IDs + let rowid_col = batch.column_by_name("_rowid").ok_or_else(|| { + Error::invalid_input("_rowid column not found in scan result".to_string()) + })?; + let row_ids = rowid_col.as_primitive::(); + let row_id_vec: Vec = row_ids.values().to_vec(); + + // Extract hashes + let hash_col = batch + .column_by_name(column) + .ok_or_else(|| Error::invalid_input(format!("Column '{}' not found in result", column)))?; + let hashes_arr = hash_col.as_fixed_size_list(); + let hashes = extract_hashes_from_fixed_list(hashes_arr)?; + + if hashes.len() < 2 { + let empty = ClusteringResult { + clusters: Vec::new(), + }; + return Ok(empty.into_reader(None)); + } + + // Compute pairwise hamming distances + let pairwise = + pairwise_hamming_distance_parallel(&hashes, Some(&row_id_vec), Some(hamming_threshold)); + + // Cluster edges + let clustering = cluster_pairwise_result(&pairwise); + + Ok(clustering.into_reader(None)) +} + +/// Perform pairwise hamming distance clustering on provided hashes (no I/O). +/// +/// This is useful for benchmarking the pure compute performance without I/O. +/// Logs timing information via tracing. +/// +/// # Arguments +/// +/// * `hashes` - Vector of 64-bit hash values +/// * `row_ids` - Optional row IDs (defaults to indices if None) +/// * `hamming_threshold` - Maximum hamming distance to consider as similar +/// +/// # Returns +/// +/// A `RecordBatchReader` yielding batches with columns: +/// - `representative`: UInt64 - The representative row ID for each cluster +/// - `duplicates`: `List` - List of duplicate row IDs in each cluster +pub fn hamming_clustering_from_hashes( + hashes: &[u64], + row_ids: Option<&[u64]>, + hamming_threshold: u32, +) -> Box { + let num_rows = hashes.len(); + if num_rows < 2 { + let empty = ClusteringResult { + clusters: Vec::new(), + }; + return empty.into_reader(None); + } + + let total_pairs = (num_rows as u64) * (num_rows as u64 - 1) / 2; + + // Compute pairwise hamming distances + let t_compute_start = Instant::now(); + let pairwise = pairwise_hamming_distance_parallel(hashes, row_ids, Some(hamming_threshold)); + let compute_time = t_compute_start.elapsed(); + + // Cluster edges + let t_cluster_start = Instant::now(); + let clustering = cluster_pairwise_result(&pairwise); + let cluster_time = t_cluster_start.elapsed(); + + // Log timing info + let pairs_per_sec = if compute_time.as_secs_f64() > 0.0 { + total_pairs as f64 / compute_time.as_secs_f64() + } else { + 0.0 + }; + tracing::info!( + num_rows, + total_pairs, + edges = pairwise.len(), + compute_time_ms = compute_time.as_millis(), + cluster_time_ms = cluster_time.as_millis(), + pairs_per_sec_millions = pairs_per_sec / 1_000_000.0, + num_clusters = clustering.num_clusters(), + num_duplicates = clustering.num_duplicates(), + "Hamming clustering completed" + ); + + clustering.into_reader(None) +} + +#[cfg(test)] +mod tests { + use super::*; + use arrow_array::Array; + + /// Helper to collect all clusters from a reader. + fn collect_clusters(reader: Box) -> Vec<(u64, Vec)> { + let mut clusters = Vec::new(); + for batch in reader { + let batch = batch.unwrap(); + let reps = batch + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + let dups = batch + .column(1) + .as_any() + .downcast_ref::() + .unwrap(); + + for i in 0..batch.num_rows() { + let rep = reps.value(i); + let dup_arr = dups.value(i); + let dup_values = dup_arr + .as_any() + .downcast_ref::() + .unwrap(); + let duplicates: Vec = dup_values.values().to_vec(); + clusters.push((rep, duplicates)); + } + } + clusters + } + + #[test] + fn test_hamming_clustering_from_hashes_basic() { + // Create some test hashes with known distances + let hashes = vec![ + 0b0000u64, // hash 0 + 0b0001u64, // hash 1 - distance 1 from hash 0 + 0b0011u64, // hash 2 - distance 1 from hash 1, distance 2 from hash 0 + 0b1111u64, // hash 3 - distance 2 from hash 2, distance 4 from hash 0 + ]; + + let reader = hamming_clustering_from_hashes(&hashes, None, 1); + let clusters = collect_clusters(reader); + + // With threshold 1, pairs (0,1) and (1,2) should be connected + // This forms one cluster: {0, 1, 2} + assert_eq!(clusters.len(), 1); + assert_eq!(clusters[0].1.len(), 2); // 2 duplicates in the cluster + } + + #[test] + fn test_hamming_clustering_from_hashes_no_clusters() { + // All hashes are far apart + let hashes = vec![ + 0x0000000000000000u64, + 0xFFFFFFFFFFFFFFFFu64, + 0xAAAAAAAAAAAAAAAAu64, + ]; + + let reader = hamming_clustering_from_hashes(&hashes, None, 5); + let clusters = collect_clusters(reader); + + // With threshold 5, no pairs should be connected (min distance is 32) + assert_eq!(clusters.len(), 0); + } + + #[test] + fn test_hamming_clustering_from_hashes_with_row_ids() { + let hashes = vec![0b0000u64, 0b0001u64]; + let row_ids = vec![100u64, 200u64]; + + let reader = hamming_clustering_from_hashes(&hashes, Some(&row_ids), 1); + let clusters = collect_clusters(reader); + + assert_eq!(clusters.len(), 1); + assert_eq!(clusters[0].0, 100); // representative + assert_eq!(clusters[0].1, vec![200]); // duplicates + } + + #[tokio::test] + async fn test_hamming_clustering_for_ivf_partition() { + use arrow_array::{FixedSizeListArray, RecordBatchIterator, UInt8Array}; + use arrow_schema::{Field, Schema}; + use lance_arrow::FixedSizeListArrayExt; + use lance_index::vector::ivf::IvfBuildParams; + use std::sync::Arc; + use tempfile::tempdir; + + // Create test data with hash column (FixedSizeList) + let schema = Arc::new(Schema::new(vec![Field::new( + "hash", + arrow_schema::DataType::FixedSizeList( + Arc::new(Field::new("item", arrow_schema::DataType::UInt8, true)), + 8, + ), + false, + )])); + + // Generate hashes with some duplicates (similar hashes) + let num_rows = 100; + let mut hash_bytes = Vec::with_capacity(num_rows * 8); + for i in 0..num_rows { + // Create groups of similar hashes + let base = (i / 10) as u64; // 10 groups + let variation = (i % 10) as u64; + let hash = base.wrapping_mul(0x123456789) ^ variation; + hash_bytes.extend_from_slice(&hash.to_le_bytes()); + } + let values = UInt8Array::from(hash_bytes); + let hash_array = + FixedSizeListArray::try_new_from_values(values, 8).expect("create hash array"); + + let batch = + arrow_array::RecordBatch::try_new(schema.clone(), vec![Arc::new(hash_array)]).unwrap(); + + // Write dataset + let temp_dir = tempdir().unwrap(); + let uri = temp_dir.path().to_str().unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let mut dataset = crate::Dataset::write(reader, uri, None).await.unwrap(); + + // Create IVF_FLAT index with 4 partitions + let ivf_params = IvfBuildParams::new(4); + let params = crate::index::vector::VectorIndexParams::with_ivf_flat_params( + lance_linalg::distance::MetricType::Hamming, + ivf_params, + ); + + dataset + .create_index( + &["hash"], + crate::index::IndexType::Vector, + None, + ¶ms, + false, + ) + .await + .unwrap(); + + // Load and test + let dataset = crate::Dataset::open(uri).await.unwrap(); + let indices = dataset.load_indices().await.unwrap(); + let index_name = &indices[0].name; + + // Test clustering on partition 0 + let reader = hamming_clustering_for_ivf_partition(&dataset, index_name, 0, 10) + .await + .unwrap(); + let clusters = collect_clusters(reader); + + // Verify we get valid results (may or may not have clusters depending on data distribution) + // At minimum, verify no panics and valid schema + for (rep, dups) in &clusters { + assert!(*rep < num_rows as u64 * 10); // row IDs should be reasonable + for dup in dups { + assert!(*dup < num_rows as u64 * 10); + } + } + } + + #[tokio::test] + async fn test_hamming_clustering_for_ivf_partition_invalid_index() { + use arrow_array::{FixedSizeListArray, RecordBatchIterator, UInt8Array}; + use arrow_schema::{Field, Schema}; + use lance_arrow::FixedSizeListArrayExt; + use std::sync::Arc; + use tempfile::tempdir; + + let schema = Arc::new(Schema::new(vec![Field::new( + "hash", + arrow_schema::DataType::FixedSizeList( + Arc::new(Field::new("item", arrow_schema::DataType::UInt8, true)), + 8, + ), + false, + )])); + + let values = UInt8Array::from(vec![0u8; 80]); // 10 rows * 8 bytes + let hash_array = FixedSizeListArray::try_new_from_values(values, 8).unwrap(); + let batch = + arrow_array::RecordBatch::try_new(schema.clone(), vec![Arc::new(hash_array)]).unwrap(); + + let temp_dir = tempdir().unwrap(); + let uri = temp_dir.path().to_str().unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + let dataset = crate::Dataset::write(reader, uri, None).await.unwrap(); + + // Test with non-existent index + let result = hamming_clustering_for_ivf_partition(&dataset, "nonexistent", 0, 10).await; + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("not found"), "Error: {}", err); + } + + #[tokio::test] + async fn test_hamming_clustering_for_sample_integration() { + use arrow_array::{FixedSizeListArray, RecordBatchIterator, UInt8Array}; + use arrow_schema::{Field, Schema}; + use lance_arrow::FixedSizeListArrayExt; + use std::sync::Arc; + use tempfile::tempdir; + + let schema = Arc::new(Schema::new(vec![Field::new( + "hash", + arrow_schema::DataType::FixedSizeList( + Arc::new(Field::new("item", arrow_schema::DataType::UInt8, true)), + 8, + ), + false, + )])); + + // Create 50 rows with some duplicate hashes + let num_rows = 50; + let mut hash_bytes = Vec::with_capacity(num_rows * 8); + for i in 0..num_rows { + // Create some identical hashes (groups of 5) + let hash = (i / 5) as u64; + hash_bytes.extend_from_slice(&hash.to_le_bytes()); + } + let values = UInt8Array::from(hash_bytes); + let hash_array = FixedSizeListArray::try_new_from_values(values, 8).unwrap(); + let batch = + arrow_array::RecordBatch::try_new(schema.clone(), vec![Arc::new(hash_array)]).unwrap(); + + let temp_dir = tempdir().unwrap(); + let uri = temp_dir.path().to_str().unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + crate::Dataset::write(reader, uri, None).await.unwrap(); + + let dataset = crate::Dataset::open(uri).await.unwrap(); + + // Test full scan (no sampling) + let reader = hamming_clustering_for_sample(&dataset, "hash", None, 0) + .await + .unwrap(); + let clusters = collect_clusters(reader); + + // With threshold 0 (exact match) and groups of 5 identical hashes, + // we should have 10 clusters with 4 duplicates each + assert_eq!(clusters.len(), 10); + for (_, dups) in &clusters { + assert_eq!(dups.len(), 4); + } + + // Test with sampling + let reader = hamming_clustering_for_sample(&dataset, "hash", Some(20), 0) + .await + .unwrap(); + let clusters = collect_clusters(reader); + // With sampling, we may get fewer clusters + assert!(clusters.len() <= 10); + } + + #[tokio::test] + async fn test_hamming_clustering_for_range_integration() { + use arrow_array::{FixedSizeListArray, RecordBatchIterator, UInt8Array}; + use arrow_schema::{Field, Schema}; + use lance_arrow::FixedSizeListArrayExt; + use std::sync::Arc; + use tempfile::tempdir; + + let schema = Arc::new(Schema::new(vec![Field::new( + "hash", + arrow_schema::DataType::FixedSizeList( + Arc::new(Field::new("item", arrow_schema::DataType::UInt8, true)), + 8, + ), + false, + )])); + + // Create 50 rows with some duplicate hashes (groups of 5 identical hashes) + let num_rows = 50; + let mut hash_bytes = Vec::with_capacity(num_rows * 8); + for i in 0..num_rows { + let hash = (i / 5) as u64; + hash_bytes.extend_from_slice(&hash.to_le_bytes()); + } + let values = UInt8Array::from(hash_bytes); + let hash_array = FixedSizeListArray::try_new_from_values(values, 8).unwrap(); + let batch = + arrow_array::RecordBatch::try_new(schema.clone(), vec![Arc::new(hash_array)]).unwrap(); + + let temp_dir = tempdir().unwrap(); + let uri = temp_dir.path().to_str().unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + crate::Dataset::write(reader, uri, None).await.unwrap(); + + let dataset = crate::Dataset::open(uri).await.unwrap(); + + // Get fragment info + let fragments = dataset.get_fragments(); + assert_eq!(fragments.len(), 1); + let fragment_id = fragments[0].id() as usize; + + // Test reading range from the fragment + // Reading rows 0-25 should cover groups 0-4 (5 groups, each with 5 rows) + let reader = hamming_clustering_for_range(&dataset, "hash", fragment_id, 0, 25, 0) + .await + .unwrap(); + let clusters = collect_clusters(reader); + + // With threshold 0 and 25 rows (groups 0-4), we should have 5 clusters + // Each cluster has 4 duplicates (5 identical hashes - 1 representative = 4 duplicates) + assert_eq!(clusters.len(), 5); + for (_, dups) in &clusters { + assert_eq!(dups.len(), 4); + } + + // Test reading a different range (rows 25-50) + let reader = hamming_clustering_for_range(&dataset, "hash", fragment_id, 25, 25, 0) + .await + .unwrap(); + let clusters = collect_clusters(reader); + + // Should have 5 clusters (groups 5-9) + assert_eq!(clusters.len(), 5); + for (_, dups) in &clusters { + assert_eq!(dups.len(), 4); + } + } + + #[tokio::test] + async fn test_hamming_clustering_for_range_invalid_fragment() { + use arrow_array::{FixedSizeListArray, RecordBatchIterator, UInt8Array}; + use arrow_schema::{Field, Schema}; + use lance_arrow::FixedSizeListArrayExt; + use std::sync::Arc; + use tempfile::tempdir; + + let schema = Arc::new(Schema::new(vec![Field::new( + "hash", + arrow_schema::DataType::FixedSizeList( + Arc::new(Field::new("item", arrow_schema::DataType::UInt8, true)), + 8, + ), + false, + )])); + + let values = UInt8Array::from(vec![0u8; 80]); // 10 rows * 8 bytes + let hash_array = FixedSizeListArray::try_new_from_values(values, 8).unwrap(); + let batch = + arrow_array::RecordBatch::try_new(schema.clone(), vec![Arc::new(hash_array)]).unwrap(); + + let temp_dir = tempdir().unwrap(); + let uri = temp_dir.path().to_str().unwrap(); + + let reader = RecordBatchIterator::new(vec![Ok(batch)], schema); + crate::Dataset::write(reader, uri, None).await.unwrap(); + + let dataset = crate::Dataset::open(uri).await.unwrap(); + + // Test with non-existent fragment + let result = hamming_clustering_for_range(&dataset, "hash", 999, 0, 10, 0).await; + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("not found"), "Error: {}", err); + + // Test with out-of-range start_row + let result = hamming_clustering_for_range(&dataset, "hash", 0, 1000, 10, 0).await; + assert!(result.is_err()); + let err = result.err().unwrap(); + assert!(err.to_string().contains("out of range"), "Error: {}", err); + } +}