Skip to content

Commit 30aa517

Browse files
author
Mateusz
committed
Responses API: wire-only streaming, no legacy response.chunk
Route /v1/responses SSE through ResponsesWireStreamEmitter for all canonical chunks, including tool calls (function_call output items and argument deltas). Remove legacy SSE formatting from the controller. Extend coercion for bytes, strings, and ProcessedResponse JSON content. Update integration and unit tests and document wire compliance in README. Made-with: Cursor
1 parent 3a6dec6 commit 30aa517

8 files changed

Lines changed: 1013 additions & 195 deletions

File tree

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ It is a compatibility layer, a security layer, a traffic control plane, a debugg
3434

3535
Beyond basic forwarding, the proxy adds cross-protocol translation, tool safety, routing and failover, session-oriented features (including B2BUA-style handling), boundary-level CBOR captures, usage tracking, and built-in token-saving controls. Longer narratives, use-case lists, and feature tours live in the [User Guide](docs/user_guide/index.md).
3636

37-
- **One endpoint, many clients** - Keep existing OpenAI-, Anthropic-, and Gemini-style clients while changing routing behind the proxy.
37+
- **One endpoint, many clients** - Keep existing OpenAI-, Anthropic-, and Gemini-style clients while changing routing behind the proxy. `/v1/responses` streaming emits official OpenAI Responses events (no legacy `response.chunk` fallback shape).
3838
- **Token-saving that actually matters** - Shrink bloated sessions with stale-history compaction and content-aware tool-output compression.
3939
- **Production-minded resilience** - Use retries, failover, health tracking, and safeguards that respect streaming semantics.
4040
- **Operational visibility** - Inspect wire captures, diagnostics, and usage data instead of debugging blind.

src/core/app/controllers/responses_controller.py

Lines changed: 51 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from fastapi.responses import StreamingResponse
1717
from pydantic import ValidationError
1818

19+
from src.core.app.controllers.responses_stream_legacy import coerce_stream_chunk_payload
1920
from src.core.common.exceptions import (
2021
InitializationError,
2122
LLMProxyError,
@@ -33,6 +34,9 @@
3334
ResponsesRequest,
3435
enforce_json_schema_limits,
3536
)
37+
from src.core.domain.translators.responses.wire_stream_emitter import (
38+
ResponsesWireStreamEmitter,
39+
)
3640
from src.core.interfaces.client_end_of_session_service_interface import (
3741
IClientEndOfSessionService,
3842
)
@@ -965,8 +969,10 @@ async def _generator() -> AsyncIterator[str]:
965969

966970
response_id = f"resp_{int(time.time())}_{id(response)}"
967971
created_timestamp = int(time.time())
968-
last_chunk_model = getattr(domain_request, "model", "unknown")
969972
stream_terminated = False
973+
wire_emitter: ResponsesWireStreamEmitter | None = None
974+
native_wire_passthrough = False
975+
last_domain_chunk: dict[str, Any] | None = None
970976

971977
cancel_lock = asyncio.Lock()
972978
cancel_state = {"called": False}
@@ -1113,171 +1119,40 @@ async def is_disconnected() -> bool:
11131119
break
11141120

