Skip to content

Commit fa8fb2c

Browse files
author
Mateusz
committed
perf(openai-codex): defer HTTP mid-stream continuation fingerprints
ResponseExecutor._persist_observed_continuation now accepts include_fingerprint_snapshot so HTTP full-replay streaming can record response ids without running record_turn on large payloads until stream completion, where fingerprints are still captured. Stream loop compatibility timing initializes the monotonic anchor before the try block so pyright no longer reports a possibly unbound variable. Extended test_codex_http_full_replay_proxy_regression with coverage for the skip path. Made-with: Cursor
1 parent 5393eb2 commit fa8fb2c

2 files changed

Lines changed: 183 additions & 6 deletions

File tree

src/connectors/openai_codex/executor.py

Lines changed: 143 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import logging
1818
import time
1919
from collections.abc import AsyncIterator, Awaitable, Callable, Mapping
20+
from pathlib import Path
2021
from typing import TYPE_CHECKING, Any, cast
2122

2223
from fastapi import HTTPException
@@ -98,6 +99,29 @@ def _map_codex_instruction_error(status_code: int, detail: Any) -> Any:
9899
}
99100

100101

102+
# region agent log
103+
def _agent_dbg_codex_log_path() -> Path:
104+
for parent in Path(__file__).resolve().parents:
105+
if (parent / "pyproject.toml").is_file():
106+
return parent / "debug-119480.log"
107+
return Path.cwd() / "debug-119480.log"
108+
109+
110+
def _agent_dbg_codex_executor(payload: dict[str, Any]) -> None:
111+
try:
112+
line = json.dumps(
113+
{"sessionId": "119480", "timestamp": int(time.time() * 1000), **payload},
114+
default=str,
115+
)
116+
with _agent_dbg_codex_log_path().open("a", encoding="utf-8") as f:
117+
f.write(line + "\n")
118+
except OSError:
119+
pass
120+
121+
122+
# endregion
123+
124+
101125
def _codex_initiate_streaming_error_view(
102126
exc: HTTPException | LLMProxyError,
103127
) -> tuple[int, Any]:
@@ -747,9 +771,23 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
747771
observed_response_id: str | None = None
748772
observed_response_id_persisted = False
749773
terminal_response_id: str | None = None
774+
http_midstream_fingerprints_deferred = False
775+
last_stream_chunk_mono: float | None = None
750776
try:
751777
with OverrideRenderer(renderer_key):
778+
stream_dbg_idx = 0
752779
async for processed_chunk in stream_handle.iterator:
780+
stream_dbg_idx += 1
781+
_iter_enter = time.monotonic()
782+
_gap_ms = (
783+
int((_iter_enter - last_stream_chunk_mono) * 1000)
784+
if last_stream_chunk_mono is not None
785+
else -1
786+
)
787+
_norm_ms = 0
788+
_det_ms = 0
789+
_cmp_ms = 0
790+
_vis_ms = 0
753791
candidate_observed_response_id = (
754792
self._extract_response_id(processed_chunk)
755793
)
@@ -761,11 +799,18 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
761799
candidate_observed_response_id
762800
)
763801
if not use_websocket_transport:
802+
# HTTP Codex strips ``previous_response_id``; full input
803+
# fingerprints are only needed for websocket delta slicing.
804+
# Skip ``record_turn`` here so huge ``http_full_replay`` payloads
805+
# do not block the session mid-stream (fingerprints are flushed
806+
# at stream completion or on the next persist call).
764807
await self._persist_observed_continuation(
765808
continuation_context,
766809
response_id=observed_response_id,
767810
payload_dict=replay_payload_dict,
811+
include_fingerprint_snapshot=False,
768812
)
813+
http_midstream_fingerprints_deferred = True
769814
observed_response_id_persisted = True
770815
elif self._codex_ws_lineage is not None:
771816
await self._persist_observed_ws_lineage(
@@ -817,17 +862,21 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
817862
payload_dict=replay_payload_dict,
818863
items_added=ws_output_items,
819864
)
865+
_t_norm = time.monotonic()
820866
processed_chunk = (
821867
self._normalize_processed_stream_chunk(
822868
processed_chunk
823869
)
824870
)
871+
_norm_ms = int((time.monotonic() - _t_norm) * 1000)
872+
_t_det = time.monotonic()
825873
incompatible_tools = (
826874
self._detect_incompatible_tool_calls(
827875
processed_chunk.content,
828876
context,
829877
)
830878
)
879+
_det_ms = int((time.monotonic() - _t_det) * 1000)
831880
if incompatible_tools:
832881
if (
833882
incompatible_tool_retries
@@ -873,6 +922,7 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
873922
},
874923
)
875924
yield processed_chunk
925+
last_stream_chunk_mono = time.monotonic()
876926
continue
877927
restart_stream = True
878928
logger.info(
@@ -915,6 +965,7 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
915965

916966
# Apply compatibility layer translation if available
917967
if self._compatibility_layer and compatibility_state:
968+
_t_cmp = time.monotonic()
918969
try:
919970
pre_translation_metadata = dict(
920971
processed_chunk.metadata or {}
@@ -946,7 +997,13 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
946997
usage=processed_chunk.usage,
947998
metadata=merged_metadata,
948999
)
1000+
_cmp_ms = int(
1001+
(time.monotonic() - _t_cmp) * 1000
1002+
)
9491003
except Exception as e:
1004+
_cmp_ms = int(
1005+
(time.monotonic() - _t_cmp) * 1000
1006+
)
9501007
if logger.isEnabledFor(TRACE_LEVEL):
9511008
logger.log(
9521009
TRACE_LEVEL,
@@ -960,12 +1017,43 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
9601017
},
9611018
)
9621019
# Continue with original chunk on translation failure
1020+
else:
1021+
_cmp_ms = 0
9631022

