Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions src/benchflow/_utils/scoring.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,17 @@
INFRA_ERROR = "infra_failure"
SANDBOX_SETUP = "sandbox_setup"
PROVIDER_AUTH = "provider_auth"
# A provider rate-limit/quota failure that surfaced as a *raised*
# ``AgentProtocolError -32603`` and was sanitized at the ACP boundary
# (``_classify_acp_error``). This is the manifestation of a Bedrock daily token
# cap ("Too many tokens per day"), which crashes the agent and is futile to
# retry in-batch — so it is non-retryable (no retry branch + listed in
# ``RetryConfig.exclude_categories``). A 429 that instead surfaces *silently*
# (zero tokens, no raise) is handled by the post-rollout ``_maybe_classify_api_error``
# path as ``api_error[rate_limit/transient]`` and stays retryable, because a
# self-surfacing throttle self-heals on backoff. The two paths track genuinely
# different failure shapes, so the differing retry verdicts are intentional.
PROVIDER_RATE_LIMIT = "provider_rate_limit"
TIMED_OUT = "timeout"
# Provider API failures detected post-rollout (rate limit, quota, rejected
# request, 5xx). "api_error" is proxy-proven (every captured provider request
Expand All @@ -36,6 +47,19 @@
"http 401",
"http 403",
)
# Sanitized markers appended by ``_classify_acp_error`` when a raised ACP error
# hides a provider rate-limit (429) or outage (503). Matched case-insensitively
# against the lowercased error, same as the auth markers above.
_PROVIDER_RATE_LIMIT_MARKERS = (
"provider rate limited",
"rate limit",
"too many requests",
"http 429",
)
_PROVIDER_UNAVAILABLE_MARKERS = (
"provider unavailable",
"http 503",
)

# Verifier error category constants
VERIFIER_FAILED = "verifier_failure"
Expand Down Expand Up @@ -89,6 +113,10 @@ def classify_error(error: str | None) -> str | None:
if "ACP error" in error or "was rejected as invalid" in error:
if any(m in lower for m in _PROVIDER_AUTH_MARKERS):
return PROVIDER_AUTH
if any(m in lower for m in _PROVIDER_RATE_LIMIT_MARKERS):
return PROVIDER_RATE_LIMIT
if any(m in lower for m in _PROVIDER_UNAVAILABLE_MARKERS):
return INFRA_ERROR
return ACP_ERROR
if "sandbox startup" in lower or "sandbox creation" in lower:
return SANDBOX_SETUP
Expand Down
3 changes: 2 additions & 1 deletion src/benchflow/evaluation.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
INSTALL_FAILED,
PIPE_CLOSED,
PROVIDER_AUTH,
PROVIDER_RATE_LIMIT,
SUSPECTED_API_ERROR,
VERIFIER_DEP_INSTALL,
VERIFIER_INFRA,
Expand Down Expand Up @@ -167,7 +168,7 @@ class RetryConfig:
min_wait_sec: float = 1.0
max_wait_sec: float = 30.0
exclude_categories: set[str] = field(
default_factory=lambda: {"timeout", PROVIDER_AUTH}
default_factory=lambda: {"timeout", PROVIDER_AUTH, PROVIDER_RATE_LIMIT}
)

@classmethod
Expand Down
67 changes: 49 additions & 18 deletions src/benchflow/providers/litellm_logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from benchflow.usage_tracking import usage_unavailable

_PROVIDER_AUTH_STATUS_CODES = (401, 403)
_PROVIDER_FAILURE_STATUS_CODES = (*_PROVIDER_AUTH_STATUS_CODES, 429, 503)
_STATUS_KEYS = {
"httpstatus",
"httpstatuscode",
Expand All @@ -28,7 +29,7 @@
"response_status",
"response_status_code",
}
_PROVIDER_AUTH_STATUS_RE = re.compile(r"\b(401|403)\b")
_PROVIDER_FAILURE_STATUS_RE = re.compile(r"\b(401|403|429|503)\b")
_PROVIDER_AUTH_HINT_RE = re.compile(
r"\b("
r"auth(?:entication|orization)?|"
Expand All @@ -41,6 +42,25 @@
r")\b",
re.IGNORECASE,
)
_PROVIDER_RATE_LIMIT_HINT_RE = re.compile(
r"\b("
r"rate[-_ ]?limit(?:ed)?|"
r"too many requests|"
r"too many tokens|"
r"tokens per day|"
r"quota"
r")\b",
re.IGNORECASE,
)
_PROVIDER_UNAVAILABLE_HINT_RE = re.compile(
r"\b("
r"service unavailable|"
r"temporarily unavailable|"
r"overloaded|"
r"upstream unavailable"
r")\b",
re.IGNORECASE,
)


