Skip to content

Commit cf91a94

Browse files
style: make linters happy
1 parent de5f65e commit cf91a94

3 files changed

Lines changed: 88 additions & 51 deletions

File tree

examples/client_usage.py

Lines changed: 23 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -148,25 +148,33 @@ def example_bulk_ingest():
148148
**CONNECTION_SETTINGS,
149149
) as client:
150150
# Define schema for the data
151-
schema = pa.schema([
152-
("id", pa.int64()),
153-
("name", pa.string()),
154-
("created_at", pa.int64()),
155-
])
151+
schema = pa.schema(
152+
[
153+
("id", pa.int64()),
154+
("name", pa.string()),
155+
("created_at", pa.int64()),
156+
]
157+
)
156158

157159
# First batch
158-
first_batch = pa.record_batch([
159-
[1, 2, 3],
160-
["Alice", "Bob", "Charlie"],
161-
[1000, 2000, 3000],
162-
], schema=schema)
160+
first_batch = pa.record_batch(
161+
[
162+
[1, 2, 3],
163+
["Alice", "Bob", "Charlie"],
164+
[1000, 2000, 3000],
165+
],
166+
schema=schema,
167+
)
163168

164169
# Second batch with updated data (same IDs 1,2 and new ID 4)
165-
second_batch = pa.record_batch([
166-
[1, 2, 4],
167-
["Alice Updated", "Bob Updated", "David"],
168-
[1500, 2500, 4000],
169-
], schema=schema)
170+
second_batch = pa.record_batch(
171+
[
172+
[1, 2, 4],
173+
["Alice Updated", "Bob Updated", "David"],
174+
[1500, 2500, 4000],
175+
],
176+
schema=schema,
177+
)
170178

