Skip to content

Commit ba07bf8

Browse files
author
Mateusz
committed
feat: classify HTTP 429 as RateLimitExceededError with retry-after metadata and improve weighted routing resilience
- Anthropic connector: extract Retry-After headers, raise RateLimitExceededError for 429 responses - Streaming error mapper: promote bare BackendError(429) to RateLimitExceededError - Composite failure recovery: recycle candidates on transient failures instead of exhausting - Add tests for rate limiting, streaming error propagation, and CBOR timestamp stability - Remove obsolete test files for deleted domain modules
1 parent 93b8105 commit ba07bf8

9 files changed

Lines changed: 225 additions & 716 deletions

src/connectors/anthropic.py

Lines changed: 69 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,10 @@
2525
)
2626
from src.core.common.exceptions import (
2727
AuthenticationError,
28+
BackendError,
2829
ConfigurationError,
2930
InvalidRequestError,
31+
RateLimitExceededError,
3032
ServiceUnavailableError,
3133
)
3234
from src.core.config.app_config import AppConfig
@@ -72,6 +74,37 @@
7274
_LLM_PROXY_CLIENT_HOST_KEY = "_llm_proxy_client_host"
7375

7476

77+
def _retry_after_metadata_from_httpx_headers(
78+
headers: Any,
79+
) -> tuple[dict[str, Any], int | None]:
80+
"""Extract Retry-After for resilience (same ``details['headers']`` shape as OpenAI).
81+
82+
``RateLimitErrorHandler`` reads ``details['headers']['retry-after']`` when
83+
``reset_at`` is not a usable wall-clock hint, so we populate that structure here.
84+
"""
85+
86+
details: dict[str, Any] = {}
87+
reset_hint: int | None = None
88+
retry_after: str | None = None
89+
try:
90+
if hasattr(headers, "get"):
91+
got = headers.get("retry-after")
92+
if got is not None:
93+
retry_after = str(got).strip()
94+
if not retry_after:
95+
for key, value in headers.items():
96+
if str(key).lower() == "retry-after":
97+
retry_after = str(value).strip()
98+
break
99+
if retry_after:
100+
details["headers"] = {"retry-after": retry_after}
101+
with contextlib.suppress(ValueError, TypeError):
102+
reset_hint = int(retry_after.split(",")[0].strip())
103+
except Exception:
104+
return {}, None
105+
return details, reset_hint
106+
107+
75108
def _message_tool_calls(msg: Any) -> list[Any] | None:
76109
raw = (
77110
msg.get("tool_calls")
@@ -552,7 +585,7 @@ async def chat_completions( # type: ignore[override]
552585
),
553586
details={"connector": "anthropic"},
554587
)
555-
return await self._chat_completions_canonical(request)
588+
return await self._chat_completions_canonical(request)
556589