def callback_module_source() -> str:
Expand Down Expand Up @@ -233,32 +253,32 @@ def _record_response_body(record: dict[str, Any]) -> dict[str, Any]:
return body


def _coerce_auth_status(value: Any) -> int | None:
if isinstance(value, int) and value in _PROVIDER_AUTH_STATUS_CODES:
def _coerce_provider_failure_status(value: Any) -> int | None:
if isinstance(value, int) and value in _PROVIDER_FAILURE_STATUS_CODES:
return value
if isinstance(value, str):
stripped = value.strip()
if stripped.isdigit():
status = int(stripped)
if status in _PROVIDER_AUTH_STATUS_CODES:
if status in _PROVIDER_FAILURE_STATUS_CODES:
return status
return None


def _explicit_auth_status(value: Any) -> int | None:
def _explicit_provider_failure_status(value: Any) -> int | None:
if isinstance(value, dict):
for key, nested in value.items():
if str(key).lower() in _STATUS_KEYS:
status = _coerce_auth_status(nested)
status = _coerce_provider_failure_status(nested)
if status is not None:
return status
for nested in value.values():
status = _explicit_auth_status(nested)
status = _explicit_provider_failure_status(nested)
if status is not None:
return status
elif isinstance(value, list | tuple):
for nested in value:
status = _explicit_auth_status(nested)
status = _explicit_provider_failure_status(nested)
if status is not None:
return status
return None
Expand All @@ -280,25 +300,36 @@ def _flatten_failure_text(value: Any) -> str:
return str(value)


def _provider_auth_status_from_failure_record(record: dict[str, Any]) -> int | None:
"""Return a sanitized provider auth status from a LiteLLM failure record."""
def _text_provider_failure_status(text: str) -> int | None:
match = _PROVIDER_FAILURE_STATUS_RE.search(text)
status = int(match.group(1)) if match is not None else None
if status in _PROVIDER_AUTH_STATUS_CODES:
return status if _PROVIDER_AUTH_HINT_RE.search(text) else None
if status == 429:
return status if _PROVIDER_RATE_LIMIT_HINT_RE.search(text) else None
if status == 503:
return status if _PROVIDER_UNAVAILABLE_HINT_RE.search(text) else None
if status is None and _PROVIDER_RATE_LIMIT_HINT_RE.search(text):
return 429
if status is None and _PROVIDER_UNAVAILABLE_HINT_RE.search(text):
return 503
return None


def _provider_failure_status_from_failure_record(record: dict[str, Any]) -> int | None:
"""Return a sanitized provider failure status from a LiteLLM failure record."""
if record.get("event") != "failure":
return None
failure_payload = {
"error": record.get("error"),
"response": record.get("response"),
}
status = _explicit_auth_status(failure_payload)
status = _explicit_provider_failure_status(failure_payload)
if status is not None:
return status

text = _flatten_failure_text(failure_payload)
if not _PROVIDER_AUTH_HINT_RE.search(text):
return None
match = _PROVIDER_AUTH_STATUS_RE.search(text)
if match is None:
return None
return int(match.group(1))
return _text_provider_failure_status(text)


