You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
feat: Add ArrivalOrder to ArrowScan for bounded-memory concurrent reads
Backport of apache/iceberg-python#3046. Adds a new `order` parameter to
`to_arrow_batch_reader()` with TaskOrder (default) and ArrivalOrder
implementations to support bounded-memory concurrent reads.
Copy file name to clipboardExpand all lines: mkdocs/docs/api.md
+80Lines changed: 80 additions & 0 deletions
Display the source diff
Display the rich diff
Original file line number
Diff line number
Diff line change
@@ -355,6 +355,62 @@ for buf in tbl.scan().to_arrow_batch_reader():
355
355
print(f"Buffer contains {len(buf)} rows")
356
356
```
357
357
358
+
By default, each file's batches are materialized in memory before being yielded (`TaskOrder()`). For large files that may exceed available memory, use `ArrivalOrder()` to yield batches as they are produced without materializing entire files:
359
+
360
+
```python
361
+
from pyiceberg.table import ArrivalOrder
362
+
363
+
for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder()):
364
+
print(f"Buffer contains {len(buf)} rows")
365
+
```
366
+
367
+
For maximum throughput, tune `concurrent_streams` to read multiple files in parallel with arrival order. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:
368
+
369
+
```python
370
+
from pyiceberg.table import ArrivalOrder
371
+
372
+
for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder(concurrent_streams=4)):
| `TaskOrder()` (default) | Batches grouped by file, in task submission order | Row order |
381
+
| `ArrivalOrder(concurrent_streams=1)` | Sequential, one file at a time | Row order |
382
+
| `ArrivalOrder(concurrent_streams>1)` | Interleaved across files (no grouping guarantee) | Row order within each file |
383
+
384
+
The `limit` parameter is enforced correctly regardless of configuration.
385
+
386
+
**Which configuration should I use?**
387
+
388
+
| Use case | Recommended config |
389
+
|---|---|
390
+
| Small tables, simple queries | Default — no extra args needed |
391
+
| Large tables, maximum throughput with bounded memory | `order=ArrivalOrder(concurrent_streams=N)` — tune N to balance throughput vs memory |
392
+
| Fine-grained memory control | `order=ArrivalOrder(concurrent_streams=N, batch_size=M, max_buffered_batches=K)` — tune all parameters |
393
+
394
+
**Memory usage and performance characteristics:**
395
+
396
+
- **TaskOrder (default)**: Uses full file materialization. Each file is loaded entirely into memory before yielding batches. Memory usage depends on file sizes.
397
+
- **ArrivalOrder**: Uses streaming with controlled memory usage. Memory is bounded by the batch buffering mechanism.
- `concurrent_streams`: Number of files read in parallel (default: 8)
408
+
- `batch_size`: Number of rows per batch (default: 131,072, can be set via ArrivalOrder constructor)
409
+
- `max_buffered_batches`: Internal buffering parameter (default: 16, can be tuned for advanced use cases)
410
+
- Average row size depends on your schema and data; multiply the above by it to estimate bytes.
411
+
412
+
**Note:** `ArrivalOrder()` yields batches in arrival order (interleaved across files when `concurrent_streams > 1`). For deterministic file ordering, use the default `TaskOrder()` mode. The `batch_size` parameter in `ArrivalOrder` controls streaming memory usage, while `TaskOrder` uses full file materialization regardless of batch size.
413
+
358
414
To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:
359
415
360
416
```python
@@ -1619,6 +1675,30 @@ table.scan(
1619
1675
).to_arrow_batch_reader()
1620
1676
```
1621
1677
1678
+
To avoid materializing entire files in memory, use `ArrivalOrder` which yields batches as they are produced by PyArrow:
When using `concurrent_streams > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.
0 commit comments