Skip to content

Commit b6cdf5a

Browse files
feat: implement ingest method (#4)
Implement the ingest method.
1 parent a4d47ed commit b6cdf5a

3 files changed

Lines changed: 397 additions & 0 deletions

File tree

examples/client_usage.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
import os
1010

11+
import pyarrow as pa
12+
1113
from altertable_flightsql import Client
14+
from altertable_flightsql.client import IngestIncrementalOptions
1215

1316
ALTERTABLE_HOST = os.getenv("ALTERTABLE_HOST", "flight.altertable.ai")
1417
ALTERTABLE_PORT = int(os.getenv("ALTERTABLE_PORT", "443"))
@@ -133,6 +136,80 @@ def example_transactions():
133136
print()
134137

135138

139+
def example_bulk_ingest():
140+
"""Bulk ingest data using Arrow Flight."""
141+
print("=" * 60)
142+
print("Example: Bulk Data Ingestion")
143+
print("=" * 60)
144+
145+
with Client(
146+
username=ALTERTABLE_USERNAME,
147+
password=ALTERTABLE_PASSWORD,
148+
**CONNECTION_SETTINGS,
149+
) as client:
150+
# Define schema for the data
151+
schema = pa.schema(
152+
[
153+
("id", pa.int64()),
154+
("name", pa.string()),
155+
("created_at", pa.int64()),
156+
]
157+
)
158+
159+
# First batch
160+
first_batch = pa.record_batch(
161+
[
162+
[1, 2, 3],
163+
["Alice", "Bob", "Charlie"],
164+
[1000, 2000, 3000],
165+
],
166+
schema=schema,
167+
)
168+
169+
# Second batch with updated data (same IDs 1,2 and new ID 4)
170+
second_batch = pa.record_batch(
171+
[
172+
[1, 2, 4],
173+
["Alice Updated", "Bob Updated", "David"],
174+
[1500, 2500, 4000],
175+
],
176+
schema=schema,
177+
)
178+
179+
with client.ingest(
180+
table_name="incremental_users",
181+
schema=schema,
182+
incremental_options=IngestIncrementalOptions(
183+
primary_key=["id"],
184+
cursor_field=["created_at"],
185+
),
186+
) as writer:
187+
writer.write(first_batch)
188+
189+
# Upsert with second batch
190+
with client.ingest(
191+
table_name="incremental_users",
192+
schema=schema,
193+
incremental_options=IngestIncrementalOptions(
194+
primary_key=["id"],
195+
cursor_field=["created_at"],
196+
),
197+
) as writer:
198+
writer.write(second_batch)
199+
200+
# Verify - should have 4 rows (3 from first batch, 2 updated, 1 new)
201+
reader = client.query("SELECT * FROM incremental_users ORDER BY id")
202+
result = reader.read_pandas()
203+
print(f"\nIncremental ingestion results ({len(result)} rows):")
204+
print(result)
205+
206+
# Cleanup
207+
client.execute("DROP TABLE IF EXISTS bulk_users")
208+
client.execute("DROP TABLE IF EXISTS incremental_users")
209+
210+
print()
211+
212+
136213
def example_metadata():
137214
"""Query database metadata."""
138215
print("=" * 60)
@@ -171,5 +248,6 @@ def example_metadata():
171248
example_updates()
172249
example_basic_query()
173250
example_transactions()
251+
example_bulk_ingest()
174252
example_prepared_statement()
175253
example_metadata()

src/altertable_flightsql/client.py

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@
44
This module provides a high-level Python client for Altertable.
55
"""
66

7+
import json
78
from collections.abc import Mapping, Sequence
9+
from dataclasses import dataclass
10+
from enum import Enum
811
from typing import Any, Optional, Union
912

1013
import pyarrow as pa
@@ -27,6 +30,30 @@ def _unpack_command(bytes, packed):
2730
any_msg.Unpack(packed)
2831

2932

33+
class IngestTableMode(Enum):
34+
"""Mode for ingesting data into a table."""
35+
36+
CREATE = "CREATE"
37+
"""Create the table if it does not exist, fail if it does"""
38+
APPEND = "APPEND"
39+
"""Append to the table if it exists, fail if it does not"""
40+
CREATE_APPEND = "CREATE_APPEND"
41+
"""Create the table if it does not exist, append to it if it does"""
42+
REPLACE = "REPLACE"
43+
"""Create the table if it does not exist, recreate it if it does"""
44+
45+
46+
@dataclass(frozen=True)
47+
class IngestIncrementalOptions:
48+
"""Options for incremental ingestion."""
49+
50+
primary_key: Sequence[str]
51+
"""Primary key for the table."""
52+
53+
cursor_field: Sequence[str]
54+
"""Cursor field for the table."""
55+
56+
3057
class BearerAuthMiddleware(flight.ClientMiddleware):
3158
"""Client middleware that adds Bearer token authentication to all requests."""
3259

@@ -239,6 +266,115 @@ def execute(
239266

240267
return result.record_count
241268

269+
def ingest(
270+
self,
271+
*,
272+
table_name: str,
273+
schema: pa.Schema,
274+
schema_name: str = "",
275+
catalog_name: str = "",
276+
mode: IngestTableMode = IngestTableMode.CREATE_APPEND,
277+
incremental_options: Optional[IngestIncrementalOptions] = None,
278+
transaction: Optional["Transaction"] = None,
279+
) -> flight.FlightStreamWriter:
280+
"""
281+
Bulk ingest data into a table using Apache Arrow Flight.
282+
283+
This method provides high-performance bulk data loading by streaming
284+
Arrow record batches directly to the server. The writer can be used as
285+
a context manager for automatic resource cleanup.
286+
287+
Args:
288+
table_name: Name of the table to ingest data into.
289+
schema: PyArrow schema defining the table structure.
290+
schema_name: Optional schema name. If not provided, uses the client's
291+
default schema.
292+
catalog_name: Optional catalog name. If not provided, uses the client's
293+
default catalog.
294+
mode: Table creation/append mode. Options:
295+
- CREATE: Create table, fail if it exists
296+
- APPEND: Append to existing table, fail if it doesn't exist
297+
- CREATE_APPEND: Create if not exists, append if exists (default)
298+
- REPLACE: Drop and recreate table if it exists
299+
incremental_options: Options for incremental ingestion, including:
300+
- primary_key: Columns to use as primary key
301+
- cursor_field: Columns used to determine which row to keep in case of conflict on primary key
302+
transaction: Optional transaction to execute ingestion within.
303+
304+
Returns:
305+
FlightStreamWriter for writing record batches to the table.
306+
The writer should be closed after all data is written, or used
307+
as a context manager.
308+
309+
Example:
310+
>>> # Basic ingestion
311+
>>> schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
312+
>>> with client.ingest(table_name="users", schema=schema) as writer:
313+
... batch = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema)
314+
... writer.write(batch)
315+
316+
>>> # Incremental ingestion with primary key
317+
>>> from altertable_flightsql.client import IngestIncrementalOptions
318+
>>> opts = IngestIncrementalOptions(
319+
... primary_key=["id"],
320+
... cursor_field=["updated_at"]
321+
... )
322+
>>> with client.ingest(
323+
... table_name="users",
324+
... schema=schema,
325+
... incremental_options=opts
326+
... ) as writer:
327+
... writer.write(batch)
328+
"""
329+
cmd = sql_pb2.CommandStatementIngest(
330+
table=table_name,
331+
table_definition_options=self._ingest_mode_to_table_definition_options(mode),
332+
)
333+
334+
if catalog_name:
335+
cmd.catalog = catalog_name
336+
337+
if schema_name:
338+
cmd.schema = schema_name
339+
340+
if txn_id := self._get_transaction_id(transaction):
341+
cmd.transaction_id = txn_id
342+
343+
if incremental_options and incremental_options.primary_key:
344+
cmd.options["primary_key"] = json.dumps(incremental_options.primary_key)
345+
346+
if incremental_options and incremental_options.cursor_field:
347+
cmd.options["cursor_field"] = json.dumps(incremental_options.cursor_field)
348+
349+
descriptor = flight.FlightDescriptor.for_command(_pack_command(cmd))
350+
writer, _ = self._client.do_put(descriptor, schema)
351+
352+
return writer
353+
354+
def _ingest_mode_to_table_definition_options(
355+
self, mode: IngestTableMode
356+
) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions:
357+
if mode == IngestTableMode.CREATE:
358+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
359+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
360+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL,
361+
)
362+
elif mode == IngestTableMode.APPEND:
363+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
364+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL,
365+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND,
366+
)
367+
elif mode == IngestTableMode.CREATE_APPEND:
368+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
369+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
370+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND,
371+
)
372+
elif mode == IngestTableMode.REPLACE:
373+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
374+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
375+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE,
376+
)
377+
242378
def prepare(
243379
self,
244380
query: str,

0 commit comments

Comments
 (0)