Skip to content

Commit 3bf7231

Browse files
author
Mateusz
committed
Sync local changes: compaction telemetry, CBOR disk-full handling, routing and tests
- Add compaction telemetry domain types and metrics recorder; extend history compaction, config, and interfaces. - Harden CBOR wire capture on ENOSPC/I/O (disable capture, throttle logs, avoid close chaining). - Propagate BackendError/RateLimitExceededError from ZAI Coding Plan without generic wrapper. - Update Anthropic connector, backend request preparation, routing compliance script, and related unit/integration tests. Made-with: Cursor
1 parent 9f838e9 commit 3bf7231

22 files changed

Lines changed: 3555 additions & 678 deletions

dev/scripts/check_routing_unification_compliance.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
APPROVED_INTERNAL_DELEGATE_CALL_CHAINS = {
2222
"self.call_completion",
2323
"self._backend_completion_flow.call_completion",
24+
"self._completion_flow.call_completion",
2425
}
2526

2627
OUTBOUND_CALL_METHOD_SUFFIXES = (

src/connectors/anthropic.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
from src.core.common.exceptions import (
2727
AuthenticationError,
2828
ConfigurationError,
29+
InvalidRequestError,
2930
ServiceUnavailableError,
3031
)
3132
from src.core.config.app_config import AppConfig
@@ -543,6 +544,14 @@ async def chat_completions( # type: ignore[override]
543544
through :class:`ConnectorChatCompletionsRequest`; legacy positional call shapes
544545
are not supported at this boundary.
545546
"""
547+
if not isinstance(request, ConnectorChatCompletionsRequest): # type: ignore[unreachable]
548+
raise InvalidRequestError(
549+
message=(
550+
f"chat_completions requires ConnectorChatCompletionsRequest, "
551+
f"got {type(request).__name__}"
552+
),
553+
details={"connector": "anthropic"},
554+
)
546555
return await self._chat_completions_canonical(request)
547556

548557
# -----------------------------------------------------------

src/connectors/zai_coding_plan.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ async def initialize(self, **kwargs: Any) -> None:
110110
code="missing_api_key",
111111
)
112112

113-
# Log masked API key for verification (show first 4 and last 4 chars)
113+
# Log masked API key for verification (show first 4 and last 4 chars)
114114
masked_key = self._mask_api_key(self.api_key)
115115
if logger.isEnabledFor(logging.INFO):
116116
logger.info(
@@ -573,6 +573,10 @@ async def chat_completions( # type: ignore[override]
573573
status_code=getattr(exc, "status_code", 502),
574574
code=f"zai_error_{getattr(exc, 'status_code', 'unknown')}",
575575
) from exc
576+
except BackendError:
577+
# RateLimitExceededError and other mapped domain errors must propagate
578+
# to resilience / HTTP mapping; do not wrap as zai_error_unexpected.
579+
raise
576580
except Exception as exc:
577581
if logger.isEnabledFor(logging.ERROR):
578582
logger.error(
@@ -1030,8 +1034,13 @@ async def _prepare_payload(
10301034
payload["tools"] = request_data.tools
10311035
logger.debug("Including tools: %d tools", len(request_data.tools))
10321036
if hasattr(request_data, "tool_choice") and request_data.tool_choice:
1033-
payload["tool_choice"] = request_data.tool_choice
1034-
logger.debug("Including tool_choice: %s", request_data.tool_choice)
1037+
tool_choice = request_data.tool_choice
1038+
if isinstance(tool_choice, dict) and tool_choice.get("type") == "function":
1039+
# ZAI coding-plan rejects function-enforced tool_choice objects;
1040+
# use OpenAI-compatible automatic tool selection instead.
1041+
tool_choice = "auto"
1042+
payload["tool_choice"] = tool_choice
1043+
logger.debug("Including tool_choice: %s", tool_choice)
10351044

10361045
# ZAI coding-plan gateway is strict about accepted payload shape.
10371046
# Only forward a minimal set of known-safe keys to avoid WAF/429 triggers.

src/core/config/env/from_env_part3.py

Lines changed: 46 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,50 @@ def _load_replacement_rules_from_env(
101101
return []
102102

103103

104+
def _apply_gemini_backend(
105+
config_backends: dict[str, Any],
106+
env: Mapping[str, str],
107+
gemini_key: str,
108+
resolution: ParameterResolution | None,
109+
) -> None:
110+
if logger.isEnabledFor(logging.INFO):
111+
_, gemini_key_source = get_env_value_with_windows_persistent_fallback(
112+
"GEMINI_API_KEY", environ=env
113+
)
114+
logger.info(
115+
"Gemini key diagnostics [from_env_part3]: env_type=%s source=%s",
116+
type(env).__name__,
117+
gemini_key_source,
118+
)
119+
120+
config_backends["gemini"] = config_backends.get("gemini", {})
121+
config_backends["gemini"]["api_key"] = gemini_key
122+
config_backends["gemini"]["api_url"] = _get_env_value(
123+
env,
124+
"GEMINI_API_BASE_URL",
125+
"https://generativelanguage.googleapis.com",
126+
path="backends.gemini.api_url",
127+
resolution=resolution,
128+
)
129+
gemini_timeout = _get_env_value(
130+
env,
131+
"GEMINI_TIMEOUT",
132+
None,
133+
path="backends.gemini.timeout",
134+
resolution=resolution,
135+
transform=lambda value: _to_int(value, 0),
136+
)
137+
if gemini_timeout:
138+
config_backends["gemini"]["timeout"] = gemini_timeout
139+
if resolution is not None:
140+
resolution.record(
141+
"backends.gemini.api_key",
142+
config_backends["gemini"]["api_key"],
143+
ParameterSource.ENVIRONMENT,
144+
origin="GEMINI_API_KEY",
145+
)
146+
147+
104148
def apply_config_part3(
105149
config: dict[str, Any],
106150
env: Mapping[str, str],
@@ -137,43 +181,11 @@ def apply_config_part3(
137181
origin="OPENROUTER_API_KEY",
138182
)
139183

140-
gemini_key, gemini_key_source = get_env_value_with_windows_persistent_fallback(
184+
gemini_key, _gemini_key_source = get_env_value_with_windows_persistent_fallback(
141185
"GEMINI_API_KEY", environ=env
142186
)
143187
if gemini_key and not _has_numbered_env_variants(env, "GEMINI_API_KEY"):
144-
if logger.isEnabledFor(logging.INFO):
145-
logger.info(
146-
"Gemini key diagnostics [from_env_part3]: env_type=%s source=%s",
147-
type(env).__name__,
148-
gemini_key_source,
149-
)
150-
151-
config_backends["gemini"] = config_backends.get("gemini", {})
152-
config_backends["gemini"]["api_key"] = gemini_key
153-
config_backends["gemini"]["api_url"] = _get_env_value(
154-
env,
155-
"GEMINI_API_BASE_URL",
156-
"https://generativelanguage.googleapis.com",
157-
path="backends.gemini.api_url",
158-
resolution=resolution,
159-
)
160-
gemini_timeout = _get_env_value(
161-
env,
162-
"GEMINI_TIMEOUT",
163-
None,
164-
path="backends.gemini.timeout",
165-
resolution=resolution,
166-
transform=lambda value: _to_int(value, 0),
167-
)
168-
if gemini_timeout:
169-
config_backends["gemini"]["timeout"] = gemini_timeout
170-
if resolution is not None:
171-
resolution.record(
172-
"backends.gemini.api_key",
173-
config_backends["gemini"]["api_key"],
174-
ParameterSource.ENVIRONMENT,
175-
origin="GEMINI_API_KEY",
176-
)
188+
_apply_gemini_backend(config_backends, env, gemini_key, resolution)
177189

178190
if env.get("ANTHROPIC_API_KEY"):
179191
config_backends["anthropic"] = config_backends.get("anthropic", {})
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
"""Telemetry domain models for history compaction.
2+
3+
Mirrors dynamic compression telemetry shape while keeping compaction
4+
semantics honest — no fake method pipelines, no recovery handles unless
5+
intentionally added later.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
from pydantic import Field
11+
12+
from src.core.interfaces.model_bases import DomainModel
13+
14+
15+
class CompactionEventRecord(DomainModel):
16+
"""Diagnostics for one compaction evaluation (per stale resource or no-op pass)."""
17+
18+
correlation_id: str | None = None
19+
tool_call_id: str | None = None
20+
tool_name: str = ""
21+
tool_category: str = ""
22+
resource_identity_hash: str = ""
23+
resource_identity_preview: str | None = None
24+
resource_preview_redacted: bool = False
25+
original_bytes: int = Field(default=0, ge=0)
26+
compacted_bytes: int = Field(default=0, ge=0)
27+
saved_bytes: int = Field(default=0, ge=0)
28+
original_tokens_estimate: int = Field(default=0, ge=0)
29+
saved_tokens_estimate: int = Field(default=0, ge=0)
30+
applied: bool = False
31+
decision_reason: str = "no_stale_results"
32+
failed_open: bool = False
33+
failure_reason: str | None = None
34+
elapsed_total_ms: float = Field(default=0.0, ge=0)
35+
original_sha256: str | None = None
36+
compacted_sha256: str | None = None
37+
warnings: list[str] = Field(default_factory=list)
38+
message_index: int | None = None
39+
40+
41+
class CompactionAggregateMetrics(DomainModel):
42+
"""Running aggregate stats for operator diagnostics."""
43+
44+
processed_evaluations: int = Field(default=0, ge=0)
45+
applied_evaluations: int = Field(default=0, ge=0)
46+
fail_open_count: int = Field(default=0, ge=0)
47+
total_original_bytes: int = Field(default=0, ge=0)
48+
total_compacted_bytes: int = Field(default=0, ge=0)
49+
total_saved_bytes: int = Field(default=0, ge=0)
50+
total_saved_tokens_estimate: int = Field(default=0, ge=0)
51+
by_category: dict[str, int] = Field(default_factory=dict)
52+
by_decision_reason: dict[str, int] = Field(default_factory=dict)
53+
54+
55+
class CompactionAlertRecord(DomainModel):
56+
"""Rate-limited alert emitted for frequent compaction issues."""
57+
58+
alert_type: str
59+
threshold: int = Field(ge=1)
60+
observed_count: int = Field(ge=0)
61+
window_seconds: int = Field(ge=1)
62+
warning: str
63+
category: str | None = None
64+
65+
66+
class EffectiveCompactionConfigDiagnostics(DomainModel):
67+
"""Redaction-safe effective-configuration diagnostics for operators."""
68+
69+
active_controls: list[str] = Field(default_factory=list)
70+
inactive_controls: list[str] = Field(default_factory=list)
71+
ignored_controls: list[str] = Field(default_factory=list)
72+
reasons: dict[str, str] = Field(default_factory=dict)
73+
fingerprint: str | None = None
74+
warnings: list[str] = Field(default_factory=list)

src/core/domain/configuration/compaction_config.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,33 @@
1515
from src.core.domain.compaction import ToolCategory
1616

1717

18+
@dataclass
19+
class CompactionAlertsConfig:
20+
"""Operator alert thresholds for compaction issues."""
21+
22+
enabled: bool = False
23+
window_seconds: int = 300
24+
cooldown_seconds: int = 600
25+
fail_open_threshold: int = 3
26+
overflow_risk_threshold: int = 3
27+
no_op_above_threshold_threshold: int = 5
28+
29+
@classmethod
30+
def from_dict(cls, data: dict[str, Any] | None) -> "CompactionAlertsConfig":
31+
"""Create alert config from dictionary payload."""
32+
payload = data or {}
33+
return cls(
34+
enabled=bool(payload.get("enabled", False)),
35+
window_seconds=int(payload.get("window_seconds", 300)),
36+
cooldown_seconds=int(payload.get("cooldown_seconds", 600)),
37+
fail_open_threshold=int(payload.get("fail_open_threshold", 3)),
38+
overflow_risk_threshold=int(payload.get("overflow_risk_threshold", 3)),
39+
no_op_above_threshold_threshold=int(
40+
payload.get("no_op_above_threshold_threshold", 5)
41+
),
42+
)
43+
44+
1845
@dataclass
1946
class CompactionConfig:
2047
"""Configuration for context compaction feature.
@@ -67,6 +94,9 @@ class CompactionConfig:
6794
"because a newer result for this resource exists later in the conversation."
6895
)
6996

97+
# Telemetry / alerting
98+
alerts: CompactionAlertsConfig = field(default_factory=CompactionAlertsConfig)
99+
70100
def is_tool_category_allowed(self, category: ToolCategory) -> bool:
71101
"""Check if a tool category is eligible for compaction.
72102
@@ -97,6 +127,7 @@ def is_tool_category_allowed(self, category: ToolCategory) -> bool:
97127
@classmethod
98128
def from_dict(cls, data: dict[str, Any]) -> "CompactionConfig":
99129
"""Create configuration from a dictionary."""
130+
alerts = CompactionAlertsConfig.from_dict(data.get("alerts", {}))
100131
return cls(
101132
enabled=data.get("enabled", False),
102133
token_threshold=data.get("token_threshold", 100_000),
@@ -110,12 +141,13 @@ def from_dict(cls, data: dict[str, Any]) -> "CompactionConfig":
110141
preserve_last_n_results=data.get("preserve_last_n_results", 1),
111142
stub_template=data.get("stub_template", cls.stub_template),
112143
redact_resource_identifiers=data.get("redact_resource_identifiers", False),
144+
alerts=alerts,
113145
)
114146

115147
@classmethod
116148
def disabled(cls) -> "CompactionConfig":
117149
"""Create a disabled configuration."""
118-
return cls(enabled=False)
150+
return cls(enabled=False, alerts=CompactionAlertsConfig(enabled=False))
119151

120152
@classmethod
121153
def default(cls) -> "CompactionConfig":
@@ -138,6 +170,7 @@ def default(cls) -> "CompactionConfig":
138170
],
139171
max_stubs_per_resource=1,
140172
preserve_last_n_results=1,
173+
alerts=CompactionAlertsConfig(enabled=False),
141174
)
142175

143176

src/core/interfaces/history_compaction_interface.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
from pydantic import BaseModel, Field
1818

1919
from src.core.domain.chat import ChatMessage
20+
from src.core.domain.compaction_telemetry import (
21+
CompactionAggregateMetrics,
22+
CompactionAlertRecord,
23+
CompactionEventRecord,
24+
EffectiveCompactionConfigDiagnostics,
25+
)
2026
from src.core.domain.configuration.compaction_config import (
2127
CompactionConfig,
2228
CompactionPolicies,
@@ -104,6 +110,10 @@ class CompactionResult:
104110
original_message_count: int = 0
105111
stale_resources: set[str] = field(default_factory=set)
106112
error: str | None = None
113+
event_records: list[CompactionEventRecord] = field(default_factory=list)
114+
aggregate_metrics: CompactionAggregateMetrics | None = None
115+
alerts: list[CompactionAlertRecord] = field(default_factory=list)
116+
effective_config_diagnostics: EffectiveCompactionConfigDiagnostics | None = None
107117

108118
@property
109119
def was_compacted(self) -> bool:

0 commit comments

Comments
 (0)