diff --git a/.claude-plugin/marketplace.json b/.claude-plugin/marketplace.json index 8badd1c..36bd608 100644 --- a/.claude-plugin/marketplace.json +++ b/.claude-plugin/marketplace.json @@ -9,7 +9,7 @@ "source": { "source": "npm", "package": "@copilotkit/aimock", - "version": "^1.26.1" + "version": "^1.27.0" }, "description": "Fixture authoring skill for @copilotkit/aimock — LLM, multimedia (image/TTS/transcription/video), MCP, A2A, AG-UI, vector, embeddings, structured output, sequential responses, streaming physics, record/replay, agent loop patterns, and debugging" } diff --git a/.claude-plugin/plugin.json b/.claude-plugin/plugin.json index 2de28a4..c02577c 100644 --- a/.claude-plugin/plugin.json +++ b/.claude-plugin/plugin.json @@ -1,6 +1,6 @@ { "name": "aimock", - "version": "1.26.1", + "version": "1.27.0", "description": "Fixture authoring guidance for @copilotkit/aimock — LLM, multimedia, MCP, A2A, AG-UI, vector, and service mocking", "author": { "name": "CopilotKit" diff --git a/CHANGELOG.md b/CHANGELOG.md index 2ff3e52..3fce2ec 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,17 +2,16 @@ ## [Unreleased] -### Fixed +## [1.27.0] - 2026-05-20 -- **AG-UI recorder** — `extractLastUserMessage` now walks structured `content` - arrays (e.g. `[{ type: "text", text: "..." }, { type: "document", source: ... }]`) - and joins their text parts. Previously, structured content fell back to the - `__NO_USER_MESSAGE__` sentinel, producing fixtures that couldn't replay. +### Added -### Changed +- **HITL continuation recording/replay support** — `toolCallId` matching for continuation fixtures. New `toolCallId` field on `AGUIFixtureMatch` and `AGUIConfigFixture`. `getLastMessageIfToolResult` helper and `onToolResult` fluent API. Recorder uses tool-result-first priority for continuation fixtures. ([#233](https://github.com/CopilotKit/aimock/pull/233), closes [#232](https://github.com/CopilotKit/aimock/issues/232)) + +### Fixed -- **`AGUIMessage.content` type widened** to `string | AGUIMessageContentPart[]`. - New exported type `AGUIMessageContentPart` describes the per-part shape. +- **Walk structured content arrays in `extractLastUserMessage`** — handle multimodal user content (`AGUIMessageContentPart[]`) by joining text parts and skipping non-text. Export `NO_USER_MESSAGE_SENTINEL` constant and `AGUIMessageContentPart` type. ([#231](https://github.com/CopilotKit/aimock/pull/231)) +- **Harden recorder against error responses, double-settle, and broken sentinel persistence** — guard against recording fixtures from non-2xx upstream responses, add `settled` flag to prevent error+end race, skip disk write for predicate fixtures (sentinel was semantically broken on reload), include parse error reason in SSE warning log ## [1.26.1] - 2026-05-19 diff --git a/charts/aimock/Chart.yaml b/charts/aimock/Chart.yaml index 37a6b59..0de02f9 100644 --- a/charts/aimock/Chart.yaml +++ b/charts/aimock/Chart.yaml @@ -3,4 +3,4 @@ name: aimock description: Mock infrastructure for AI application testing (OpenAI, Anthropic, Gemini, MCP, A2A, vector) type: application version: 0.1.0 -appVersion: "1.26.1" +appVersion: "1.27.0" diff --git a/package.json b/package.json index 9d6f86a..ecf94fb 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@copilotkit/aimock", - "version": "1.26.1", + "version": "1.27.0", "description": "Mock infrastructure for AI application testing — LLM APIs, image generation, text-to-speech, transcription, audio generation, video generation, MCP tools, A2A agents, AG-UI event streams, vector databases, search, rerank, and moderation. One package, one port, zero dependencies.", "license": "MIT", "keywords": [ diff --git a/src/__tests__/agui-mock.test.ts b/src/__tests__/agui-mock.test.ts index 603445c..a78cb23 100644 --- a/src/__tests__/agui-mock.test.ts +++ b/src/__tests__/agui-mock.test.ts @@ -18,6 +18,8 @@ import { buildCompositeResponse, buildTextChunkResponse, extractLastUserMessage, + getLastMessageIfToolResult, + matchesFixture, } from "../agui-handler.js"; import { NO_USER_MESSAGE_SENTINEL } from "../agui-recorder.js"; import { LLMock } from "../llmock.js"; @@ -999,6 +1001,470 @@ describe("AGUIMock record & replay", () => { expect(types).toContain("TEXT_MESSAGE_CONTENT"); expect(types).toContain("RUN_FINISHED"); }); + + // ---- Continuation (HITL) recording tests (46-48) ---- + + it("46. continuation recording writes toolCallId fixture", async () => { + const upstreamUrl = await startUpstreamWithCounter("continuation reply"); + + agui = new AGUIMock({ port: 0 }); + agui.enableRecording({ upstream: upstreamUrl, proxyOnly: false, fixturePath: tmpDir }); + await agui.start(); + + const resp = await post(agui.url, { + messages: [{ id: "m1", role: "tool", toolCallId: "call_hitl_001", content: "approved" }], + threadId: "t1", + }); + expect(resp.status).toBe(200); + + const files = fs.readdirSync(tmpDir); + expect(files.length).toBe(1); + + const parsed = JSON.parse(fs.readFileSync(path.join(tmpDir, files[0]), "utf-8")); + expect(parsed.fixtures).toHaveLength(1); + expect(parsed.fixtures[0].match.toolCallId).toBe("call_hitl_001"); + expect(parsed.fixtures[0].match.message).toBeUndefined(); + }); + + it("47. continuation replay from recorded fixture", async () => { + const upstreamUrl = await startUpstreamWithCounter("replay me"); + + agui = new AGUIMock({ port: 0 }); + agui.enableRecording({ upstream: upstreamUrl, proxyOnly: false, fixturePath: tmpDir }); + await agui.start(); + + // First request — hits upstream + await post(agui.url, { + messages: [{ id: "m1", role: "tool", toolCallId: "call_hitl_001", content: "approved" }], + threadId: "t1", + }); + expect(requestCount).toBe(1); + + // Second identical request — should match in-memory fixture + const resp2 = await post(agui.url, { + messages: [{ id: "m1", role: "tool", toolCallId: "call_hitl_001", content: "approved" }], + threadId: "t1", + }); + expect(resp2.status).toBe(200); + expect(requestCount).toBe(1); // upstream NOT hit again + + const events = parseSSEEvents(resp2.body); + const content = events.find((e) => e.type === "TEXT_MESSAGE_CONTENT") as unknown as Record< + string, + unknown + >; + expect(content.delta).toBe("replay me"); + }); + + it("48. normal request still records message fixture", async () => { + const upstreamUrl = await startUpstreamWithCounter("normal reply"); + + agui = new AGUIMock({ port: 0 }); + agui.enableRecording({ upstream: upstreamUrl, proxyOnly: false, fixturePath: tmpDir }); + await agui.start(); + + const resp = await post(agui.url, aguiInput("normal user message")); + expect(resp.status).toBe(200); + + const files = fs.readdirSync(tmpDir); + expect(files.length).toBe(1); + + const parsed = JSON.parse(fs.readFileSync(path.join(tmpDir, files[0]), "utf-8")); + expect(parsed.fixtures).toHaveLength(1); + expect(parsed.fixtures[0].match.message).toBe("normal user message"); + expect(parsed.fixtures[0].match.toolCallId).toBeUndefined(); + }); + + it("49. fallback predicate fixture is in-memory only (no disk write)", async () => { + const upstreamUrl = await startUpstreamWithCounter("sentinel reply"); + + agui = new AGUIMock({ port: 0 }); + agui.enableRecording({ upstream: upstreamUrl, proxyOnly: false, fixturePath: tmpDir }); + await agui.start(); + + const resp = await post(agui.url, { + messages: [{ id: "m1", role: "tool", content: "no-id" }], + threadId: "t1", + }); + expect(resp.status).toBe(200); + + // Predicate fixtures should NOT be written to disk — the sentinel + // string becomes a literal match that never matches real requests + const files = fs.readdirSync(tmpDir); + expect(files.length).toBe(0); + + // But the fixture IS available in memory for same-session replay + const resp2 = await post(agui.url, { + messages: [{ id: "m2", role: "tool", content: "another-no-id" }], + threadId: "t2", + }); + expect(resp2.status).toBe(200); + }); + + // ---- Recorder priority test (50) ---- + + it("50. tool result wins over user message in history", async () => { + const upstreamUrl = await startUpstreamWithCounter("priority reply"); + + agui = new AGUIMock({ port: 0 }); + agui.enableRecording({ upstream: upstreamUrl, proxyOnly: false, fixturePath: tmpDir }); + await agui.start(); + + const resp = await post(agui.url, { + messages: [ + { id: "m1", role: "user", content: "approve the action" }, + { id: "m2", role: "tool", toolCallId: "call_789", content: "result" }, + ], + threadId: "t1", + }); + expect(resp.status).toBe(200); + + const files = fs.readdirSync(tmpDir); + expect(files.length).toBe(1); + + const parsed = JSON.parse(fs.readFileSync(path.join(tmpDir, files[0]), "utf-8")); + expect(parsed.fixtures).toHaveLength(1); + expect(parsed.fixtures[0].match.toolCallId).toBe("call_789"); + expect(parsed.fixtures[0].match.message).toBeUndefined(); + }); + + // ---- Full HITL round-trip integration test (51) ---- + + it("51. full round-trip: record two legs then replay without upstream", async () => { + // Build upstream with two fixtures routed by predicate. + // Continuation (leg 2) must be checked FIRST because leg 2 requests + // also contain the original user message in history. + upstream = new AGUIMock({ port: 0 }); + const leg1Events = buildToolCallResponse("confirm_action", '{"action":"delete"}'); + const leg2Events = buildTextResponse("Action confirmed"); + + // Leg 2: continuation — last message is tool result with toolCallId + upstream.onPredicate((input) => { + const msgs = input.messages ?? []; + const last = msgs[msgs.length - 1]; + if (last?.role === "tool" && last?.toolCallId === "call_rt_001") { + requestCount++; + return true; + } + return false; + }, leg2Events); + + // Leg 1: initial user message (only matches when last message is NOT a tool result) + upstream.onPredicate((input) => { + const msgs = input.messages ?? []; + const last = msgs[msgs.length - 1]; + if (last?.role === "user") { + requestCount++; + return true; + } + return false; + }, leg1Events); + + const upstreamUrl = await upstream.start(); + + // Create recording proxy + agui = new AGUIMock({ port: 0 }); + agui.enableRecording({ upstream: upstreamUrl, proxyOnly: false, fixturePath: tmpDir }); + await agui.start(); + + // RECORD PHASE — Leg 1: user message + const resp1 = await post(agui.url, { + messages: [{ id: "m1", role: "user", content: "What should I do?" }], + threadId: "t-rt", + }); + expect(resp1.status).toBe(200); + const events1 = parseSSEEvents(resp1.body); + const types1 = events1.map((e) => e.type); + expect(types1).toContain("TOOL_CALL_START"); + expect(types1).toContain("TOOL_CALL_ARGS"); + + // RECORD PHASE — Leg 2: continuation with tool result + // Note: only tool result in messages (no user message) so it doesn't + // match the leg 1 fixture keyed on user message content. + const resp2 = await post(agui.url, { + messages: [{ id: "m2", role: "tool", toolCallId: "call_rt_001", content: "confirmed" }], + threadId: "t-rt", + }); + expect(resp2.status).toBe(200); + const events2 = parseSSEEvents(resp2.body); + const content2 = events2.find((e) => e.type === "TEXT_MESSAGE_CONTENT") as unknown as Record< + string, + unknown + >; + expect(content2.delta).toBe("Action confirmed"); + + // Verify 2 fixture files on disk: one with match.message, one with match.toolCallId + const files = fs.readdirSync(tmpDir).sort(); + expect(files.length).toBe(2); + + const fixtures = files.map((f) => JSON.parse(fs.readFileSync(path.join(tmpDir, f), "utf-8"))); + const matchTypes = fixtures.map((f) => f.fixtures[0].match); + const hasMessage = matchTypes.some( + (m: Record) => m.message !== undefined && m.toolCallId === undefined, + ); + const hasToolCallId = matchTypes.some( + (m: Record) => m.toolCallId !== undefined, + ); + expect(hasMessage).toBe(true); + expect(hasToolCallId).toBe(true); + + // Track how many times upstream was hit during recording + const recordingHits = requestCount; + + // REPLAY PHASE — Stop upstream + await upstream.stop(); + upstream = null; + + // Replay leg 1 + const replay1 = await post(agui.url, { + messages: [{ id: "m1", role: "user", content: "What should I do?" }], + threadId: "t-rt", + }); + expect(replay1.status).toBe(200); + const replayEvents1 = parseSSEEvents(replay1.body); + const replayTypes1 = replayEvents1.map((e) => e.type); + expect(replayTypes1).toContain("TOOL_CALL_START"); + + // Replay leg 2 + const replay2 = await post(agui.url, { + messages: [{ id: "m2", role: "tool", toolCallId: "call_rt_001", content: "confirmed" }], + threadId: "t-rt", + }); + expect(replay2.status).toBe(200); + const replayEvents2 = parseSSEEvents(replay2.body); + const replayContent2 = replayEvents2.find( + (e) => e.type === "TEXT_MESSAGE_CONTENT", + ) as unknown as Record; + expect(replayContent2.delta).toBe("Action confirmed"); + + // Upstream was hit exactly twice during recording, zero during replay + expect(requestCount).toBe(recordingHits); + }); +}); + +// --------------------------------------------------------------------------- +// getLastMessageIfToolResult unit tests (33-35) +// --------------------------------------------------------------------------- + +describe("getLastMessageIfToolResult", () => { + it("33. returns the tool message when last message has role tool with toolCallId", () => { + const input: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [ + { id: "m1", role: "user", content: "do something" }, + { id: "m2", role: "tool", toolCallId: "call_abc", content: "approved" }, + ], + }; + const result = getLastMessageIfToolResult(input); + expect(result).not.toBeNull(); + expect(result!.id).toBe("m2"); + expect(result!.role).toBe("tool"); + expect(result!.toolCallId).toBe("call_abc"); + }); + + it("34. returns null when last message is not role tool", () => { + const input: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [ + { id: "m1", role: "tool", toolCallId: "call_abc", content: "result" }, + { id: "m2", role: "user", content: "follow up" }, + ], + }; + const result = getLastMessageIfToolResult(input); + expect(result).toBeNull(); + }); + + it("35. returns null for empty/undefined messages array", () => { + expect(getLastMessageIfToolResult({ threadId: "t1", runId: "r1", messages: [] })).toBeNull(); + expect( + getLastMessageIfToolResult({ threadId: "t1", runId: "r1" } as AGUIRunAgentInput), + ).toBeNull(); + }); +}); + +// --------------------------------------------------------------------------- +// matchesFixture toolCallId unit tests (36-41) +// --------------------------------------------------------------------------- + +describe("matchesFixture toolCallId", () => { + it("36. toolCallId matches when last message is role tool with matching toolCallId", () => { + const input: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [ + { id: "m1", role: "user", content: "hello" }, + { id: "m2", role: "tool", toolCallId: "call_xyz", content: "done" }, + ], + }; + expect(matchesFixture(input, { toolCallId: "call_xyz" })).toBe(true); + }); + + it("37. toolCallId does not match wrong ID", () => { + const input: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [{ id: "m1", role: "tool", toolCallId: "call_xyz", content: "done" }], + }; + expect(matchesFixture(input, { toolCallId: "call_wrong" })).toBe(false); + }); + + it("38. toolCallId does not match when last message is not role tool", () => { + const input: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [ + { id: "m1", role: "tool", toolCallId: "call_xyz", content: "done" }, + { id: "m2", role: "user", content: "follow up" }, + ], + }; + expect(matchesFixture(input, { toolCallId: "call_xyz" })).toBe(false); + }); + + it("39. toolCallId does not match when toolCallId absent on message", () => { + const input: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [{ id: "m1", role: "tool", content: "no toolCallId" }], + }; + expect(matchesFixture(input, { toolCallId: "call_xyz" })).toBe(false); + }); + + it("40. AND logic — fixture with both message and toolCallId must both match", () => { + const inputBothMatch: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [ + { id: "m1", role: "user", content: "approve this" }, + { id: "m2", role: "tool", toolCallId: "call_abc", content: "approved" }, + ], + }; + // Both criteria match + expect(matchesFixture(inputBothMatch, { message: "approve", toolCallId: "call_abc" })).toBe( + true, + ); + + // Message matches but toolCallId does not + expect(matchesFixture(inputBothMatch, { message: "approve", toolCallId: "call_wrong" })).toBe( + false, + ); + + // toolCallId matches but message does not + expect(matchesFixture(inputBothMatch, { message: "reject", toolCallId: "call_abc" })).toBe( + false, + ); + }); + + it("41. AND logic — fixture with both toolCallId and stateKey must both match", () => { + const inputBothMatch: AGUIRunAgentInput = { + threadId: "t1", + runId: "r1", + messages: [{ id: "m1", role: "tool", toolCallId: "call_state", content: "result" }], + state: { counter: 10 }, + }; + // Both criteria match + expect(matchesFixture(inputBothMatch, { toolCallId: "call_state", stateKey: "counter" })).toBe( + true, + ); + + // toolCallId matches but stateKey does not + expect(matchesFixture(inputBothMatch, { toolCallId: "call_state", stateKey: "missing" })).toBe( + false, + ); + + // stateKey matches but toolCallId does not + expect(matchesFixture(inputBothMatch, { toolCallId: "call_wrong", stateKey: "counter" })).toBe( + false, + ); + }); +}); + +// --------------------------------------------------------------------------- +// onToolResult fluent API tests (42-43) +// --------------------------------------------------------------------------- + +describe("onToolResult fluent API", () => { + it("42. onToolResult registers fixture and matching request returns 200", async () => { + agui = new AGUIMock({ port: 0 }); + agui.onToolResult("call_abc", buildTextResponse("continuation response")); + await agui.start(); + + const resp = await post(agui.url, { + messages: [{ id: "m1", role: "tool", toolCallId: "call_abc", content: "approved" }], + threadId: "test", + }); + expect(resp.status).toBe(200); + + const events = parseSSEEvents(resp.body); + const content = events.find((e) => e.type === "TEXT_MESSAGE_CONTENT") as unknown as Record< + string, + unknown + >; + expect(content).toBeDefined(); + expect(content.delta).toBe("continuation response"); + }); + + it("43. onToolResult with delayMs applies delay", async () => { + agui = new AGUIMock({ port: 0 }); + agui.onToolResult("call_delayed", buildTextResponse("delayed continuation"), 50); + await agui.start(); + + const start = Date.now(); + const resp = await post(agui.url, { + messages: [{ id: "m1", role: "tool", toolCallId: "call_delayed", content: "result" }], + threadId: "test", + }); + const elapsed = Date.now() - start; + + expect(resp.status).toBe(200); + // 5 events * 50ms = 250ms minimum + expect(elapsed).toBeGreaterThanOrEqual(200); + }); +}); + +// --------------------------------------------------------------------------- +// Config loader toolCallId pass-through tests (44-45) +// --------------------------------------------------------------------------- + +describe("config loader toolCallId pass-through", () => { + it("44. addFixture with toolCallId match returns 200 for matching tool message", async () => { + agui = new AGUIMock({ port: 0 }); + agui.addFixture({ + match: { toolCallId: "call_config_123" }, + events: buildTextResponse("config response"), + }); + await agui.start(); + + const resp = await post(agui.url, { + messages: [{ id: "m1", role: "tool", toolCallId: "call_config_123", content: "tool output" }], + threadId: "test", + }); + expect(resp.status).toBe(200); + + const events = parseSSEEvents(resp.body); + const content = events.find((e) => e.type === "TEXT_MESSAGE_CONTENT") as unknown as Record< + string, + unknown + >; + expect(content).toBeDefined(); + expect(content.delta).toBe("config response"); + }); + + it("45. same fixture returns 404 for user message instead of tool result", async () => { + agui = new AGUIMock({ port: 0 }); + agui.addFixture({ + match: { toolCallId: "call_config_123" }, + events: buildTextResponse("config response"), + }); + await agui.start(); + + const resp = await post(agui.url, { + messages: [{ id: "m1", role: "user", content: "hello" }], + threadId: "test", + }); + expect(resp.status).toBe(404); + }); }); // --------------------------------------------------------------------------- @@ -1192,7 +1658,7 @@ describe("AGUIMock recorder — structured user content", () => { expect(parsed.fixtures[0].match.message).not.toBe(NO_USER_MESSAGE_SENTINEL); }); - it("still writes the sentinel when no user text is present (e.g. only file parts)", async () => { + it("skips disk write for no-user-text predicate fixtures (in-memory only)", async () => { upstream = new AGUIMock({ port: 0 }); upstream.onPredicate(() => true, buildTextResponse("ok")); const upstreamUrl = await upstream.start(); @@ -1218,9 +1684,23 @@ describe("AGUIMock recorder — structured user content", () => { expect(resp.status).toBe(200); const files = fs.readdirSync(tmpDir); - expect(files.length).toBe(1); - const parsed = JSON.parse(fs.readFileSync(path.join(tmpDir, files[0]), "utf-8")); - expect(parsed.fixtures[0].match.message).toBe(NO_USER_MESSAGE_SENTINEL); + expect(files.length).toBe(0); + + const resp2 = await post(agui.url, { + messages: [ + { + id: "u2", + role: "user", + content: [ + { + type: "document", + source: { type: "data", value: "BBB=", mimeType: "text/plain" }, + }, + ], + }, + ], + } as AGUIRunAgentInput); + expect(resp2.status).toBe(200); }); }); diff --git a/src/agui-handler.ts b/src/agui-handler.ts index 791a01a..acdc115 100644 --- a/src/agui-handler.ts +++ b/src/agui-handler.ts @@ -79,6 +79,15 @@ function extractTextFromContent(content: AGUIMessage["content"]): string { return parts.join(" ").trim(); } +/** + * Return the absolute last message if it has role "tool", otherwise null. + */ +export function getLastMessageIfToolResult(input: AGUIRunAgentInput): AGUIMessage | null { + if (!input.messages || input.messages.length === 0) return null; + const last = input.messages[input.messages.length - 1]; + return last.role === "tool" ? last : null; +} + /** * Check whether an input matches a fixture's match criteria. * All specified criteria must pass (AND logic). @@ -94,6 +103,13 @@ export function matchesFixture(input: AGUIRunAgentInput, match: AGUIFixtureMatch } } + if (match.toolCallId !== undefined) { + const lastMsg = input.messages?.[input.messages.length - 1]; + if (!lastMsg || lastMsg.role !== "tool" || lastMsg.toolCallId !== match.toolCallId) { + return false; + } + } + if (match.toolName !== undefined) { const tools = input.tools ?? []; if (!tools.some((t) => t.name === match.toolName)) return false; diff --git a/src/agui-mock.ts b/src/agui-mock.ts index 3c71a57..192b2f8 100644 --- a/src/agui-mock.ts +++ b/src/agui-mock.ts @@ -112,6 +112,15 @@ export class AGUIMock implements Mountable { return this; } + onToolResult(toolCallId: string, events: AGUIEvent[], delayMs?: number): this { + this.fixtures.push({ + match: { toolCallId }, + events, + delayMs, + }); + return this; + } + enableRecording(config: AGUIRecordConfig): this { this.recordConfig = config; return this; diff --git a/src/agui-recorder.ts b/src/agui-recorder.ts index 4e934e2..96f72d8 100644 --- a/src/agui-recorder.ts +++ b/src/agui-recorder.ts @@ -3,8 +3,14 @@ import * as https from "node:https"; import * as fs from "node:fs"; import * as path from "node:path"; import * as crypto from "node:crypto"; -import type { AGUIFixture, AGUIRecordConfig, AGUIEvent, AGUIRunAgentInput } from "./agui-types.js"; -import { extractLastUserMessage } from "./agui-handler.js"; +import type { + AGUIFixture, + AGUIFixtureMatch, + AGUIRecordConfig, + AGUIEvent, + AGUIRunAgentInput, +} from "./agui-types.js"; +import { extractLastUserMessage, getLastMessageIfToolResult } from "./agui-handler.js"; import type { Logger } from "./logger.js"; /** @@ -164,7 +170,11 @@ function teeUpstreamStream( chunks.push(chunk); }); + let settled = false; + upstreamRes.on("error", (err) => { + if (settled) return; + settled = true; try { if (!clientRes.headersSent) { clientRes.writeHead(502, { "Content-Type": "application/json" }); @@ -182,6 +192,22 @@ function teeUpstreamStream( }); upstreamRes.on("end", () => { + if (settled) return; + settled = true; + + // Don't record fixtures for non-2xx upstream responses + if (clientStatus !== 200) { + try { + if (!clientRes.writableEnded) clientRes.end(); + } catch (writeErr) { + logger.warn( + "Failed to end client response:", + writeErr instanceof Error ? writeErr.message : String(writeErr), + ); + } + resolve(clientStatus); + return; + } try { if (!clientRes.writableEnded) clientRes.end(); } catch (writeErr) { @@ -195,55 +221,68 @@ function teeUpstreamStream( const buffered = Buffer.concat(chunks).toString(); const events = parseSSEEvents(buffered, logger); - // Build fixture - const message = extractLastUserMessage(input); - const fixture: AGUIFixture = { - match: message - ? { message } - : { - predicate: (inp: AGUIRunAgentInput) => - !inp.messages?.length || !inp.messages.some((m) => m.role === "user"), - }, - events, - }; - if (!message) { - logger.warn( - `Recorded AG-UI fixture has no user message — will use ${NO_USER_MESSAGE_SENTINEL} sentinel on disk`, - ); + // Build fixture — three-way match priority: + // 1. Tool-result continuation (HITL): match by toolCallId + // 2. User message: match by last user message content + // 3. Fallback predicate: no user message present + let match: AGUIFixtureMatch; + const lastToolResult = getLastMessageIfToolResult(input); + if (lastToolResult?.toolCallId) { + match = { toolCallId: lastToolResult.toolCallId }; + logger.info(`Recorded AG-UI fixture keyed on toolCallId=${lastToolResult.toolCallId}`); + } else { + const message = extractLastUserMessage(input); + if (message) { + match = { message }; + } else { + match = { + predicate: (inp: AGUIRunAgentInput) => + !inp.messages?.length || !inp.messages.some((m) => m.role === "user"), + }; + logger.warn( + "Recorded AG-UI fixture has no user message — available in-memory only (predicate fixtures cannot be persisted to disk)", + ); + } } + const fixture: AGUIFixture = { match, events }; if (!config.proxyOnly) { // Register in memory first (always available even if disk write fails) fixtures.push(fixture); - // Write to disk — predicate functions are not serializable, - // so replace with a sentinel string that won't match real user messages. - const serializableFixture = { - match: fixture.match.predicate - ? { message: NO_USER_MESSAGE_SENTINEL } - : fixture.match, - events: fixture.events, - ...(fixture.delayMs !== undefined ? { delayMs: fixture.delayMs } : {}), - }; + // Predicate fixtures (no user message, no toolCallId) cannot be + // meaningfully serialized — the sentinel becomes a literal string + // match that never matches real requests. Keep in-memory only. + if (fixture.match.predicate) { + logger.warn( + "Skipping disk write for predicate fixture — in-memory only (cannot be persisted)", + ); + } else { + const serializableFixture = { + match: fixture.match, + events: fixture.events, + ...(fixture.delayMs !== undefined ? { delayMs: fixture.delayMs } : {}), + }; - const fixturePath = config.fixturePath ?? "./fixtures/agui-recorded"; - const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); - const filename = `agui-${timestamp}-${crypto.randomUUID().slice(0, 8)}.json`; - const filepath = path.join(fixturePath, filename); + const fixturePath = config.fixturePath ?? "./fixtures/agui-recorded"; + const timestamp = new Date().toISOString().replace(/[:.]/g, "-"); + const filename = `agui-${timestamp}-${crypto.randomUUID().slice(0, 8)}.json`; + const filepath = path.join(fixturePath, filename); - try { - fs.mkdirSync(fixturePath, { recursive: true }); - fs.writeFileSync( - filepath, - JSON.stringify({ fixtures: [serializableFixture] }, null, 2), - "utf-8", - ); - logger.warn(`AG-UI response recorded → ${filepath}`); - } catch (err) { - const msg = err instanceof Error ? err.message : "Unknown filesystem error"; - logger.error( - `Failed to save AG-UI fixture to disk: ${msg} (fixture retained in memory)`, - ); + try { + fs.mkdirSync(fixturePath, { recursive: true }); + fs.writeFileSync( + filepath, + JSON.stringify({ fixtures: [serializableFixture] }, null, 2), + "utf-8", + ); + logger.warn(`AG-UI response recorded → ${filepath}`); + } catch (err) { + const msg = err instanceof Error ? err.message : "Unknown filesystem error"; + logger.error( + `Failed to save AG-UI fixture to disk: ${msg} (fixture retained in memory)`, + ); + } } } else { logger.info("Proxied AG-UI request (proxy-only mode)"); @@ -298,8 +337,9 @@ function parseSSEEvents(text: string, logger?: Logger): AGUIEvent[] { events.push(parsed); } catch (err) { const msg = err instanceof Error ? err.message : String(err); - if (logger) logger.warn(`Skipping unparseable SSE data line: ${payload.slice(0, 200)}`); - else console.warn(`Skipping unparseable SSE data line: ${msg}`); + const warning = `Skipping unparseable SSE data line (${msg}): ${payload.slice(0, 200)}`; + if (logger) logger.warn(warning); + else console.warn(warning); } } } diff --git a/src/agui-stub.ts b/src/agui-stub.ts index 158aec6..831b9f2 100644 --- a/src/agui-stub.ts +++ b/src/agui-stub.ts @@ -54,6 +54,7 @@ export type { } from "./agui-types.js"; export { extractLastUserMessage, + getLastMessageIfToolResult, matchesFixture, findFixture, buildTextResponse, diff --git a/src/agui-types.ts b/src/agui-types.ts index 25e4e58..4fb2253 100644 --- a/src/agui-types.ts +++ b/src/agui-types.ts @@ -389,6 +389,7 @@ export interface AGUIToolDefinition { export interface AGUIFixtureMatch { message?: string | RegExp; + toolCallId?: string; toolName?: string; stateKey?: string; predicate?: (input: AGUIRunAgentInput) => boolean; diff --git a/src/config-loader.ts b/src/config-loader.ts index 2127e7f..2c26f71 100644 --- a/src/config-loader.ts +++ b/src/config-loader.ts @@ -59,7 +59,7 @@ export interface A2AConfig { } export interface AGUIConfigFixture { - match: { message?: string; toolName?: string; stateKey?: string }; + match: { message?: string; toolCallId?: string; toolName?: string; stateKey?: string }; text?: string; // shorthand: uses buildTextResponse events?: AGUIEvent[]; // raw events delayMs?: number; @@ -220,12 +220,18 @@ export async function startFromConfig( if (aguiConfig.fixtures) { for (const f of aguiConfig.fixtures) { + if (f.match.toolCallId && f.text && !f.events) { + logger.warn( + `AG-UI fixture uses text shorthand with toolCallId — text shorthand ignores toolCallId matching; use events[] instead (match: ${JSON.stringify(f.match)})`, + ); + } if (f.text) { agui.onMessage(f.match.message ?? /.*/, f.text, { delayMs: f.delayMs }); } else if (f.events) { agui.addFixture({ match: { message: f.match.message, + toolCallId: f.match.toolCallId, toolName: f.match.toolName, stateKey: f.match.stateKey, }, diff --git a/src/index.ts b/src/index.ts index 8e887d3..bfdf8cf 100644 --- a/src/index.ts +++ b/src/index.ts @@ -284,6 +284,7 @@ export { buildReasoningChunk as buildAGUIReasoningChunk, buildReasoningEncryptedValue as buildAGUIReasoningEncryptedValue, extractLastUserMessage as extractAGUILastUserMessage, + getLastMessageIfToolResult as getAGUILastMessageIfToolResult, findFixture as findAGUIFixture, writeAGUIEventStream, } from "./agui-handler.js";