Skip to content

Commit edeaf68

Browse files
authored
fix(arrow)!: in-memory streaming (#348)
- Fix AsyncArrowBatchIterator to use sentinel pattern instead of catching StopIteration, which cannot propagate through asyncio Futures - Make async_() limiter truly optional (None = no semaphore) - Remove storage_limiter usage from all backends to avoid event loop binding issues with pytest-asyncio - Rename all sync methods to *_sync pattern (breaking change)
1 parent 6aab462 commit edeaf68

19 files changed

Lines changed: 805 additions & 724 deletions

sqlspec/loader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -278,8 +278,8 @@ def _read_file_content(self, path: str | Path) -> str:
278278
if file_path and len(file_path) > 2 and file_path[2] == ":": # noqa: PLR2004
279279
file_path = file_path[1:]
280280
filename = Path(file_path).name
281-
return backend.read_text(filename, encoding=self.encoding)
282-
return backend.read_text(path_str, encoding=self.encoding)
281+
return backend.read_text_sync(filename, encoding=self.encoding)
282+
return backend.read_text_sync(path_str, encoding=self.encoding)
283283
except KeyError as e:
284284
raise SQLFileNotFoundError(path_str) from e
285285
except FileNotFoundInStorageError as e:

sqlspec/protocols.py

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -476,83 +476,86 @@ def get_data(self) -> Any: ...
476476

477477
@runtime_checkable
478478
class ObjectStoreProtocol(Protocol):
479-
"""Protocol for object storage operations."""
479+
"""Protocol for object storage operations.
480+
481+
All synchronous methods use the `*_sync` suffix for consistency with async methods.
482+
"""
480483

481484
protocol: str
482485
backend_type: str
483486

484487
def __init__(self, uri: str, **kwargs: Any) -> None:
485488
return
486489

487-
def read_bytes(self, path: "str | Path", **kwargs: Any) -> bytes:
488-
"""Read bytes from an object."""
490+
def read_bytes_sync(self, path: "str | Path", **kwargs: Any) -> bytes:
491+
"""Read bytes from an object synchronously."""
489492
return b""
490493

491-
def write_bytes(self, path: "str | Path", data: bytes, **kwargs: Any) -> None:
492-
"""Write bytes to an object."""
494+
def write_bytes_sync(self, path: "str | Path", data: bytes, **kwargs: Any) -> None:
495+
"""Write bytes to an object synchronously."""
493496
return
494497

495-
def read_text(self, path: "str | Path", encoding: str = "utf-8", **kwargs: Any) -> str:
496-
"""Read text from an object."""
498+
def read_text_sync(self, path: "str | Path", encoding: str = "utf-8", **kwargs: Any) -> str:
499+
"""Read text from an object synchronously."""
497500
return ""
498501

499-
def write_text(self, path: "str | Path", data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
500-
"""Write text to an object."""
502+
def write_text_sync(self, path: "str | Path", data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
503+
"""Write text to an object synchronously."""
501504
return
502505

503-
def exists(self, path: "str | Path", **kwargs: Any) -> bool:
504-
"""Check if an object exists."""
506+
def exists_sync(self, path: "str | Path", **kwargs: Any) -> bool:
507+
"""Check if an object exists synchronously."""
505508
return False
506509

507-
def delete(self, path: "str | Path", **kwargs: Any) -> None:
508-
"""Delete an object."""
510+
def delete_sync(self, path: "str | Path", **kwargs: Any) -> None:
511+
"""Delete an object synchronously."""
509512
return
510513

511-
def copy(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
512-
"""Copy an object."""
514+
def copy_sync(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
515+
"""Copy an object synchronously."""
513516
return
514517

515-
def move(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
516-
"""Move an object."""
518+
def move_sync(self, source: "str | Path", destination: "str | Path", **kwargs: Any) -> None:
519+
"""Move an object synchronously."""
517520
return
518521

519-
def list_objects(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> list[str]:
520-
"""List objects with optional prefix."""
522+
def list_objects_sync(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> list[str]:
523+
"""List objects with optional prefix synchronously."""
521524
return []
522525

523-
def glob(self, pattern: str, **kwargs: Any) -> list[str]:
524-
"""Find objects matching a glob pattern."""
526+
def glob_sync(self, pattern: str, **kwargs: Any) -> list[str]:
527+
"""Find objects matching a glob pattern synchronously."""
525528
return []
526529

527-
def is_object(self, path: "str | Path") -> bool:
528-
"""Check if path points to an object."""
530+
def is_object_sync(self, path: "str | Path") -> bool:
531+
"""Check if path points to an object synchronously."""
529532
return False
530533

531-
def is_path(self, path: "str | Path") -> bool:
532-
"""Check if path points to a prefix (directory-like)."""
534+
def is_path_sync(self, path: "str | Path") -> bool:
535+
"""Check if path points to a prefix (directory-like) synchronously."""
533536
return False
534537

535-
def get_metadata(self, path: "str | Path", **kwargs: Any) -> dict[str, object]:
536-
"""Get object metadata."""
538+
def get_metadata_sync(self, path: "str | Path", **kwargs: Any) -> dict[str, object]:
539+
"""Get object metadata synchronously."""
537540
return {}
538541

539-
def read_arrow(self, path: "str | Path", **kwargs: Any) -> "ArrowTable":
540-
"""Read an Arrow table from storage."""
542+
def read_arrow_sync(self, path: "str | Path", **kwargs: Any) -> "ArrowTable":
543+
"""Read an Arrow table from storage synchronously."""
541544
msg = "Arrow reading not implemented"
542545
raise NotImplementedError(msg)
543546

544-
def write_arrow(self, path: "str | Path", table: "ArrowTable", **kwargs: Any) -> None:
545-
"""Write an Arrow table to storage."""
547+
def write_arrow_sync(self, path: "str | Path", table: "ArrowTable", **kwargs: Any) -> None:
548+
"""Write an Arrow table to storage synchronously."""
546549
msg = "Arrow writing not implemented"
547550
raise NotImplementedError(msg)
548551

549-
def stream_arrow(self, pattern: str, **kwargs: Any) -> "Iterator[ArrowRecordBatch]":
550-
"""Stream Arrow record batches from matching objects."""
552+
def stream_arrow_sync(self, pattern: str, **kwargs: Any) -> "Iterator[ArrowRecordBatch]":
553+
"""Stream Arrow record batches from matching objects synchronously."""
551554
msg = "Arrow streaming not implemented"
552555
raise NotImplementedError(msg)
553556

554-
def stream_read(self, path: "str | Path", chunk_size: "int | None" = None, **kwargs: Any) -> "Iterator[bytes]":
555-
"""Stream bytes from an object."""
557+
def stream_read_sync(self, path: "str | Path", chunk_size: "int | None" = None, **kwargs: Any) -> "Iterator[bytes]":
558+
"""Stream bytes from an object synchronously."""
556559
msg = "Stream reading not implemented"
557560
raise NotImplementedError(msg)
558561

sqlspec/storage/backends/base.py

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing_extensions import Self
1010

1111
from sqlspec.typing import ArrowRecordBatch, ArrowTable
12+
from sqlspec.utils.sync_tools import CapacityLimiter
1213

1314
__all__ = (
1415
"AsyncArrowBatchIterator",
@@ -17,8 +18,38 @@
1718
"AsyncObStoreStreamIterator",
1819
"AsyncThreadedBytesIterator",
1920
"ObjectStoreBase",
21+
"storage_limiter",
2022
)
2123

24+
# Dedicated capacity limiter for storage I/O operations (100 concurrent ops)
25+
# This is shared across all storage backends to prevent overwhelming the system
26+
storage_limiter = CapacityLimiter(100)
27+
28+
29+
class _ExhaustedSentinel:
30+
"""Sentinel value to signal iterator exhaustion across thread boundaries.
31+
32+
StopIteration cannot be raised into asyncio Futures, so we use this sentinel
33+
to signal iterator exhaustion from the thread pool back to the async context.
34+
"""
35+
36+
__slots__ = ()
37+
38+
39+
_EXHAUSTED = _ExhaustedSentinel()
40+
41+
42+
def _next_or_sentinel(iterator: "Iterator[Any]") -> "Any":
43+
"""Get next item or return sentinel if exhausted.
44+
45+
This helper wraps next() to catch StopIteration in the thread,
46+
since StopIteration cannot propagate through asyncio Futures.
47+
"""
48+
try:
49+
return next(iterator)
50+
except StopIteration:
51+
return _EXHAUSTED
52+
2253

2354
class AsyncArrowBatchIterator:
2455
"""Async iterator wrapper for sync Arrow batch iterators.
@@ -47,16 +78,19 @@ def __aiter__(self) -> "AsyncArrowBatchIterator":
4778
async def __anext__(self) -> "ArrowRecordBatch":
4879
"""Get the next item from the iterator asynchronously.
4980
81+
Uses asyncio.to_thread to offload the blocking next() call
82+
to a thread pool, preventing event loop blocking.
83+
5084
Returns:
5185
The next Arrow record batch.
5286
5387
Raises:
5488
StopAsyncIteration: When the iterator is exhausted.
5589
"""
56-
try:
57-
return next(self._sync_iter)
58-
except StopIteration:
59-
raise StopAsyncIteration from None
90+
result = await asyncio.to_thread(_next_or_sentinel, self._sync_iter)
91+
if result is _EXHAUSTED:
92+
raise StopAsyncIteration
93+
return cast("ArrowRecordBatch", result)
6094

6195

6296
class AsyncBytesIterator:
@@ -309,93 +343,97 @@ async def __anext__(self) -> bytes:
309343

310344
@mypyc_attr(allow_interpreted_subclasses=True)
311345
class ObjectStoreBase(ABC):
312-
"""Base class for storage backends."""
346+
"""Base class for storage backends.
347+
348+
All synchronous methods follow the *_sync naming convention for consistency
349+
with their async counterparts.
350+
"""
313351

314352
__slots__ = ()
315353

316354
@abstractmethod
317-
def read_bytes(self, path: str, **kwargs: Any) -> bytes:
318-
"""Read bytes from storage."""
355+
def read_bytes_sync(self, path: str, **kwargs: Any) -> bytes:
356+
"""Read bytes from storage synchronously."""
319357
raise NotImplementedError
320358

321359
@abstractmethod
322-
def write_bytes(self, path: str, data: bytes, **kwargs: Any) -> None:
323-
"""Write bytes to storage."""
360+
def write_bytes_sync(self, path: str, data: bytes, **kwargs: Any) -> None:
361+
"""Write bytes to storage synchronously."""
324362
raise NotImplementedError
325363

326364
@abstractmethod
327-
def stream_read(self, path: str, chunk_size: "int | None" = None, **kwargs: Any) -> Iterator[bytes]:
328-
"""Stream bytes from storage."""
365+
def stream_read_sync(self, path: str, chunk_size: "int | None" = None, **kwargs: Any) -> Iterator[bytes]:
366+
"""Stream bytes from storage synchronously."""
329367
raise NotImplementedError
330368

331369
@abstractmethod
332-
def read_text(self, path: str, encoding: str = "utf-8", **kwargs: Any) -> str:
333-
"""Read text from storage."""
370+
def read_text_sync(self, path: str, encoding: str = "utf-8", **kwargs: Any) -> str:
371+
"""Read text from storage synchronously."""
334372
raise NotImplementedError
335373

336374
@abstractmethod
337-
def write_text(self, path: str, data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
338-
"""Write text to storage."""
375+
def write_text_sync(self, path: str, data: str, encoding: str = "utf-8", **kwargs: Any) -> None:
376+
"""Write text to storage synchronously."""
339377
raise NotImplementedError
340378

341379
@abstractmethod
342-
def list_objects(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
343-
"""List objects in storage."""
380+
def list_objects_sync(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
381+
"""List objects in storage synchronously."""
344382
raise NotImplementedError
345383

346384
@abstractmethod
347-
def exists(self, path: str, **kwargs: Any) -> bool:
348-
"""Check if object exists in storage."""
385+
def exists_sync(self, path: str, **kwargs: Any) -> bool:
386+
"""Check if object exists in storage synchronously."""
349387
raise NotImplementedError
350388

351389
@abstractmethod
352-
def delete(self, path: str, **kwargs: Any) -> None:
353-
"""Delete object from storage."""
390+
def delete_sync(self, path: str, **kwargs: Any) -> None:
391+
"""Delete object from storage synchronously."""
354392
raise NotImplementedError
355393

356394
@abstractmethod
357-
def copy(self, source: str, destination: str, **kwargs: Any) -> None:
358-
"""Copy object within storage."""
395+
def copy_sync(self, source: str, destination: str, **kwargs: Any) -> None:
396+
"""Copy object within storage synchronously."""
359397
raise NotImplementedError
360398

361399
@abstractmethod
362-
def move(self, source: str, destination: str, **kwargs: Any) -> None:
363-
"""Move object within storage."""
400+
def move_sync(self, source: str, destination: str, **kwargs: Any) -> None:
401+
"""Move object within storage synchronously."""
364402
raise NotImplementedError
365403

366404
@abstractmethod
367-
def glob(self, pattern: str, **kwargs: Any) -> "list[str]":
368-
"""Find objects matching pattern."""
405+
def glob_sync(self, pattern: str, **kwargs: Any) -> "list[str]":
406+
"""Find objects matching pattern synchronously."""
369407
raise NotImplementedError
370408

371409
@abstractmethod
372-
def get_metadata(self, path: str, **kwargs: Any) -> "dict[str, object]":
373-
"""Get object metadata from storage."""
410+
def get_metadata_sync(self, path: str, **kwargs: Any) -> "dict[str, object]":
411+
"""Get object metadata from storage synchronously."""
374412
raise NotImplementedError
375413

376414
@abstractmethod
377-
def is_object(self, path: str) -> bool:
378-
"""Check if path points to an object."""
415+
def is_object_sync(self, path: str) -> bool:
416+
"""Check if path points to an object synchronously."""
379417
raise NotImplementedError
380418

381419
@abstractmethod
382-
def is_path(self, path: str) -> bool:
383-
"""Check if path points to a directory."""
420+
def is_path_sync(self, path: str) -> bool:
421+
"""Check if path points to a directory synchronously."""
384422
raise NotImplementedError
385423

386424
@abstractmethod
387-
def read_arrow(self, path: str, **kwargs: Any) -> ArrowTable:
388-
"""Read Arrow table from storage."""
425+
def read_arrow_sync(self, path: str, **kwargs: Any) -> ArrowTable:
426+
"""Read Arrow table from storage synchronously."""
389427
raise NotImplementedError
390428

391429
@abstractmethod
392-
def write_arrow(self, path: str, table: ArrowTable, **kwargs: Any) -> None:
393-
"""Write Arrow table to storage."""
430+
def write_arrow_sync(self, path: str, table: ArrowTable, **kwargs: Any) -> None:
431+
"""Write Arrow table to storage synchronously."""
394432
raise NotImplementedError
395433

396434
@abstractmethod
397-
def stream_arrow(self, pattern: str, **kwargs: Any) -> Iterator[ArrowRecordBatch]:
398-
"""Stream Arrow record batches from storage."""
435+
def stream_arrow_sync(self, pattern: str, **kwargs: Any) -> Iterator[ArrowRecordBatch]:
436+
"""Stream Arrow record batches from storage synchronously."""
399437
raise NotImplementedError
400438

401439
@abstractmethod
@@ -426,7 +464,7 @@ async def stream_read_async(
426464
raise NotImplementedError
427465

428466
@abstractmethod
429-
def list_objects_async(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
467+
async def list_objects_async(self, prefix: str = "", recursive: bool = True, **kwargs: Any) -> "list[str]":
430468
"""List objects in storage asynchronously."""
431469
raise NotImplementedError
432470

@@ -451,7 +489,7 @@ async def move_async(self, source: str, destination: str, **kwargs: Any) -> None
451489
raise NotImplementedError
452490

453491
@abstractmethod
454-
def get_metadata_async(self, path: str, **kwargs: Any) -> "dict[str, object]":
492+
async def get_metadata_async(self, path: str, **kwargs: Any) -> "dict[str, object]":
455493
"""Get object metadata from storage asynchronously."""
456494
raise NotImplementedError
457495

0 commit comments

Comments
 (0)