Skip to content

Commit e52ff2d

Browse files
author
Stephen Buck
committed
Implement write.parquet.row-group-size-bytes in the pyarrow writer
The pyiceberg writer has historically ignored write.parquet.row-group-size-bytes (logging 'not implemented') and used only write.parquet.row-group-limit (rows). For wide tables that means a single row group ends up at gigabytes — e.g. 337 cols × 1,048,576 default rows ≈ 1.7 GiB uncompressed per row group — which drives the polars / pyarrow reader's decode peak into the tens of GiB on production reads. Now write_file resolves row_group_size as min(row_group_limit, row_group_size_bytes / bytes_per_row), where bytes_per_row is approximated from the in-memory arrow_table's nbytes. This matches Spark / parquet-mr 'whichever limit fires first' semantics and lets the existing PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT (128 MiB) actually take effect.
1 parent 5da8186 commit e52ff2d

2 files changed

Lines changed: 99 additions & 2 deletions

File tree

pyiceberg/io/pyarrow.py

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2604,15 +2604,28 @@ def data_file_statistics_from_parquet_metadata(
26042604
)
26052605

26062606

2607+
def _resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int, row_group_size_bytes: int) -> int:
2608+
if row_group_size_bytes <= 0 or arrow_table.num_rows == 0:
2609+
return row_group_limit
2610+
bytes_per_row = max(1, arrow_table.nbytes // arrow_table.num_rows)
2611+
rows_for_byte_budget = max(1, row_group_size_bytes // bytes_per_row)
2612+
return min(row_group_limit, rows_for_byte_budget)
2613+
2614+
26072615
def write_file(io: FileIO, table_metadata: TableMetadata, tasks: Iterator[WriteTask]) -> Iterator[DataFile]:
26082616
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
26092617

26102618
parquet_writer_kwargs = _get_parquet_writer_kwargs(table_metadata.properties)
2611-
row_group_size = property_as_int(
2619+
row_group_limit = property_as_int(
26122620
properties=table_metadata.properties,
26132621
property_name=TableProperties.PARQUET_ROW_GROUP_LIMIT,
26142622
default=TableProperties.PARQUET_ROW_GROUP_LIMIT_DEFAULT,
26152623
)
2624+
row_group_size_bytes = property_as_int(
2625+
properties=table_metadata.properties,
2626+
property_name=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
2627+
default=TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES_DEFAULT,
2628+
)
26162629
location_provider = load_location_provider(table_location=table_metadata.location, table_properties=table_metadata.properties)
26172630

26182631
def write_parquet(task: WriteTask) -> DataFile:
@@ -2636,6 +2649,7 @@ def write_parquet(task: WriteTask) -> DataFile:
26362649
for batch in task.record_batches
26372650
]
26382651
arrow_table = pa.Table.from_batches(batches)
2652+
row_group_size = _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes)
26392653
file_path = location_provider.new_data_location(
26402654
data_file_name=task.generate_data_file_filename("parquet"),
26412655
partition_key=task.partition_key,
@@ -2819,7 +2833,6 @@ def _get_parquet_writer_kwargs(table_properties: Properties) -> dict[str, Any]:
28192833
from pyiceberg.table import TableProperties
28202834

28212835
for key_pattern in [
2822-
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES,
28232836
TableProperties.PARQUET_BLOOM_FILTER_MAX_BYTES,
28242837
f"{TableProperties.PARQUET_BLOOM_FILTER_COLUMN_ENABLED_PREFIX}.*",
28252838
]:

tests/io/test_pyarrow.py

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@
7474
_determine_partitions,
7575
_primitive_to_physical,
7676
_read_deletes,
77+
_resolve_row_group_size,
7778
_task_to_record_batches,
7879
_to_requested_schema,
7980
bin_pack_arrow_table,
@@ -3045,6 +3046,89 @@ def test_write_file_rejects_timestamptz_to_timestamp(tmp_path: Path) -> None:
30453046
list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
30463047

30473048

3049+
@pytest.mark.parametrize(
3050+
"arrow_table,row_group_limit,row_group_size_bytes,expected",
3051+
[
3052+
# Byte limit tighter than row limit — 2 int64 cols => 16 bytes/row,
3053+
# 1024-byte budget => 64 rows/group.
3054+
(pa.table({"a": list(range(1000)), "b": list(range(1000))}), 10_000, 1024, 64),
3055+
# Row limit tighter than byte limit.
3056+
(pa.table({"a": list(range(1000))}), 10, 10**9, 10),
3057+
# Byte limit disabled (0) falls back to the row limit.
3058+
(pa.table({"a": list(range(1000))}), 500, 0, 500),
3059+
# Empty input falls back to the row limit.
3060+
(pa.table({"a": pa.array([], type=pa.int64())}), 500, 1024, 500),
3061+
],
3062+
)
3063+
def test__resolve_row_group_size(arrow_table: pa.Table, row_group_limit: int, row_group_size_bytes: int, expected: int) -> None:
3064+
"""Pick min(row_group_limit, bytes/(bytes_per_row)) when byte limit is set."""
3065+
assert _resolve_row_group_size(arrow_table, row_group_limit, row_group_size_bytes) == expected
3066+
3067+
3068+
def test_write_file_byte_limit_produces_more_row_groups_than_row_limit_alone(tmp_path: Path) -> None:
3069+
"""A tight byte limit splits a single arrow table across multiple row groups."""
3070+
from pyiceberg.table import WriteTask
3071+
3072+
table_schema = Schema(
3073+
NestedField(1, "a", LongType(), required=False),
3074+
NestedField(2, "b", LongType(), required=False),
3075+
)
3076+
arrow_data = pa.table({"a": list(range(10_000)), "b": list(range(10_000))})
3077+
3078+
def _write(properties: dict[str, str], subdir: str) -> Path:
3079+
table_metadata = TableMetadataV2(
3080+
location=f"file://{tmp_path}/{subdir}",
3081+
last_column_id=2,
3082+
format_version=2,
3083+
schemas=[table_schema],
3084+
partition_specs=[PartitionSpec()],
3085+
properties=properties,
3086+
)
3087+
task = WriteTask(
3088+
write_uuid=uuid.uuid4(),
3089+
task_id=0,
3090+
record_batches=arrow_data.to_batches(),
3091+
schema=table_schema,
3092+
)
3093+
data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
3094+
return Path(data_files[0].file_path.removeprefix("file://"))
3095+
3096+
default_groups = pq.ParquetFile(_write({}, "default")).num_row_groups
3097+
constrained_groups = pq.ParquetFile(
3098+
_write({TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: "1024"}, "constrained")
3099+
).num_row_groups
3100+
assert default_groups == 1
3101+
assert constrained_groups > 1
3102+
3103+
3104+
def test_write_file_byte_limit_respects_row_limit_upper_bound(tmp_path: Path) -> None:
3105+
"""With an effectively infinite byte target, the row limit caps row groups."""
3106+
from pyiceberg.table import WriteTask
3107+
3108+
table_schema = Schema(NestedField(1, "a", LongType(), required=False))
3109+
arrow_data = pa.table({"a": list(range(10_000))})
3110+
table_metadata = TableMetadataV2(
3111+
location=f"file://{tmp_path}",
3112+
last_column_id=1,
3113+
format_version=2,
3114+
schemas=[table_schema],
3115+
partition_specs=[PartitionSpec()],
3116+
properties={
3117+
TableProperties.PARQUET_ROW_GROUP_LIMIT: "1000",
3118+
TableProperties.PARQUET_ROW_GROUP_SIZE_BYTES: str(10**12),
3119+
},
3120+
)
3121+
task = WriteTask(
3122+
write_uuid=uuid.uuid4(),
3123+
task_id=0,
3124+
record_batches=arrow_data.to_batches(),
3125+
schema=table_schema,
3126+
)
3127+
data_files = list(write_file(io=PyArrowFileIO(), table_metadata=table_metadata, tasks=iter([task])))
3128+
pf = pq.ParquetFile(data_files[0].file_path.removeprefix("file://"))
3129+
assert pf.num_row_groups == 10
3130+
3131+
30483132
def test__to_requested_schema_timestamps(
30493133
arrow_table_schema_with_all_timestamp_precisions: pa.Schema,
30503134
arrow_table_with_all_timestamp_precisions: pa.Table,

0 commit comments

Comments
 (0)