11151121
try:
1116-
chunk_content: Any = ""
1117-
chunk_metadata: dict[str, Any] = {}
1118-
chunk_payload: dict[str, Any] | None = None
1119-
1120-
if isinstance(chunk, ProcessedResponse):
1121-
chunk_content = chunk.content or ""
1122-
chunk_metadata = chunk.metadata or {}
1123-
if isinstance(chunk.content, dict):
1124-
chunk_payload = chunk.content
1125-
elif isinstance(chunk, dict):
1126-
chunk_content = str(chunk.get("content", ""))
1127-
chunk_metadata = chunk.get("metadata", {}) or {}
1128-
chunk_payload = chunk
1129-
elif hasattr(chunk, "content"):
1130-
chunk_content = getattr(chunk, "content", "") or ""
1131-
chunk_metadata = getattr(chunk, "metadata", {}) or {}
1132-
if isinstance(chunk_content, dict):
1133-
chunk_payload = chunk_content
1134-
elif isinstance(chunk, str):
1135-
chunk_content = chunk
1136-
elif isinstance(chunk, bytes):
1137-
chunk_content = chunk.decode("utf-8", errors="ignore")
1138-
else:
1139-
chunk_content = str(chunk)
1140-
1141-
chunk_id = chunk_metadata.get("id") or response_id
1142-
chunk_model = chunk_metadata.get("model") or getattr(
1143-
domain_request, "model", "unknown"
1144-
)
1145-
chunk_created = (
1146-
chunk_metadata.get("created") or created_timestamp
1122+
chunk_payload = coerce_stream_chunk_payload(
1123+
chunk, default_response_id=response_id
11471124
)
11481125

1149-
finish_reason = chunk_metadata.get("finish_reason")
1150-
delta: dict[str, Any] = {}
1151-
1152-
if chunk_payload:
1153-
chunk_id = chunk_payload.get("id", chunk_id)
1154-
chunk_model = chunk_payload.get("model", chunk_model)
1155-
chunk_created = chunk_payload.get("created", chunk_created)
1156-
1157-
choices = chunk_payload.get("choices")
1158-
if isinstance(choices, list) and choices:
1159-
primary_choice = choices[0] or {}
1160-
delta_payload = primary_choice.get("delta") or {}
1161-
if isinstance(delta_payload, dict):
1162-
delta = dict(delta_payload)
1163-
finish_reason = (
1164-
primary_choice.get("finish_reason") or finish_reason
1165-
)
1166-
1167-
if not delta and chunk_content:
1168-
delta["content"] = chunk_content
1169-
1170-
content_value = delta.get("content")
1171-
if content_value is not None and not isinstance(
1172-
content_value, str
1173-
):
1174-
# Use dict() for dict types to safely handle StopChunkWithUsage
1175-
safe_value = (
1176-
dict(content_value)
1177-
if isinstance(content_value, dict)
1178-
else content_value
1179-
)
1180-
delta["content"] = json.dumps(safe_value)
1181-
1182-
tool_calls = delta.get("tool_calls") or chunk_metadata.get(
1183-
"tool_calls"
1184-
)
1185-
if tool_calls:
1186-
normalized_calls: list[dict[str, Any]] = []
1187-
for tool_call in tool_calls:
1188-
if hasattr(tool_call, "model_dump"):
1189-
call_data = tool_call.model_dump()
1190-
elif isinstance(tool_call, dict):
1191-
call_data = dict(tool_call)
1192-
else:
1193-
function = getattr(tool_call, "function", None)
1194-
call_data = {
1195-
"id": getattr(tool_call, "id", ""),
1196-
"type": getattr(tool_call, "type", "function"),
1197-
"function": {
1198-
"name": getattr(function, "name", ""),
1199-
"arguments": getattr(
1200-
function, "arguments", "{}"
1201-
),
1202-
},
1203-
}
1204-
1205-
function_payload = call_data.get("function")
1206-
if isinstance(function_payload, dict):
1207-
arguments = function_payload.get("arguments")
1208-
if isinstance(arguments, dict | list):
1209-
function_payload["arguments"] = json.dumps(
1210-
arguments
1211-
)
1212-
elif arguments is None:
1213-
function_payload["arguments"] = "{}"
1214-
1215-
normalized_calls.append(call_data)
1216-
1217-
delta["tool_calls"] = normalized_calls
1126+
if not isinstance(chunk_payload, dict):
12181127
if logger.isEnabledFor(logging.DEBUG):
12191128
logger.debug(
1220-
"ResponsesController normalized streaming tool_calls: %s",
1221-
normalized_calls,
1129+
"Skipping non-object Responses stream chunk type=%s",
1130+
type(chunk).__name__,
1131+
extra={"request_id": request_id},
12221132
)
1133+
continue
12231134

1224-
if not delta:
1225-
delta["content"] = ""
1226-
1227-
choice_payload: dict[str, Any] = {
1228-
"index": 0,
1229-
"delta": delta,
1230-
}
1231-
if finish_reason:
1232-
choice_payload["finish_reason"] = finish_reason
1233-
1234-
streaming_chunk = {
1235-
"id": chunk_id,
1236-
"object": "response.chunk",
1237-
"created": chunk_created,
1238-
"model": chunk_model,
1239-
"choices": [choice_payload],
1240-
}
1135+
last_domain_chunk = chunk_payload
12411136

1242-
last_chunk_model = chunk_model
1243-
1244-
# OPTIMIZATION: Use fast path for simple content chunks to avoid expensive json.dumps
1245-
# This avoids serializing the full dict structure for every token in high-volume streams
1246-
choices_list = streaming_chunk.get("choices")
1247-
is_simple_chunk = (
1248-
len(streaming_chunk) == 5
1249-
and "choices" in streaming_chunk
1250-
and isinstance(choices_list, list)
1251-
and len(choices_list) == 1
1252-
and isinstance(choices_list[0], dict)
1253-
and "delta" in choices_list[0]
1254-
and len(choices_list[0]) == 2 # index and delta only
1255-
and isinstance(choices_list[0].get("delta"), dict)
1256-
and len(choices_list[0].get("delta", {})) == 1
1257-
and "content" in choices_list[0].get("delta", {})
1258-
and isinstance(
1259-
choices_list[0].get("delta", {}).get("content"), str
1260-
)
1261-
)
1262-
1263-
if is_simple_chunk:
1264-
# Fast string construction for simple chunks
1265-
# Use json.dumps only for the content string to ensure safe escaping
1266-
c = cast(dict[str, Any], streaming_chunk)
1267-
choices = cast(list[dict[str, Any]], c["choices"])
1268-
choice = choices[0]
1269-
delta = cast(dict[str, Any], choice["delta"])
1270-
1271-
content_json = json.dumps(delta["content"])
1272-
idx = choice["index"]
1273-
1274-
json_str = (
1275-
f'{{"id": "{c["id"]}", "object": "{c["object"]}", "created": {c["created"]}, '
1276-
f'"model": "{c["model"]}", "choices": [{{"index": {idx}, "delta": {{"content": {content_json}}}}}]}}'
1137+
ot_raw = chunk_payload.get("type")
1138+
if (
1139+
isinstance(ot_raw, str)
1140+
and ot_raw.startswith("response.")
1141+
and ot_raw != "response.chunk"
1142+
):
1143+
native_wire_passthrough = True
1144+
yield f"data: {json.dumps(chunk_payload)}\n\n"
1145+
continue
1146+
1147+
if wire_emitter is None:
1148+
wire_emitter = ResponsesWireStreamEmitter(
1149+
model=str(
1150+
getattr(domain_request, "model", None) or "unknown"
1151+
),
1152+
created_at=float(created_timestamp),
12771153
)
1278-
yield f"data: {json_str}\n\n"
1279-
else:
1280-
yield f"data: {json.dumps(streaming_chunk)}\n\n"
1154+
for wire_evt in wire_emitter.feed(chunk_payload):
1155+
yield f"data: {json.dumps(wire_evt)}\n\n"
12811156

12821157
except Exception as exc:
12831158
if logger.isEnabledFor(logging.WARNING):
@@ -1290,20 +1165,24 @@ async def is_disconnected() -> bool:
12901165
continue
12911166

12921167
if not stream_terminated:
1293-
final_chunk = {
1294-
"id": response_id,
1295-
"object": "response.chunk",
1296-
"created": created_timestamp,
1297-
"model": last_chunk_model,
1298-
"choices": [
1299-
{
1300-
"index": 0,
1301-
"finish_reason": "stop",
1302-
"delta": {},
1303-
}
1304-
],
1305-
}
1306-
yield f"data: {json.dumps(final_chunk)}\n\n"
1168+
if native_wire_passthrough:
1169+
pass
1170+
elif wire_emitter is not None and not wire_emitter.is_finished():
1171+
for wire_evt in wire_emitter.finalize(
1172+
tail_domain_chunk=last_domain_chunk
1173+
):
1174+
yield f"data: {json.dumps(wire_evt)}\n\n"
1175+
elif wire_emitter is None and not native_wire_passthrough:
1176+
empty_wire = ResponsesWireStreamEmitter(
1177+
model=str(
1178+
getattr(domain_request, "model", None) or "unknown"
1179+
),
1180+
created_at=float(created_timestamp),
1181+
)
1182+
for wire_evt in empty_wire.finalize(
1183+
tail_domain_chunk=last_domain_chunk
1184+
):
1185+
yield f"data: {json.dumps(wire_evt)}\n\n"
13071186
yield "data: [DONE]\n\n"
13081187

13091188
except GeneratorExit:
Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
"""Helpers for coercing streaming chunks into canonical dict payloads."""
2+
3+
from __future__ import annotations
4+
5+
import json
6+
from typing import Any, cast
7+
8+
from src.core.interfaces.response_processor_interface import ProcessedResponse
9+
10+
11+
def coerce_stream_chunk_payload(
12+
chunk: Any, *, default_response_id: str
13+
) -> dict[str, Any] | None:
14+
"""Normalize stream iterator items into a domain-style chunk dict."""
15+
16+
def _parse_json_dict(raw: bytes | bytearray | str) -> dict[str, Any] | None:
17+
try:
18+
text = (
19+
bytes(raw).decode("utf-8")
20+
if isinstance(raw, bytes | bytearray)
21+
else raw
22+
)
23+
parsed = json.loads(text)
24+
except (UnicodeDecodeError, json.JSONDecodeError, TypeError, ValueError):
25+
return None
26+
return parsed if isinstance(parsed, dict) else None
27+
28+
if isinstance(chunk, bytes | bytearray | str):
29+
return _parse_json_dict(chunk)
30+
31+
if isinstance(chunk, ProcessedResponse):
32+
if isinstance(chunk.content, dict):
33+
return chunk.content
34+
if isinstance(chunk.content, bytes | bytearray | str):
35+
parsed_content = _parse_json_dict(chunk.content)
36+
if parsed_content is not None:
37+
return parsed_content
38+
md = chunk.metadata or {}
39+
if md.get("tool_calls"):
40+
return {
41+
"id": md.get("id", default_response_id),
42+
"model": md.get("model", "unknown"),
43+
"created": md.get("created"),
44+
"choices": [
45+
{
46+
"index": 0,
47+
"delta": {"tool_calls": md["tool_calls"]},
48+
"finish_reason": md.get("finish_reason"),
49+
}
50+
],
51+
}
52+
if md.get("is_done") or md.get("finish_reason"):
53+
return {
54+
"id": md.get("id", default_response_id),
55+
"model": md.get("model", "unknown"),
56+
"created": md.get("created"),
57+
"choices": [
58+
{
59+
"index": 0,
60+
"delta": {},
61+
"finish_reason": md.get("finish_reason", "stop"),
62+
},
63+
],
64+
}
65+
return None
66+
67+
if isinstance(chunk, dict):
68+
return chunk
69+
if hasattr(chunk, "content"):
70+
content = getattr(chunk, "content", None)
71+
if isinstance(content, dict):
72+
return cast(dict[str, Any], content)
73+
if isinstance(content, bytes | bytearray | str):
74+
return _parse_json_dict(content)
75+
return None

0 commit comments

Comments
 (0)