Skip to content

Commit 3690470

Browse files
author
Mateusz
committed
fix(codex): keep large http_full_replay off the event loop
- Build continuation fingerprints in a worker thread (record_turn). - Skip full json.dumps for oversized input/tools in INFO request logs. - Add dedicated regression tests for the two stall paths. Made-with: Cursor
1 parent 36775d4 commit 3690470

3 files changed

Lines changed: 146 additions & 16 deletions

File tree

src/connectors/openai_codex/continuation.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,20 @@ class _ContinuationEntry:
5555
expires_at: float
5656

5757

58+
def _build_codex_turn_snapshot(
59+
normalized: str, payload_dict: dict[str, Any]
60+
) -> CodexContinuationSnapshot:
61+
"""CPU-heavy snapshot build for ``record_turn`` (may run in a worker thread)."""
62+
return CodexContinuationSnapshot(
63+
response_id=normalized,
64+
input_fingerprints=fingerprint_input_items(payload_dict.get("input")),
65+
instructions_fingerprint=fingerprint_component(
66+
payload_dict.get("instructions")
67+
),
68+
tools_fingerprint=fingerprint_component(payload_dict.get("tools")),
69+
)
70+
71+
5872
class InMemoryCodexContinuationCoordinator(ICodexContinuationCoordinator):
5973
"""Ephemeral TTL/LRU-ish continuation store keyed by Codex request identity."""
6074

