Skip to content

Commit bda5d4f

Browse files
fix(sdk): raise asyncio StreamReader buffer in Python AsyncHostTransport (#2760)
* fix(sdk): raise asyncio StreamReader buffer in Python AsyncHostTransport The Python async transport spawned the host CLI without passing a `limit=` to `asyncio.create_subprocess_exec`, so its stdout `StreamReader` inherited asyncio's default 64 KiB buffer. Every host response is written as a single newline-delimited JSON line, so any `cli.invoke` whose serialized result exceeds 64 KiB (e.g. `superdoc_get_content` on larger documents) caused `readline()` to raise `ValueError: Separator is not found, and chunk exceed the limit` inside `_reader_loop`. The exception was caught by the generic reader-loop handler and pending requests were rejected with the misleading `HOST_DISCONNECTED` error — even though the host process was still alive and healthy. Pass `limit=` to `create_subprocess_exec` and expose it as a new `stdout_buffer_limit_bytes` constructor option on `AsyncHostTransport`, threaded through `SuperDocAsyncRuntime` and `AsyncSuperDocClient`. The default of 64 MiB safely covers the host's own 32 MiB `DEFAULT_MAX_STDIN_BYTES` input cap with room for ~2x JSON expansion. `SyncHostTransport` is unaffected — it uses raw blocking `subprocess.Popen` which has no asyncio buffer limit. Adds a `TestAsyncLargeResponse` regression suite that: 1. Round-trips a 200 KB response through the default-configured transport. 2. Pins that an explicitly tightened `stdout_buffer_limit_bytes` still reproduces the original failure mode, guaranteeing the option is wired through to `create_subprocess_exec`. * fix(sdk): tear down host process on async reader-loop failure AsyncHostTransport._reader_loop caught reader exceptions by rejecting pending futures and flipping state to DISCONNECTED, but never killed self._process. Because dispose() early-returns on DISCONNECTED, any reader-loop failure left an orphaned host subprocess running with no public API to reap it. This is a pre-existing bug, but the previous commit made it easier to trip by exposing stdout_buffer_limit_bytes: any caller who sets it below their real response size hits the orphan path. Route both the buffer-overflow and generic-error branches through a new _schedule_cleanup helper that fires _cleanup() as a separate task (it can't be awaited inline — _cleanup cancels and awaits the reader task itself). _cleanup kills the process, waits on it, rejects pending, and only then transitions to DISCONNECTED, so a subsequent dispose() is a safe no-op instead of leaking the host. Also catch asyncio.LimitOverrunError / ValueError separately and surface HOST_PROTOCOL_ERROR with a "raise stdout_buffer_limit_bytes" hint plus the current limit in details. The previous HOST_DISCONNECTED code pointed users at the wrong problem since the host was still alive. Extends TestAsyncLargeResponse to assert HOST_PROTOCOL_ERROR, verify the hint is in the message, confirm the subprocess is actually reaped (returncode set, _process cleared), and that dispose() after an overflow is a safe no-op. * refactor(sdk): dedupe stdout_buffer_limit default and add wiring test Address review follow-ups on the async transport buffer-limit option. - Hoist DEFAULT_STDOUT_BUFFER_LIMIT_BYTES (64 MiB) to module scope in transport.py and reference it from AsyncHostTransport, the async runtime, and AsyncSuperDocClient so the default lives in one place instead of three copies of 64 * 1024 * 1024. - Add a short "raise if a single host response can exceed this size" comment on the client.py parameter so callers see the guidance at the public API boundary, not buried in transport.py. - Rename test_response_above_default_64kb_buffer to test_response_above_asyncio_default_streamreader_limit. 64 KiB is asyncio's default, not the SDK's (which is now 64 MiB), so the old name read backwards after this PR. - Add test_client_threads_stdout_buffer_limit_to_transport: builds AsyncSuperDocClient with a custom limit and asserts the value reaches AsyncHostTransport. Without this, a silent drop of the arg in client.py or runtime.py would leave the existing overflow test passing while the public API reverts to the asyncio 64 KiB default. * fix(sdk): mark transport DISPOSING synchronously on reader teardown Round-2 review follow-ups: - _schedule_cleanup now flips state to DISPOSING before scheduling the cleanup task. Previously, between the reader returning and the async _cleanup running, _ensure_connected's CONNECTED fast path would still accept invoke() calls; they then blocked on a future the dead reader could never resolve until watchdog_timeout_ms (default 30s). - Narrow the buffer-overflow catch to readline() only and drop asyncio.LimitOverrunError from the tuple. readline() re-raises LimitOverrunError as ValueError (it is not a ValueError subclass on any supported CPython), so the previous broad except could reclassify unrelated ValueErrors from dispatch as a buffer-limit error with a misleading remediation hint. Comment corrected to match. - Re-export DEFAULT_STDOUT_BUFFER_LIMIT_BYTES from superdoc/__init__.py so consumers tuning the option don't import from the implementation module. - Tighten test_host_crash to assert HOST_DISCONNECTED specifically and verify process teardown via the new _schedule_cleanup path. - Strengthen the dispose-after-overflow assertion to actually verify the no-op claim (state stays DISCONNECTED, _process stays None, a second dispose is also safe). Replace the timing-sensitive process.returncode read with await process.wait(). * fix(sdk): serialize teardown across reader, _kill_and_reset, and dispose Round-2 follow-up — addresses the residual race that the synchronous DISPOSING flip didn't cover. Before: `_kill_and_reset()` (called from `_send_request` on stdin write failure or watchdog timeout) `await`ed `_cleanup` directly. If a reader-triggered `_schedule_cleanup` was in flight, both ran concurrently and raced on `_reject_all_pending`'s read-then-clear of `self._pending` (futures added between snapshot and clear were leaked) and on `process.kill()`/`reader_task.cancel()`. `dispose()` similarly short-circuited on DISPOSING without waiting for the in-flight cleanup to finish — the caller saw "disposed" before the host was fully torn down. Now: - `_kill_and_reset` and `dispose` both check the cleanup-task slot and `await` an in-flight cleanup rather than starting a parallel one. Single-flight teardown across all three entry points. - `_cleanup` clears `self._cleanup_task` in `finally` when it owns the slot, so introspection doesn't surface a stale done handle and the next teardown gets a fresh slot. - `dispose()` after a reader-triggered cleanup now blocks until that cleanup finishes, restoring the "host fully torn down on return" contract. Tests: - `test_schedule_cleanup_dedupe_guard_drops_reentrant_call` — second `_schedule_cleanup` does not replace the in-flight task slot. - `test_overflow_during_dispose_does_not_schedule_cleanup` — `_stopping` suppression is honored. - `test_kill_and_reset_awaits_in_flight_cleanup` — `_kill_and_reset` observes the existing task instead of running a parallel `_cleanup`. - `test_dispose_waits_for_in_flight_cleanup` — `dispose()` blocks until reader-triggered cleanup completes before returning. 95 transport tests pass; 5 consecutive runs with PYTHONASYNCIODEBUG=1 show no flakes. * fix(sdk): close residual races in async transport teardown Two correctness regressions and three test gaps surfaced in the final-pass review of the cleanup-task lifecycle. **1. _ensure_connected race (HIGH).** The synchronous DISPOSING flip in _schedule_cleanup did not gate _ensure_connected, so a concurrent connect()/invoke() reaching _start_host during the DISPOSING window would reassign self._process and self._reader_task. The pending cleanup task then read those slots after its first await and killed the freshly-spawned process. Fix: drain self._cleanup_task at the top of _ensure_connected via asyncio.shield (so a cancelled caller doesn't abort the in-flight cleanup). **2. Cancellation propagation race (HIGH).** _kill_and_reset and dispose() awaited the cleanup task without asyncio.shield. When the caller (e.g. an invoke task at the watchdog branch) was cancelled, asyncio cancelled the awaited cleanup task too — _cleanup did not catch CancelledError around process.wait(), so teardown stopped before clearing _process / setting state. dispose() then saw DISPOSING with _cleanup_task=None and returned without finishing teardown, leaking the host process. Fix: wrap the awaited cleanup in asyncio.shield in both call sites; restructure _cleanup so it captures handles and sets state synchronously up-front, before any awaits, so observable state is always consistent. **3. Move _stopping guard into _schedule_cleanup.** The previous test_overflow_during_dispose_does_not_schedule_cleanup was tautological — it set _stopping=True and then re-checked the same condition in the test body before calling _schedule_cleanup, so the call never ran and the assertion passed trivially. Move the guard into _schedule_cleanup itself (it's the correct authoritative location anyway), remove the now-redundant call-site checks in _reader_loop, and rewrite the test to call _schedule_cleanup unconditionally with _stopping=True. The test now actually exercises the production guard. **4. Multi-pending-invoke overflow test.** Codex round-2 gap that remained open. Locks down that _reject_all_pending fails ALL pending futures with HOST_PROTOCOL_ERROR plus the actionable hint, not just the one whose response overflowed. **5. Async reconnect-after-buffer-overflow test.** Sync transport already had test_reconnect_after_failure; async only covered reconnect after explicit dispose. Validates that reader-triggered cleanup leaves the transport reusable for a fresh invoke without wedging _cleanup_task / _connecting / _process. Plus: replaced asyncio.sleep(0) with asyncio.Event-based synchronization in lifecycle tests (Codex/Opus medium — sleep(0) is implementation-defined under uvloop / Python scheduling changes); two new tests directly cover the round-3 races (test_ensure_connected_drains_in_flight_cleanup_before_spawn, test_kill_and_reset_caller_cancellation_does_not_cancel_cleanup). 99 transport tests pass; 5 consecutive runs with PYTHONASYNCIODEBUG=1 show no flakes; new tests pass under -W error::ResourceWarning. --------- Co-authored-by: Caio Pizzol <97641911+caio-pizzol@users.noreply.github.com> Co-authored-by: Caio Pizzol <caio@harbourshare.com>
1 parent 68dcff2 commit bda5d4f

5 files changed

Lines changed: 643 additions & 38 deletions

File tree

packages/sdk/langs/python/superdoc/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,13 +10,15 @@
1010
get_tool_catalog,
1111
list_tools,
1212
)
13+
from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES
1314

1415
__all__ = [
1516
"SuperDocClient",
1617
"AsyncSuperDocClient",
1718
"SuperDocDocument",
1819
"AsyncSuperDocDocument",
1920
"SuperDocError",
21+
"DEFAULT_STDOUT_BUFFER_LIMIT_BYTES",
2022
"get_skill",
2123
"install_skill",
2224
"list_skills",

packages/sdk/langs/python/superdoc/client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
DocOpenResult as GeneratedDocOpenResult,
2323
)
2424
from .runtime import SuperDocAsyncRuntime, SuperDocSyncRuntime
25+
from .transport import DEFAULT_STDOUT_BUFFER_LIMIT_BYTES
2526

2627
UserIdentity = Dict[str, str]
2728

@@ -340,6 +341,9 @@ def __init__(
340341
request_timeout_ms: int | None = None,
341342
watchdog_timeout_ms: int = 30_000,
342343
max_queue_depth: int = 100,
344+
# Raise if a single host response can exceed this size (e.g. reading
345+
# very large documents); otherwise the default is safe.
346+
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
343347
default_change_mode: Literal['direct', 'tracked'] | None = None,
344348
user: UserIdentity | None = None,
345349
) -> None:
@@ -350,6 +354,7 @@ def __init__(
350354
request_timeout_ms=request_timeout_ms,
351355
watchdog_timeout_ms=watchdog_timeout_ms,
352356
max_queue_depth=max_queue_depth,
357+
stdout_buffer_limit_bytes=stdout_buffer_limit_bytes,
353358
default_change_mode=default_change_mode,
354359
user=user,
355360
)

packages/sdk/langs/python/superdoc/runtime.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,11 @@
1414
from .embedded_cli import resolve_embedded_cli_path
1515
from .generated.contract import OPERATION_INDEX
1616
from .protocol import normalize_default_change_mode
17-
from .transport import AsyncHostTransport, SyncHostTransport
17+
from .transport import (
18+
DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
19+
AsyncHostTransport,
20+
SyncHostTransport,
21+
)
1822

1923

2024
class SuperDocSyncRuntime:
@@ -79,6 +83,7 @@ def __init__(
7983
request_timeout_ms: Optional[int] = None,
8084
watchdog_timeout_ms: int = 30_000,
8185
max_queue_depth: int = 100,
86+
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
8287
default_change_mode: Optional[str] = None,
8388
user: Optional[Dict[str, str]] = None,
8489
) -> None:
@@ -93,6 +98,7 @@ def __init__(
9398
request_timeout_ms=request_timeout_ms,
9499
watchdog_timeout_ms=watchdog_timeout_ms,
95100
max_queue_depth=max_queue_depth,
101+
stdout_buffer_limit_bytes=stdout_buffer_limit_bytes,
96102
default_change_mode=self._default_change_mode,
97103
user=user,
98104
)

packages/sdk/langs/python/superdoc/transport.py

Lines changed: 179 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,12 @@
4343

4444
logger = logging.getLogger('superdoc.transport')
4545

46+
# Default stdout StreamReader buffer for the async transport. Host responses
47+
# are single newline-delimited JSON lines, so this caps the largest individual
48+
# response a caller can receive. Raise it if your workload routinely produces
49+
# responses above this size (e.g. whole-document reads on very large docs).
50+
DEFAULT_STDOUT_BUFFER_LIMIT_BYTES = 64 * 1024 * 1024
51+
4652
# Opt-in debug logging via SUPERDOC_DEBUG=1 or SUPERDOC_LOG_LEVEL=debug.
4753
# Only configures the named logger — never mutates root logging config.
4854
_log_level = os.environ.get('SUPERDOC_LOG_LEVEL', '').lower()
@@ -399,6 +405,7 @@ def __init__(
399405
request_timeout_ms: Optional[int] = None,
400406
watchdog_timeout_ms: int = 30_000,
401407
max_queue_depth: int = 100,
408+
stdout_buffer_limit_bytes: int = DEFAULT_STDOUT_BUFFER_LIMIT_BYTES,
402409
default_change_mode: Optional[ChangeMode] = None,
403410
user: Optional[Dict[str, str]] = None,
404411
) -> None:
@@ -409,11 +416,13 @@ def __init__(
409416
self._request_timeout_ms = request_timeout_ms
410417
self._watchdog_timeout_ms = watchdog_timeout_ms
411418
self._max_queue_depth = max_queue_depth
419+
self._stdout_buffer_limit_bytes = stdout_buffer_limit_bytes
412420
self._default_change_mode = default_change_mode
413421
self._user = user
414422

415423
self._process: Optional[asyncio.subprocess.Process] = None
416424
self._reader_task: Optional[asyncio.Task] = None
425+
self._cleanup_task: Optional[asyncio.Task] = None
417426
self._pending: Dict[int, asyncio.Future] = {}
418427
self._state = _State.DISCONNECTED
419428
self._next_request_id = 1
@@ -428,7 +437,22 @@ async def connect(self) -> None:
428437

429438
async def dispose(self) -> None:
430439
"""Gracefully shut down the host process."""
431-
if self._state == _State.DISCONNECTED or self._state == _State.DISPOSING:
440+
if self._state == _State.DISCONNECTED:
441+
return
442+
if self._state == _State.DISPOSING:
443+
# A reader-triggered cleanup is in flight (or an earlier teardown
444+
# left state in DISPOSING briefly). Wait for it so the caller
445+
# observes "host fully torn down" by the time dispose() returns.
446+
# shield() so a cancelled dispose() doesn't interrupt _cleanup
447+
# mid-flight and leak the host process.
448+
existing = self._cleanup_task
449+
if existing and not existing.done():
450+
try:
451+
await asyncio.shield(existing)
452+
except asyncio.CancelledError:
453+
raise
454+
except Exception:
455+
pass
432456
return
433457

434458
self._stopping = True
@@ -507,6 +531,20 @@ async def invoke(
507531

508532
async def _ensure_connected(self) -> None:
509533
"""Lazy connect: spawn and handshake if not already connected."""
534+
# Drain any in-flight teardown before spawning a new host. Without
535+
# this, a concurrent reader-triggered cleanup would still be running
536+
# when _start_host reassigns self._process / self._reader_task; the
537+
# cleanup task would then cancel the fresh reader and kill the fresh
538+
# process. shield() so we don't cancel the cleanup if our caller is.
539+
cleanup = self._cleanup_task
540+
if cleanup and not cleanup.done():
541+
try:
542+
await asyncio.shield(cleanup)
543+
except asyncio.CancelledError:
544+
raise
545+
except Exception:
546+
pass
547+
510548
if self._state == _State.CONNECTED and self._process and self._process.returncode is None:
511549
return
512550

@@ -531,12 +569,15 @@ async def _start_host(self) -> None:
531569
args = [*prefix_args, 'host', '--stdio']
532570

533571
try:
572+
# ``limit`` raises asyncio's StreamReader buffer above its 64 KiB
573+
# default; host responses are single JSON lines and can exceed it.
534574
self._process = await asyncio.create_subprocess_exec(
535575
command, *args,
536576
stdin=asyncio.subprocess.PIPE,
537577
stdout=asyncio.subprocess.PIPE,
538578
stderr=asyncio.subprocess.DEVNULL,
539579
env={**os.environ, **self._env},
580+
limit=self._stdout_buffer_limit_bytes,
540581
)
541582
logger.debug('Host spawned (pid=%s, bin=%s).', self._process.pid, self._cli_bin)
542583
except Exception as exc:
@@ -582,7 +623,29 @@ async def _reader_loop(self) -> None:
582623

583624
try:
584625
while True:
585-
raw = await process.stdout.readline()
626+
try:
627+
raw = await process.stdout.readline()
628+
except ValueError as exc:
629+
# asyncio.StreamReader.readline() re-raises LimitOverrunError
630+
# from readuntil() as ValueError when a single line exceeds
631+
# `limit` (see CPython asyncio/streams.py). The host is still
632+
# alive — schedule cleanup so a later dispose() doesn't
633+
# short-circuit on DISCONNECTED state. Scoped to readline()
634+
# only so unrelated ValueErrors from dispatch aren't
635+
# reclassified as a buffer-limit error. _schedule_cleanup
636+
# is a no-op when _stopping is set (graceful dispose path).
637+
logger.debug('Reader loop buffer overflow: %s', exc)
638+
self._schedule_cleanup(SuperDocError(
639+
'Host response exceeded stdout buffer limit. '
640+
'Raise stdout_buffer_limit_bytes to accommodate larger responses.',
641+
code=HOST_PROTOCOL_ERROR,
642+
details={
643+
'message': str(exc),
644+
'stdout_buffer_limit_bytes': self._stdout_buffer_limit_bytes,
645+
},
646+
))
647+
return
648+
586649
if not raw:
587650
# EOF — process died.
588651
break
@@ -614,16 +677,16 @@ async def _reader_loop(self) -> None:
614677
except Exception as exc:
615678
logger.debug('Reader loop error: %s', exc)
616679

617-
# Reader exited (EOF or error) — reject all pending futures.
618-
if not self._stopping:
619-
exit_code = process.returncode
620-
error = SuperDocError(
621-
'Host process disconnected.',
622-
code=HOST_DISCONNECTED,
623-
details={'exit_code': exit_code, 'signal': None},
624-
)
625-
self._reject_all_pending(error)
626-
self._state = _State.DISCONNECTED
680+
# Reader exited (EOF or unexpected error) — tear down the process so
681+
# no orphaned host is left running, then reject pending futures.
682+
# _schedule_cleanup is a no-op when _stopping is set (graceful
683+
# dispose path) so we don't race the dispose teardown.
684+
exit_code = process.returncode
685+
self._schedule_cleanup(SuperDocError(
686+
'Host process disconnected.',
687+
code=HOST_DISCONNECTED,
688+
details={'exit_code': exit_code, 'signal': None},
689+
))
627690

628691
async def _send_request(self, method: str, params: Any, watchdog_ms: int) -> Any:
629692
"""Send a JSON-RPC request and await the matching response future."""
@@ -687,39 +750,120 @@ def _reject_all_pending(self, error: SuperDocError) -> None:
687750
future.set_exception(error)
688751

689752
async def _kill_and_reset(self) -> None:
690-
"""Kill the host process and reset to DISCONNECTED."""
691-
await self._cleanup(
692-
SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED),
693-
)
694-
695-
async def _cleanup(self, error: Optional[SuperDocError]) -> None:
696-
"""Cancel reader, kill process, reject pending, reset state."""
697-
if self._reader_task and not self._reader_task.done():
698-
self._reader_task.cancel()
753+
"""Kill the host process and reset to DISCONNECTED.
754+
755+
Coordinates with `_schedule_cleanup` so callers (e.g. `_send_request`
756+
on watchdog timeout or stdin write failure) don't run a parallel
757+
`_cleanup` that races a reader-triggered cleanup on
758+
`_reject_all_pending` and `process.kill`. If a cleanup is already in
759+
flight, await it; otherwise own a fresh task in the same slot so a
760+
later concurrent caller sees us instead of starting its own.
761+
762+
shield() the await so caller cancellation (e.g. an `invoke()` task
763+
that times out and is then cancelled by the user) does NOT propagate
764+
into `_cleanup` — interrupting cleanup mid-flight would leak the
765+
subprocess and wedge state in DISPOSING.
766+
"""
767+
existing = self._cleanup_task
768+
if existing and not existing.done():
699769
try:
700-
await self._reader_task
701-
except (asyncio.CancelledError, Exception):
770+
await asyncio.shield(existing)
771+
except asyncio.CancelledError:
772+
raise
773+
except Exception:
702774
pass
703-
self._reader_task = None
775+
return
776+
self._state = _State.DISPOSING
777+
task = asyncio.create_task(self._cleanup(
778+
SuperDocError('Host process disconnected.', code=HOST_DISCONNECTED),
779+
))
780+
self._cleanup_task = task
781+
try:
782+
await asyncio.shield(task)
783+
except asyncio.CancelledError:
784+
raise
785+
except Exception:
786+
pass
704787

788+
def _schedule_cleanup(self, error: SuperDocError) -> None:
789+
"""Fire-and-forget teardown from inside the reader task.
790+
791+
Why a separate task: `_cleanup` cancels and awaits `self._reader_task`.
792+
Awaiting it from inside the reader itself would deadlock — so we punt
793+
to a fresh task, and by the time it runs the reader has already
794+
returned (so cancel+await is a no-op).
795+
796+
Synchronously flips state to DISPOSING so concurrent `invoke()` callers
797+
observe the failed transport immediately rather than passing the
798+
CONNECTED fast path and blocking on a future the dead reader can never
799+
resolve until `watchdog_timeout_ms`.
800+
801+
Skips when `_stopping` is set: a graceful `dispose()` is already
802+
tearing down, and a parallel cleanup task would race on
803+
`_reject_all_pending` and `process.kill`.
804+
805+
Idempotent: if a cleanup is already in flight, subsequent errors are
806+
dropped — the first one wins. Callers may observe completion via
807+
`self._cleanup_task`.
808+
"""
809+
if self._stopping:
810+
return
811+
if self._cleanup_task and not self._cleanup_task.done():
812+
return
813+
self._state = _State.DISPOSING
814+
self._cleanup_task = asyncio.create_task(self._cleanup(error))
815+
816+
async def _cleanup(self, error: Optional[SuperDocError]) -> None:
817+
"""Cancel reader, kill process, reject pending, reset state.
818+
819+
Capture handles and flip user-visible state SYNCHRONOUSLY at the top
820+
before any awaits. That way, even if cancellation arrives during
821+
`process.wait()`, observers see a consistent "torn down" transport
822+
(state DISCONNECTED, _process None, pending futures rejected) rather
823+
than a half-disposed one. The async work below is best-effort
824+
process reaping.
825+
"""
826+
# Snapshot and clear before any await so concurrent callers see a
827+
# fully torn-down transport from this point on.
828+
reader_task = self._reader_task
705829
process = self._process
706-
if process:
707-
try:
708-
process.kill()
709-
except Exception:
710-
pass
711-
try:
712-
await asyncio.wait_for(process.wait(), timeout=2)
713-
except (asyncio.TimeoutError, Exception):
714-
pass
830+
self._reader_task = None
715831
self._process = None
832+
self._state = _State.DISCONNECTED
716833

717-
if error:
834+
if error is not None:
718835
self._reject_all_pending(error)
719836
else:
720837
# Dispose path — reject remaining with generic disconnect.
721838
self._reject_all_pending(
722839
SuperDocError('Host process was disposed.', code=HOST_DISCONNECTED),
723840
)
724841

725-
self._state = _State.DISCONNECTED
842+
try:
843+
if reader_task and not reader_task.done():
844+
reader_task.cancel()
845+
try:
846+
await reader_task
847+
except (asyncio.CancelledError, Exception):
848+
pass
849+
850+
if process:
851+
try:
852+
process.kill()
853+
except Exception:
854+
pass
855+
try:
856+
await asyncio.wait_for(process.wait(), timeout=2)
857+
except (asyncio.TimeoutError, asyncio.CancelledError, Exception):
858+
pass
859+
finally:
860+
# Release the task handle if we are the in-flight cleanup task,
861+
# so introspection doesn't surface a stale done handle and the
862+
# next teardown gets a fresh slot. Skip when called inline (e.g.
863+
# from dispose) — that current task is not our cleanup task.
864+
try:
865+
current = asyncio.current_task()
866+
except RuntimeError:
867+
current = None
868+
if current is not None and self._cleanup_task is current:
869+
self._cleanup_task = None

0 commit comments

Comments
 (0)