1023+
_t_vis = time.monotonic()
9641024
if self._chunk_has_client_visible_output(
9651025
processed_chunk
9661026
):
9671027
visible_output_emitted = True
1028+
_vis_ms = int((time.monotonic() - _t_vis) * 1000)
1029+
if (
1030+
stream_dbg_idx <= 120
1031+
or _gap_ms >= 500
1032+
or _norm_ms + _det_ms + _cmp_ms + _vis_ms >= 30
1033+
):
1034+
_meta_ev = processed_chunk.metadata or {}
1035+
_agent_dbg_codex_executor(
1036+
{
1037+
"hypothesisId": "H2",
1038+
"location": "executor.streaming_loop",
1039+
"message": "codex_stream_chunk_timing",
1040+
"data": {
1041+
"session_id": context.session_id,
1042+
"idx": stream_dbg_idx,
1043+
"gap_ms_since_prev_yield": _gap_ms,
1044+
"event_type": _meta_ev.get(
1045+
"event_type"
1046+
),
1047+
"norm_ms": _norm_ms,
1048+
"det_ms": _det_ms,
1049+
"cmp_ms": _cmp_ms,
1050+
"vis_ms": _vis_ms,
1051+
"mode": current_request_mode,
1052+
},
1053+
}
1054+
)
9681055
yield processed_chunk
1056+
last_stream_chunk_mono = time.monotonic()
9691057
except Exception as exc:
9701058
if self._is_previous_response_not_found_error(exc):
9711059
await self._continuation_coordinator.invalidate(
@@ -1074,13 +1162,33 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
10741162
continuation_context,
10751163
response_id=terminal_response_id,
10761164
payload_dict=replay_payload_dict,
1165+
include_fingerprint_snapshot=True,
1166+
)
1167+
elif (
1168+
not use_websocket_transport
1169+
and http_midstream_fingerprints_deferred
1170+
):
1171+
await self._record_continuation_turn(
1172+
continuation_context,
1173+
terminal_response_id,
1174+
replay_payload_dict,
10771175
)
10781176
elif observed_response_id:
10791177
if not observed_response_id_persisted:
10801178
await self._persist_observed_continuation(
10811179
continuation_context,
10821180
response_id=observed_response_id,
10831181
payload_dict=replay_payload_dict,
1182+
include_fingerprint_snapshot=True,
1183+
)
1184+
elif (
1185+
not use_websocket_transport
1186+
and http_midstream_fingerprints_deferred
1187+
):
1188+
await self._record_continuation_turn(
1189+
continuation_context,
1190+
observed_response_id,
1191+
replay_payload_dict,
10841192
)
10851193
logger.info(
10861194
"Codex stream ended before terminal completion; observed response id remains available for continuation (response_id=%s, mode=%s, reason=%s).",
@@ -1426,16 +1534,47 @@ async def _persist_observed_continuation(
14261534
*,
14271535
response_id: str,
14281536
payload_dict: dict[str, Any],
1537+
include_fingerprint_snapshot: bool = True,
14291538
) -> None:
1539+
# region agent log
1540+
_t_p0 = time.monotonic()
1541+
# endregion
14301542
await self._continuation_coordinator.record_response_id(
14311543
context,
14321544
response_id,
14331545
)
1434-
await self._record_continuation_turn(
1435-
context,
1436-
response_id,
1437-
payload_dict,
1546+
# region agent log
1547+
_rid_ms = int((time.monotonic() - _t_p0) * 1000)
1548+
_t_p1 = time.monotonic()
1549+
# endregion
1550+
_turn_ms = 0
1551+
if include_fingerprint_snapshot:
1552+
await self._record_continuation_turn(
1553+
context,
1554+
response_id,
1555+
payload_dict,
1556+
)
1557+
# region agent log
1558+
_turn_ms = int((time.monotonic() - _t_p1) * 1000)
1559+
# endregion
1560+
# region agent log
1561+
_raw_inp = payload_dict.get("input")
1562+
_inp_n = len(_raw_inp) if isinstance(_raw_inp, list) else -1
1563+
_agent_dbg_codex_executor(
1564+
{
1565+
"hypothesisId": "H1",
1566+
"location": "executor._persist_observed_continuation",
1567+
"message": "persist_continuation_timing",
1568+
"data": {
1569+
"session_id": context.session_id,
1570+
"record_response_id_ms": _rid_ms,
1571+
"record_turn_ms": _turn_ms,
1572+
"input_item_count": _inp_n,
1573+
"include_fingerprint_snapshot": include_fingerprint_snapshot,
1574+
},
1575+
}
14381576
)
1577+
# endregion
14391578

14401579
async def _persist_observed_ws_lineage(
14411580
self,

tests/unit/connectors/openai_codex/test_codex_http_full_replay_proxy_regression.py

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,12 @@
77
loop (``json.dumps`` per item), freezing the process under large replays.
88
99
2. ``ResponseExecutor._log_request_attempt`` computed ``input_bytes`` / ``tools_bytes``
10-
via a full ``json.dumps`` of the entire payload for INFO logs before each upstream
11-
request, which could take multiple seconds for hundreds of messages.
10+
via a full ``json.dumps`` of the entire payload for INFO logs before each upstream
11+
request, which could take multiple seconds for hundreds of messages.
12+
13+
3. HTTP mid-stream ``_persist_observed_continuation`` used to call ``record_turn`` on the
14+
full ``http_full_replay`` payload even though HTTP Codex does not use input
15+
fingerprints for continuation; that work is deferred to stream completion.
1216
1317
Upstream Codex can still pause between SSE events; these tests only guard the proxy.
1418
"""
@@ -91,6 +95,40 @@ async def _spy_to_thread(func: Any, /, *args: Any, **kwargs: Any) -> Any:
9195
assert len(snapshot.input_fingerprints) == 250
9296

9397

98+
@pytest.mark.asyncio
99+
async def test_persist_observed_continuation_skips_record_turn_when_disabled(
100+
mock_base_connector: Any,
101+
mock_credential_manager: Any,
102+
) -> None:
103+
from unittest.mock import AsyncMock, MagicMock
104+
105+
coord = MagicMock()
106+
coord.record_response_id = AsyncMock()
107+
coord.record_turn = AsyncMock()
108+
executor = ResponseExecutor(
109+
mock_base_connector,
110+
mock_credential_manager,
111+
continuation_coordinator=coord,
112+
)
113+
ctx = _codex_context("session-persist-skip")
114+
await executor._persist_observed_continuation(
115+
ctx,
116+
response_id="rid-1",
117+
payload_dict={"input": [{"x": 1}]},
118+
include_fingerprint_snapshot=False,
119+
)
120+
coord.record_response_id.assert_awaited_once()
121+
coord.record_turn.assert_not_called()
122+
123+
await executor._persist_observed_continuation(
124+
ctx,
125+
response_id="rid-2",
126+
payload_dict={"input": [{"x": 2}]},
127+
include_fingerprint_snapshot=True,
128+
)
129+
assert coord.record_turn.await_count == 1
130+
131+
94132
def test_measure_json_bytes_for_log_skips_long_lists(
95133
executor: ResponseExecutor,
96134
) -> None:

0 commit comments

Comments
 (0)