@@ -152,14 +166,12 @@ async def record_turn(
152166
key = self._build_key(context)
153167
now = time.monotonic()
154168

155-
# M1: Uses shared fingerprinting utilities
156-
snapshot = CodexContinuationSnapshot(
157-
response_id=normalized,
158-
input_fingerprints=fingerprint_input_items(payload_dict.get("input")),
159-
instructions_fingerprint=fingerprint_component(
160-
payload_dict.get("instructions")
161-
),
162-
tools_fingerprint=fingerprint_component(payload_dict.get("tools")),
169+
# M1: Uses shared fingerprinting utilities. Heavy ``json.dumps`` per input item
170+
# can block the event loop for large http_full_replay sessions; run off-thread.
171+
snapshot = await asyncio.to_thread(
172+
_build_codex_turn_snapshot,
173+
normalized,
174+
payload_dict,
163175
)
164176
async with self._lock:
165177
self._purge_expired(now)

src/connectors/openai_codex/executor.py

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,11 @@ class ResponseExecutor(IResponseExecutor):
323323
- Credential refresh integration for streaming retries
324324
"""
325325

326+
# INFO logging calls ``json.dumps`` on the full Codex ``input`` list; skip that
327+
# work for very large histories to avoid multi-second event-loop stalls before
328+
# each ``http_full_replay`` upstream request.
329+
_LOG_JSON_MEASURE_MAX_INPUT_ITEMS: int = 120
330+
326331
def __init__(
327332
self,
328333
base_connector: OpenAIConnector,
@@ -823,12 +828,12 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
823828
context,
824829
)
825830
)
826-
if incompatible_tools:
827-
if (
828-
incompatible_tool_retries
829-
< self._max_incompatible_tool_retries
830-
):
831-
retry_for_incompatible_tools = True
831+
if incompatible_tools:
832+
if (
833+
incompatible_tool_retries
834+
< self._max_incompatible_tool_retries
835+
):
836+
retry_for_incompatible_tools = True
832837
restart_stream = True
833838
incompatible_tool_retries += 1
834839
current_payload_dict = self._append_incompatible_tool_retry_steering(
@@ -1588,9 +1593,9 @@ def _log_request_attempt(
15881593
"input_item_count": (
15891594
len(input_items) if isinstance(input_items, list) else 0
15901595
),
1591-
"input_bytes": self._measure_json_bytes(input_items),
1596+
"input_bytes": self._measure_json_bytes_for_log(input_items),
15921597
"tools_count": len(tools) if isinstance(tools, list) else 0,
1593-
"tools_bytes": self._measure_json_bytes(tools),
1598+
"tools_bytes": self._measure_json_bytes_for_log(tools),
15941599
"instructions_bytes": (
15951600
len(instructions.encode("utf-8"))
15961601
if isinstance(instructions, str)
@@ -1599,6 +1604,15 @@ def _log_request_attempt(
15991604
},
16001605
)
16011606

1607+
def _measure_json_bytes_for_log(self, value: Any) -> int | None:
1608+
"""Like ``_measure_json_bytes`` but skips huge lists used only for diagnostics."""
1609+
if (
1610+
isinstance(value, list)
1611+
and len(value) > self._LOG_JSON_MEASURE_MAX_INPUT_ITEMS
1612+
):
1613+
return None
1614+
return self._measure_json_bytes(value)
1615+
16021616
@staticmethod
16031617
def _measure_json_bytes(value: Any) -> int:
16041618
if value is None:
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
"""Regression: proxy-side stalls on long ``http_full_replay`` Codex sessions.
2+
3+
Historically, two issues showed up together with ``Observed Codex response id`` / long
4+
histories:
5+
6+
1. ``record_turn`` fingerprinted the entire ``input`` list synchronously on the event
7+
loop (``json.dumps`` per item), freezing the process under large replays.
8+
9+
2. ``ResponseExecutor._log_request_attempt`` computed ``input_bytes`` / ``tools_bytes``
10+
via a full ``json.dumps`` of the entire payload for INFO logs before each upstream
11+
request, which could take multiple seconds for hundreds of messages.
12+
13+
Upstream Codex can still pause between SSE events; these tests only guard the proxy.
14+
"""
15+
16+
from __future__ import annotations
17+
18+
import asyncio
19+
from typing import Any
20+
21+
import pytest
22+
from src.connectors._openai_codex_capabilities import CodexClientCapabilities
23+
from src.connectors.openai_codex.continuation import (
24+
InMemoryCodexContinuationCoordinator,
25+
_build_codex_turn_snapshot,
26+
)
27+
from src.connectors.openai_codex.contracts import (
28+
CodexRequestContext,
29+
ProcessedMessage,
30+
)
31+
from src.connectors.openai_codex.executor import ResponseExecutor
32+
from src.core.domain.chat import CanonicalChatRequest, ChatMessage
33+
34+
35+
def _codex_context(session_id: str) -> CodexRequestContext:
36+
request = CanonicalChatRequest(
37+
model="gpt-5.1-codex",
38+
messages=[ChatMessage(role="user", content="hello")],
39+
stream=True,
40+
)
41+
return CodexRequestContext(
42+
request=request,
43+
processed_messages=[ProcessedMessage(role="user", content="hello")],
44+
effective_model="gpt-5.1-codex",
45+
capabilities=CodexClientCapabilities(),
46+
session_id=session_id,
47+
metadata={
48+
"continuation_backend": "openai-codex",
49+
"continuation_prompt_cache_key": "prompt-a",
50+
},
51+
)
52+
53+
54+
@pytest.mark.asyncio
55+
async def test_record_turn_fingerprints_large_input_via_worker_thread(
56+
monkeypatch: pytest.MonkeyPatch,
57+
) -> None:
58+
coordinator = InMemoryCodexContinuationCoordinator(ttl_seconds=60, max_entries=4)
59+
context = _codex_context("session-regression-large-input")
60+
big_input = [
61+
{"type": "message", "role": "user", "content": f"line-{i}"} for i in range(250)
62+
]
63+
64+
to_thread_calls: list[Any] = []
65+
66+
real_to_thread = asyncio.to_thread
67+
68+
async def _spy_to_thread(func: Any, /, *args: Any, **kwargs: Any) -> Any:
69+
to_thread_calls.append(func)
70+
return await real_to_thread(func, *args, **kwargs)
71+
72+
monkeypatch.setattr(
73+
"src.connectors.openai_codex.continuation.asyncio.to_thread",
74+
_spy_to_thread,
75+
)
76+
77+
await coordinator.record_turn(
78+
context,
79+
response_id="resp-regression",
80+
payload_dict={
81+
"input": big_input,
82+
"instructions": "inst",
83+
"tools": [{"type": "function", "name": "read", "parameters": {}}],
84+
},
85+
)
86+
87+
assert to_thread_calls == [_build_codex_turn_snapshot]
88+
snapshot = await coordinator.get_snapshot(context)
89+
assert snapshot is not None
90+
assert snapshot.response_id == "resp-regression"
91+
assert len(snapshot.input_fingerprints) == 250
92+
93+
94+
def test_measure_json_bytes_for_log_skips_long_lists(
95+
executor: ResponseExecutor,
96+
) -> None:
97+
"""INFO-only size metrics must not full-serialize huge ``input`` lists."""
98+
cap = executor._LOG_JSON_MEASURE_MAX_INPUT_ITEMS
99+
assert (
100+
executor._measure_json_bytes_for_log([{"i": n} for n in range(cap + 1)]) is None
101+
)
102+
measured = executor._measure_json_bytes_for_log([{"i": n} for n in range(5)])
103+
assert isinstance(measured, int)
104+
assert measured > 0

0 commit comments

Comments
 (0)