fix: improve Responses API streaming event lifecycle and input handling#63
fix: improve Responses API streaming event lifecycle and input handling#63ivanopcode wants to merge 7 commits intoteabranch:mainfrom
Conversation
Review Summary by QodoImprove Responses API streaming lifecycle and input handling
WalkthroughsDescription• Improved input handling for function_call, function_call_output, and developer message types in Responses API requests • Enhanced streaming event lifecycle with response.output_item.added, response.output_item.done, response.output_text.done, and response.content_part.done events • Fixed item_id consistency across streamed tool-call events (changed from id to item_id in ToolCallArgumentsDone) • Replaced non-spec ready status with in_progress/completed for tool calls • Added reasoning_content caching and reinject mechanism across tool-call turns for CoT preservation Diagramflowchart LR
A["Input Processing"] -->|function_call| B["Convert to Assistant Message"]
A -->|function_call_output| C["Convert to Tool Message"]
A -->|developer| D["Convert to System Message"]
B --> E["Cache reasoning_content"]
C --> E
E --> F["Stream Events"]
F -->|output_item.added| G["Tool Call Created"]
F -->|function_call_arguments.delta| H["Arguments Streaming"]
H -->|function_call_arguments.done| I["Arguments Complete"]
I -->|output_item.done| J["Tool Call Done"]
F -->|output_text.delta| K["Text Streaming"]
K -->|output_text.done| L["Text Complete"]
L -->|output_item.done| M["Message Done"]
E -->|Next Turn| N["Reinject reasoning_content"]
File Changes1. src/open_responses_server/models/responses_models.py
|
Code Review by Qodo
1.
|
There was a problem hiding this comment.
Pull request overview
This PR improves the Responses adapter to better match the Responses API, especially for tool-calling clients and models that emit separate reasoning_content.
Changes:
- Expanded
inputitem conversion to supportfunction_call,function_call_output(incl.idfallback + non-string output normalization), anddevelopermessages. - Implemented a richer streaming event lifecycle for tool calls and message items (added/done events, arguments.done, text done events) and standardized status values (
in_progress/completed). - Added a bounded
reasoning_contentcache to reinject model reasoning across tool-call turns (“CoT passback”).
Reviewed changes
Copilot reviewed 3 out of 3 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
tests/test_responses_service.py |
Adds regression tests for reasoning cache eviction, tool output normalization, and stricter streaming lifecycle expectations. |
src/open_responses_server/responses_service.py |
Updates request input handling, adds CoT caching, and significantly expands streaming event lifecycle emissions. |
src/open_responses_server/models/responses_models.py |
Updates/extends streaming event models (e.g., output_item.*, output_text.done) and renames ToolCallArgumentsDone.id → item_id. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if index not in tool_calls: | ||
| tool_call_id = tool_delta.get("id", f"call_{uuid.uuid4().hex}") | ||
| tool_calls[index] = { | ||
| "id": tool_delta.get("id", f"call_{uuid.uuid4().hex}"), | ||
| "id": tool_call_id, | ||
| "type": tool_delta.get("type", "function"), | ||
| "function": { | ||
| "name": tool_delta.get("function", {}).get("name", ""), | ||
| "arguments": tool_delta.get("function", {}).get("arguments", ""), | ||
| "arguments": "", | ||
| }, | ||
| "item_id": f"tool_call_{uuid.uuid4().hex}", | ||
| "output_index": tool_call_counter | ||
| "output_index": tool_call_counter, | ||
| "added_emitted": False, | ||
| } | ||
|
|
||
| # If we got a tool name, emit the created event | ||
| if "function" in tool_delta and "name" in tool_delta["function"]: | ||
| tool_call = tool_calls[index] | ||
| tool_call["function"]["name"] = tool_delta["function"]["name"] | ||
| # Log tool call creation | ||
| logger.info(f"Tool call created: {tool_call['function']['name']}") | ||
|
|
||
| # Check if this is an MCP tool or a user-defined tool | ||
| is_mcp = mcp_manager.is_mcp_tool(tool_call["function"]["name"]) | ||
| tool_status = "in_progress" if is_mcp else "ready" | ||
|
|
||
| logger.info(f"[TOOL-CALL-CREATED] Tool '{tool_call['function']['name']}': is_mcp={is_mcp}, status={tool_status}") | ||
|
|
||
| # Add the tool call to the response output in Responses API format | ||
| response_obj.output.append({ | ||
| "arguments": tool_call["function"]["arguments"], | ||
| "call_id": tool_call["id"], | ||
| "name": tool_call["function"]["name"], | ||
| "type": "function_call", | ||
| "id": tool_call["id"], | ||
| "status": tool_status | ||
| }) | ||
|
|
||
| # Emit the in_progress event | ||
| in_progress_event = ResponseInProgress( | ||
| type="response.in_progress", | ||
| response=response_obj | ||
| ) | ||
|
|
||
| logger.info(f"Emitting {in_progress_event}") | ||
| yield f"data: {json.dumps(in_progress_event.dict())}\n\n" | ||
| tool_call_counter += 1 | ||
|
|
||
| tool_call = tool_calls[index] | ||
|
|
||
| tool_call_counter += 1 | ||
| if "function" in tool_delta and "name" in tool_delta["function"]: | ||
| tool_call["function"]["name"] = tool_delta["function"]["name"] | ||
| item_added_payload = ensure_tool_call_added(tool_call) | ||
| if item_added_payload: | ||
| yield item_added_payload | ||
|
|
||
| # Process function arguments if present | ||
| if "function" in tool_delta and "arguments" in tool_delta["function"]: | ||
| arg_fragment = tool_delta["function"]["arguments"] | ||
| tool_calls[index]["function"]["arguments"] += arg_fragment | ||
|
|
||
| # Emit delta event | ||
| args_event = ToolCallArgumentsDelta( | ||
| type="response.function_call_arguments.delta", | ||
| item_id=tool_calls[index]["item_id"], | ||
| item_id=tool_calls[index]["id"], | ||
| output_index=tool_calls[index]["output_index"], | ||
| delta=arg_fragment | ||
| ) | ||
|
|
||
| yield f"data: {json.dumps(args_event.dict())}\n\n" | ||
|
|
||
| # Handle content (text) | ||
| elif "content" in delta and delta["content"] is not None: | ||
| content_delta = delta["content"] | ||
| output_text_content += content_delta | ||
|
|
||
| # Create a new message if it doesn't exist | ||
| if not response_obj.output: | ||
| response_obj.output.append({ | ||
|
|
||
| # On first text chunk, emit output_item.added + content_part.added | ||
| if not response_obj.output or not any( | ||
| o.get("type") == "message" for o in response_obj.output | ||
| ): | ||
| msg_item = { | ||
| "id": message_id, | ||
| "type": "message", | ||
| "role": "assistant", | ||
| "content": [{"type": "output_text", "text": output_text_content or "(No update)"}] | ||
| }) | ||
|
|
||
| "status": "in_progress", | ||
| "content": [] | ||
| } | ||
| response_obj.output.append(msg_item) | ||
| # output_item.added | ||
| yield f"data: {json.dumps({'type': 'response.output_item.added', 'output_index': 0, 'item': msg_item})}\n\n" | ||
| # content_part.added | ||
| yield f"data: {json.dumps({'type': 'response.content_part.added', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': '', 'annotations': []}})}\n\n" | ||
|
|
There was a problem hiding this comment.
output_index values can collide and/or not match the actual position in response_obj.output. Tool calls use tool_call_counter starting at 0, while message events hardcode output_index: 0; if text is emitted before/alongside tool calls (or vice versa), multiple items can share output_index=0 and tool calls may get an output_index that doesn't reflect their order in the output array. Consider maintaining a single monotonically increasing output_index for all output items (messages + tool calls) and storing it per item when you append to response_obj.output so events and response.completed.response.output stay consistent.
| # Cache reasoning_content (CoT) keyed by tool call_id for passback. | ||
| # Keep a bounded insertion-ordered cache so recent tool-call chains can feed | ||
| # reasoning back into the next request without unbounded growth. | ||
| reasoning_content_cache: OrderedDict[str, str] = OrderedDict() |
There was a problem hiding this comment.
reasoning_content_cache is process-global and keyed only by client-provided call_id. If call_id is reused/collides across requests (or across different users in a shared deployment), reasoning content can be injected into an unrelated conversation, which is a potential cross-request data leak. Consider scoping by conversation/response identifier (e.g., (previous_response_id, call_id)) or storing reasoning alongside the per-response conversation_history instead of in a global cache.
| # If we haven't already completed the response, do it now | ||
| if response_obj.status != "completed": | ||
| # If no output, add empty message | ||
| if not response_obj.output: | ||
| response_obj.output.append({ | ||
| "id": message_id, | ||
| "type": "message", | ||
| "role": "assistant", | ||
| "content": [{"type": "output_text", "text": f"{output_text_content}\n\n" or "Done"}] | ||
| }) | ||
|
|
||
| final_text = output_text_content or "" | ||
|
|
||
| # Emit text closing events if we had text content | ||
| if final_text: | ||
| yield f"data: {json.dumps({'type': 'response.output_text.done', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'text': final_text})}\n\n" | ||
| yield f"data: {json.dumps({'type': 'response.content_part.done', 'item_id': message_id, 'output_index': 0, 'content_index': 0, 'part': {'type': 'output_text', 'text': final_text, 'annotations': []}})}\n\n" | ||
|
|
||
| final_msg_item = { | ||
| "id": message_id, | ||
| "type": "message", | ||
| "role": "assistant", | ||
| "status": "completed", | ||
| "content": [{"type": "output_text", "text": final_text, "annotations": []}] | ||
| } | ||
|
|
||
| # Emit output_item.done if we have text | ||
| if final_text: | ||
| yield f"data: {json.dumps({'type': 'response.output_item.done', 'output_index': 0, 'item': final_msg_item})}\n\n" | ||
|
|
||
| response_obj.output = [final_msg_item] if final_text else response_obj.output | ||
| response_obj.status = "completed" |
There was a problem hiding this comment.
In the [DONE] handler, if final_text is empty you leave response_obj.output as-is (potentially empty) and skip emitting response.output_item.* lifecycle events. This can produce a response.completed event with no message output, which is inconsistent with the finish_reason == "stop" path (and with the goal of always having a valid message lifecycle even for empty text). Consider always creating an empty message item (and emitting output_item.added/done) when completing on [DONE] without any prior output.
Problem
ORS did not fully support some Responses API request and event patterns used
by tool-calling clients and open-weight reasoning models such as gpt-oss.
In practice this caused three classes of problems:
Input history was reconstructed incompletely.
Clients send prior tool calls, tool results, and developer messages as
inputitems on each turn. ORS only handled a subset of these items, soparts of the conversation history were dropped before reaching the backend.
The streamed Responses event lifecycle was incomplete.
Several expected events and state transitions were missing or inconsistent,
especially around tool calls and text output items.
reasoning_contentwas not preserved across tool-call turns.For models that emit reasoning separately from the final answer, losing that
context degraded multi-step tool use. This is sometimes referred to in the
community as "CoT passback".
These issues were reproduced with Codex CLI, but the fixes bring ORS closer
to the Responses API model more generally.
Changes
This MR updates the Responses adapter to:
function_call,function_call_output, anddeveloperinput itemsinto the corresponding chat-completions message structure
output, including
response.output_item.added,response.function_call_arguments.done,response.output_text.done, andresponse.output_item.doneitem_idvalues across streamed tool-call eventsreadystatus within_progress/completedreasoning_contentacross tool-call turns when the modelprovides it
Testing
Tested with:
uv run pytest tests/test_responses_service.pyAlso verified manually with Codex CLI against local llama.cpp-backed models,
including multi-turn tool-calling flows where prior tool calls and reasoning
need to survive across turns.