Skip to content

Commit 23cd45d

Browse files
author
Mateusz
committed
Detect rate limits in first SSE chunk for composite failover.
OpenAI-compatible streams sometimes return HTTP 200 with an error object that uses string codes or types (e.g. rate_limit_exceeded, usage_limit_reached) instead of numeric status_code 429. The streaming envelope then stayed at 200, so BackendCompletionFlow skipped terminal-error handling and weighted composite rerolls never ran. Extend first-chunk status inference to classify those payloads as 429 and add regression tests for the extractor and integrate_streaming_pipeline. Made-with: Cursor
1 parent 8631f51 commit 23cd45d

2 files changed

Lines changed: 116 additions & 2 deletions

File tree

src/core/ports/streaming_integration.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
import json
1111
import logging
1212
from collections.abc import AsyncIterator
13-
from typing import cast
13+
from typing import Any, cast
1414

1515
from src.core.common.exceptions import LLMProxyError, RateLimitExceededError
1616
from src.core.domain.responses import StreamingResponseEnvelope
@@ -45,6 +45,53 @@
4545

4646
logger = logging.getLogger(__name__)
4747

48+
# String / type tokens that indicate a rate-limited first SSE frame when HTTP status
49+
# was already 200 (some OpenAI-compatible gateways stream errors as SSE only).
50+
_RATE_LIMIT_ERROR_CODE_TOKENS: frozenset[str] = frozenset(
51+
{
52+
"rate_limit_exceeded",
53+
"too_many_requests",
54+
"requests_per_minute_exceeded",
55+
"rpm_limit_exceeded",
56+
"tpm_limit_exceeded",
57+
"tenant_rate_limited",
58+
"usage_limit_reached",
59+
}
60+
)
61+
62+
63+
def _error_payload_implies_rate_limit(payload: dict[str, Any]) -> bool:
64+
"""Return True when a JSON object (often ``error``) describes a rate limit."""
65+
err_type = payload.get("type")
66+
if isinstance(err_type, str) and err_type.strip():
67+
lowered = err_type.strip().lower()
68+
if lowered in _RATE_LIMIT_ERROR_CODE_TOKENS:
69+
return True
70+
if "rate" in lowered and "limit" in lowered:
71+
return True
72+
73+
code = payload.get("code")
74+
if isinstance(code, int) and code == 429:
75+
return True
76+
if isinstance(code, float) and code.is_integer() and int(code) == 429:
77+
return True
78+
if isinstance(code, str) and code.strip():
79+
lowered = code.strip().lower()
80+
if lowered in _RATE_LIMIT_ERROR_CODE_TOKENS or lowered == "429":
81+
return True
82+
83+
sc = payload.get("status_code")
84+
if isinstance(sc, int) and sc == 429:
85+
return True
86+
if isinstance(sc, float) and sc.is_integer() and int(sc) == 429:
87+
return True
88+
if isinstance(sc, str):
89+
stripped = sc.strip()
90+
if stripped.isdigit() and int(stripped) == 429:
91+
return True
92+
93+
return False
94+
4895

4996
def _try_extract_http_status_from_first_sse_chunk(first_chunk: bytes) -> int | None:
5097
"""Best-effort: extract HTTP-like status from an SSE error chunk.
@@ -56,6 +103,7 @@ def _try_extract_http_status_from_first_sse_chunk(first_chunk: bytes) -> int | N
56103
Expected formats:
57104
- data: {"choices": [...], "error": {"status_code": 404, ...}}
58105
- data: {"choices": [{"finish_reason": "error"}], ...}
106+
- data: {"error": {"code": "rate_limit_exceeded", ...}} (no numeric status)
59107
"""
60108
try:
61109
text = first_chunk.decode("utf-8", errors="ignore")
@@ -91,6 +139,14 @@ def _try_extract_http_status_from_first_sse_chunk(first_chunk: bytes) -> int | N
91139
return code
92140
if isinstance(code, float) and code.is_integer():
93141
return int(code)
142+
if _error_payload_implies_rate_limit(err):
143+
return 429
144+
elif isinstance(err, str) and "rate" in err.lower() and "limit" in err.lower():
145+
return 429
146+
147+
if _error_payload_implies_rate_limit(obj):
148+
return 429
149+
94150
# Fallback: if it looks like an OpenAI error chunk, treat as 500.
95151
choices = obj.get("choices")
96152
if isinstance(choices, list) and choices:

tests/unit/core/ports/test_streaming_error_propagation.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,10 @@
1717
StreamingErrorMapper,
1818
handle_streaming_error,
1919
)
20-
from src.core.ports.streaming_integration import integrate_streaming_pipeline
20+
from src.core.ports.streaming_integration import (
21+
_try_extract_http_status_from_first_sse_chunk,
22+
integrate_streaming_pipeline,
23+
)
2124

2225

2326
class TestStreamingContentErrorChunks:
@@ -299,6 +302,61 @@ async def failing_raw_stream():
299302
headers = cast(dict[str, Any], detail["headers"])
300303
assert headers["retry-after"] == "7"
301304

305+
def test_extract_status_from_first_sse_string_rate_limit_code(self) -> None:
306+
"""String error.code (no numeric status) must classify as HTTP 429 for failover."""
307+
payload = (
308+
'data: {"error":{"type":"rate_limit_exceeded",'
309+
'"code":"rate_limit_exceeded","message":"RPM"}}\n\n'
310+
)
311+
assert _try_extract_http_status_from_first_sse_chunk(payload.encode()) == 429
312+
313+
def test_extract_status_from_first_sse_usage_limit_reached_type(self) -> None:
314+
"""Codex-style usage_limit_reached must map to 429 for downstream recovery."""
315+
payload = (
316+
'data: {"error":{"type":"usage_limit_reached",'
317+
'"message":"The usage limit has been reached"}}\n\n'
318+
)
319+
assert _try_extract_http_status_from_first_sse_chunk(payload.encode()) == 429
320+
321+
def test_extract_status_from_first_sse_string_status_code_429(self) -> None:
322+
payload = 'data: {"error":{"status_code":"429","message":"slow down"}}\n\n'
323+
assert _try_extract_http_status_from_first_sse_chunk(payload.encode()) == 429
324+
325+
@pytest.mark.asyncio
326+
async def test_integrate_streaming_pipeline_first_sse_string_rate_limit_status_429(
327+
self, monkeypatch: pytest.MonkeyPatch
328+
) -> None:
329+
"""When the first SSE frame is a string-coded rate limit, envelope HTTP status is 429."""
330+
331+
rate_chunk = (
332+
b'data: {"error":{"code":"rate_limit_exceeded","message":"RPM"}}\n\n'
333+
)
334+
335+
class _Pipeline:
336+
async def process_stream(self, *args, **kwargs):
337+
yield rate_chunk
338+
339+
monkeypatch.setattr(
340+
"src.core.ports.streaming_integration.create_pipeline_for_provider",
341+
lambda *args, **kwargs: _Pipeline(),
342+
)
343+
344+
async def raw_stream():
345+
if False:
346+
yield b""
347+
348+
envelope = await integrate_streaming_pipeline(
349+
raw_stream(),
350+
provider="openai",
351+
stream_id="sse-string-rl",
352+
enable_loop_detection=False,
353+
enable_tool_call_repair=False,
354+
enable_think_tags=False,
355+
)
356+
357+
assert envelope.status_code == 429
358+
assert envelope.content is not None
359+
302360
@pytest.mark.asyncio
303361
async def test_integrate_streaming_pipeline_maps_early_429(
304362
self, monkeypatch

0 commit comments

Comments
 (0)