From 0abe6ecf49da1ab63b84aa9ec12aee991cd00832 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sun, 10 May 2026 12:29:50 +0200 Subject: [PATCH 1/2] add prefix lookup support in python --- bindings/python/fluss/__init__.pyi | 55 ++++++- bindings/python/src/lib.rs | 2 + bindings/python/src/lookup.rs | 99 +++++++++++++ bindings/python/src/table.rs | 47 ++++++ bindings/python/test/test_kv_table.py | 135 +++++++++++++++--- .../docs/user-guide/python/api-reference.md | 23 ++- 6 files changed, 339 insertions(+), 22 deletions(-) diff --git a/bindings/python/fluss/__init__.pyi b/bindings/python/fluss/__init__.pyi index fc713973..1ffc76d7 100644 --- a/bindings/python/fluss/__init__.pyi +++ b/bindings/python/fluss/__init__.pyi @@ -574,15 +574,43 @@ class TableUpsert: def __repr__(self) -> str: ... class TableLookup: - """Builder for creating a Lookuper. + """Builder for creating a Lookuper or PrefixLookuper. - Obtain via `FlussTable.new_lookup()`, then call `create_lookuper()`. + Obtain via `FlussTable.new_lookup()`, then call `create_lookuper()` + for primary key lookup, or `lookup_by(columns).create_lookuper()` + for prefix key lookup. Example: lookuper = table.new_lookup().create_lookuper() + prefix_lookuper = table.new_lookup().lookup_by(["a", "b"]).create_lookuper() """ def create_lookuper(self) -> Lookuper: ... + def lookup_by(self, column_names: List[str]) -> "TablePrefixLookup": + """Switch to prefix-scan mode for the given lookup columns. + + The columns must be the table's partition keys (if any) plus the + bucket keys, in that order. + + Args: + column_names: List of column names forming the prefix key. + + Returns: + TablePrefixLookup builder. Call `create_lookuper()` to get a PrefixLookuper. + """ + ... + def __repr__(self) -> str: ... + +class TablePrefixLookup: + """Builder for creating a PrefixLookuper. + + Obtain via `TableLookup.lookup_by(columns)`, then call `create_lookuper()`. + + Example: + prefix_lookuper = table.new_lookup().lookup_by(["a", "b"]).create_lookuper() + """ + + def create_lookuper(self) -> "PrefixLookuper": ... def __repr__(self) -> str: ... class AppendWriter: @@ -721,6 +749,29 @@ class Lookuper: ... def __repr__(self) -> str: ... +class PrefixLookuper: + """Lookuper for performing prefix key lookups on a Fluss table. + + Returns all rows whose primary key starts with the given prefix. + Create via `table.new_lookup().lookup_by(columns).create_lookuper()`. + """ + + async def lookup(self, prefix: dict | list | tuple) -> List[Dict[str, object]]: + """Lookup all rows matching a prefix key. + + Args: + prefix: A dict, list, or tuple containing only the prefix key values + (the columns specified in lookup_by()). + For dict: keys are prefix column names. + For list/tuple: values in prefix column order. + + Returns: + A list of dicts, each containing the full row data. + Empty list if no matches. + """ + ... + def __repr__(self) -> str: ... + class LogScanner: """Scanner for reading log data from a Fluss table. diff --git a/bindings/python/src/lib.rs b/bindings/python/src/lib.rs index 6890e088..2d71491a 100644 --- a/bindings/python/src/lib.rs +++ b/bindings/python/src/lib.rs @@ -113,9 +113,11 @@ fn _fluss(m: &Bound<'_, PyModule>) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/bindings/python/src/lookup.rs b/bindings/python/src/lookup.rs index f7bd09a5..196faa1e 100644 --- a/bindings/python/src/lookup.rs +++ b/bindings/python/src/lookup.rs @@ -113,3 +113,102 @@ impl Lookuper { }) } } + +/// Lookuper for performing prefix key lookups on a Fluss table. +/// +/// Returns all rows whose primary key starts with the given prefix. +/// Create once via `table.new_lookup().lookup_by(columns).create_lookuper()` +/// and reuse for multiple lookups. +#[pyclass] +pub struct PrefixLookuper { + inner: Arc>, + table_info: Arc, + lookup_column_indices: Vec, +} + +#[pymethods] +impl PrefixLookuper { + /// Lookup all rows matching a prefix key. + /// + /// Args: + /// prefix: A dict, list, or tuple containing only the prefix key values + /// (the columns specified in lookup_by()). + /// For dict: keys are prefix column names. + /// For list/tuple: values in prefix column order. + /// + /// Returns: + /// A list of dicts, each containing the full row data. Empty list if no matches. + pub fn lookup<'py>( + &self, + py: Python<'py>, + prefix: &Bound<'_, PyAny>, + ) -> PyResult> { + let generic_row = + python_to_dense_generic_row(prefix, &self.table_info, &self.lookup_column_indices)?; + let inner = self.inner.clone(); + let table_info = self.table_info.clone(); + + future_into_py(py, async move { + let result = { + let mut lookuper = inner.lock().await; + lookuper + .lookup(&generic_row) + .await + .map_err(|e| FlussError::from_core_error(&e))? + }; + + let rows = result + .get_rows() + .map_err(|e| FlussError::from_core_error(&e))?; + + Python::attach(|py| { + let py_rows: Vec> = rows + .iter() + .map(|row| internal_row_to_dict(py, row, &table_info)) + .collect::>()?; + Ok(py_rows) + }) + }) + } + + fn __repr__(&self) -> String { + "PrefixLookuper()".to_string() + } +} + +impl PrefixLookuper { + pub fn new( + connection: &Arc, + metadata: Arc, + table_info: fcore::metadata::TableInfo, + lookup_column_names: Vec, + ) -> PyResult { + let row_type = table_info.row_type(); + let lookup_column_indices: Vec = lookup_column_names + .iter() + .map(|name| { + row_type.get_field_index(name).ok_or_else(|| { + FlussError::new_err(format!("Unknown column name '{name}' for prefix lookup")) + }) + }) + .collect::>()?; + + let lookuper = TOKIO_RUNTIME.block_on(async { + let fluss_table = + fcore::client::FlussTable::new(connection, metadata, table_info.clone()); + let table_lookup = fluss_table + .new_lookup() + .map_err(|e| FlussError::from_core_error(&e))?; + table_lookup + .lookup_by(lookup_column_names) + .create_lookuper() + .map_err(|e| FlussError::from_core_error(&e)) + })?; + + Ok(Self { + inner: Arc::new(Mutex::new(lookuper)), + table_info: Arc::new(table_info), + lookup_column_indices, + }) + } +} diff --git a/bindings/python/src/table.rs b/bindings/python/src/table.rs index 98aee5e3..4ec7b8d7 100644 --- a/bindings/python/src/table.rs +++ b/bindings/python/src/table.rs @@ -888,11 +888,58 @@ impl TableLookup { ) } + /// Switch to prefix-scan mode for the given lookup columns. + /// + /// The columns must be the table's partition keys (if any) plus the + /// bucket keys, in that order. + /// + /// Args: + /// column_names: List of column names forming the prefix key. + /// + /// Returns: + /// TablePrefixLookup builder. Call `create_lookuper()` to get a PrefixLookuper. + pub fn lookup_by(&self, column_names: Vec) -> TablePrefixLookup { + TablePrefixLookup { + connection: self.connection.clone(), + metadata: self.metadata.clone(), + table_info: self.table_info.clone(), + lookup_column_names: column_names, + } + } + fn __repr__(&self) -> String { "TableLookup()".to_string() } } +/// Builder for creating a PrefixLookuper. +/// +/// Obtain via `TableLookup.lookup_by(columns)`, then call `create_lookuper()`. +#[pyclass] +pub struct TablePrefixLookup { + connection: Arc, + metadata: Arc, + table_info: fcore::metadata::TableInfo, + lookup_column_names: Vec, +} + +#[pymethods] +impl TablePrefixLookup { + /// Create a PrefixLookuper from this builder. + pub fn create_lookuper(&self) -> PyResult { + crate::PrefixLookuper::new( + &self.connection, + self.metadata.clone(), + self.table_info.clone(), + self.lookup_column_names.clone(), + ) + } + + fn __repr__(&self) -> String { + "TablePrefixLookup()".to_string() + } +} + /// Writer for appending data to a Fluss table #[pyclass] pub struct AppendWriter { diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py index 39407375..9b3e4e4c 100644 --- a/bindings/python/test/test_kv_table.py +++ b/bindings/python/test/test_kv_table.py @@ -26,6 +26,7 @@ from decimal import Decimal import pyarrow as pa +import pytest import fluss @@ -97,7 +98,7 @@ async def test_upsert_delete_and_lookup(connection, admin): async def test_composite_primary_keys(connection, admin): - """Test upsert and lookup with composite (multi-column) primary keys.""" + """Test upsert/lookup with composite PKs, including prefix lookup.""" table_path = fluss.TablePath("fluss", "py_test_composite_pk") await admin.drop_table(table_path, ignore_if_not_exists=True) @@ -109,47 +110,75 @@ async def test_composite_primary_keys(connection, admin): pa.field("region", pa.string()), pa.field("score", pa.int64()), pa.field("user_id", pa.int32()), + pa.field("event_id", pa.int64()), ] ), - primary_keys=["region", "user_id"], + primary_keys=["region", "user_id", "event_id"], + ) + table_descriptor = fluss.TableDescriptor( + schema, bucket_count=3, bucket_keys=["region", "user_id"] ) - table_descriptor = fluss.TableDescriptor(schema) await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) table = await connection.get_table(table_path) upsert_writer = table.new_upsert().create_writer() test_data = [ - ("US", 1, 100), - ("US", 2, 200), - ("EU", 1, 150), - ("EU", 2, 250), + ("US", 1, 1, 100), + ("US", 1, 2, 200), + ("US", 2, 1, 300), + ("EU", 1, 1, 150), + ("EU", 2, 1, 250), ] - for region, user_id, score in test_data: - upsert_writer.upsert({"region": region, "user_id": user_id, "score": score}) + for region, user_id, event_id, score in test_data: + upsert_writer.upsert( + { + "region": region, + "user_id": user_id, + "event_id": event_id, + "score": score, + } + ) await upsert_writer.flush() lookuper = table.new_lookup().create_lookuper() - # Lookup (US, 1) -> score 100 - result = await lookuper.lookup({"region": "US", "user_id": 1}) + # Lookup (US, 1, 1) -> score 100 + result = await lookuper.lookup({"region": "US", "user_id": 1, "event_id": 1}) assert result is not None assert result["score"] == 100 - # Lookup (EU, 2) -> score 250 - result = await lookuper.lookup({"region": "EU", "user_id": 2}) + # Lookup (EU, 2, 1) -> score 250 + result = await lookuper.lookup({"region": "EU", "user_id": 2, "event_id": 1}) assert result is not None assert result["score"] == 250 - # Update (US, 1) score (await acknowledgment) - handle = upsert_writer.upsert({"region": "US", "user_id": 1, "score": 500}) + # Update (US, 1, 1) score (await acknowledgment) + handle = upsert_writer.upsert( + {"region": "US", "user_id": 1, "event_id": 1, "score": 500} + ) await handle.wait() - result = await lookuper.lookup({"region": "US", "user_id": 1}) + result = await lookuper.lookup({"region": "US", "user_id": 1, "event_id": 1}) assert result is not None assert result["score"] == 500 + prefix_lookuper = table.new_lookup().lookup_by(["region", "user_id"]).create_lookuper() + + # Prefix (US, 1) should match 2 rows (event_id 1 and 2) + rows = await prefix_lookuper.lookup({"region": "US", "user_id": 1}) + assert len(rows) == 2 + event_ids = sorted(row["event_id"] for row in rows) + assert event_ids == [1, 2] + + # Also validate list/tuple prefix input + rows = await prefix_lookuper.lookup(["US", 1]) + assert len(rows) == 2 + rows = await prefix_lookuper.lookup(("EU", 2)) + assert len(rows) == 1 + assert rows[0]["event_id"] == 1 + await admin.drop_table(table_path, ignore_if_not_exists=False) @@ -431,3 +460,77 @@ async def test_all_supported_datatypes(connection, admin): assert result[col] is None, f"{col} should be null" await admin.drop_table(table_path, ignore_if_not_exists=False) + + +async def test_prefix_lookup_validation_errors(connection, admin): + """Test that prefix lookup raises errors for invalid column configurations.""" + table_path = fluss.TablePath("fluss", "py_test_prefix_lookup_validation") + await admin.drop_table(table_path, ignore_if_not_exists=True) + + schema = fluss.Schema( + pa.schema( + [ + pa.field("a", pa.int32()), + pa.field("b", pa.string()), + pa.field("c", pa.int64()), + ] + ), + primary_keys=["a", "b", "c"], + ) + table_descriptor = fluss.TableDescriptor( + schema, bucket_count=3, bucket_keys=["a", "b"] + ) + await admin.create_table(table_path, table_descriptor, ignore_if_exists=False) + + table = await connection.get_table(table_path) + + # lookup_by with columns equal to full PK should error + with pytest.raises(fluss.FlussError, match="prefix lookup"): + table.new_lookup().lookup_by(["a", "b", "c"]).create_lookuper() + + # lookup_by with wrong column names should error + with pytest.raises(fluss.FlussError, match="bucket keys"): + table.new_lookup().lookup_by(["a", "c"]).create_lookuper() + + # lookup_by with unknown column should error + with pytest.raises(fluss.FlussError, match="Unknown column name"): + table.new_lookup().lookup_by(["a", "missing_col"]).create_lookuper() + + await admin.drop_table(table_path, ignore_if_not_exists=False) + + # Partitioned table: lookup columns must include partition keys first, + # followed by bucket keys. + partitioned_table_path = fluss.TablePath("fluss", "py_test_prefix_lookup_validation_pt") + await admin.drop_table(partitioned_table_path, ignore_if_not_exists=True) + + partitioned_schema = fluss.Schema( + pa.schema( + [ + pa.field("region", pa.string()), + pa.field("user_id", pa.int32()), + pa.field("event_id", pa.int64()), + ] + ), + primary_keys=["region", "user_id", "event_id"], + ) + partitioned_table_descriptor = fluss.TableDescriptor( + partitioned_schema, + partition_keys=["region"], + bucket_count=3, + bucket_keys=["user_id"], + ) + await admin.create_table( + partitioned_table_path, partitioned_table_descriptor, ignore_if_exists=False + ) + + partitioned_table = await connection.get_table(partitioned_table_path) + + # Missing partition key in lookup columns. + with pytest.raises(fluss.FlussError, match="partition fields"): + partitioned_table.new_lookup().lookup_by(["user_id"]).create_lookuper() + + # After partition keys, remaining columns must equal bucket keys. + with pytest.raises(fluss.FlussError, match="bucket keys"): + partitioned_table.new_lookup().lookup_by(["region", "event_id"]).create_lookuper() + + await admin.drop_table(partitioned_table_path, ignore_if_not_exists=False) diff --git a/website/docs/user-guide/python/api-reference.md b/website/docs/user-guide/python/api-reference.md index 32f23a59..d5d666a1 100644 --- a/website/docs/user-guide/python/api-reference.md +++ b/website/docs/user-guide/python/api-reference.md @@ -115,11 +115,20 @@ Builder for creating an `UpsertWriter`. Obtain via `FlussTable.new_upsert()`. ## `TableLookup` -Builder for creating a `Lookuper`. Obtain via `FlussTable.new_lookup()`. +Builder for creating a `Lookuper` or `PrefixLookuper`. Obtain via `FlussTable.new_lookup()`. -| Method | Description | -|----------------------------------|---------------------| -| `.create_lookuper() -> Lookuper` | Create the lookuper | +| Method | Description | +|-----------------------------------------------------|-------------------------------------------| +| `.create_lookuper() -> Lookuper` | Create a primary key lookuper | +| `.lookup_by(column_names) -> TablePrefixLookup` | Switch to prefix-scan mode for the given columns (partition keys + bucket keys) | + +## `TablePrefixLookup` + +Builder for creating a `PrefixLookuper`. Obtain via `TableLookup.lookup_by(columns)`. + +| Method | Description | +|--------------------------------------------|---------------------------| +| `.create_lookuper() -> PrefixLookuper` | Create the prefix lookuper | ## `AppendWriter` @@ -151,6 +160,12 @@ Builder for creating a `Lookuper`. Obtain via `FlussTable.new_lookup()`. |-------------------------------------|-----------------------------| | `await .lookup(pk) -> dict \| None` | Lookup a row by primary key | +## `PrefixLookuper` + +| Method | Description | +|-----------------------------------------------|---------------------------------------------| +| `await .lookup(prefix) -> list[dict]` | Lookup all rows matching a prefix key | + ## `LogScanner` | Method | Description | From 2e76f94d64c5b8bb6b6319847e28ca6d46384b51 Mon Sep 17 00:00:00 2001 From: charlesdong1991 Date: Sun, 10 May 2026 14:51:17 +0200 Subject: [PATCH 2/2] add more tests --- bindings/python/test/test_kv_table.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/bindings/python/test/test_kv_table.py b/bindings/python/test/test_kv_table.py index 9b3e4e4c..7cfeff4e 100644 --- a/bindings/python/test/test_kv_table.py +++ b/bindings/python/test/test_kv_table.py @@ -179,6 +179,10 @@ async def test_composite_primary_keys(connection, admin): assert len(rows) == 1 assert rows[0]["event_id"] == 1 + # Validate empty-result case: valid prefix shape but no matching rows. + rows = await prefix_lookuper.lookup({"region": "APAC", "user_id": 999}) + assert rows == [] + await admin.drop_table(table_path, ignore_if_not_exists=False) @@ -529,6 +533,13 @@ async def test_prefix_lookup_validation_errors(connection, admin): with pytest.raises(fluss.FlussError, match="partition fields"): partitioned_table.new_lookup().lookup_by(["user_id"]).create_lookuper() + # A non-existent partition returns empty list. + partitioned_prefix_lookuper = ( + partitioned_table.new_lookup().lookup_by(["region", "user_id"]).create_lookuper() + ) + rows = await partitioned_prefix_lookuper.lookup({"region": "UNKNOWN_REGION", "user_id": 1}) + assert rows == [] + # After partition keys, remaining columns must equal bucket keys. with pytest.raises(fluss.FlussError, match="bucket keys"): partitioned_table.new_lookup().lookup_by(["region", "event_id"]).create_lookuper()