Skip to content
Closed
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
5ab0fd1
feat: forward batch_size parameter to PyArrow Scanner
sumedhsakdeo Feb 14, 2026
c1ece14
style: fix ruff formatting in residual_evaluator lambda
sumedhsakdeo Feb 15, 2026
70af67f
chore: remove unintended vendor directory changes
sumedhsakdeo Feb 15, 2026
2474b12
feat: add ScanOrder enum to ArrowScan.to_record_batches
sumedhsakdeo Feb 14, 2026
48b332a
feat: add concurrent_files flag for bounded concurrent streaming
sumedhsakdeo Feb 14, 2026
b360ae8
fix: remove unused imports in test_bounded_concurrent_batches
sumedhsakdeo Feb 15, 2026
4186713
refactor: simplify _bounded_concurrent_batches with per-scan executor
sumedhsakdeo Feb 17, 2026
7c415d4
refactor: replace streaming param with order=ScanOrder in concurrent …
sumedhsakdeo Feb 17, 2026
70d5a99
feat: add read throughput micro-benchmark for ArrowScan configurations
sumedhsakdeo Feb 15, 2026
2e044ea
fix: remove extraneous f-string prefix in benchmark
sumedhsakdeo Feb 15, 2026
8dcd240
fix: properly reset mock call_count in test_hive_wait_for_lock
sumedhsakdeo Feb 15, 2026
4a0a430
feat: add default-4threads benchmark and time-to-first-record metric
sumedhsakdeo Feb 15, 2026
2efdcba
chore: remove default-4threads benchmark configuration
sumedhsakdeo Feb 15, 2026
09aad7a
docs: add configuration guidance table to streaming API docs
sumedhsakdeo Feb 17, 2026
b2ae725
chore: remove benchmark marker so tests run in CI
sumedhsakdeo Feb 17, 2026
afb244c
refactor: replace streaming param with order=ScanOrder in benchmarks …
sumedhsakdeo Feb 17, 2026
03bda3d
refactor: Replace ScanOrder enum with class hierarchy
sumedhsakdeo Feb 18, 2026
19841dc
test: Update tests for new ScanOrder class hierarchy
sumedhsakdeo Feb 18, 2026
e06c01a
test: Refactor benchmark tests for new ScanOrder API
sumedhsakdeo Feb 18, 2026
c38bc76
docs: Update API documentation for ScanOrder refactoring
sumedhsakdeo Feb 18, 2026
2d4a67a
Fix ScanOrder class and remove unused import
sumedhsakdeo Feb 19, 2026
de9f3c2
Fix long line and B008 error in ArrowScan
sumedhsakdeo Feb 19, 2026
ac8add8
Fix mypy errors: change concurrent_files to concurrent_streams
sumedhsakdeo Feb 19, 2026
b5cfb78
Fix import ordering in test files
sumedhsakdeo Feb 19, 2026
d93526e
Move batch_size parameter to ArrivalOrder for better semantic design
sumedhsakdeo Feb 19, 2026
432cd81
Update tests for new ArrivalOrder batch_size API
sumedhsakdeo Feb 19, 2026
84adcfa
Update API documentation for new ArrivalOrder batch_size parameter
sumedhsakdeo Feb 19, 2026
1c73ea4
Fix to_record_batches default order and add TaskOrder import
sumedhsakdeo Feb 19, 2026
caa079e
Fix ruff B008: use module-level singleton for default ScanOrder
sumedhsakdeo Feb 19, 2026
a882dd2
fix: drain until sentinel to prevent deadlock on early generator close
sumedhsakdeo Feb 20, 2026
039e91b
fix: validate ArrivalOrder params and clarify ordering docs
sumedhsakdeo Feb 28, 2026
75ba28b
fix: remove isinstance guard that caused CI failures
sumedhsakdeo Feb 28, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 79 additions & 0 deletions mkdocs/docs/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,52 @@ for buf in tbl.scan().to_arrow_batch_reader():
print(f"Buffer contains {len(buf)} rows")
```

You can control the number of rows per batch using the `batch_size` parameter:

```python
Comment thread
sumedhsakdeo marked this conversation as resolved.
for buf in tbl.scan().to_arrow_batch_reader(batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

By default, each file's batches are materialized in memory before being yielded (`TaskOrder()`). For large files that may exceed available memory, use `ArrivalOrder()` to yield batches as they are produced without materializing entire files:

```python
from pyiceberg.table import ArrivalOrder

for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder(), batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

For maximum throughput, use `concurrent_streams` to read multiple files in parallel with arrival order. Batches are yielded as they arrive from any file — ordering across files is not guaranteed:

```python
from pyiceberg.table import ArrivalOrder

for buf in tbl.scan().to_arrow_batch_reader(order=ArrivalOrder(concurrent_streams=4), batch_size=1000):
print(f"Buffer contains {len(buf)} rows")
```

**Ordering semantics:**

| Configuration | File ordering | Within-file ordering |
|---|---|---|
| `TaskOrder()` (default) | Batches grouped by file, in task submission order | Row order |
| `ArrivalOrder()` | Interleaved across files (no grouping guarantee) | Row order within each file |
Comment thread
sumedhsakdeo marked this conversation as resolved.
Outdated

Within each file, batch ordering always follows row order. The `limit` parameter is enforced correctly regardless of configuration.

**Which configuration should I use?**

| Use case | Recommended config |
|---|---|
| Small tables, simple queries | Default — no extra args needed |
| Large tables, memory-constrained | `order=ArrivalOrder()` — one file at a time, minimal memory |
| Maximum throughput with bounded memory | `order=ArrivalOrder(concurrent_streams=N)` — tune N to balance throughput vs memory |
| Fine-grained memory control | `order=ArrivalOrder(concurrent_streams=N, max_buffered_batches=M)` — tune both parameters |
| Fine-grained batch control | Add `batch_size=N` to any of the above |

**Note:** `ArrivalOrder()` yields batches in arrival order (interleaved across files when `concurrent_streams > 1`). For deterministic file ordering, use the default `TaskOrder()` mode. `batch_size` is usually an advanced tuning knob — the PyArrow default of 131,072 rows works well for most workloads.

To avoid any type inconsistencies during writing, you can convert the Iceberg table schema to Arrow:

```python
Expand Down Expand Up @@ -1619,6 +1665,39 @@ table.scan(
).to_arrow_batch_reader()
```

The `batch_size` parameter controls the maximum number of rows per RecordBatch (default is PyArrow's 131,072 rows):

```python
table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(batch_size=1000)
```

Use `order=ScanOrder.ARRIVAL` to avoid materializing entire files in memory. This yields batches as they are produced by PyArrow, one file at a time:

```python
from pyiceberg.table import ScanOrder

table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL)
```

For concurrent file reads with arrival order, use `concurrent_files`. Note that batch ordering across files is not guaranteed:

```python
from pyiceberg.table import ScanOrder

