From b2bd90f9c61c5a70d59c5114cb6e78ed5bd069f4 Mon Sep 17 00:00:00 2001 From: Manoj Prabhakar Paidiparthy Date: Mon, 22 Jun 2026 23:25:10 -0700 Subject: [PATCH] feat(reconciler): repair broken client transcripts before reconcile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror CLI face's _repair_loaded_transcript_if_needed pattern in the HTTP chat-completions reconciler. Catches orphaned tool_use, ordering violations, and incomplete assistant turns before they reach the provider — without this, Anthropic returns HTTP 400. Healthy transcripts pass through unchanged (Layer 1 diagnostic is <10ms). The repair runs as Step 1 of reconcile_client_history, before the store.save persist. 🤖 Generated with [Amplifier](https://github.com/microsoft/amplifier) Co-Authored-By: Amplifier <240397093+microsoft-amplifier@users.noreply.github.com> --- CHANGELOG.md | 2 + src/amplifier_agent_http/_reconciler.py | 57 ++++++++-- tests/http/test_reconciler.py | 141 ++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index dd66442..e8f6c09 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -30,6 +30,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed +- **`reconcile_client_history` now runs foundation's transcript-repair pass before persisting** the client's view. Catches broken chat-completions clients (orphaned `tool_use` without paired `tool_result`, ordering violations, incomplete assistant turns) that would otherwise cause Anthropic to reject the next LLM call with HTTP 400. Mirrors `_runtime.py:_repair_loaded_transcript_if_needed` from the CLI face. Healthy transcripts pass through unchanged with negligible overhead (<10ms diagnostic). + - **`X-Session-Id` header is now recognized as a fallback** for the existing `X-Client-Session-Id` correlation mechanism (PR #71). opencode and other Vercel AI SDK-based clients send `X-Session-Id` by default; amplifier-agent now picks it up automatically, so session-resume + client-authoritative reconciliation works for opencode with zero config. `X-Client-Session-Id` remains authoritative when both headers are present. - **Workspace name is no longer suffixed with the client session id.** Previously, `X-Client-Session-Id: abc` would route requests into `workspaces/-abc/`. Now the workspace stays at `` and per-client distinction is purely at the session_id level (`workspaces//sessions/http-abc/`). This keeps workspace-level hook state (context-intelligence, etc.) shared across all sessions of the same server process, where it belongs. diff --git a/src/amplifier_agent_http/_reconciler.py b/src/amplifier_agent_http/_reconciler.py index e394987..a757a46 100644 --- a/src/amplifier_agent_http/_reconciler.py +++ b/src/amplifier_agent_http/_reconciler.py @@ -14,6 +14,8 @@ import logging from typing import TYPE_CHECKING, Any +from amplifier_foundation.session import diagnose_transcript, repair_transcript + if TYPE_CHECKING: from amplifier_agent_lib.session_store import SessionStore @@ -26,16 +28,25 @@ def reconcile_client_history( session_id: str, store: SessionStore, ) -> list[dict[str, Any]]: - """Persist the client's view as authoritative and return it for replay. + """Repair the client-sent transcript (if broken), then persist as authoritative. + + Step 1 — Layer-1 repair: run foundation's ``diagnose_transcript`` / + ``repair_transcript`` against the incoming messages. Catches orphaned + ``tool_use`` blocks (no paired ``tool_result``), ordering violations, and + incomplete assistant turns that would otherwise cause Anthropic to reject + the next LLM call with HTTP 400. Healthy transcripts pass through + unchanged. + + Step 2 — Persist the (now repaired) client view to the session store so + the kernel's resume path loads from a clean state. - Chat-completions is client-authoritative: opencode (and any conforming - OpenAI-compatible client) sends the full conversation every turn. Whatever - we have stored locally is at best a copy. On any divergence the client's - view wins. + Mirrors the CLI face's ``_runtime.py:_repair_loaded_transcript_if_needed`` + pattern, but for the HTTP wire's client-authoritative model: we trust the + client's view of the conversation, while still defending Anthropic's + API contract before replay. - We persist over the stored copy (idempotent on healthy resumes — same - content) so the next turn's load is consistent, then return for replay. - No divergence detection, no special events, no ceremony. + The repair runs every turn but is essentially free on healthy + transcripts (Layer 1, pure, <10ms — annotate + diagnose). Parameters ---------- @@ -51,10 +62,36 @@ def reconcile_client_history( Returns ------- list[dict] - The client's messages, unchanged. Returned for caller's convenience - so the caller doesn't have to re-reference ``client_messages`` + The client's messages, repaired if broken and stripped of any + ``line_num`` annotations. Returned for caller's convenience so + the caller doesn't have to re-reference ``client_messages`` downstream. """ + if client_messages: + # Foundation's diagnose_transcript prefers line_num annotations for + # the incomplete-turns fallback path. SessionStore doesn't annotate + # them, so add them to shallow copies before diagnosing. repair_transcript's + # output strips line_num itself; we strip again here defensively in case + # the healthy path is hit (no repair invocation). + annotated = [{**m, "line_num": i + 1} for i, m in enumerate(client_messages)] + diagnosis = diagnose_transcript(annotated) + + if diagnosis["status"] != "healthy": + repaired = repair_transcript(annotated, diagnosis) + client_messages = [{k: v for k, v in m.items() if k != "line_num"} for m in repaired] + logger.warning( + "Client-sent transcript was broken — repaired before reconcile. " + "failure_modes=%s orphaned_tool_ids=%s misplaced_tool_ids=%s " + "incomplete_turns=%d entries_before=%d entries_after=%d session=%s", + diagnosis["failure_modes"], + diagnosis["orphaned_tool_ids"], + diagnosis["misplaced_tool_ids"], + len(diagnosis["incomplete_turns"]), + len(annotated), + len(client_messages), + session_id, + ) + store.save( session_id, client_messages, diff --git a/tests/http/test_reconciler.py b/tests/http/test_reconciler.py index 9330a26..ac24c24 100644 --- a/tests/http/test_reconciler.py +++ b/tests/http/test_reconciler.py @@ -333,3 +333,144 @@ async def _fake_run(**kwargs: Any) -> str: assert any(msg.get("content") == "edited message" for msg in transcript if isinstance(msg.get("content"), str)), ( f"Expected 'edited message' in transcript, got: {transcript}" ) + + +# --------------------------------------------------------------------------- +# Unit tests: repair step in reconcile_client_history +# --------------------------------------------------------------------------- + + +def test_reconciler_repairs_orphaned_tool_use_before_persist(tmp_path: Path) -> None: + """Client sends a transcript with an orphaned tool_call (no matching tool result). + + The foundation's diagnose_transcript / repair_transcript operates on the + OpenAI wire format: ``tool_calls`` on the assistant message and ``role: + "tool"`` response messages with ``tool_call_id``. + + reconcile_client_history runs the foundation repair pass, synthesises a + synthetic ``role: "tool"`` result, persists the cleaned version to the + store, and returns the repaired list — not the original broken one. + """ + store = SessionStore(tmp_path) + sid = "http-repair-orphan" + + broken: list[dict[str, Any]] = [ + {"role": "user", "content": "do the thing"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "call-123", + "type": "function", + "function": {"name": "bash", "arguments": '{"command": "ls"}'}, + } + ], + }, + # missing role:"tool" tool_call_id:"call-123" result — that's the break + {"role": "user", "content": "follow up"}, + ] + + result = reconcile_client_history( + client_messages=broken, + session_id=sid, + store=store, + ) + + # The returned transcript must no longer have an orphaned tool_call. + # After repair the stored and returned transcripts should be consistent. + loaded = store.load(sid) + assert loaded is not None + transcript, metadata = loaded + assert metadata == {"last_turn": "client_reconciled"} + + # Returned value must be the repaired (stored) version. + assert result == transcript + + # Every tool_call id in the repaired transcript must have a paired + # role:"tool" result message. + tool_call_ids: set[str] = set() + tool_result_ids: set[str] = set() + for msg in result: + if msg.get("role") == "assistant": + for tc in msg.get("tool_calls") or []: + tool_call_ids.add(tc["id"]) + elif msg.get("role") == "tool": + tool_result_ids.add(msg.get("tool_call_id", "")) + unmatched = tool_call_ids - tool_result_ids + assert not unmatched, f"Orphaned tool_call ids remain after repair: {unmatched}" + + +def test_reconciler_healthy_transcript_passes_through_unchanged(tmp_path: Path) -> None: + """Client sends a well-formed transcript. + + diagnose_transcript reports healthy — no repair is invoked — and + store.save sees a transcript identity-equal to the original input. + """ + store = SessionStore(tmp_path) + sid = "http-healthy-passthrough" + + healthy: list[dict[str, Any]] = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "hi there"}, + {"role": "user", "content": "goodbye"}, + ] + + result = reconcile_client_history( + client_messages=healthy, + session_id=sid, + store=store, + ) + + # Healthy transcript: returned value is the same object (identity check). + assert result is healthy + + # Persisted view matches input. + loaded = store.load(sid) + assert loaded is not None + transcript, _ = loaded + assert transcript == healthy + + +def test_reconciler_logs_warning_with_failure_modes_on_repair(tmp_path: Path, caplog: Any) -> None: + """When a repair happens, a warning is logged containing 'failure_modes=' + and the session_id — operator visibility for production debugging. + + Uses the OpenAI wire format (``tool_calls`` / ``role: "tool"``) which is + what foundation's diagnose_transcript understands. + """ + import logging + + store = SessionStore(tmp_path) + sid = "http-warn-on-repair" + + broken: list[dict[str, Any]] = [ + {"role": "user", "content": "do the thing"}, + { + "role": "assistant", + "content": None, + "tool_calls": [ + { + "id": "call-456", + "type": "function", + "function": {"name": "bash", "arguments": '{"command": "pwd"}'}, + } + ], + }, + # missing role:"tool" tool_call_id:"call-456" result + {"role": "user", "content": "next step"}, + ] + + with caplog.at_level(logging.WARNING, logger="amplifier_agent_http._reconciler"): + reconcile_client_history( + client_messages=broken, + session_id=sid, + store=store, + ) + + # At least one WARNING record must contain the expected fields. + warning_texts = [r.getMessage() for r in caplog.records if r.levelno == logging.WARNING] + assert warning_texts, "Expected at least one WARNING log entry from the reconciler" + combined = " ".join(warning_texts) + assert "failure_modes=" in combined, f"'failure_modes=' not found in: {combined}" + assert sid in combined, f"session_id '{sid}' not found in warning: {combined}"