Skip to content

Commit de5f65e

Browse files
docs: add example and docstring to ingest
1 parent 5a7726d commit de5f65e

2 files changed

Lines changed: 119 additions & 0 deletions

File tree

examples/client_usage.py

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88

99
import os
1010

11+
import pyarrow as pa
12+
1113
from altertable_flightsql import Client
14+
from altertable_flightsql.client import IngestIncrementalOptions
1215

1316
ALTERTABLE_HOST = os.getenv("ALTERTABLE_HOST", "flight.altertable.ai")
1417
ALTERTABLE_PORT = int(os.getenv("ALTERTABLE_PORT", "443"))
@@ -133,6 +136,72 @@ def example_transactions():
133136
print()
134137

135138

139+
def example_bulk_ingest():
140+
"""Bulk ingest data using Arrow Flight."""
141+
print("=" * 60)
142+
print("Example: Bulk Data Ingestion")
143+
print("=" * 60)
144+
145+
with Client(
146+
username=ALTERTABLE_USERNAME,
147+
password=ALTERTABLE_PASSWORD,
148+
**CONNECTION_SETTINGS,
149+
) as client:
150+
# Define schema for the data
151+
schema = pa.schema([
152+
("id", pa.int64()),
153+
("name", pa.string()),
154+
("created_at", pa.int64()),
155+
])
156+
157+
# First batch
158+
first_batch = pa.record_batch([
159+
[1, 2, 3],
160+
["Alice", "Bob", "Charlie"],
161+
[1000, 2000, 3000],
162+
], schema=schema)
163+
164+
# Second batch with updated data (same IDs 1,2 and new ID 4)
165+
second_batch = pa.record_batch([
166+
[1, 2, 4],
167+
["Alice Updated", "Bob Updated", "David"],
168+
[1500, 2500, 4000],
169+
], schema=schema)
170+
171+
with client.ingest(
172+
table_name="incremental_users",
173+
schema=schema,
174+
incremental_options=IngestIncrementalOptions(
175+
primary_key=["id"],
176+
cursor_field=["created_at"],
177+
),
178+
) as writer:
179+
writer.write(first_batch)
180+
181+
# Upsert with second batch
182+
with client.ingest(
183+
table_name="incremental_users",
184+
schema=schema,
185+
incremental_options=IngestIncrementalOptions(
186+
primary_key=["id"],
187+
cursor_field=["created_at"],
188+
),
189+
) as writer:
190+
writer.write(second_batch)
191+
192+
# Verify - should have 4 rows (3 from first batch, 2 updated, 1 new)
193+
reader = client.query("SELECT * FROM incremental_users ORDER BY id")
194+
result = reader.read_pandas()
195+
print(f"\nIncremental ingestion results ({len(result)} rows):")
196+
print(result)
197+
198+
# Cleanup
199+
client.execute("DROP TABLE IF EXISTS bulk_users")
200+
client.execute("DROP TABLE IF EXISTS incremental_users")
201+
202+
print()
203+
204+
136205
def example_metadata():
137206
"""Query database metadata."""
138207
print("=" * 60)
@@ -171,5 +240,6 @@ def example_metadata():
171240
example_updates()
172241
example_basic_query()
173242
example_transactions()
243+
example_bulk_ingest()
174244
example_prepared_statement()
175245
example_metadata()

src/altertable_flightsql/client.py

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,55 @@ def ingest(
275275
incremental_options: Optional[IngestIncrementalOptions] = None,
276276
transaction: Optional["Transaction"] = None,
277277
) -> flight.FlightStreamWriter:
278+
"""
279+
Bulk ingest data into a table using Apache Arrow Flight.
280+
281+
This method provides high-performance bulk data loading by streaming
282+
Arrow record batches directly to the server. The writer can be used as
283+
a context manager for automatic resource cleanup.
284+
285+
Args:
286+
table_name: Name of the table to ingest data into.
287+
schema: PyArrow schema defining the table structure.
288+
schema_name: Optional schema name. If not provided, uses the client's
289+
default schema.
290+
catalog_name: Optional catalog name. If not provided, uses the client's
291+
default catalog.
292+
mode: Table creation/append mode. Options:
293+
- CREATE: Create table, fail if it exists
294+
- APPEND: Append to existing table, fail if it doesn't exist
295+
- CREATE_APPEND: Create if not exists, append if exists (default)
296+
- REPLACE: Drop and recreate table if it exists
297+
incremental_options: Options for incremental ingestion, including:
298+
- primary_key: Columns to use as primary key
299+
- cursor_field: Columns used to determine which row to keep in case of conflict on primary key
300+
transaction: Optional transaction to execute ingestion within.
301+
302+
Returns:
303+
FlightStreamWriter for writing record batches to the table.
304+
The writer should be closed after all data is written, or used
305+
as a context manager.
306+
307+
Example:
308+
>>> # Basic ingestion
309+
>>> schema = pa.schema([("id", pa.int64()), ("name", pa.string())])
310+
>>> with client.ingest(table_name="users", schema=schema) as writer:
311+
... batch = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema)
312+
... writer.write(batch)
313+
314+
>>> # Incremental ingestion with primary key
315+
>>> from altertable_flightsql.client import IngestIncrementalOptions
316+
>>> opts = IngestIncrementalOptions(
317+
... primary_key=["id"],
318+
... cursor_field=["updated_at"]
319+
... )
320+
>>> with client.ingest(
321+
... table_name="users",
322+
... schema=schema,
323+
... incremental_options=opts
324+
... ) as writer:
325+
... writer.write(batch)
326+
"""
278327
cmd = sql_pb2.CommandStatementIngest(
279328
table=table_name,
280329
table_definition_options=self._ingest_mode_to_table_definition_options(mode),

0 commit comments

Comments
 (0)