Skip to content

Commit 5a7726d

Browse files
feat: implement ingest support
1 parent a4d47ed commit 5a7726d

2 files changed

Lines changed: 241 additions & 0 deletions

File tree

src/altertable_flightsql/client.py

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@
55
"""
66

77
from collections.abc import Mapping, Sequence
8+
from dataclasses import dataclass
9+
from enum import Enum
10+
import json
811
from typing import Any, Optional, Union
912

1013
import pyarrow as pa
@@ -27,6 +30,28 @@ def _unpack_command(bytes, packed):
2730
any_msg.Unpack(packed)
2831

2932

33+
class IngestTableMode(Enum):
34+
"""Mode for ingesting data into a table."""
35+
CREATE = "CREATE"
36+
"""Create the table if it does not exist, fail if it does"""
37+
APPEND = "APPEND"
38+
"""Append to the table if it exists, fail if it does not"""
39+
CREATE_APPEND = "CREATE_APPEND"
40+
"""Create the table if it does not exist, append to it if it does"""
41+
REPLACE = "REPLACE"
42+
"""Create the table if it does not exist, recreate it if it does"""
43+
44+
45+
@dataclass(frozen=True)
46+
class IngestIncrementalOptions:
47+
"""Options for incremental ingestion."""
48+
primary_key: Sequence[str]
49+
"""Primary key for the table."""
50+
51+
cursor_field: Sequence[str]
52+
"""Cursor field for the table."""
53+
54+
3055
class BearerAuthMiddleware(flight.ClientMiddleware):
3156
"""Client middleware that adds Bearer token authentication to all requests."""
3257

@@ -239,6 +264,64 @@ def execute(
239264

240265
return result.record_count
241266

267+
def ingest(
268+
self,
269+
*,
270+
table_name: str,
271+
schema: pa.Schema,
272+
schema_name: str = "",
273+
catalog_name: str = "",
274+
mode: IngestTableMode = IngestTableMode.CREATE_APPEND,
275+
incremental_options: Optional[IngestIncrementalOptions] = None,
276+
transaction: Optional["Transaction"] = None,
277+
) -> flight.FlightStreamWriter:
278+
cmd = sql_pb2.CommandStatementIngest(
279+
table=table_name,
280+
table_definition_options=self._ingest_mode_to_table_definition_options(mode),
281+
)
282+
283+
if catalog_name:
284+
cmd.catalog = catalog_name
285+
286+
if schema_name:
287+
cmd.schema = schema_name
288+
289+
if txn_id := self._get_transaction_id(transaction):
290+
cmd.transaction_id = txn_id
291+
292+
if incremental_options and incremental_options.primary_key:
293+
cmd.options["primary_key"] = json.dumps(incremental_options.primary_key)
294+
295+
if incremental_options and incremental_options.cursor_field:
296+
cmd.options["cursor_field"] = json.dumps(incremental_options.cursor_field)
297+
298+
descriptor = flight.FlightDescriptor.for_command(_pack_command(cmd))
299+
writer, _ = self._client.do_put(descriptor, schema)
300+
301+
return writer
302+
303+
def _ingest_mode_to_table_definition_options(self, mode: IngestTableMode) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions:
304+
if mode == IngestTableMode.CREATE:
305+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
306+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
307+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL
308+
)
309+
elif mode == IngestTableMode.APPEND:
310+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
311+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL,
312+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND
313+
)
314+
elif mode == IngestTableMode.CREATE_APPEND:
315+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
316+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
317+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND
318+
)
319+
elif mode == IngestTableMode.REPLACE:
320+
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
321+
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
322+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE
323+
)
324+
242325
def prepare(
243326
self,
244327
query: str,

tests/test_ingest.py

Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
"""
2+
Integration tests for data ingestion.
3+
4+
Tests the ingest method for bulk data loading.
5+
"""
6+
7+
import pyarrow as pa
8+
import pytest
9+
10+
from altertable_flightsql import Client
11+
from altertable_flightsql.client import IngestIncrementalOptions
12+
from tests.conftest import SchemaInfo
13+
14+
15+
class TestBasicIngest:
16+
"""Test basic ingest functionality."""
17+
18+
def test_ingest_simple_table(self, altertable_client: Client, test_schema: SchemaInfo):
19+
"""Test ingesting data into a new table."""
20+
import uuid
21+
22+
table_name = f"test_ingest_{uuid.uuid4().hex[:8]}"
23+
fully_qualified_table = f"{test_schema.full_name}.{table_name}"
24+
25+
# Define schema
26+
schema = pa.schema([
27+
("id", pa.int64()),
28+
("name", pa.string()),
29+
("value", pa.float64()),
30+
])
31+
32+
# Create test data
33+
data = pa.record_batch([
34+
[1, 2, 3, 4, 5],
35+
["Alice", "Bob", "Charlie", "David", "Eve"],
36+
[100.5, 200.0, 300.75, 400.25, 500.5],
37+
], schema=schema)
38+
39+
try:
40+
# Ingest data
41+
with altertable_client.ingest(
42+
table_name=table_name,
43+
schema=schema,
44+
schema_name=test_schema.schema,
45+
catalog_name=test_schema.catalog,
46+
) as writer:
47+
writer.write(data)
48+
49+
reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id")
50+
result = reader.read_all()
51+
52+
assert result.num_rows == 5
53+
result_df = result.to_pandas()
54+
assert list(result_df["id"]) == [1, 2, 3, 4, 5]
55+
assert list(result_df["name"]) == ["Alice", "Bob", "Charlie", "David", "Eve"]
56+
57+
finally:
58+
# Cleanup
59+
try:
60+
altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}")
61+
except Exception as e:
62+
print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")
63+
64+
def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: SchemaInfo):
65+
"""Test ingesting multiple batches of data."""
66+
import uuid
67+
68+
table_name = f"test_ingest_{uuid.uuid4().hex[:8]}"
69+
fully_qualified_table = f"{test_schema.full_name}.{table_name}"
70+
71+
# Define schema
72+
schema = pa.schema([
73+
("id", pa.int64()),
74+
("name", pa.string()),
75+
])
76+
77+
try:
78+
# Ingest data
79+
with altertable_client.ingest(
80+
table_name=table_name,
81+
schema=schema,
82+
schema_name=test_schema.schema,
83+
catalog_name=test_schema.catalog,
84+
) as writer:
85+
# Write multiple batches
86+
batch1 = pa.record_batch([[1, 2], ["Alice", "Bob"]], schema=schema)
87+
batch2 = pa.record_batch([[3, 4], ["Charlie", "David"]], schema=schema)
88+
batch3 = pa.record_batch([[5], ["Eve"]], schema=schema)
89+
90+
writer.write(batch1)
91+
writer.write(batch2)
92+
writer.write(batch3)
93+
94+
reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id")
95+
result = reader.read_all()
96+
97+
assert result.num_rows == 5
98+
result_df = result.to_pandas()
99+
assert list(result_df["id"]) == [1, 2, 3, 4, 5]
100+
assert list(result_df["name"]) == ["Alice", "Bob", "Charlie", "David", "Eve"]
101+
102+
finally:
103+
# Cleanup
104+
try:
105+
altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}")
106+
except Exception as e:
107+
print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")
108+
109+
class TestIngestWithPrimaryKey:
110+
"""Test ingest with primary key specification."""
111+
112+
def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: SchemaInfo):
113+
"""Test ingesting data with primary key constraint."""
114+
import uuid
115+
116+
table_name = f"test_ingest_{uuid.uuid4().hex[:8]}"
117+
fully_qualified_table = f"{test_schema.full_name}.{table_name}"
118+
119+
# Define schema
120+
schema = pa.schema([
121+
("id", pa.int64()),
122+
("email", pa.string()),
123+
("name", pa.string()),
124+
("created_at", pa.int64()),
125+
])
126+
127+
try:
128+
# Ingest data with primary key
129+
with altertable_client.ingest(
130+
table_name=table_name,
131+
schema=schema,
132+
schema_name=test_schema.schema,
133+
catalog_name=test_schema.catalog,
134+
incremental_options=IngestIncrementalOptions(primary_key=["id"], cursor_field=["created_at"]),
135+
) as writer:
136+
writer.write(pa.record_batch([
137+
[1, 2, 3, 1],
138+
["alice@example.com", "bob@example.com", "charlie@example.com", "alice+1@example.com"],
139+
["Alice", "Bob", "Charlie", "Alice"],
140+
[1, 2, 3, 4],
141+
], schema=schema))
142+
143+
# Verify data was ingested
144+
reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id")
145+
result = reader.read_all()
146+
147+
assert result.num_rows == 3
148+
result_df = result.to_pandas()
149+
assert list(result_df["id"]) == [1, 2, 3]
150+
assert list(result_df["email"]) == ["alice+1@example.com", "bob@example.com", "charlie@example.com"]
151+
assert list(result_df["name"]) == ["Alice", "Bob", "Charlie"]
152+
153+
finally:
154+
# Cleanup
155+
try:
156+
altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}")
157+
except Exception as e:
158+
print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")

0 commit comments

Comments
 (0)