diff --git a/src/benchflow/_utils/scoring.py b/src/benchflow/_utils/scoring.py index d3ed3b7e..627231cc 100644 --- a/src/benchflow/_utils/scoring.py +++ b/src/benchflow/_utils/scoring.py @@ -11,6 +11,17 @@ INFRA_ERROR = "infra_failure" SANDBOX_SETUP = "sandbox_setup" PROVIDER_AUTH = "provider_auth" +# A provider rate-limit/quota failure that surfaced as a *raised* +# ``AgentProtocolError -32603`` and was sanitized at the ACP boundary +# (``_classify_acp_error``). This is the manifestation of a Bedrock daily token +# cap ("Too many tokens per day"), which crashes the agent and is futile to +# retry in-batch — so it is non-retryable (no retry branch + listed in +# ``RetryConfig.exclude_categories``). A 429 that instead surfaces *silently* +# (zero tokens, no raise) is handled by the post-rollout ``_maybe_classify_api_error`` +# path as ``api_error[rate_limit/transient]`` and stays retryable, because a +# self-surfacing throttle self-heals on backoff. The two paths track genuinely +# different failure shapes, so the differing retry verdicts are intentional. +PROVIDER_RATE_LIMIT = "provider_rate_limit" TIMED_OUT = "timeout" # Provider API failures detected post-rollout (rate limit, quota, rejected # request, 5xx). "api_error" is proxy-proven (every captured provider request @@ -36,6 +47,19 @@ "http 401", "http 403", ) +# Sanitized markers appended by ``_classify_acp_error`` when a raised ACP error +# hides a provider rate-limit (429) or outage (503). Matched case-insensitively +# against the lowercased error, same as the auth markers above. +_PROVIDER_RATE_LIMIT_MARKERS = ( + "provider rate limited", + "rate limit", + "too many requests", + "http 429", +) +_PROVIDER_UNAVAILABLE_MARKERS = ( + "provider unavailable", + "http 503", +) # Verifier error category constants VERIFIER_FAILED = "verifier_failure" @@ -89,6 +113,10 @@ def classify_error(error: str | None) -> str | None: if "ACP error" in error or "was rejected as invalid" in error: if any(m in lower for m in _PROVIDER_AUTH_MARKERS): return PROVIDER_AUTH + if any(m in lower for m in _PROVIDER_RATE_LIMIT_MARKERS): + return PROVIDER_RATE_LIMIT + if any(m in lower for m in _PROVIDER_UNAVAILABLE_MARKERS): + return INFRA_ERROR return ACP_ERROR if "sandbox startup" in lower or "sandbox creation" in lower: return SANDBOX_SETUP diff --git a/src/benchflow/evaluation.py b/src/benchflow/evaluation.py index 703f00c2..3d22b39b 100644 --- a/src/benchflow/evaluation.py +++ b/src/benchflow/evaluation.py @@ -50,6 +50,7 @@ INSTALL_FAILED, PIPE_CLOSED, PROVIDER_AUTH, + PROVIDER_RATE_LIMIT, SUSPECTED_API_ERROR, VERIFIER_DEP_INSTALL, VERIFIER_INFRA, @@ -167,7 +168,7 @@ class RetryConfig: min_wait_sec: float = 1.0 max_wait_sec: float = 30.0 exclude_categories: set[str] = field( - default_factory=lambda: {"timeout", PROVIDER_AUTH} + default_factory=lambda: {"timeout", PROVIDER_AUTH, PROVIDER_RATE_LIMIT} ) @classmethod diff --git a/src/benchflow/providers/litellm_logging.py b/src/benchflow/providers/litellm_logging.py index 3336b7c3..776bedf5 100644 --- a/src/benchflow/providers/litellm_logging.py +++ b/src/benchflow/providers/litellm_logging.py @@ -16,6 +16,7 @@ from benchflow.usage_tracking import usage_unavailable _PROVIDER_AUTH_STATUS_CODES = (401, 403) +_PROVIDER_FAILURE_STATUS_CODES = (*_PROVIDER_AUTH_STATUS_CODES, 429, 503) _STATUS_KEYS = { "httpstatus", "httpstatuscode", @@ -28,7 +29,7 @@ "response_status", "response_status_code", } -_PROVIDER_AUTH_STATUS_RE = re.compile(r"\b(401|403)\b") +_PROVIDER_FAILURE_STATUS_RE = re.compile(r"\b(401|403|429|503)\b") _PROVIDER_AUTH_HINT_RE = re.compile( r"\b(" r"auth(?:entication|orization)?|" @@ -41,6 +42,25 @@ r")\b", re.IGNORECASE, ) +_PROVIDER_RATE_LIMIT_HINT_RE = re.compile( + r"\b(" + r"rate[-_ ]?limit(?:ed)?|" + r"too many requests|" + r"too many tokens|" + r"tokens per day|" + r"quota" + r")\b", + re.IGNORECASE, +) +_PROVIDER_UNAVAILABLE_HINT_RE = re.compile( + r"\b(" + r"service unavailable|" + r"temporarily unavailable|" + r"overloaded|" + r"upstream unavailable" + r")\b", + re.IGNORECASE, +) def callback_module_source() -> str: @@ -233,32 +253,32 @@ def _record_response_body(record: dict[str, Any]) -> dict[str, Any]: return body -def _coerce_auth_status(value: Any) -> int | None: - if isinstance(value, int) and value in _PROVIDER_AUTH_STATUS_CODES: +def _coerce_provider_failure_status(value: Any) -> int | None: + if isinstance(value, int) and value in _PROVIDER_FAILURE_STATUS_CODES: return value if isinstance(value, str): stripped = value.strip() if stripped.isdigit(): status = int(stripped) - if status in _PROVIDER_AUTH_STATUS_CODES: + if status in _PROVIDER_FAILURE_STATUS_CODES: return status return None -def _explicit_auth_status(value: Any) -> int | None: +def _explicit_provider_failure_status(value: Any) -> int | None: if isinstance(value, dict): for key, nested in value.items(): if str(key).lower() in _STATUS_KEYS: - status = _coerce_auth_status(nested) + status = _coerce_provider_failure_status(nested) if status is not None: return status for nested in value.values(): - status = _explicit_auth_status(nested) + status = _explicit_provider_failure_status(nested) if status is not None: return status elif isinstance(value, list | tuple): for nested in value: - status = _explicit_auth_status(nested) + status = _explicit_provider_failure_status(nested) if status is not None: return status return None @@ -280,25 +300,36 @@ def _flatten_failure_text(value: Any) -> str: return str(value) -def _provider_auth_status_from_failure_record(record: dict[str, Any]) -> int | None: - """Return a sanitized provider auth status from a LiteLLM failure record.""" +def _text_provider_failure_status(text: str) -> int | None: + match = _PROVIDER_FAILURE_STATUS_RE.search(text) + status = int(match.group(1)) if match is not None else None + if status in _PROVIDER_AUTH_STATUS_CODES: + return status if _PROVIDER_AUTH_HINT_RE.search(text) else None + if status == 429: + return status if _PROVIDER_RATE_LIMIT_HINT_RE.search(text) else None + if status == 503: + return status if _PROVIDER_UNAVAILABLE_HINT_RE.search(text) else None + if status is None and _PROVIDER_RATE_LIMIT_HINT_RE.search(text): + return 429 + if status is None and _PROVIDER_UNAVAILABLE_HINT_RE.search(text): + return 503 + return None + + +def _provider_failure_status_from_failure_record(record: dict[str, Any]) -> int | None: + """Return a sanitized provider failure status from a LiteLLM failure record.""" if record.get("event") != "failure": return None failure_payload = { "error": record.get("error"), "response": record.get("response"), } - status = _explicit_auth_status(failure_payload) + status = _explicit_provider_failure_status(failure_payload) if status is not None: return status text = _flatten_failure_text(failure_payload) - if not _PROVIDER_AUTH_HINT_RE.search(text): - return None - match = _PROVIDER_AUTH_STATUS_RE.search(text) - if match is None: - return None - return int(match.group(1)) + return _text_provider_failure_status(text) def trajectory_from_litellm_callback_log( @@ -327,7 +358,7 @@ def trajectory_from_litellm_callback_log( if record.get("event") == "success": status = 200 else: - status = _provider_auth_status_from_failure_record(record) or 500 + status = _provider_failure_status_from_failure_record(record) or 500 trajectory.exchanges.append( LLMExchange( request=LLMRequest( diff --git a/src/benchflow/rollout/__init__.py b/src/benchflow/rollout/__init__.py index 1154ae66..afa2eb84 100644 --- a/src/benchflow/rollout/__init__.py +++ b/src/benchflow/rollout/__init__.py @@ -158,6 +158,9 @@ from benchflow.rollout._usage import ( _NATIVE_ACP_USAGE_SNAPSHOT_TO_RESULT as _NATIVE_ACP_USAGE_SNAPSHOT_TO_RESULT, ) +from benchflow.rollout._usage import ( + ProviderFailure as ProviderFailure, +) from benchflow.rollout._usage import _as_nonnegative_int as _as_nonnegative_int from benchflow.rollout._usage import _native_acp_usage_delta as _native_acp_usage_delta from benchflow.rollout._usage import ( @@ -166,6 +169,12 @@ from benchflow.rollout._usage import ( _provider_auth_status_from_runtime as _provider_auth_status_from_runtime, ) +from benchflow.rollout._usage import ( + _provider_failure_from_runtime as _provider_failure_from_runtime, +) +from benchflow.rollout._usage import ( + _provider_failure_from_status as _provider_failure_from_status, +) from benchflow.rollout._usage import ( _zero_native_acp_usage_metrics as _zero_native_acp_usage_metrics, ) @@ -289,10 +298,13 @@ def __init__(self, config: RolloutConfig) -> None: self._usage_metrics: dict[str, Any] = self._planes.extract_usage(None) self._native_usage_metrics: dict[str, Any] = _zero_native_acp_usage_metrics() self._native_usage_checkpoint: dict[str, int | None] | None = None - # Provider 401/403 status snapshotted during cleanup, after the usage - # proxy imports its captures (Daytona's SandboxUsageProxy only fills - # trajectory on stop()). Read by _provider_auth_status() so ACP-error - # classification can fail fast on auth failures (#546/#564). + # Provider failure snapshotted during cleanup, after the usage proxy + # imports its captures (Daytona's SandboxUsageProxy only fills trajectory + # on stop()). Read by _provider_failure() so ACP-error classification can + # expose a provider auth/rate-limit/outage failure (status code only) + # instead of a generic ACP internal error (#546/#564). + self._provider_failure_cached: ProviderFailure | None = None + # Auth-only (401/403) view kept for callers added by PR #564. self._provider_auth_status_cached: int | None = None # Provider API failure summary (all statuses >= 400), snapshotted in # cleanup() alongside the auth status — consumed by the post-rollout @@ -1327,11 +1339,11 @@ async def cleanup(self) -> None: except Exception as e: logger.warning(f"Usage telemetry runtime stop failed: {e}") self._usage_metrics = self._planes.extract_usage(None) - # Snapshot any provider 401/403 now that captures are imported - # (stop() populated the trajectory). This must happen before we drop - # the runtime reference below, and is read later by ACP-error - # classification — for Daytona the trajectory is empty until here - # (#546/#564). + # Snapshot any provider failure (401/403/429/503) now that captures + # are imported (stop() populated the trajectory). This must happen + # before we drop the runtime reference below, and is read later by + # ACP-error classification — for Daytona the trajectory is empty + # until here (#546/#564). # # Coverage gap: only `self._usage_runtime` is scanned here. Bedrock # auth failures flow through `self._provider_runtime`, whose server @@ -1339,9 +1351,15 @@ async def cleanup(self) -> None: # fallback scan of it would always return None — useless, so it's # not implemented. The direct-AWS-Bedrock case (remote sandbox, # runtime=None) bypasses both proxies entirely and is out of scope. - self._provider_auth_status_cached = _provider_auth_status_from_runtime( + self._provider_failure_cached = _provider_failure_from_runtime( usage_runtime ) + self._provider_auth_status_cached = ( + self._provider_failure_cached.status + if self._provider_failure_cached is not None + and self._provider_failure_cached.marker == "provider auth failed" + else None + ) self._api_failure_summary_cached = ( _provider_api_failure_summary_from_runtime(usage_runtime) ) @@ -1734,17 +1752,29 @@ def _classify_acp_error(self, e: AgentProtocolError) -> str: f"Subscription auth credentials exist — unset the env var " f"to use them: env -u {key} " ) - # A real invalid-key failure often surfaces only as a generic + # A real provider failure often surfaces only as a generic # "ACP error -32603: Internal error" at this layer — the provider's - # actual 401/403 is visible only in the proxy-captured trajectory - # (#546/#564). Surface a sanitized auth marker (status code only — never - # the response body or headers) so RetryConfig.should_retry classifies - # it as provider_auth and fails fast instead of burning retries. - auth_status = self._provider_auth_status() - if auth_status is not None: - return f"{e} | provider auth failed (HTTP {auth_status})" + # actual 401/403/429/503 is visible only in the proxy-captured + # trajectory (#546/#564). Surface a sanitized marker (status code only — + # never the response body or headers) so RetryConfig.should_retry can + # classify it (auth/rate-limit fail fast; 503 stays retryable infra) + # instead of burning retries on a generic ACP error. + provider_failure = self._provider_failure() + if provider_failure is not None: + return f"{e} | {provider_failure.error_suffix}" return str(e) + def _provider_failure(self) -> ProviderFailure | None: + """Return the provider failure snapshotted during cleanup. + + Falls back to the auth-only status cache for partial Rollout doubles in + tests that set ``_provider_auth_status_cached`` directly (#564). + """ + failure = getattr(self, "_provider_failure_cached", None) + if failure is not None: + return failure + return _provider_failure_from_status(self._provider_auth_status()) + def _provider_auth_status(self) -> int | None: """Return the provider 401/403 status snapshotted during cleanup. diff --git a/src/benchflow/rollout/_usage.py b/src/benchflow/rollout/_usage.py index 4b360eb8..f8609371 100644 --- a/src/benchflow/rollout/_usage.py +++ b/src/benchflow/rollout/_usage.py @@ -9,26 +9,68 @@ from __future__ import annotations +from dataclasses import dataclass from typing import Any from benchflow.usage_tracking import usage_unavailable -def _provider_auth_status_from_runtime(runtime: Any) -> int | None: - """Return a provider 401/403 status from a usage runtime's trajectory. +@dataclass(frozen=True) +class ProviderFailure: + """Sanitized provider failure recovered from captured LLM exchanges.""" + + status: int + marker: str + + @property + def error_suffix(self) -> str: + return f"{self.marker} (HTTP {self.status})" + + +_PROVIDER_FAILURES: dict[int, ProviderFailure] = { + 401: ProviderFailure(401, "provider auth failed"), + 403: ProviderFailure(403, "provider auth failed"), + 429: ProviderFailure(429, "provider rate limited"), + 503: ProviderFailure(503, "provider unavailable"), +} + + +def _provider_failure_from_status(status: Any) -> ProviderFailure | None: + try: + return _PROVIDER_FAILURES.get(int(status)) + except (TypeError, ValueError): + return None + + +def _provider_failure_from_runtime(runtime: Any) -> ProviderFailure | None: + """Return the latest provider failure from a usage runtime trajectory. - Scans the captured provider HTTP exchanges for an auth-failure status. - Only the integer status code is read — never response bodies or headers — - so no credential material can leak into ``result.error`` (#546/#564). - Returns ``None`` when there is no runtime, no trajectory, or no 401/403. + Scans the captured provider HTTP exchanges for provider-owned failures that + ACP agents often wrap as a generic ``-32603 Internal error``. Only the + sanitized status code and category marker are surfaced, never response + bodies or headers, so no credential material reaches ``result.error`` + (#546/#564). Returns ``None`` when there is no runtime, no trajectory, or no + recognized provider-failure status. """ server = getattr(runtime, "server", None) trajectory = getattr(server, "trajectory", None) exchanges = getattr(trajectory, "exchanges", None) or [] for exchange in reversed(exchanges): status = getattr(getattr(exchange, "response", None), "status_code", None) - if status in (401, 403): - return status + failure = _provider_failure_from_status(status) + if failure is not None: + return failure + return None + + +def _provider_auth_status_from_runtime(runtime: Any) -> int | None: + """Backward-compatible auth-only view of provider failure extraction. + + Kept for callers added by PR #564 that only care about the 401/403 status. + """ + failure = _provider_failure_from_runtime(runtime) + if failure is not None and failure.marker == "provider auth failed": + return failure.status return None diff --git a/tests/test_eval_worker_retry.py b/tests/test_eval_worker_retry.py index f3233c35..9d7ea2ff 100644 --- a/tests/test_eval_worker_retry.py +++ b/tests/test_eval_worker_retry.py @@ -14,18 +14,23 @@ "ACP error -32603: Internal error: Failed to authenticate. " "API Error: 401 Invalid bearer token" ) +_PROVIDER_RATE_LIMIT_ERROR = ( + "ACP error -32603: Internal error | provider rate limited (HTTP 429)" +) def test_from_mapping_omitted_exclude_keeps_provider_auth(): - """A payload with no exclude_categories must still exclude provider_auth.""" + """Guards PR #653: omitted excludes must still exclude provider caps.""" cfg = RetryConfig.from_mapping({"max_retries": 3}) assert not cfg.should_retry(_PROVIDER_AUTH_ERROR) + assert not cfg.should_retry(_PROVIDER_RATE_LIMIT_ERROR) assert cfg.max_retries == 3 # other fields still parsed def test_from_mapping_none_uses_defaults(): cfg = RetryConfig.from_mapping(None) assert not cfg.should_retry(_PROVIDER_AUTH_ERROR) + assert not cfg.should_retry(_PROVIDER_RATE_LIMIT_ERROR) def test_from_mapping_explicit_exclude_respected(): @@ -43,11 +48,13 @@ def test_worker_retry_config_omitting_exclude_does_not_retry_provider_auth(): """Worker payload without retry.exclude_categories must not retry auth.""" cfg = _retry_config({"retry": {"max_retries": 2}}) assert not cfg.should_retry(_PROVIDER_AUTH_ERROR) + assert not cfg.should_retry(_PROVIDER_RATE_LIMIT_ERROR) def test_worker_retry_config_no_retry_key_at_all(): cfg = _retry_config({}) assert not cfg.should_retry(_PROVIDER_AUTH_ERROR) + assert not cfg.should_retry(_PROVIDER_RATE_LIMIT_ERROR) def test_evaluation_config_retry_excludes_provider_auth_by_default(): @@ -55,3 +62,4 @@ def test_evaluation_config_retry_excludes_provider_auth_by_default(): EvaluationConfig whose retry refuses to retry provider_auth (#564).""" eval_cfg = _evaluation_config({"tasks_dir": "/tmp/tasks", "agent": "oracle"}) assert not eval_cfg.retry.should_retry(_PROVIDER_AUTH_ERROR) + assert not eval_cfg.retry.should_retry(_PROVIDER_RATE_LIMIT_ERROR) diff --git a/tests/test_job.py b/tests/test_job.py index f2e69356..5d570cc2 100644 --- a/tests/test_job.py +++ b/tests/test_job.py @@ -59,6 +59,20 @@ def test_should_not_retry_sanitized_proxy_auth_marker(self): "ACP error -32603: Internal error | provider auth failed (HTTP 401)" ) + def test_should_not_retry_provider_rate_limit_sanitized_marker(self): + """Guards PR #653: Bedrock daily caps must not burn retries.""" + cfg = RetryConfig() + assert not cfg.should_retry( + "ACP error -32603: Internal error | provider rate limited (HTTP 429)" + ) + + def test_should_retry_provider_unavailable_sanitized_marker(self): + """Provider 503s are transient infra and remain retryable.""" + cfg = RetryConfig() + assert cfg.should_retry( + "ACP error -32603: Internal error | provider unavailable (HTTP 503)" + ) + def test_should_not_retry_timeout(self): cfg = RetryConfig() assert not cfg.should_retry("Agent timed out after 900.0s") diff --git a/tests/test_provider_auth_detection.py b/tests/test_provider_auth_detection.py index 6c2e062e..99d5914c 100644 --- a/tests/test_provider_auth_detection.py +++ b/tests/test_provider_auth_detection.py @@ -20,7 +20,10 @@ import pytest from benchflow.providers.litellm_logging import trajectory_from_litellm_callback_log -from benchflow.rollout import _provider_auth_status_from_runtime +from benchflow.rollout import ( + _provider_auth_status_from_runtime, + _provider_failure_from_runtime, +) def _runtime_with_statuses(statuses): @@ -142,6 +145,99 @@ def test_litellm_non_auth_failure_import_remains_generic_500(): ) +def test_litellm_daily_cap_failure_import_drives_sanitized_provider_marker( + tmp_path, +): + """Guards PR #653: surface Bedrock daily-token caps hidden behind + ACP ``-32603 Internal error`` as provider_rate_limit instead of acp_error.""" + from benchflow.agents.errors import AgentProtocolError + + record = { + "event": "failure", + "request_model": "benchflow-opus", + "provider_model": "aws-bedrock/us.anthropic.claude-opus-4-8", + "request": {"method": "POST", "path": "/v1/messages", "body": {}}, + "response": {}, + "error": { + "type": "RateLimitError", + "message": ( + "litellm.RateLimitError: BedrockException - " + '{"message":"Too many tokens per day, please wait before trying again."}' + ), + "traceback": ( + "MaskedHTTPStatusError: Client error '429 Too Many Requests' " + "for url 'https://bedrock-runtime.us-east-2.amazonaws.com/model/x/converse'" + ), + }, + "start_time": "2026-06-09T22:00:00", + "end_time": "2026-06-09T22:00:00", + } + + trajectory = trajectory_from_litellm_callback_log( + json.dumps(record), + session_id="session", + agent_name="openhands", + ) + failure = _provider_failure_from_runtime(_runtime_with_trajectory(trajectory)) + + assert trajectory.exchanges[0].response.status_code == 429 + assert failure is not None + assert failure.status == 429 + assert failure.error_suffix == "provider rate limited (HTTP 429)" + + rollout = _auth_rollout(tmp_path) + rollout._provider_failure_cached = failure + + classified = rollout._classify_acp_error( + AgentProtocolError("ACP error -32603: Internal error") + ) + + assert classified == ( + "ACP error -32603: Internal error | provider rate limited (HTTP 429)" + ) + assert "Too many tokens per day" not in classified + + +def test_litellm_503_failure_import_drives_sanitized_provider_marker(tmp_path): + """Guards PR #653: surface provider 503s hidden behind ACP errors.""" + from benchflow.agents.errors import AgentProtocolError + + record = { + "event": "failure", + "request_model": "benchflow-opus", + "provider_model": "aws-bedrock/us.anthropic.claude-opus-4-8", + "request": {"method": "POST", "path": "/v1/messages", "body": {}}, + "response": {}, + "error": { + "type": "APIError", + "message": "BedrockException - 503 Service Unavailable", + }, + "start_time": "2026-06-09T22:00:00", + "end_time": "2026-06-09T22:00:00", + } + + trajectory = trajectory_from_litellm_callback_log( + json.dumps(record), + session_id="session", + agent_name="openhands", + ) + failure = _provider_failure_from_runtime(_runtime_with_trajectory(trajectory)) + + assert trajectory.exchanges[0].response.status_code == 503 + assert failure is not None + assert failure.error_suffix == "provider unavailable (HTTP 503)" + + rollout = _auth_rollout(tmp_path) + rollout._provider_failure_cached = failure + + assert ( + rollout._classify_acp_error( + AgentProtocolError("ACP error -32603: Internal error") + ) + == "ACP error -32603: Internal error | provider unavailable (HTTP 503)" + ) + + def test_snapshot_after_late_capture_import(): """Guards PR #564 finding 1: a proxy that only populates its trajectory on stop() (Daytona's SandboxUsageProxy) must still yield the 401 once captures diff --git a/tests/test_scoring.py b/tests/test_scoring.py index 2c0ce61b..a757ab66 100644 --- a/tests/test_scoring.py +++ b/tests/test_scoring.py @@ -120,6 +120,24 @@ def test_provider_auth_lowercase_and_status_forms(self): ): assert classify_error(err) == "provider_auth", err + def test_provider_rate_limit_marker(self): + """Guards PR #653: Bedrock daily caps surface as provider_rate_limit.""" + assert ( + classify_error( + "ACP error -32603: Internal error | provider rate limited (HTTP 429)" + ) + == "provider_rate_limit" + ) + + def test_provider_unavailable_marker_is_infra(self): + """Provider 503s are transient infra, not generic ACP errors.""" + assert ( + classify_error( + "ACP error -32603: Internal error | provider unavailable (HTTP 503)" + ) + == "infra_failure" + ) + def test_generic_acp_internal_error_still_retryable(self): """Guards PR #564: a bare ACP internal error with no auth signal stays acp_error — only a real surfaced 401/403 should flip it to provider_auth."""