Skip to content

Commit 9f28d8c

Browse files
author
Mateusz
committed
fix(codex-websocket): normalize Responses-native tool events into canonical chunks
Preserve full response.output_item.done payloads in OpenAIWebSocketClient and add normalization in ResponseExecutor so websocket-originated Codex events flow through the Responses translator before retry/visibility logic. Also normalize response.done into synthetic response.completed so websocket streams end with consistent canonical stop chunks. Delivers failing tests first proving websocket tool-call completion and response.done normalization end-to-end.
1 parent ac76f3a commit 9f28d8c

30 files changed

Lines changed: 758 additions & 1231 deletions

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ The proxy includes built-in resilience features for production use:
5959
- **Streaming protection** - Avoids retry after output has started, preventing corruption
6060
- **Health monitoring** - Tracks backend availability and performance
6161

62-
Configure via the `resilience` section in `config.yaml` or see the [Failure Handling Guide](docs/user_guide/features/failure-handling.md). Request processing now runs through a single canonical manager path with no legacy split-handler fallback. `request_processing_unification` remains for operational controls such as per-backend `connector_stream_first` hints, optional `emit_path_selection_metadata`, and `promotion_requirements` reporting. When path diagnostics are enabled, `promotion_guardrails` uses strict missing-evidence semantics (absent measurements do not read as promotion-ready).
62+
Configure via the `resilience` section in `config.yaml` or see the [Failure Handling Guide](docs/user_guide/features/failure-handling.md). Request processing now runs through a single canonical manager path with no legacy split-handler fallback. `canonical_request_processing` provides the remaining runtime controls such as empty-stream recovery tuning.
6363

6464
## Quick Start
6565

config/config.example.yaml

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -263,20 +263,10 @@ dynamic_compression:
263263
diff_max_lines_per_hunk: 100
264264
diff_max_total_lines: 500
265265

266-
# Request-processing unification (canonical path is always used at runtime)
267-
request_processing_unification:
268-
enable_core_canonical_path: true
269-
enable_canonical_features: false
270-
connector_stream_first: {}
271-
retire_legacy_dual_path: false
272-
emit_path_selection_metadata: false
273-
promotion_requirements:
274-
require_characterization_tests: true
275-
require_equivalence_tests: true
276-
max_non_stream_p95_latency_delta_pct: 10.0
277-
max_stream_ttft_delta_pct: 10.0
278-
max_memory_delta_pct: 10.0
279-
require_cleanup_checks: true
266+
# Canonical request-processing runtime controls
267+
canonical_request_processing:
268+
empty_stream_recovery_prompt: "The previous response was empty, please try again."
269+
max_empty_stream_retries: 1
280270

281271
# Logging
282272
logging:

config/schemas/app_config.schema.yaml

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -634,18 +634,10 @@ properties:
634634
capability_refresh_backoff_seconds:
635635
type: number
636636
minimum: 0
637-
request_processing_unification:
637+
canonical_request_processing:
638638
type: object
639639
additionalProperties: false
640640
properties:
641-
enable_core_canonical_path: { type: boolean }
642-
enable_canonical_features: { type: boolean }
643-
connector_stream_first:
644-
type: object
645-
additionalProperties: { type: boolean }
646-
retire_legacy_dual_path: { type: boolean }
647-
emit_path_selection_metadata: { type: boolean }
648-
legacy_streaming_client_blocking_envelope: { type: boolean }
649641
empty_stream_recovery_prompt:
650642
type: string
651643
description: "Recovery prompt appended to retry requests when stream produces no content"
@@ -654,9 +646,6 @@ properties:
654646
minimum: 0
655647
maximum: 5
656648
description: "Maximum number of empty stream retry attempts before failing"
657-
promotion_requirements:
658-
type: object
659-
additionalProperties: false
660649
properties:
661650
require_characterization_tests: { type: boolean }
662651
require_equivalence_tests: { type: boolean }