def trajectory_from_litellm_callback_log(
Expand Down Expand Up @@ -327,7 +358,7 @@ def trajectory_from_litellm_callback_log(
if record.get("event") == "success":
status = 200
else:
status = _provider_auth_status_from_failure_record(record) or 500
status = _provider_failure_status_from_failure_record(record) or 500
trajectory.exchanges.append(
LLMExchange(
request=LLMRequest(
Expand Down
66 changes: 48 additions & 18 deletions src/benchflow/rollout/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,9 @@
from benchflow.rollout._usage import (
_NATIVE_ACP_USAGE_SNAPSHOT_TO_RESULT as _NATIVE_ACP_USAGE_SNAPSHOT_TO_RESULT,
)
from benchflow.rollout._usage import (
ProviderFailure as ProviderFailure,
)
from benchflow.rollout._usage import _as_nonnegative_int as _as_nonnegative_int
from benchflow.rollout._usage import _native_acp_usage_delta as _native_acp_usage_delta
from benchflow.rollout._usage import (
Expand All @@ -166,6 +169,12 @@
from benchflow.rollout._usage import (
_provider_auth_status_from_runtime as _provider_auth_status_from_runtime,
)
from benchflow.rollout._usage import (
_provider_failure_from_runtime as _provider_failure_from_runtime,
)
from benchflow.rollout._usage import (
_provider_failure_from_status as _provider_failure_from_status,
)
from benchflow.rollout._usage import (
_zero_native_acp_usage_metrics as _zero_native_acp_usage_metrics,
)
Expand Down Expand Up @@ -289,10 +298,13 @@ def __init__(self, config: RolloutConfig) -> None:
self._usage_metrics: dict[str, Any] = self._planes.extract_usage(None)
self._native_usage_metrics: dict[str, Any] = _zero_native_acp_usage_metrics()
self._native_usage_checkpoint: dict[str, int | None] | None = None
# Provider 401/403 status snapshotted during cleanup, after the usage
# proxy imports its captures (Daytona's SandboxUsageProxy only fills
# trajectory on stop()). Read by _provider_auth_status() so ACP-error
# classification can fail fast on auth failures (#546/#564).
# Provider failure snapshotted during cleanup, after the usage proxy
# imports its captures (Daytona's SandboxUsageProxy only fills trajectory
# on stop()). Read by _provider_failure() so ACP-error classification can
# expose a provider auth/rate-limit/outage failure (status code only)
# instead of a generic ACP internal error (#546/#564).
self._provider_failure_cached: ProviderFailure | None = None
# Auth-only (401/403) view kept for callers added by PR #564.
self._provider_auth_status_cached: int | None = None
# Provider API failure summary (all statuses >= 400), snapshotted in
# cleanup() alongside the auth status — consumed by the post-rollout
Expand Down Expand Up @@ -1327,21 +1339,27 @@ async def cleanup(self) -> None:
except Exception as e:
logger.warning(f"Usage telemetry runtime stop failed: {e}")
self._usage_metrics = self._planes.extract_usage(None)
# Snapshot any provider 401/403 now that captures are imported
# (stop() populated the trajectory). This must happen before we drop
# the runtime reference below, and is read later by ACP-error
# classification — for Daytona the trajectory is empty until here
# (#546/#564).
# Snapshot any provider failure (401/403/429/503) now that captures
# are imported (stop() populated the trajectory). This must happen
# before we drop the runtime reference below, and is read later by
# ACP-error classification — for Daytona the trajectory is empty
# until here (#546/#564).
#
# Coverage gap: only `self._usage_runtime` is scanned here. Bedrock
# auth failures flow through `self._provider_runtime`, whose server
# (BedrockProxyServer) exposes no `.trajectory`/`.exchanges`, so a
# fallback scan of it would always return None — useless, so it's
# not implemented. The direct-AWS-Bedrock case (remote sandbox,
# runtime=None) bypasses both proxies entirely and is out of scope.
self._provider_auth_status_cached = _provider_auth_status_from_runtime(
self._provider_failure_cached = _provider_failure_from_runtime(
usage_runtime
)
self._provider_auth_status_cached = (
self._provider_failure_cached.status
if self._provider_failure_cached is not None
and self._provider_failure_cached.marker == "provider auth failed"
else None
)
self._api_failure_summary_cached = (
_provider_api_failure_summary_from_runtime(usage_runtime)
)
Expand Down Expand Up @@ -1734,17 +1752,29 @@ def _classify_acp_error(self, e: AgentProtocolError) -> str:
f"Subscription auth credentials exist — unset the env var "
f"to use them: env -u {key} <command>"
)
# A real invalid-key failure often surfaces only as a generic
# A real provider failure often surfaces only as a generic
# "ACP error -32603: Internal error" at this layer — the provider's
# actual 401/403 is visible only in the proxy-captured trajectory
# (#546/#564). Surface a sanitized auth marker (status code only — never
# the response body or headers) so RetryConfig.should_retry classifies
# it as provider_auth and fails fast instead of burning retries.
auth_status = self._provider_auth_status()
if auth_status is not None:
return f"{e} | provider auth failed (HTTP {auth_status})"
# actual 401/403/429/503 is visible only in the proxy-captured
# trajectory (#546/#564). Surface a sanitized marker (status code only —
# never the response body or headers) so RetryConfig.should_retry can
# classify it (auth/rate-limit fail fast; 503 stays retryable infra)
# instead of burning retries on a generic ACP error.
provider_failure = self._provider_failure()
if provider_failure is not None:
return f"{e} | {provider_failure.error_suffix}"
return str(e)

def _provider_failure(self) -> ProviderFailure | None:
"""Return the provider failure snapshotted during cleanup.

Falls back to the auth-only status cache for partial Rollout doubles in
tests that set ``_provider_auth_status_cached`` directly (#564).
"""
failure = getattr(self, "_provider_failure_cached", None)
if failure is not None:
return failure
return _provider_failure_from_status(self._provider_auth_status())

def _provider_auth_status(self) -> int | None:
"""Return the provider 401/403 status snapshotted during cleanup.

Expand Down
58 changes: 50 additions & 8 deletions src/benchflow/rollout/_usage.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,68 @@

from __future__ import annotations

from dataclasses import dataclass
from typing import Any

from benchflow.usage_tracking import usage_unavailable


def _provider_auth_status_from_runtime(runtime: Any) -> int | None:
"""Return a provider 401/403 status from a usage runtime's trajectory.
@dataclass(frozen=True)
class ProviderFailure:
"""Sanitized provider failure recovered from captured LLM exchanges."""

status: int
marker: str

@property
def error_suffix(self) -> str:
return f"{self.marker} (HTTP {self.status})"


_PROVIDER_FAILURES: dict[int, ProviderFailure] = {
401: ProviderFailure(401, "provider auth failed"),
403: ProviderFailure(403, "provider auth failed"),
429: ProviderFailure(429, "provider rate limited"),
503: ProviderFailure(503, "provider unavailable"),
}


def _provider_failure_from_status(status: Any) -> ProviderFailure | None:
try:
return _PROVIDER_FAILURES.get(int(status))
except (TypeError, ValueError):
return None


def _provider_failure_from_runtime(runtime: Any) -> ProviderFailure | None:
"""Return the latest provider failure from a usage runtime trajectory.

Scans the captured provider HTTP exchanges for an auth-failure status.
Only the integer status code is read — never response bodies or headers —
so no credential material can leak into ``result.error`` (#546/#564).
Returns ``None`` when there is no runtime, no trajectory, or no 401/403.
Scans the captured provider HTTP exchanges for provider-owned failures that
ACP agents often wrap as a generic ``-32603 Internal error``. Only the
sanitized status code and category marker are surfaced, never response
bodies or headers, so no credential material reaches ``result.error``
(#546/#564). Returns ``None`` when there is no runtime, no trajectory, or no
recognized provider-failure status.
"""
server = getattr(runtime, "server", None)
trajectory = getattr(server, "trajectory", None)
exchanges = getattr(trajectory, "exchanges", None) or []
for exchange in reversed(exchanges):
status = getattr(getattr(exchange, "response", None), "status_code", None)
if status in (401, 403):
return status
failure = _provider_failure_from_status(status)
if failure is not None:
return failure
return None


def _provider_auth_status_from_runtime(runtime: Any) -> int | None:
"""Backward-compatible auth-only view of provider failure extraction.

Kept for callers added by PR #564 that only care about the 401/403 status.
"""
failure = _provider_failure_from_runtime(runtime)
if failure is not None and failure.marker == "provider auth failed":
return failure.status
return None


Expand Down
Loading