Skip to content

Commit 781192a

Browse files
author
Mateusz
committed
fix(codex): repair usage reporting for openai-codex connector
- Use Responses-format translator to preserve input/output_tokens usage - Remove unsupported stream_options injection to avoid 400 errors - Add streaming usage fallback when provider omits usage metadata - Add regression tests for non-stream and streaming usage paths - Add end-to-end demo script proving non-zero usage for both flows
1 parent 56dece2 commit 781192a

3 files changed

Lines changed: 546 additions & 17 deletions

File tree

Lines changed: 228 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,228 @@
1+
#!/usr/bin/env python
2+
"""Demo script proving OpenAI Codex usage reporting is non-zero.
3+
4+
This script creates a real OpenAICodexConnector instance, routes both
5+
non-streaming and streaming chat flows through it, and prints/asserts usage.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import asyncio
11+
import json
12+
import sys
13+
import tempfile
14+
from collections.abc import AsyncIterator
15+
from pathlib import Path
16+
from typing import Any
17+
from unittest.mock import AsyncMock, patch
18+
19+
import httpx
20+
21+
PROJECT_ROOT = Path(__file__).resolve().parents[2]
22+
if str(PROJECT_ROOT) not in sys.path:
23+
sys.path.insert(0, str(PROJECT_ROOT))
24+
from src.connectors.openai_codex import OpenAICodexConnector
25+
from src.core.config.app_config import AppConfig
26+
from src.core.domain.chat import ChatMessage, ChatRequest
27+
from src.core.domain.responses import (
28+
ResponseEnvelope,
29+
StreamingResponseEnvelope,
30+
StreamingResponseHandle,
31+
)
32+
from src.core.interfaces.response_processor_interface import ProcessedResponse
33+
from src.core.services.translation_service import TranslationService
34+
35+
36+
class _FakeTransportWithProviderUsage:
37+
async def initiate_streaming_request(
38+
self,
39+
url: str,
40+
payload: dict[str, Any],
41+
headers: dict[str, str],
42+
session_id: str,
43+
) -> StreamingResponseHandle:
44+
async def _iterator() -> AsyncIterator[ProcessedResponse]:
45+
yield ProcessedResponse(
46+
content={
47+
"choices": [
48+
{
49+
"delta": {"content": "Non-streaming Codex response"},
50+
"finish_reason": "stop",
51+
}
52+
],
53+
"usage": {
54+
"input_tokens": 17,
55+
"output_tokens": 9,
56+
"total_tokens": 26,
57+
},
58+
}
59+
)
60+
61+
async def _cancel() -> None:
62+
return None
63+
64+
return StreamingResponseHandle(
65+
iterator=_iterator(),
66+
headers={"x-demo": "codex-usage"},
67+
cancel_callback=_cancel,
68+
)
69+
70+
71+
class _FakeTransportWithoutProviderUsage:
72+
async def initiate_streaming_request(
73+
self,
74+
url: str,
75+
payload: dict[str, Any],
76+
headers: dict[str, str],
77+
session_id: str,
78+
) -> StreamingResponseHandle:
79+
async def _iterator() -> AsyncIterator[ProcessedResponse]:
80+
yield ProcessedResponse(
81+
content={
82+
"choices": [
83+
{
84+
"delta": {"content": "This should carry token usage."},
85+
"finish_reason": "stop",
86+
}
87+
]
88+
}
89+
)
90+
91+
async def _cancel() -> None:
92+
return None
93+
94+
return StreamingResponseHandle(
95+
iterator=_iterator(),
96+
headers={"x-demo": "codex-usage"},
97+
cancel_callback=_cancel,
98+
)
99+
100+
101+
async def _run_demo() -> None:
102+
with tempfile.TemporaryDirectory() as tmp_dir:
103+
auth_dir = Path(tmp_dir)
104+
auth_payload = {"tokens": {"access_token": "chatgpt_token"}}
105+
(auth_dir / "auth.json").write_text(json.dumps(auth_payload), encoding="utf-8")
106+
107+
async with httpx.AsyncClient() as client:
108+
cfg = AppConfig()
109+
ts = TranslationService()
110+
backend = OpenAICodexConnector(client, cfg, translation_service=ts)
111+
112+
with (
113+
patch.object(
114+
backend,
115+
"_validate_credentials_file_exists",
116+
return_value=(True, []),
117+
),
118+
patch.object(
119+
backend, "_validate_credentials_structure", return_value=(True, [])
120+
),
121+
patch.object(backend, "_start_file_watching"),
122+
):
123+
await backend.initialize(openai_codex_path=str(auth_dir))
124+
125+
backend._credential_manager._managed_current_account = None # type: ignore[attr-defined]
126+
127+
try:
128+
with patch.object(
129+
backend,
130+
"_validate_runtime_credentials",
131+
AsyncMock(return_value=True),
132+
):
133+
non_stream_request = ChatRequest(
134+
model="openai-codex:gpt-5-codex",
135+
messages=[ChatMessage(role="user", content="Count my tokens")],
136+
stream=False,
137+
)
138+
139+
response_executor = getattr(backend, "_response_executor", None)
140+
if response_executor is None:
141+
raise RuntimeError("Response executor is not initialized")
142+
response_executor._transport = _FakeTransportWithProviderUsage() # type: ignore[attr-defined]
143+
144+
non_stream_result = await backend.chat_completions(
145+
request_data=non_stream_request,
146+
processed_messages=list(non_stream_request.messages),
147+
effective_model="gpt-5-codex",
148+
)
149+
150+
if not isinstance(non_stream_result, ResponseEnvelope):
151+
raise RuntimeError(
152+
"Expected non-streaming call to return ResponseEnvelope"
153+
)
154+
if non_stream_result.usage is None:
155+
raise RuntimeError("Non-streaming usage is missing")
156+
157+
non_stream_total = non_stream_result.usage.total_tokens or 0
158+
print("[non-stream] usage:", non_stream_result.usage.model_dump())
159+
if non_stream_total <= 0:
160+
raise RuntimeError(
161+
"Non-streaming total_tokens is zero; expected > 0"
162+
)
163+
164+
streaming_request = ChatRequest(
165+
model="openai-codex:gpt-5-codex",
166+
messages=[
167+
ChatMessage(
168+
role="user",
169+
content="Give me a short answer and report usage",
170+
)
171+
],
172+
stream=True,
173+
)
174+
175+
response_executor._transport = _FakeTransportWithoutProviderUsage() # type: ignore[attr-defined]
176+
177+
stream_result = await backend.chat_completions(
178+
request_data=streaming_request,
179+
processed_messages=list(streaming_request.messages),
180+
effective_model="gpt-5-codex",
181+
)
182+
183+
if not isinstance(stream_result, StreamingResponseEnvelope):
184+
raise RuntimeError(
185+
"Expected streaming call to return StreamingResponseEnvelope"
186+
)
187+
stream_content_any = stream_result.content
188+
if stream_content_any is None or not hasattr(
189+
stream_content_any, "__aiter__"
190+
):
191+
raise RuntimeError(
192+
"Streaming response content iterator is missing"
193+
)
194+
stream_content: AsyncIterator[ProcessedResponse] = (
195+
stream_content_any
196+
)
197+
198+
usage_payloads: list[dict[str, Any]] = []
199+
async for chunk in stream_content:
200+
if chunk.usage is not None:
201+
usage_payloads.append(chunk.usage.model_dump())
202+
usage_metadata = chunk.metadata.get("usage")
203+
if isinstance(usage_metadata, dict):
204+
usage_payloads.append(dict(usage_metadata))
205+
206+
print("[stream] usage payloads:", usage_payloads)
207+
if not usage_payloads:
208+
raise RuntimeError("Streaming usage is missing")
209+
210+
max_total = max(
211+
int(p.get("total_tokens", 0)) for p in usage_payloads
212+
)
213+
if max_total <= 0:
214+
raise RuntimeError(
215+
"Streaming total_tokens is zero; expected > 0"
216+
)
217+
218+
print("SUCCESS: Codex usage reporting is non-zero for both flows.")
219+
finally:
220+
await backend.shutdown()
221+
222+
223+
def main() -> None:
224+
asyncio.run(_run_demo())
225+
226+
227+
if __name__ == "__main__":
228+
main()

0 commit comments

Comments
 (0)