Skip to content

Commit 505d049

Browse files
author
Mateusz
committed
fix: canonicalize routing error details and propagate streaming errors to failover
1 parent c9812b4 commit 505d049

5 files changed

Lines changed: 330 additions & 29 deletions

File tree

.gitignore

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,3 +133,9 @@ leveldb_test/
133133

134134
# Cache directory
135135
var/cache/
136+
137+
# Local test configs
138+
config/configtest1.yaml
139+
140+
# Debug transcripts
141+
dev/transcript.md

src/core/services/backend_completion_flow/service.py

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,55 @@ def _normalize_backend_exception(
232232

233233
return candidate
234234

235+
@staticmethod
236+
def _exception_from_streaming_error_envelope(
237+
result: StreamingResponseEnvelope,
238+
) -> Exception | None:
239+
"""Convert terminal streaming error envelopes back into domain errors."""
240+
status_code = getattr(result, "status_code", None)
241+
if not isinstance(status_code, int) or status_code < 400:
242+
return None
243+
244+
metadata = getattr(result, "metadata", None)
245+
error_message = f"Backend returned {status_code} error"
246+
error_type = ""
247+
error_code = ""
248+
error_details: dict[str, Any] = {}
249+
250+
if isinstance(metadata, dict):
251+
raw_message = metadata.get("error_message")
252+
if isinstance(raw_message, str) and raw_message.strip():
253+
error_message = raw_message
254+
255+
raw_type = metadata.get("error_type")
256+
if isinstance(raw_type, str):
257+
error_type = raw_type
258+
259+
raw_code = metadata.get("error_code")
260+
if isinstance(raw_code, str):
261+
error_code = raw_code
262+
263+
raw_details = metadata.get("error_details")
264+
if isinstance(raw_details, dict):
265+
error_details = dict(raw_details)
266+
267+
if status_code == 401:
268+
return AuthenticationError(error_message)
269+
270+
if status_code == 404 or error_type == "RoutingError":
271+
return RoutingError(
272+
message=error_message,
273+
details=error_details,
274+
code=error_details.get("code") or error_code or "unknown_model",
275+
)
276+
277+
return BackendError(
278+
message=error_message,
279+
status_code=status_code,
280+
details=error_details,
281+
code=error_code or None,
282+
)
283+
235284
async def _enforce_non_forwardable_content(
236285
self,
237286
session_id: str,
@@ -1086,6 +1135,12 @@ def cancel(self) -> None:
10861135

10871136
# Step 10: Handle streaming response (wire capture + session ID injection)
10881137
if isinstance(result, StreamingResponseEnvelope):
1138+
if allow_failover:
1139+
streaming_error = self._exception_from_streaming_error_envelope(
1140+
result
1141+
)
1142+
if streaming_error is not None:
1143+
raise streaming_error
10891144
return await self._handle_streaming_response(
10901145
result=result,
10911146
backend_type=backend_type,

src/core/services/request_processor_service.py

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,29 @@
4848
logger = logging.getLogger(__name__)
4949

5050
MAX_QUALITY_VERIFIER_TURN_STATES = 10_000
51+
_ROUTING_CODES_BY_STATUS: dict[int, tuple[str, str, bool]] = {
52+
400: ("unsupported_on_instance", "availability", False),
53+
403: ("policy_rejected", "policy", False),
54+
404: ("unknown_model", "validation", False),
55+
503: ("temporarily_unavailable", "availability", True),
56+
}
57+
58+
59+
def _canonicalize_routing_error_details(
60+
status_code: int,
61+
details: dict[str, Any] | None,
62+
) -> dict[str, Any]:
63+
"""Normalize routing metadata so status and canonical code do not disagree."""
64+
normalized = dict(details) if isinstance(details, dict) else {}
65+
canonical = _ROUTING_CODES_BY_STATUS.get(status_code)
66+
if canonical is None:
67+
return normalized
68+
69+
code, category, retryable = canonical
70+
normalized["code"] = code
71+
normalized["category"] = category
72+
normalized["retryable"] = retryable
73+
return normalized
5174

5275

5376
class RequestProcessor(IRequestProcessor):
@@ -832,10 +855,14 @@ def _prepare_quality_verifier_extensions_for_backend_call(
832855
error_message or "Backend returned 401 error"
833856
)
834857
elif result.status_code == 404 or orig_type == "RoutingError":
858+
error_details = _canonicalize_routing_error_details(
859+
int(result.status_code),
860+
error_details,
861+
)
835862
raise RoutingError(
836863
message=error_message,
837864
details=error_details,
838-
code=orig_code or "unknown_model",
865+
code=error_details.get("code") or orig_code or "unknown_model",
839866
)
840867
else:
841868
raise BackendError(
@@ -975,10 +1002,16 @@ def _prepare_quality_verifier_extensions_for_backend_call(
9751002
fallback_result.status_code == 404
9761003
or fallback_type == "RoutingError"
9771004
):
1005+
error_details = _canonicalize_routing_error_details(
1006+
int(fallback_result.status_code),
1007+
error_details,
1008+
)
9781009
raise RoutingError(
9791010
message=error_message,
9801011
details=error_details,
981-
code=fallback_code or "unknown_model",
1012+
code=error_details.get("code")
1013+
or fallback_code
1014+
or "unknown_model",
9821015
)
9831016
else:
9841017
raise BackendError(

tests/unit/core/services/test_backend_completion_flow_streaming_error_envelope.py

Lines changed: 116 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,9 @@
55

66
import pytest
77
from fastapi import HTTPException
8+
from src.core.common.exceptions import RoutingError
89
from src.core.domain.chat import ChatMessage, ChatRequest
9-
from src.core.domain.responses import StreamingResponseEnvelope
10+
from src.core.domain.responses import ResponseEnvelope, StreamingResponseEnvelope
1011
from src.core.services.backend_completion_flow.service import BackendCompletionFlow
1112

1213

@@ -70,6 +71,89 @@ def _build_flow_with_erroring_backend() -> BackendCompletionFlow:
7071
return BackendCompletionFlow(**deps)
7172

7273

74+
def _build_flow_with_streaming_error_envelope() -> (
75+
tuple[BackendCompletionFlow, dict[str, Any]]
76+
):
77+
deps: dict[str, Any] = {
78+
"availability_checker": MagicMock(),
79+
"request_preparer": MagicMock(),
80+
"session_resolver": MagicMock(),
81+
"backend_invoker": MagicMock(),
82+
"failover_executor": MagicMock(),
83+
"wire_capture_orchestrator": MagicMock(),
84+
"usage_accounting_orchestrator": MagicMock(),
85+
"exception_normalizer": MagicMock(),
86+
"stream_formatting_service": MagicMock(),
87+
"connector_invoker": MagicMock(),
88+
}
89+
90+
deps["exception_normalizer"].normalize.side_effect = lambda exc, _backend: exc
91+
deps["request_preparer"].prepare_request = AsyncMock(
92+
return_value=MagicMock(
93+
backend="ollama",
94+
model="minimax-m2.7-cloud",
95+
uri_params={},
96+
)
97+
)
98+
deps["request_preparer"].synchronize_request_with_target.side_effect = (
99+
lambda req, _target: req
100+
)
101+
deps["request_preparer"].prepare_backend_request = AsyncMock(
102+
side_effect=lambda request, *_args, **_kwargs: request
103+
)
104+
deps["request_preparer"].prepare_backend_kwargs = MagicMock(return_value={})
105+
106+
deps["failover_executor"].check_complex_failover = AsyncMock(return_value=False)
107+
deps["failover_executor"].apply_failure_recovery = AsyncMock(
108+
return_value=ResponseEnvelope(content={"ok": True}, status_code=200)
109+
)
110+
111+
deps["availability_checker"].check_backend_availability = AsyncMock()
112+
deps["session_resolver"].resolve_session = AsyncMock(
113+
return_value=(MagicMock(), "session-1")
114+
)
115+
deps["backend_invoker"].acquire_backend = AsyncMock(return_value=MagicMock())
116+
117+
deps["wire_capture_orchestrator"].prepare_wire_capture_context = AsyncMock(
118+
return_value=None
119+
)
120+
deps["wire_capture_orchestrator"].capture_wire_outbound = AsyncMock()
121+
deps["wire_capture_orchestrator"].capture_inbound_response = AsyncMock()
122+
deps["wire_capture_orchestrator"].detect_key_name.return_value = "test-key"
123+
124+
deps["usage_accounting_orchestrator"].calculate_and_record_usage = AsyncMock(
125+
return_value=(0, None, None)
126+
)
127+
deps["usage_accounting_orchestrator"].wrap_response_for_usage = AsyncMock(
128+
side_effect=lambda **kwargs: kwargs["result"]
129+
)
130+
deps["usage_accounting_orchestrator"].handle_backend_error = AsyncMock()
131+
132+
deps["connector_invoker"].invoke = AsyncMock(
133+
return_value=StreamingResponseEnvelope(
134+
content=async_iter(()),
135+
status_code=404,
136+
metadata={
137+
"error_message": "Backend returned 404 error",
138+
"error_type": "RoutingError",
139+
"error_code": "unknown_model",
140+
"error_details": {
141+
"code": "unknown_model",
142+
"category": "validation",
143+
"retryable": False,
144+
},
145+
},
146+
)
147+
)
148+
149+
return BackendCompletionFlow(**deps), deps
150+
151+
152+
async def async_iter(items):
153+
for item in items:
154+
yield item
155+
156+
73157
@pytest.mark.asyncio
74158
async def test_streaming_call_returns_terminal_error_envelope_on_http_exception() -> (
75159
None
@@ -115,3 +199,34 @@ async def test_streaming_call_returns_terminal_error_envelope_on_http_exception(
115199
error_payload = metadata.get("error")
116200
assert isinstance(error_payload, dict)
117201
assert error_payload.get("status_code") == 413
202+
203+
204+
@pytest.mark.asyncio
205+
async def test_streaming_error_envelope_enters_failover_recovery_when_enabled() -> None:
206+
flow, deps = _build_flow_with_streaming_error_envelope()
207+
208+
request = ChatRequest(
209+
model="alias:minimax",
210+
messages=[ChatMessage(role="user", content="continue")],
211+
)
212+
213+
result = await flow.call_completion(
214+
request=request,
215+
stream=True,
216+
allow_failover=True,
217+
context=None,
218+
)
219+
220+
assert isinstance(result, ResponseEnvelope)
221+
assert result.content == {"ok": True}
222+
223+
deps["failover_executor"].apply_failure_recovery.assert_awaited_once()
224+
recovery_error = deps["failover_executor"].apply_failure_recovery.await_args.kwargs[
225+
"error"
226+
]
227+
assert isinstance(recovery_error, RoutingError)
228+
assert recovery_error.details == {
229+
"code": "unknown_model",
230+
"category": "validation",
231+
"retryable": False,
232+
}

0 commit comments

Comments
 (0)