src/connectors/openai_codex/executor.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -846,6 +846,9 @@ async def _streaming_iterator() -> AsyncIterator[ProcessedResponse]:
846846
visible_output_emitted = False
847847
with OverrideRenderer(renderer_key):
848848
async for processed_chunk in stream_handle.iterator:
849+
processed_chunk = self._normalize_processed_stream_chunk(
850+
processed_chunk
851+
)
849852
incompatible_tools = self._detect_incompatible_tool_calls(
850853
processed_chunk.content,
851854
context,
@@ -1164,6 +1167,53 @@ def _append_incompatible_tool_retry_steering(
11641167
)
11651168
return dict(adapted)
11661169

1170+
@staticmethod
1171+
def _coerce_stream_chunk_content(content: object) -> dict[str, Any] | None:
1172+
model_dump = getattr(content, "model_dump", None)
1173+
if callable(model_dump):
1174+
dumped = model_dump(exclude_none=True)
1175+
if isinstance(dumped, dict):
1176+
return cast(dict[str, Any], dumped)
1177+
if isinstance(content, Mapping):
1178+
return dict(content)
1179+
return None
1180+
1181+
def _normalize_processed_stream_chunk(
1182+
self, chunk: ProcessedResponse
1183+
) -> ProcessedResponse:
1184+
metadata = chunk.metadata
1185+
event_type = metadata.get("event_type")
1186+
if not isinstance(event_type, str):
1187+
return chunk
1188+
1189+
content_dict = self._coerce_stream_chunk_content(chunk.content)
1190+
if not content_dict:
1191+
return chunk
1192+
1193+
if event_type == "response.done":
1194+
content_dict = {"type": "response.completed", "response": content_dict}
1195+
elif "choices" in content_dict or not str(
1196+
content_dict.get("type") or ""
1197+
).startswith("response."):
1198+
return chunk
1199+
1200+
translation_service = getattr(self._base_connector, "translation_service", None)
1201+
if translation_service is None:
1202+
return chunk
1203+
1204+
translated = translation_service.to_domain_stream_chunk(
1205+
content_dict, "responses"
1206+
)
1207+
translated_content = self._coerce_stream_chunk_content(translated)
1208+
if translated_content is None:
1209+
return chunk
1210+
1211+
return ProcessedResponse(
1212+
content=translated_content,
1213+
usage=chunk.usage,
1214+
metadata=dict(metadata),
1215+
)
1216+
11671217
@staticmethod
11681218
def _extract_tool_calls(response_like: object) -> list[dict[str, object]]:
11691219
if isinstance(response_like, Mapping):

src/connectors/openai_websocket_client.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -343,13 +343,14 @@ def _event_to_processed_response(
343343
metadata={"event_type": event_type},
344344
)
345345

346-
# Handle response.output_item.done
347-
if event_type == "response.output_item.done":
348-
output_item = event_data.get("item", {})
349-
return ProcessedResponse(
350-
content={"type": "output_item.done", "item": output_item},
351-
metadata={"event_type": event_type},
352-
)
346+
# Preserve full Responses-native payloads for tool-call completion events.
347+
# Downstream Codex translation needs fields like output_index and the exact
348+
# top-level event type to reconstruct canonical tool-call chunks.
349+
if event_type == "response.output_item.done":
350+
return ProcessedResponse(
351+
content=event_data,
352+
metadata={"event_type": event_type},
353+
)
353354

354355
# Handle response.done
355356
if event_type == "response.done":

src/core/config/app_config.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,14 @@
1414
BackendConfig,
1515
BackendSettings,
1616
BruteForceProtectionConfig,
17+
CanonicalRequestProcessingConfig,
1718
CodebuffConfig,
1819
EditPrecisionConfig,
1920
EmptyResponseConfig,
2021
LoggingConfig,
2122
LogLevel,
2223
ModelAliasRule,
2324
PlanningPhaseConfig,
24-
RequestProcessingPromotionRequirementsConfig,
25-
RequestProcessingUnificationConfig,
2625
RewritingConfig,
2726
RoutingConfig,
2827
SessionConfig,
@@ -113,7 +112,7 @@ def save(self, path: str | Path) -> None:
113112
"failure_handling",
114113
"routing",
115114
"dynamic_compression",
116-
"request_processing_unification",
115+
"canonical_request_processing",
117116
"reasoning_model_token_floor",
118117
"memory",
119118
"database",
@@ -239,8 +238,7 @@ def load_config(
239238
"ParameterResolution",
240239
"ParameterSource",
241240
"PlanningPhaseConfig",
242-
"RequestProcessingPromotionRequirementsConfig",
243-
"RequestProcessingUnificationConfig",
241+
"CanonicalRequestProcessingConfig",
244242
"RewritingConfig",
245243
"RoutingConfig",
246244
"SessionConfig",

src/core/config/models/__init__.py

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,9 @@
88
BackendSettings,
99
get_openrouter_headers,
1010
)
11+
from src.core.config.models.canonical_request_processing import (
12+
CanonicalRequestProcessingConfig,
13+
)
1114
from src.core.config.models.end_of_session import EndOfSessionConfig
1215
from src.core.config.models.logging import LoggingConfig, LogLevel
1316
from src.core.config.models.misc import (
@@ -19,10 +22,6 @@
1922
)
2023
from src.core.config.models.non_forwardable_config import NonForwardableTaggingConfig
2124
from src.core.config.models.notification import NotificationConfig
22-
from src.core.config.models.request_processing_unification import (
23-
RequestProcessingPromotionRequirementsConfig,
24-
RequestProcessingUnificationConfig,
25-
)
2625
from src.core.config.models.rewriting import (
2726
EditPrecisionConfig,
2827
ModelAliasRule,
@@ -56,8 +55,7 @@
5655
"ModelAliasRule",
5756
"NotificationConfig",
5857
"PlanningPhaseConfig",
59-
"RequestProcessingPromotionRequirementsConfig",
60-
"RequestProcessingUnificationConfig",
58+
"CanonicalRequestProcessingConfig",
6159
"ReasoningModelTokenFloorConfig",
6260
"RewritingConfig",
6361
"ResilienceConfig",

src/core/config/models/app_config_model.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,9 @@
1111
from src.core.config.models.auth import AuthConfig
1212
from src.core.config.models.auxiliary_routing import AuxiliaryRoutingConfig
1313
from src.core.config.models.backends import BackendSettings
14+
from src.core.config.models.canonical_request_processing import (
15+
CanonicalRequestProcessingConfig,
16+
)
1417
from src.core.config.models.end_of_session import EndOfSessionConfig
1518
from src.core.config.models.logging import LoggingConfig
1619
from src.core.config.models.misc import (
@@ -24,9 +27,6 @@
2427
)
2528
from src.core.config.models.non_forwardable_config import NonForwardableTaggingConfig
2629
from src.core.config.models.notification import NotificationConfig
27-
from src.core.config.models.request_processing_unification import (
28-
RequestProcessingUnificationConfig,
29-
)
3030
from src.core.config.models.rewriting import (
3131
EditPrecisionConfig,
3232
ModelAliasRule,
@@ -120,8 +120,8 @@ class AppConfigModel(DomainModel, IConfig):
120120
auxiliary_routing: AuxiliaryRoutingConfig = Field(
121121
default_factory=AuxiliaryRoutingConfig
122122
)
123-
request_processing_unification: RequestProcessingUnificationConfig = Field(
124-
default_factory=RequestProcessingUnificationConfig
123+
canonical_request_processing: CanonicalRequestProcessingConfig = Field(
124+
default_factory=CanonicalRequestProcessingConfig
125125
)
126126
compaction: CompactionConfig = Field(default_factory=CompactionConfig)
127127
dynamic_compression: DynamicCompressionConfig = Field(
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
from __future__ import annotations
2+
3+
from pydantic import Field
4+
5+
from src.core.interfaces.model_bases import DomainModel
6+
7+
8+
class CanonicalRequestProcessingConfig(DomainModel):
9+
"""Runtime settings for the canonical request-processing pipeline."""
10+
11+
# Empty stream recovery tuning (operational flexibility)
12+
empty_stream_recovery_prompt: str = Field(
13+
default="The previous response was empty, please try again.",
14+
description="Recovery prompt appended to retry requests when stream produces no content",
15+
)
16+
max_empty_stream_retries: int = Field(
17+
default=1,
18+
ge=0,
19+
le=5,
20+
description="Maximum number of empty stream retry attempts before failing",
21+
)

src/core/config/models/request_processing_unification.py

Lines changed: 0 additions & 50 deletions
This file was deleted.

0 commit comments

Comments
 (0)