557590
# -----------------------------------------------------------
558591
# Payload helpers
@@ -1332,7 +1365,10 @@ async def stream_completion(
13321365

13331366
# Check for errors before streaming
13341367
if response.status_code >= 400:
1335-
from src.core.common.exceptions import BackendError
1368+
status_code = response.status_code
1369+
rate_limit_details, retry_after_seconds = (
1370+
_retry_after_metadata_from_httpx_headers(response.headers)
1371+
)
13361372

13371373
try:
13381374
# Read only first 10MB of error body to prevent DoS (consistent with other middleware)
@@ -1356,14 +1392,30 @@ async def stream_completion(
13561392

13571393
body_text = body_bytes.decode("utf-8", errors="ignore")
13581394

1359-
if logger.isEnabledFor(logging.ERROR):
1360-
# Note: stream_completion doesn't have context access (protocol method)
1361-
# Context correlation would require protocol change
1395+
# Operational HTTP errors: never use exc_info=True here — under concurrent
1396+
# asyncio work, sys.exc_info() can belong to another task and produces a
1397+
# misleading traceback on this log line.
1398+
preview = (
1399+
(body_text[:500] + "...") if len(body_text) > 500 else body_text
1400+
)
1401+
if status_code == 429:
1402+
if logger.isEnabledFor(logging.WARNING):
1403+
logger.warning(
1404+
"Anthropic API rate limited (HTTP 429): %s",
1405+
preview or "(empty body)",
1406+
)
1407+
elif 400 <= status_code < 500:
1408+
if logger.isEnabledFor(logging.WARNING):
1409+
logger.warning(
1410+
"Anthropic API client error %s: %s",
1411+
status_code,
1412+
preview or "(empty body)",
1413+
)
1414+
elif logger.isEnabledFor(logging.ERROR):
13621415
logger.error(
1363-
"Anthropic API error %s: %s",
1364-
response.status_code,
1365-
body_text,
1366-
exc_info=True,
1416+
"Anthropic API server error %s: %s",
1417+
status_code,
1418+
preview or "(empty body)",
13671419
)
13681420
except (UnicodeDecodeError, httpx.ReadError) as e:
13691421
if logger.isEnabledFor(logging.WARNING):
@@ -1375,10 +1427,17 @@ async def stream_completion(
13751427
body_text = ""
13761428
finally:
13771429
await response.aclose()
1430+
1431+
if status_code == 429:
1432+
raise RateLimitExceededError(
1433+
message=body_text or "Anthropic rate limit exceeded",
1434+
details=rate_limit_details,
1435+
reset_at=retry_after_seconds,
1436+
)
13781437
raise BackendError(
13791438
message=body_text,
13801439
code="anthropic_error",
1381-
status_code=response.status_code,
1440+
status_code=status_code,
13821441
)
13831442
# Stream SSE messages
13841443
try:

src/core/services/composite_failure_recovery_bridge.py

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -175,11 +175,21 @@ def _build_weighted_retry_request(
175175
remaining.append(item)
176176

177177
if not remaining:
178-
weighted["excluded_selectors"] = excluded
179-
context.extensions[COMPOSITE_ROUTING_STATE_KEY] = cast(
180-
JsonValue, self._serialize_weighted_state(weighted)
181-
)
182-
return None
178+
# All branches were exhausted once; recycle candidates by keeping only
179+
# the current failed selector excluded and retrying the rest.
180+
excluded = [selected]
181+
excluded_set = {selected}
182+
remaining = [
183+
item
184+
for item in weighted["branches"]
185+
if item["selector"] not in excluded_set
186+
]
187+
if not remaining:
188+
weighted["excluded_selectors"] = excluded
189+
context.extensions[COMPOSITE_ROUTING_STATE_KEY] = cast(
190+
JsonValue, self._serialize_weighted_state(weighted)
191+
)
192+
return None
183193

184194
if len(remaining) == 1:
185195
next_selector = remaining[0]["selector"]

src/core/services/streaming/error_mapping.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,25 @@ def map_backend_error(
178178
status_code=status_code,
179179
)
180180

181-
# Map BackendError with quota_exceeded code
181+
# Map BackendError: promote bare HTTP 429 to RateLimitExceededError so early
182+
# streaming prefetch (integrate_streaming_pipeline) and terminal error chunks
183+
# agree with connectors that raise RateLimitExceededError directly.
182184
if isinstance(error, BackendError):
183-
# Preserve the BackendError as-is, including code and status_code
185+
if (
186+
not isinstance(error, RateLimitExceededError)
187+
and getattr(error, "status_code", None) == 429
188+
):
189+
merged_details = dict(error.details or {})
190+
if "provider" not in merged_details:
191+
merged_details["provider"] = provider
192+
if stream_id and "stream_id" not in merged_details:
193+
merged_details["stream_id"] = stream_id
194+
return RateLimitExceededError(
195+
message=error.message,
196+
details=merged_details,
197+
reset_at=getattr(error, "reset_at", None),
198+
)
199+
# Preserve other BackendError variants as-is, including code and status_code
184200
return error
185201

186202
# Map httpx connection errors

tests/integration/core/services/test_weighted_routing_recycles_on_transient_400.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
from __future__ import annotations
22

3-
from typing import Any, cast
3+
from typing import Any
44
from unittest.mock import AsyncMock, MagicMock
55

66
import pytest
@@ -9,6 +9,17 @@
99
from src.core.domain.chat import ChatMessage, ChatRequest
1010
from src.core.domain.request_context import RequestContext
1111
from src.core.domain.responses import ResponseEnvelope
12+
from src.core.interfaces.backend_model_resolver_interface import ResolvedTarget
13+
from src.core.interfaces.domain_entities_interface import ISession
14+
from src.core.services.backend_completion_flow.failure_recovery_executor import (
15+
FailureRecoveryExecutor,
16+
)
17+
from src.core.services.backend_completion_flow.service import BackendCompletionFlow
18+
from src.core.services.backend_model_resolver import BackendModelResolver
19+
from src.core.services.composite_failure_recovery_bridge import (
20+
CompositeFailureRecoveryBridge,
21+
)
22+
from src.core.services.weighted_branch_selector import WeightedBranchSelector
1223

1324

1425
def _new_request_context() -> RequestContext:
@@ -196,7 +207,7 @@ def synchronize_request_with_target(
196207
request=request,
197208
resolved=target,
198209
)
199-
return cast(ChatRequest, synchronized)
210+
return synchronized
200211

201212
async def prepare_backend_request(
202213
self,
@@ -366,7 +377,10 @@ def _rng() -> float:
366377
state = context.extensions["composite_routing_state"]
367378
assert isinstance(state, dict)
368379
assert state["selected_selector"] == "qwen-oauth:coder-model"
369-
assert "zai-coding-plan:glm-5.1" in state["excluded_selectors"]
380+
excluded_raw = state["excluded_selectors"]
381+
assert isinstance(excluded_raw, list)
382+
assert all(isinstance(x, str) for x in excluded_raw)
383+
assert "zai-coding-plan:glm-5.1" in excluded_raw
370384
assert state["hop_count"] == 1
371385

372386

@@ -437,7 +451,7 @@ def _rng() -> float:
437451
state = context.extensions["composite_routing_state"]
438452
assert isinstance(state, dict)
439453
assert state["selected_selector"] == "zai-coding-plan:glm-5.1"
440-
assert state["hop_count"] == 3
454+
assert state["hop_count"] == 2
441455

442456

443457
@pytest.mark.asyncio
@@ -508,4 +522,4 @@ def _rng() -> float:
508522
state = context.extensions["composite_routing_state"]
509523
assert isinstance(state, dict)
510524
assert state["selected_selector"] == "zai-coding-plan:glm-5.1"
511-
assert state["hop_count"] == 3
525+
assert state["hop_count"] == 2

tests/unit/connectors/test_anthropic_error_handling.py

Lines changed: 85 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import httpx
66
import pytest
7+
from src.core.domain.chat import CanonicalChatRequest, ChatMessage
78

89

910
@pytest.mark.asyncio
@@ -119,34 +120,87 @@ async def mock_aiter_text():
119120
assert exc_info.value.code == "anthropic_error_rate_limit"
120121

121122

122-
@pytest.mark.asyncio
123-
async def test_zai_coding_plan_uses_openai_connector():
124-
"""Test that zai-coding-plan now inherits from OpenAI connector."""
125-
from src.connectors.openai import OpenAIConnector
126-
from src.connectors.zai_coding_plan import ZaiCodingPlanBackend
127-
128-
# Use minimal mock setup to avoid heavy initialization
129-
client = MagicMock()
130-
config = MagicMock()
131-
translation_service = MagicMock()
132-
133-
backend = ZaiCodingPlanBackend(client, config, translation_service)
134-
135-
# Verify it's an OpenAI connector now
136-
assert isinstance(backend, OpenAIConnector)
137-
138-
# Mock _refresh_available_models to avoid network call entirely
139-
async def mock_refresh():
140-
backend.available_models = ["glm-4.6", "claude-sonnet-4-20250514"]
141-
backend._provider_models = {"glm-4.6", "claude-sonnet-4-20250514"}
142-
143-
# Patch _refresh_available_models and directly set attributes to avoid initialization overhead
144-
with patch.object(backend, "_refresh_available_models", new=mock_refresh):
145-
# Directly set attributes that would be set during initialize
146-
backend.api_key = "test-zai-key"
147-
backend.api_base_url = "https://api.z.ai/api/coding/paas/v4"
148-
backend._max_tokens_limit = 200000
149-
backend._default_max_tokens = 8192
150-
151-
# Verify OpenAI-style API URL
152-
assert "api.z.ai/api/coding/paas/v4" in backend.api_base_url
123+
@pytest.mark.asyncio
124+
async def test_stream_completion_http_429_raises_rate_limit_exceeded() -> None:
125+
"""HTTP 429 before the SSE body must map to RateLimitExceededError for resilience."""
126+
from src.connectors.anthropic import AnthropicBackend
127+
from src.core.common.exceptions import RateLimitExceededError
128+
from src.core.config.app_config import AppConfig
129+
from src.core.services.translation_service import TranslationService
130+
131+
client = httpx.AsyncClient()
132+
config = AppConfig()
133+
translation_service = TranslationService()
134+
135+
backend = AnthropicBackend(client, config, translation_service)
136+
await backend.initialize(
137+
anthropic_api_base_url="https://api.anthropic.com/v1",
138+
key_name="test_key",
139+
api_key="test-api-key-123",
140+
)
141+
142+
err_json = (
143+
'{"type":"error","error":{"type":"SubscriptionUsageLimitError",'
144+
'"message":"quota exceeded"}}'
145+
)
146+
mock_response = MagicMock()
147+
mock_response.status_code = 429
148+
mock_response.headers = httpx.Headers({"retry-after": "42"})
149+
150+
async def mock_aiter_bytes():
151+
yield err_json.encode()
152+
153+
mock_response.aiter_bytes = mock_aiter_bytes
154+
mock_response.aclose = AsyncMock()
155+
156+
req = CanonicalChatRequest(
157+
model="claude-3-5-sonnet-20241022",
158+
messages=[ChatMessage(role="user", content="hello")],
159+
stream=True,
160+
)
161+
162+
with (
163+
patch.object(backend.client, "build_request", return_value=MagicMock()),
164+
patch.object(backend, "_capture_http_client") as cap,
165+
):
166+
cap.send = AsyncMock(return_value=mock_response)
167+
with pytest.raises(RateLimitExceededError) as exc_info:
168+
async for _ in backend.stream_completion(req):
169+
pass
170+
171+
assert "quota exceeded" in str(exc_info.value).lower()
172+
assert exc_info.value.details.get("headers", {}).get("retry-after") == "42"
173+
assert getattr(exc_info.value, "reset_at", None) == 42
174+
175+
176+
@pytest.mark.asyncio
177+
async def test_zai_coding_plan_uses_openai_connector():
178+
"""Test that zai-coding-plan now inherits from OpenAI connector."""
179+
from src.connectors.openai import OpenAIConnector
180+
from src.connectors.zai_coding_plan import ZaiCodingPlanBackend
181+
182+
# Use minimal mock setup to avoid heavy initialization
183+
client = MagicMock()
184+
config = MagicMock()
185+
translation_service = MagicMock()
186+
187+
backend = ZaiCodingPlanBackend(client, config, translation_service)
188+
189+
# Verify it's an OpenAI connector now
190+
assert isinstance(backend, OpenAIConnector)
191+
192+
# Mock _refresh_available_models to avoid network call entirely
193+
async def mock_refresh():
194+
backend.available_models = ["glm-4.6", "claude-sonnet-4-20250514"]
195+
backend._provider_models = {"glm-4.6", "claude-sonnet-4-20250514"}
196+
197+
# Patch _refresh_available_models and directly set attributes to avoid initialization overhead
198+
with patch.object(backend, "_refresh_available_models", new=mock_refresh):
199+
# Directly set attributes that would be set during initialize
200+
backend.api_key = "test-zai-key"
201+
backend.api_base_url = "https://api.z.ai/api/coding/paas/v4"
202+
backend._max_tokens_limit = 200000
203+
backend._default_max_tokens = 8192
204+
205+
# Verify OpenAI-style API URL
206+
assert "api.z.ai/api/coding/paas/v4" in backend.api_base_url

tests/unit/core/domain/test_session_state.py renamed to tests/unit/core/domain/test_session_state_weighted_first_request.py

File renamed without changes.

tests/unit/core/ports/test_streaming_error_propagation.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,23 @@ async def test_error_chunk_includes_retryable_flag(self) -> None:
200200
# The retryable flag should be present
201201
assert "retryable" in chunk.metadata["error"]
202202

203+
def test_streaming_error_mapper_promotes_backend_error_429(self) -> None:
204+
"""Plain BackendError(429) should map like a native rate-limit error."""
205+
206+
mapped_error = StreamingErrorMapper.map_backend_error(
207+
BackendError(
208+
message="upstream throttled",
209+
status_code=429,
210+
details={"headers": {"retry-after": "33"}},
211+
),
212+
"anthropic",
213+
"s-1",
214+
)
215+
216+
assert isinstance(mapped_error, RateLimitExceededError)
217+
assert mapped_error.details.get("headers", {}).get("retry-after") == "33"
218+
assert mapped_error.details.get("stream_id") == "s-1"
219+
203220
def test_streaming_error_mapper_preserves_retry_after_headers(self) -> None:
204221
"""HTTP 429 detail headers should survive streaming error mapping."""
205222

0 commit comments

Comments
 (0)