Skip to content

Commit 92dae1c

Browse files
author
Mateusz
committed
fix(openai): map upstream read disconnects to 502 in streaming paths
Handle httpx.ReadError as a backend read failure instead of a generic connection outage across all streaming code paths (send-time, SSE buffer loop, and stream_completion). Previously these surfaced as misleading ServiceUnavailableError (503) with 'Could not connect to backend' messages and ERROR-level tracebacks. Now they map to BackendError (502) with reason='read_error' and WARNING-level logs. Also fix LSP errors in unrelated files touched: remove unnecessary runtime type guard in Anthropic connector and tighten dict typing in Codex retry standardization test. Tests: - New unit tests for _raise_for_httpx_request_error mapping - Mid-stream read error regression in connector streaming path - Adapter-boundary FastAPI 502 status propagation test
1 parent 81a4577 commit 92dae1c

6 files changed

Lines changed: 555 additions & 259 deletions

File tree

src/connectors/anthropic.py

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626
from src.core.common.exceptions import (
2727
AuthenticationError,
2828
ConfigurationError,
29-
InvalidRequestError,
3029
ServiceUnavailableError,
3130
)
3231
from src.core.config.app_config import AppConfig
@@ -544,18 +543,7 @@ async def chat_completions( # type: ignore[override]
544543
through :class:`ConnectorChatCompletionsRequest`; legacy positional call shapes
545544
are not supported at this boundary.
546545
"""
547-
if not isinstance(request, ConnectorChatCompletionsRequest):
548-
raise InvalidRequestError(
549-
message=(
550-
"AnthropicBackend.chat_completions requires ConnectorChatCompletionsRequest. "
551-
"Legacy request_data/processed_messages/effective_model invocation is not supported."
552-
),
553-
details={
554-
"received_type": type(request).__name__,
555-
"connector": "anthropic",
556-
},
557-
)
558-
return await self._chat_completions_canonical(request)
546+
return await self._chat_completions_canonical(request)
559547

560548
# -----------------------------------------------------------
561549
# Payload helpers

src/connectors/openai.py

Lines changed: 167 additions & 123 deletions
Original file line numberDiff line numberDiff line change
@@ -192,18 +192,21 @@ def _build_quota_exhaustion_stream_chunk(
192192
message = _extract_insufficient_quota_message(body) or (
193193
"Upstream quota was exhausted."
194194
)
195-
error_payload = {
195+
error_payload: dict[str, Any] = {
196196
"id": f"chatcmpl-error-{int(time.time())}",
197197
"object": "chat.completion.chunk",
198198
"created": int(time.time()),
199199
"model": model,
200200
"choices": [{"index": 0, "delta": {}, "finish_reason": "error"}],
201-
"error": {
202-
"message": message,
203-
"type": "quota_exceeded",
204-
"code": 503,
205-
"status_code": 503,
206-
},
201+
"error": cast(
202+
dict[str, Any],
203+
{
204+
"message": message,
205+
"type": "quota_exceeded",
206+
"code": 503,
207+
"status_code": 503,
208+
},
209+
),
207210
}
208211
error_body = error_payload["error"]
209212
if error_details and isinstance(error_body, dict):
@@ -237,11 +240,11 @@ def _attach_http_error_details(
237240
}
238241

239242

240-
def _raise_for_httpx_request_error(
241-
exc: httpx.RequestError,
242-
*,
243-
url: str,
244-
log_extra: dict[str, str] | None,
243+
def _raise_for_httpx_request_error(
244+
exc: httpx.RequestError,
245+
*,
246+
url: str,
247+
log_extra: dict[str, str] | None,
245248
) -> NoReturn:
246249
"""Map httpx transport errors to domain errors (read timeout vs connect vs other)."""
247250

@@ -288,27 +291,41 @@ def _raise_for_httpx_request_error(
288291
status_code=504,
289292
) from exc
290293

294+
if isinstance(exc, httpx.ReadError):
295+
if logger.isEnabledFor(logging.WARNING):
296+
logger.warning(
297+
"Upstream read error (connection lost mid-stream): %s: %s",
298+
url,
299+
exc,
300+
extra=log_extra,
301+
)
302+
raise BackendError(
303+
message=f"Upstream read error: connection lost during read ({exc!s})",
304+
details={"url": url, "reason": "read_error"},
305+
status_code=502,
306+
) from exc
307+
291308
logger.error(
292309
"Request failed to %s: %s",
293310
url,
294311
exc,
295312
exc_info=True,
296313
extra=log_extra if log_extra else None,
297314
)
298-
raise ServiceUnavailableError(
299-
message=f"Could not connect to backend ({exc!s})",
300-
details={"url": url},
301-
) from exc
302-
303-
304-
def _is_retryable_http2_stream_termination(exc: httpx.RequestError) -> bool:
305-
if not isinstance(exc, httpx.RemoteProtocolError):
306-
return False
307-
message = str(exc)
308-
return "ConnectionTerminated" in message and "ErrorCodes.NO_ERROR" in message
309-
310-
311-
class OpenAIConnector(LLMBackend):
315+
raise ServiceUnavailableError(
316+
message=f"Could not connect to backend ({exc!s})",
317+
details={"url": url},
318+
) from exc
319+
320+
321+
def _is_retryable_http2_stream_termination(exc: httpx.RequestError) -> bool:
322+
if not isinstance(exc, httpx.RemoteProtocolError):
323+
return False
324+
message = str(exc)
325+
return "ConnectionTerminated" in message and "ErrorCodes.NO_ERROR" in message
326+
327+
328+
class OpenAIConnector(LLMBackend):
312329
"""Minimal OpenAI-compatible connector used by OpenRouterBackend in tests.
313330
314331
It supports an optional `headers_override` kwarg and treats streaming
@@ -363,51 +380,51 @@ def __init__(
363380
getattr(self.config, "disable_health_checks", False)
364381
)
365382

366-
# Enable health checks only when neither config nor env disable them
367-
self._health_check_enabled = not (
368-
disable_health_checks_env or disable_health_checks_config
369-
)
370-
371-
async def _send_request_with_retry(
372-
self,
373-
*,
374-
build_request: Callable[[], httpx.Request],
375-
stream: bool,
376-
capture: HttpxBoundaryCaptureContext,
377-
url: str,
378-
log_extra: dict[str, str] | None,
379-
) -> httpx.Response:
380-
request = build_request()
381-
try:
382-
return await self._capture_http_client.send(
383-
request,
384-
stream=stream,
385-
capture=capture,
386-
)
387-
except httpx.RequestError as exc:
388-
if _is_retryable_http2_stream_termination(exc):
389-
logger.warning(
390-
"Transient upstream HTTP/2 termination for %s; retrying once",
391-
url,
392-
extra=log_extra if log_extra else None,
393-
)
394-
retry_request = build_request()
395-
try:
396-
return await self._capture_http_client.send(
397-
retry_request,
398-
stream=stream,
399-
capture=capture,
400-
)
401-
except httpx.RequestError as retry_exc:
402-
_raise_for_httpx_request_error(
403-
retry_exc,
404-
url=url,
405-
log_extra=log_extra,
406-
)
407-
_raise_for_httpx_request_error(exc, url=url, log_extra=log_extra)
408-
409-
@property
410-
def api_base_url(self) -> str:
383+
# Enable health checks only when neither config nor env disable them
384+
self._health_check_enabled = not (
385+
disable_health_checks_env or disable_health_checks_config
386+
)
387+
388+
async def _send_request_with_retry(
389+
self,
390+
*,
391+
build_request: Callable[[], httpx.Request],
392+
stream: bool,
393+
capture: HttpxBoundaryCaptureContext,
394+
url: str,
395+
log_extra: dict[str, str] | None,
396+
) -> httpx.Response:
397+
request = build_request()
398+
try:
399+
return await self._capture_http_client.send(
400+
request,
401+
stream=stream,
402+
capture=capture,
403+
)
404+
except httpx.RequestError as exc:
405+
if _is_retryable_http2_stream_termination(exc):
406+
logger.warning(
407+
"Transient upstream HTTP/2 termination for %s; retrying once",
408+
url,
409+
extra=log_extra if log_extra else None,
410+
)
411+
retry_request = build_request()
412+
try:
413+
return await self._capture_http_client.send(
414+
retry_request,
415+
stream=stream,
416+
capture=capture,
417+
)
418+
except httpx.RequestError as retry_exc:
419+
_raise_for_httpx_request_error(
420+
retry_exc,
421+
url=url,
422+
log_extra=log_extra,
423+
)
424+
_raise_for_httpx_request_error(exc, url=url, log_extra=log_extra)
425+
426+
@property
427+
def api_base_url(self) -> str:
411428
"""Return the API base URL."""
412429
return self._api_base_url
413430

@@ -1181,19 +1198,19 @@ async def _handle_non_streaming_response(
11811198

11821199
guarded_headers = self._apply_loop_guard_to_outbound_headers(headers)
11831200
log_extra = self._get_log_extra(context)
1184-
response = await self._send_request_with_retry(
1185-
build_request=lambda: self.client.build_request(
1186-
"POST", url, json=payload, headers=guarded_headers
1187-
),
1188-
stream=False,
1189-
capture=self._http_boundary_capture(
1190-
model=str(payload.get("model") or "unknown"),
1191-
context=context,
1192-
),
1193-
url=url,
1194-
log_extra=log_extra if log_extra else None,
1195-
)
1196-
self.update_quota_headers(response.headers)
1201+
response = await self._send_request_with_retry(
1202+
build_request=lambda: self.client.build_request(
1203+
"POST", url, json=payload, headers=guarded_headers
1204+
),
1205+
stream=False,
1206+
capture=self._http_boundary_capture(
1207+
model=str(payload.get("model") or "unknown"),
1208+
context=context,
1209+
),
1210+
url=url,
1211+
log_extra=log_extra if log_extra else None,
1212+
)
1213+
self.update_quota_headers(response.headers)
11971214

11981215
if int(response.status_code) >= 400:
11991216
# For backwards compatibility with existing error handlers, still use HTTPException here.
@@ -1305,19 +1322,19 @@ async def _handle_streaming_response(
13051322

13061323
guarded_headers = self._apply_loop_guard_to_outbound_headers(headers)
13071324

1308-
response = await self._send_request_with_retry(
1309-
build_request=lambda: self.client.build_request(
1310-
"POST", url, json=payload, headers=guarded_headers
1311-
),
1312-
stream=True,
1313-
capture=self._http_boundary_capture(
1314-
model=str(payload.get("model") or "unknown"),
1315-
context=context,
1316-
),
1317-
url=url,
1318-
log_extra=log_extra if log_extra else None,
1319-
)
1320-
self.update_quota_headers(response.headers)
1325+
response = await self._send_request_with_retry(
1326+
build_request=lambda: self.client.build_request(
1327+
"POST", url, json=payload, headers=guarded_headers
1328+
),
1329+
stream=True,
1330+
capture=self._http_boundary_capture(
1331+
model=str(payload.get("model") or "unknown"),
1332+
context=context,
1333+
),
1334+
url=url,
1335+
log_extra=log_extra if log_extra else None,
1336+
)
1337+
self.update_quota_headers(response.headers)
13211338

13221339
status_code = (
13231340
int(response.status_code) if hasattr(response, "status_code") else 200
@@ -1607,6 +1624,23 @@ async def iter_sse_messages() -> AsyncGenerator[str, None]:
16071624
details={"url": url, "reason": "read_timeout"},
16081625
status_code=504,
16091626
) from exc
1627+
except httpx.ReadError as exc:
1628+
if buffer:
1629+
yield buffer
1630+
buffer = ""
1631+
if logger.isEnabledFor(logging.WARNING):
1632+
logger.warning(
1633+
"Streaming read error during SSE for %s",
1634+
url,
1635+
extra=log_extra if log_extra else None,
1636+
)
1637+
raise BackendError(
1638+
message=(
1639+
f"Upstream read error: connection lost during streaming ({exc!s})"
1640+
),
1641+
details={"url": url, "reason": "read_error"},
1642+
status_code=502,
1643+
) from exc
16101644
except httpx.RequestError as exc:
16111645
if buffer:
16121646
yield buffer
@@ -2042,19 +2076,19 @@ async def _handle_responses_non_streaming_response(
20422076

20432077
guarded_headers = self._apply_loop_guard_to_outbound_headers(headers)
20442078

2045-
response = await self._send_request_with_retry(
2046-
build_request=lambda: self.client.build_request(
2047-
"POST", url, json=payload, headers=guarded_headers
2048-
),
2049-
stream=False,
2050-
capture=self._http_boundary_capture(
2051-
model=str(payload.get("model") or "unknown"),
2052-
context=context,
2053-
),
2054-
url=url,
2055-
log_extra=None,
2056-
)
2057-
self.update_quota_headers(response.headers)
2079+
response = await self._send_request_with_retry(
2080+
build_request=lambda: self.client.build_request(
2081+
"POST", url, json=payload, headers=guarded_headers
2082+
),
2083+
stream=False,
2084+
capture=self._http_boundary_capture(
2085+
model=str(payload.get("model") or "unknown"),
2086+
context=context,
2087+
),
2088+
url=url,
2089+
log_extra=None,
2090+
)
2091+
self.update_quota_headers(response.headers)
20582092

20592093
if int(response.status_code) >= 400:
20602094
try:
@@ -2251,19 +2285,19 @@ async def stream_completion(
22512285
payload["stream"] = True
22522286

22532287
# Build and send request
2254-
response = await self._send_request_with_retry(
2255-
build_request=lambda: self.client.build_request(
2256-
"POST", url, json=payload, headers=guarded_headers
2257-
),
2258-
stream=True,
2259-
capture=self._http_boundary_capture(
2260-
model=str(effective_model),
2261-
context=connector_context,
2262-
),
2263-
url=url,
2264-
log_extra=None,
2265-
)
2266-
self.update_quota_headers(response.headers)
2288+
response = await self._send_request_with_retry(
2289+
build_request=lambda: self.client.build_request(
2290+
"POST", url, json=payload, headers=guarded_headers
2291+
),
2292+
stream=True,
2293+
capture=self._http_boundary_capture(
2294+
model=str(effective_model),
2295+
context=connector_context,
2296+
),
2297+
url=url,
2298+
log_extra=None,
2299+
)
2300+
self.update_quota_headers(response.headers)
22672301

22682302
status_code = (
22692303
int(response.status_code) if hasattr(response, "status_code") else 200
@@ -2402,6 +2436,16 @@ async def stream_completion(
24022436
details={"url": url, "reason": "read_timeout"},
24032437
status_code=504,
24042438
) from exc
2439+
except httpx.ReadError as exc:
2440+
if logger.isEnabledFor(logging.WARNING):
2441+
logger.warning("Streaming read error for %s", url)
2442+
raise BackendError(
2443+
message=(
2444+
f"Upstream read error: connection lost during streaming ({exc!s})"
2445+
),
2446+
details={"url": url, "reason": "read_error"},
2447+
status_code=502,
2448+
) from exc
24052449
except httpx.RequestError as exc:
24062450
raise ServiceUnavailableError(
24072451
message=f"Streaming connection interrupted ({exc!s})"

0 commit comments

Comments
 (0)