171179
with client.ingest(
172180
table_name="incremental_users",

src/altertable_flightsql/client.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,10 @@
44
This module provides a high-level Python client for Altertable.
55
"""
66

7+
import json
78
from collections.abc import Mapping, Sequence
89
from dataclasses import dataclass
910
from enum import Enum
10-
import json
1111
from typing import Any, Optional, Union
1212

1313
import pyarrow as pa
@@ -32,6 +32,7 @@ def _unpack_command(bytes, packed):
3232

3333
class IngestTableMode(Enum):
3434
"""Mode for ingesting data into a table."""
35+
3536
CREATE = "CREATE"
3637
"""Create the table if it does not exist, fail if it does"""
3738
APPEND = "APPEND"
@@ -45,6 +46,7 @@ class IngestTableMode(Enum):
4546
@dataclass(frozen=True)
4647
class IngestIncrementalOptions:
4748
"""Options for incremental ingestion."""
49+
4850
primary_key: Sequence[str]
4951
"""Primary key for the table."""
5052

@@ -349,26 +351,28 @@ def ingest(
349351

350352
return writer
351353

352-
def _ingest_mode_to_table_definition_options(self, mode: IngestTableMode) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions:
354+
def _ingest_mode_to_table_definition_options(
355+
self, mode: IngestTableMode
356+
) -> sql_pb2.CommandStatementIngest.TableDefinitionOptions:
353357
if mode == IngestTableMode.CREATE:
354358
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
355359
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
356-
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL
360+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_FAIL,
357361
)
358362
elif mode == IngestTableMode.APPEND:
359363
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
360364
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_FAIL,
361-
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND
365+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND,
362366
)
363367
elif mode == IngestTableMode.CREATE_APPEND:
364368
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
365369
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
366-
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND
370+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_APPEND,
367371
)
368372
elif mode == IngestTableMode.REPLACE:
369373
return sql_pb2.CommandStatementIngest.TableDefinitionOptions(
370374
if_not_exist=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableNotExistOption.TABLE_NOT_EXIST_OPTION_CREATE,
371-
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE
375+
if_exists=sql_pb2.CommandStatementIngest.TableDefinitionOptions.TableExistsOption.TABLE_EXISTS_OPTION_REPLACE,
372376
)
373377

374378
def prepare(

tests/test_ingest.py

Lines changed: 55 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
"""
66

77
import pyarrow as pa
8-
import pytest
98

109
from altertable_flightsql import Client
1110
from altertable_flightsql.client import IngestIncrementalOptions
@@ -23,18 +22,23 @@ def test_ingest_simple_table(self, altertable_client: Client, test_schema: Schem
2322
fully_qualified_table = f"{test_schema.full_name}.{table_name}"
2423

2524
# Define schema
26-
schema = pa.schema([
27-
("id", pa.int64()),
28-
("name", pa.string()),
29-
("value", pa.float64()),
30-
])
25+
schema = pa.schema(
26+
[
27+
("id", pa.int64()),
28+
("name", pa.string()),
29+
("value", pa.float64()),
30+
]
31+
)
3132

3233
# Create test data
33-
data = pa.record_batch([
34-
[1, 2, 3, 4, 5],
35-
["Alice", "Bob", "Charlie", "David", "Eve"],
36-
[100.5, 200.0, 300.75, 400.25, 500.5],
37-
], schema=schema)
34+
data = pa.record_batch(
35+
[
36+
[1, 2, 3, 4, 5],
37+
["Alice", "Bob", "Charlie", "David", "Eve"],
38+
[100.5, 200.0, 300.75, 400.25, 500.5],
39+
],
40+
schema=schema,
41+
)
3842

3943
try:
4044
# Ingest data
@@ -69,10 +73,12 @@ def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: S
6973
fully_qualified_table = f"{test_schema.full_name}.{table_name}"
7074

7175
# Define schema
72-
schema = pa.schema([
73-
("id", pa.int64()),
74-
("name", pa.string()),
75-
])
76+
schema = pa.schema(
77+
[
78+
("id", pa.int64()),
79+
("name", pa.string()),
80+
]
81+
)
7682

7783
try:
7884
# Ingest data
@@ -106,6 +112,7 @@ def test_ingest_multiple_batches(self, altertable_client: Client, test_schema: S
106112
except Exception as e:
107113
print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")
108114

115+
109116
class TestIngestWithPrimaryKey:
110117
"""Test ingest with primary key specification."""
111118

@@ -117,12 +124,14 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S
117124
fully_qualified_table = f"{test_schema.full_name}.{table_name}"
118125

119126
# Define schema
120-
schema = pa.schema([
121-
("id", pa.int64()),
122-
("email", pa.string()),
123-
("name", pa.string()),
124-
("created_at", pa.int64()),
125-
])
127+
schema = pa.schema(
128+
[
129+
("id", pa.int64()),
130+
("email", pa.string()),
131+
("name", pa.string()),
132+
("created_at", pa.int64()),
133+
]
134+
)
126135

127136
try:
128137
# Ingest data with primary key
@@ -131,14 +140,26 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S
131140
schema=schema,
132141
schema_name=test_schema.schema,
133142
catalog_name=test_schema.catalog,
134-
incremental_options=IngestIncrementalOptions(primary_key=["id"], cursor_field=["created_at"]),
143+
incremental_options=IngestIncrementalOptions(
144+
primary_key=["id"], cursor_field=["created_at"]
145+
),
135146
) as writer:
136-
writer.write(pa.record_batch([
137-
[1, 2, 3, 1],
138-
["alice@example.com", "bob@example.com", "charlie@example.com", "alice+1@example.com"],
139-
["Alice", "Bob", "Charlie", "Alice"],
140-
[1, 2, 3, 4],
141-
], schema=schema))
147+
writer.write(
148+
pa.record_batch(
149+
[
150+
[1, 2, 3, 1],
151+
[
152+
"alice@example.com",
153+
"bob@example.com",
154+
"charlie@example.com",
155+
"alice+1@example.com",
156+
],
157+
["Alice", "Bob", "Charlie", "Alice"],
158+
[1, 2, 3, 4],
159+
],
160+
schema=schema,
161+
)
162+
)
142163

143164
# Verify data was ingested
144165
reader = altertable_client.query(f"SELECT * FROM {fully_qualified_table} ORDER BY id")
@@ -147,12 +168,16 @@ def test_ingest_with_primary_key(self, altertable_client: Client, test_schema: S
147168
assert result.num_rows == 3
148169
result_df = result.to_pandas()
149170
assert list(result_df["id"]) == [1, 2, 3]
150-
assert list(result_df["email"]) == ["alice+1@example.com", "bob@example.com", "charlie@example.com"]
171+
assert list(result_df["email"]) == [
172+
"alice+1@example.com",
173+
"bob@example.com",
174+
"charlie@example.com",
175+
]
151176
assert list(result_df["name"]) == ["Alice", "Bob", "Charlie"]
152177

153178
finally:
154179
# Cleanup
155180
try:
156181
altertable_client.execute(f"DROP TABLE IF EXISTS {fully_qualified_table}")
157182
except Exception as e:
158-
print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")
183+
print(f"Warning: Failed to drop table {fully_qualified_table}: {e}")

0 commit comments

Comments
 (0)