table.scan(
row_filter=GreaterThanOrEqual("trip_distance", 10.0),
selected_fields=("VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime"),
).to_arrow_batch_reader(order=ScanOrder.ARRIVAL, concurrent_files=4)
```

When using `concurrent_files > 1`, batches from different files may be interleaved. Within each file, batches are always in row order. See the ordering semantics table in the [Apache Arrow section](#apache-arrow) above for details.

### Pandas

<!-- prettier-ignore-start -->
Expand Down
196 changes: 165 additions & 31 deletions pyiceberg/io/pyarrow.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@
import logging
import operator
import os
import queue
import re
import threading
import uuid
import warnings
from abc import ABC, abstractmethod
from collections.abc import Callable, Iterable, Iterator
from collections.abc import Callable, Generator, Iterable, Iterator
from concurrent.futures import ThreadPoolExecutor
from copy import copy
from dataclasses import dataclass
from enum import Enum
Expand Down Expand Up @@ -141,7 +144,7 @@
visit,
visit_with_partner,
)
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, TableProperties
from pyiceberg.table import DOWNCAST_NS_TIMESTAMP_TO_US_ON_WRITE, ScanOrder, TaskOrder, ArrivalOrder, TableProperties
from pyiceberg.table.locations import load_location_provider
from pyiceberg.table.metadata import TableMetadata
from pyiceberg.table.name_mapping import NameMapping, apply_name_mapping
Expand Down Expand Up @@ -1581,6 +1584,7 @@ def _task_to_record_batches(
partition_spec: PartitionSpec | None = None,
format_version: TableVersion = TableProperties.DEFAULT_FORMAT_VERSION,
downcast_ns_timestamp_to_us: bool | None = None,
batch_size: int | None = None,
) -> Iterator[pa.RecordBatch]:
arrow_format = _get_file_format(task.file.file_format, pre_buffer=True, buffer_size=(ONE_MEGABYTE * 8))
with io.new_input(task.file.file_path).open() as fin:
Expand Down Expand Up @@ -1612,14 +1616,18 @@ def _task_to_record_batches(

file_project_schema = prune_columns(file_schema, projected_field_ids, select_full_types=False)

fragment_scanner = ds.Scanner.from_fragment(
fragment=fragment,
schema=physical_schema,
scanner_kwargs: dict[str, Any] = {
"fragment": fragment,
"schema": physical_schema,
# This will push down the query to Arrow.
# But in case there are positional deletes, we have to apply them first
filter=pyarrow_filter if not positional_deletes else None,
columns=[col.name for col in file_project_schema.columns],
)
"filter": pyarrow_filter if not positional_deletes else None,
"columns": [col.name for col in file_project_schema.columns],
}
if batch_size is not None:
scanner_kwargs["batch_size"] = batch_size

fragment_scanner = ds.Scanner.from_fragment(**scanner_kwargs)

next_index = 0
batches = fragment_scanner.to_batches()
Expand Down Expand Up @@ -1677,6 +1685,76 @@ def _read_all_delete_files(io: FileIO, tasks: Iterable[FileScanTask]) -> dict[st
return deletes_per_file


_QUEUE_SENTINEL = object()


def _bounded_concurrent_batches(
tasks: list[FileScanTask],
batch_fn: Callable[[FileScanTask], Iterator[pa.RecordBatch]],
concurrent_streams: int,
max_buffered_batches: int = 16,
Comment thread
sumedhsakdeo marked this conversation as resolved.
) -> Generator[pa.RecordBatch, None, None]:
"""Read batches from multiple files concurrently with bounded memory.

Uses a per-scan ThreadPoolExecutor(max_workers=concurrent_streams) to naturally
bound concurrency. Workers push batches into a bounded queue which provides
backpressure when the consumer is slower than the producers.

Args:
tasks: The file scan tasks to process.
batch_fn: A callable that takes a FileScanTask and returns an iterator of RecordBatches.
concurrent_streams: Maximum number of concurrent read streams.
max_buffered_batches: Maximum number of batches to buffer in the queue.
"""
if not tasks:
return

batch_queue: queue.Queue[pa.RecordBatch | BaseException | object] = queue.Queue(maxsize=max_buffered_batches)
Comment thread
sumedhsakdeo marked this conversation as resolved.
cancel = threading.Event()
remaining = len(tasks)
remaining_lock = threading.Lock()

def worker(task: FileScanTask) -> None:
nonlocal remaining
try:
for batch in batch_fn(task):
if cancel.is_set():
return
batch_queue.put(batch)
except BaseException as e:
if not cancel.is_set():
batch_queue.put(e)
finally:
with remaining_lock:
remaining -= 1
if remaining == 0:
batch_queue.put(_QUEUE_SENTINEL)

with ThreadPoolExecutor(max_workers=concurrent_streams) as executor:
for task in tasks:
executor.submit(worker, task)

try:
while True:
item = batch_queue.get()

if item is _QUEUE_SENTINEL:
break

if isinstance(item, BaseException):
raise item

yield item
finally:
cancel.set()
# Drain the queue to unblock any workers stuck on put()
while not batch_queue.empty():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could cause a worker to hang when concurrent_files > 1 and max_buffered_batches=1. Here is an example.

Starting state

max_buffered_batches=1, concurrent_files=3. Queue is full (1 item). Workers A, B, and C are all blocked on batch_queue.put().

Timeline

Step Main thread Workers A, B, C
1 cancel.set()
2 get_nowait() → removes 1 item. Queue: 0. Internally notifies Worker A Worker A: woken but hasn't run yet
3 empty()True (queue IS empty because A hasn't put yet). Exits drain loop.
4 executor.__exit__()shutdown(wait=True), joins all threads... Worker A runs, put() completes → Queue: 1. Checks cancel → returns. ✓
5 DEADLOCK — waiting for B and C to finish Workers B, C: still blocked on put(). Queue is full, nobody will ever drain.

Fix
In the worker use put with a timeout so it can check if the thread is canceled periodically.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I did have timeout in the previous version of the code, but it was causing performance regression. Exploring few other alternatives like condition variables, more complex but does not result in the bug.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a 15% regression if timeout is added. Will need to find another way.

  │            Config            │ Throughput (rows/s) │ Time  │  TTFR  │ Peak Memory │
  ├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
  │ default (TASK, all-parallel) │ 199M                │ 0.08s │ 60.9ms │ 609.5 MB    │
  ├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
  │ arrival-cf1                  │ 56.6M               │ 0.28s │ 29.9ms │ 10.3 MB     │
  ├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
  │ arrival-cf2                  │ 93.3M               │ 0.17s │ 32.6ms │ 42.6 MB     │
  ├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
  │ arrival-cf4                  │ 159.7M              │ 0.10s │ 32.9ms │ 114.7 MB    │
  ├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
  │ arrival-cf8                  │ 186.4M              │ 0.09s │ 37.8ms │ 276.2 MB    │
  ├──────────────────────────────┼─────────────────────┼───────┼────────┼─────────────┤
  │ arrival-cf16                 │ 188.9M              │ 0.09s │ 53.4ms │ 453.2 MB    │
  └──────────────────────────────┴─────────────────────┴───────┴────────┴─────────────┘

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed, PTAL.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any issues with the logic, but am still a little worried about leaving an indefinite block in case of bugs. What put timeout did you use for testing? I'm surprised it would cause such a large regression.

Copy link
Copy Markdown
Author

@sumedhsakdeo sumedhsakdeo Feb 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used timeout=0.1s (100ms). It caused a consistent ~10-15% regression even in the normal (non-cancel) path:

Config With timeout Without timeout Delta
arrival, cs=1 56.6M rows/s 59.3M rows/s +5%
arrival, cs=2 93.3M rows/s 105.4M rows/s +13%
arrival, cs=4 159.7M rows/s 175.8M rows/s +10%
arrival, cs=8 186.4M rows/s 211.9M rows/s +14%
arrival, cs=16 188.9M rows/s 209.0M rows/s +11%

The root cause: put(timeout=0.1) uses a timed threading.Condition.wait() which is heavier than a pure blocking wait even when it returns promptly on notify().

Instead I went with a drain-until-sentinel approach — the finally block calls blocking batch_queue.get() in a loop until it sees the sentinel. Each get() naturally calls not_full.notify() which wakes one blocked worker; that worker checks cancel and returns, eventually the last worker puts the sentinel and the drain loop exits.

Copy link
Copy Markdown

@robreeves robreeves Mar 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm assuming when the timeout is hit it looped and called put again? I think the short timeout is the cause the performance regression. Each loop is blocking other threads due to the GIL only executing one thread at time. I don't expect a timeout like 10sec to have the same issue.

try:
batch_queue.get_nowait()
except queue.Empty:
break


class ArrowScan:
_table_metadata: TableMetadata
_io: FileIO
Expand Down Expand Up @@ -1756,54 +1834,109 @@ def to_table(self, tasks: Iterable[FileScanTask]) -> pa.Table:

return result

def to_record_batches(self, tasks: Iterable[FileScanTask]) -> Iterator[pa.RecordBatch]:
def to_record_batches(
self,
tasks: Iterable[FileScanTask],
batch_size: int | None = None,
order: ScanOrder = TaskOrder(),
) -> Iterator[pa.RecordBatch]:
"""Scan the Iceberg table and return an Iterator[pa.RecordBatch].

Returns an Iterator of pa.RecordBatch with data from the Iceberg table
by resolving the right columns that match the current table schema.
Only data that matches the provided row_filter expression is returned.

Ordering semantics:
- TaskOrder() (default): Yields batches one file at a time in task submission order.
- ArrivalOrder(): Batches may be interleaved across files as they arrive.
Within each file, batch ordering follows row order.

Args:
tasks: FileScanTasks representing the data files and delete files to read from.
batch_size: The number of rows per batch. If None, PyArrow's default is used.
order: Controls the order in which record batches are returned.
TaskOrder() (default) yields batches one file at a time in task order.
ArrivalOrder(concurrent_streams=N, max_buffered_batches=M) yields batches
as they are produced without materializing entire files into memory.

Returns:
An Iterator of PyArrow RecordBatches.
Total number of rows will be capped if specified.

Raises:
ResolveError: When a required field cannot be found in the file
ValueError: When a field type in the file cannot be projected to the schema type
ValueError: When a field type in the file cannot be projected to the schema type,
or when an invalid order value is provided, or when concurrent_streams < 1.
"""
deletes_per_file = _read_all_delete_files(self._io, tasks)
if not isinstance(order, ScanOrder):
raise ValueError(f"Invalid order: {order!r}. Must be a ScanOrder instance (TaskOrder() or ArrivalOrder()).")

total_row_count = 0
task_list, deletes_per_file = self._prepare_tasks_and_deletes(tasks)

if isinstance(order, ArrivalOrder):
if order.concurrent_streams < 1:
raise ValueError(f"concurrent_streams must be >= 1, got {order.concurrent_streams}")
return self._apply_limit(self._iter_batches_arrival(task_list, deletes_per_file, batch_size, order.concurrent_streams, order.max_buffered_batches))

return self._apply_limit(self._iter_batches_materialized(task_list, deletes_per_file, batch_size))

def _prepare_tasks_and_deletes(
self, tasks: Iterable[FileScanTask]
) -> tuple[list[FileScanTask], dict[str, list[ChunkedArray]]]:
"""Resolve delete files and return tasks as a list."""
task_list = list(tasks)
deletes_per_file = _read_all_delete_files(self._io, task_list)
return task_list, deletes_per_file

def _iter_batches_arrival(
self,
task_list: list[FileScanTask],
deletes_per_file: dict[str, list[ChunkedArray]],
batch_size: int | None,
concurrent_streams: int,
max_buffered_batches: int = 16,
Comment thread
sumedhsakdeo marked this conversation as resolved.
) -> Iterator[pa.RecordBatch]:
"""Yield batches using bounded concurrent streaming in arrival order."""

def batch_fn(task: FileScanTask) -> Iterator[pa.RecordBatch]:
return self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size)

yield from _bounded_concurrent_batches(task_list, batch_fn, concurrent_streams, max_buffered_batches)

def _iter_batches_materialized(
self,
task_list: list[FileScanTask],
deletes_per_file: dict[str, list[ChunkedArray]],
batch_size: int | None,
) -> Iterator[pa.RecordBatch]:
"""Yield batches using executor.map with full file materialization."""
executor = ExecutorFactory.get_or_create()

def batches_for_task(task: FileScanTask) -> list[pa.RecordBatch]:
# Materialize the iterator here to ensure execution happens within the executor.
# Otherwise, the iterator would be lazily consumed later (in the main thread),
# defeating the purpose of using executor.map.
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file))
return list(self._record_batches_from_scan_tasks_and_deletes([task], deletes_per_file, batch_size))

limit_reached = False
for batches in executor.map(batches_for_task, tasks):
for batch in batches:
current_batch_size = len(batch)
if self._limit is not None and total_row_count + current_batch_size >= self._limit:
yield batch.slice(0, self._limit - total_row_count)
for batches in executor.map(batches_for_task, task_list):
yield from batches

limit_reached = True
break
else:
yield batch
total_row_count += current_batch_size
def _apply_limit(self, batches: Iterator[pa.RecordBatch]) -> Iterator[pa.RecordBatch]:
Comment thread
sumedhsakdeo marked this conversation as resolved.
"""Apply row limit across batches."""
if self._limit is None:
yield from batches
return

if limit_reached:
# This break will also cancel all running tasks in the executor
break
total_row_count = 0
for batch in batches:
remaining = self._limit - total_row_count
if remaining <= 0:
return
if len(batch) > remaining:
yield batch.slice(0, remaining)
return
yield batch
total_row_count += len(batch)

def _record_batches_from_scan_tasks_and_deletes(
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]]
self, tasks: Iterable[FileScanTask], deletes_per_file: dict[str, list[ChunkedArray]], batch_size: int | None = None
) -> Iterator[pa.RecordBatch]:
total_row_count = 0
for task in tasks:
Expand All @@ -1822,6 +1955,7 @@ def _record_batches_from_scan_tasks_and_deletes(
self._table_metadata.specs().get(task.file.spec_id),
self._table_metadata.format_version,
self._downcast_ns_timestamp_to_us,
batch_size,
)
for batch in batches:
if self._limit is not None:
Expand Down
Loading
Loading