diff --git a/packages/agent/package.json b/packages/agent/package.json index 1680eb629..b6f4326c6 100644 --- a/packages/agent/package.json +++ b/packages/agent/package.json @@ -129,8 +129,8 @@ }, "dependencies": { "@agentclientprotocol/sdk": "0.25.0", - "@anthropic-ai/claude-agent-sdk": "0.3.165", - "@anthropic-ai/sdk": "0.100.1", + "@anthropic-ai/claude-agent-sdk": "0.3.170", + "@anthropic-ai/sdk": "0.104.1", "@hono/node-server": "^1.19.9", "@opentelemetry/api-logs": "^0.208.0", "@opentelemetry/exporter-logs-otlp-http": "^0.208.0", diff --git a/packages/agent/src/adapters/claude/UPSTREAM.md b/packages/agent/src/adapters/claude/UPSTREAM.md index 2c694f9b3..3c3584c51 100644 --- a/packages/agent/src/adapters/claude/UPSTREAM.md +++ b/packages/agent/src/adapters/claude/UPSTREAM.md @@ -5,8 +5,8 @@ Fork of `@anthropic-ai/claude-agent-acp`. Upstream repo: https://github.com/anth ## Fork Point - **Forked**: v0.10.9, commit `5411e0f4`, Dec 2 2025 -- **Last sync**: v0.42.0, commit `0dbccf5`, Jun 5 2026 -- **SDK**: `@anthropic-ai/claude-agent-sdk` 0.3.165, `@agentclientprotocol/sdk` 0.25.0, `@anthropic-ai/sdk` 0.100.1 +- **Last sync**: v0.44.0, commit `7de5e4b`, Jun 11 2026 +- **SDK**: `@anthropic-ai/claude-agent-sdk` 0.3.170, `@agentclientprotocol/sdk` 0.25.0, `@anthropic-ai/sdk` 0.104.1 ## File Mapping @@ -54,6 +54,46 @@ Fork of `@anthropic-ai/claude-agent-acp`. Upstream repo: https://github.com/anth | Session fingerprinting | Implicit teardown on cwd/mcp change | Explicit `refreshSession()` | Caller-initiated is more predictable | | Shutdown on ACP close | Process exits | No standalone process | Agent is embedded in server | | Unsupported slash commands | Loops silently on early idle | Emits "Unsupported slash command" chunk, gated on `initializationResult().commands` so plugin/skill commands (e.g. `/skills-store`) whose echoes use a fresh uuid are not false-flagged | The SDK consumes some slash commands without producing output (e.g. `/plugin` in non-interactive mode); without this we hang. The known-commands gate avoids racing plugin/skill loads where idle can arrive before the transformed user-message echo. | +| Prompt-loop cancel race | `Promise.race([query.next(), cancelWake])` each iteration (#742) | `withAbort(query.next(), cancelController.signal)` helper in `utils/common.ts`, also guarding the `compact_boundary` `getContextUsage` fetch | The classic `Promise.race` leak (nodejs/node#17469): each race call parks a reaction on the turn-lived `cancelWake` promise that retains that iteration's settled value, so every yielded message (and every stream event, since `includePartialMessages` is on) stays reachable until the turn ends. Long high-reasoning turns could pin tens of MB. `withAbort` removes its abort listener as soon as `next()` settles, so nothing accumulates. Cancel semantics are unchanged, including the force-cancel backstop. | + +## Changes Ported in v0.44.0 Sync + +- **SDK bumps**: claude-agent-sdk 0.3.165 -> 0.3.170 (the 0.3.169 bump #754 was version-only), + anthropic SDK 0.100.1 -> 0.104.1 (upstream now carries it as a dev dependency; 0.104.1 matches + upstream HEAD), ACP SDK unchanged at 0.25.0. +- **Forward unstreamed assistant text blocks** (#757, 7ff6b7f): The consolidated assistant + message's `text`/`thinking` blocks were always dropped as duplicates of the streamed chunks, + which loses the whole answer behind gateways that return a turn as a single non-streamed block + (common with OpenAI-compatible proxies). Added a per-turn `StreamedAssistantBlocks` tracker on + `MessageHandlerContext` (populated in `handleStreamEvent` from `message_start` ids + + `content_block_delta` types, top-level streams only); `filterAssistantContent` (replaces + `filterMessageContent`) now drops a block only if its exact (message id, block type) pair + streamed live or the block is empty. Subagent assistant text stays always-dropped, and the + replay path (no tracker) keeps the legacy drop-all filter — upstream's replay never filtered, so + this divergence is contained to `replaySessionHistory`. Covered by new unit tests in + `conversion/sdk-to-acp.test.ts`. +- **`fallback` content block no-op** (#761, d8af943): New @anthropic-ai/sdk block type added to + the `processContentChunk` no-op group so it doesn't trip the `unreachable` default (same + treatment as `advisor_tool_result` / `mid_conv_system` in the v0.38 sync). +- **Test mock**: added `usage_EXPERIMENTAL_MAY_CHANGE_DO_NOT_RELY_ON_THIS_API_YET` to the SDK + `MockQuery` (new method on the SDK `Query` interface in 0.3.170). + +## Skipped in v0.44.0 Sync + +- **Experimental elicitation support** (#756, 12bd276): Upstream re-enables AskUserQuestion by + rendering it through ACP's unstable elicitation API (`unstable_createElicitation`, gated on + `clientCapabilities.elicitation`) and forwards MCP-server elicitations the same way. Conflicts + with our AskUserQuestion divergence (own `questions/` machinery behind + `CLAUDE_CODE_ENABLE_ASK_USER_QUESTION_TOOL`, plus existing AskUserQuestion rendering in + `conversion/tool-use-to-acp.ts`), and our renderer does not advertise elicitation capabilities. + Revisit if the renderer adopts ACP elicitation; the `elicitation_complete` system subtype also + stays unhandled (we never create elicitations, and `handleSystemMessage` defaults to no-op). +- **`model_refusal_fallback` system subtype** (#761, d8af943): Upstream adds it to their + exhaustive status-TODO case group. Our `handleSystemMessage` ends in `default: break`, so the + new subtype already no-ops harmlessly (same precedent as the v0.38 `thinking_tokens` skip). +- **Release / dep-group / dev-dep bumps** (#752, #758, #759, #763): No fork-relevant code beyond + the SDK versions captured above. (#751 `validateCwd` appears in upstream's v0.43.0 changelog but + predates our v0.42.0 sync point and is already in the fork at `claude-agent.ts`.) ## Changes Ported in v0.42.0 Sync @@ -229,7 +269,7 @@ Fork of `@anthropic-ai/claude-agent-acp`. Upstream repo: https://github.com/anth ## Next Sync -1. Check upstream changelog since v0.42.0 +1. Check upstream changelog since v0.44.0 2. Diff upstream source against PostHog Code using the file mapping above 3. Port in phases: bug fixes first, then features 4. After each phase: `pnpm --filter agent typecheck && pnpm --filter agent build && pnpm lint` diff --git a/packages/agent/src/adapters/claude/claude-agent.streamed-text.test.ts b/packages/agent/src/adapters/claude/claude-agent.streamed-text.test.ts new file mode 100644 index 000000000..da450eaca --- /dev/null +++ b/packages/agent/src/adapters/claude/claude-agent.streamed-text.test.ts @@ -0,0 +1,219 @@ +import type { AgentSideConnection } from "@agentclientprotocol/sdk"; +import type { + SDKMessage, + SDKUserMessage, +} from "@anthropic-ai/claude-agent-sdk"; +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { createMockQuery, type MockQuery } from "../../test/mocks/claude-sdk"; +import { Pushable } from "../../utils/streams"; + +vi.mock("@anthropic-ai/claude-agent-sdk", () => ({ + query: vi.fn(), +})); + +vi.mock("./mcp/tool-metadata", () => ({ + fetchMcpToolMetadata: vi.fn().mockResolvedValue(undefined), + getConnectedMcpServerNames: vi.fn().mockReturnValue([]), + setMcpToolApprovalStates: vi.fn(), + isMcpToolReadOnly: vi.fn().mockReturnValue(false), + getMcpToolMetadata: vi.fn().mockReturnValue(undefined), + getMcpToolApprovalState: vi.fn().mockReturnValue(undefined), +})); + +const { ClaudeAcpAgent } = await import("./claude-agent"); +type Agent = InstanceType; + +interface ClientMocks { + sessionUpdate: ReturnType; + extNotification: ReturnType; +} + +function makeAgent(): { agent: Agent; client: ClientMocks } { + const client: ClientMocks = { + sessionUpdate: vi.fn().mockResolvedValue(undefined), + extNotification: vi.fn().mockResolvedValue(undefined), + }; + const agent = new ClaudeAcpAgent(client as unknown as AgentSideConnection); + return { agent, client }; +} + +function installFakeSession( + agent: Agent, + sessionId: string, +): { query: MockQuery; input: Pushable } { + const query = createMockQuery(); + const input = new Pushable(); + const abortController = new AbortController(); + + const session = { + query, + queryOptions: { sessionId, cwd: "/tmp/repo", abortController }, + input, + cancelled: false, + interruptReason: undefined, + settingsManager: { dispose: vi.fn(), getRepoRoot: () => "/tmp/repo" }, + permissionMode: "default" as const, + abortController, + accumulatedUsage: { + inputTokens: 0, + outputTokens: 0, + cachedReadTokens: 0, + cachedWriteTokens: 0, + }, + sessionResources: new Set(), + configOptions: [], + promptRunning: false, + pendingMessages: new Map(), + nextPendingOrder: 0, + cwd: "/tmp/repo", + notificationHistory: [] as unknown[], + taskRunId: "run-1", + lastContextWindowSize: 200_000, + modelId: "claude-sonnet-4-6", + taskState: new Map(), + }; + + (agent as unknown as { session: typeof session }).session = session; + (agent as unknown as { sessionId: string }).sessionId = sessionId; + + return { query, input }; +} + +function tick(): Promise { + return new Promise((resolve) => setImmediate(resolve)); +} + +async function send(query: MockQuery, message: unknown): Promise { + query._mockHelpers.sendMessage(message as SDKMessage); + await tick(); +} + +// Replays the prompt's own user message back through the query so +// `promptReplayed` flips and the terminal `result` message is not skipped as a +// background-task result. +async function echoUserMessage( + query: MockQuery, + input: Pushable, +): Promise { + const { value: pushed } = await input[Symbol.asyncIterator]().next(); + await send(query, pushed); +} + +function messageStart(sessionId: string, apiId: string) { + return { + type: "stream_event", + parent_tool_use_id: null, + session_id: sessionId, + uuid: `start-${apiId}`, + event: { type: "message_start", message: { id: apiId, usage: {} } }, + }; +} + +function textDelta(sessionId: string, text: string) { + return { + type: "stream_event", + parent_tool_use_id: null, + session_id: sessionId, + uuid: `delta-${text}`, + event: { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text }, + }, + }; +} + +function assistantMessage(sessionId: string, apiId: string, text: string) { + return { + type: "assistant", + parent_tool_use_id: null, + session_id: sessionId, + uuid: `assistant-${apiId}`, + message: { + id: apiId, + role: "assistant", + content: [{ type: "text", text }], + }, + }; +} + +function resultSuccess(sessionId: string) { + return { + type: "result", + subtype: "success", + session_id: sessionId, + uuid: "result-1", + result: "", + is_error: false, + usage: {}, + modelUsage: {}, + }; +} + +function messageChunkTexts( + calls: ClientMocks["sessionUpdate"]["mock"]["calls"], +): string[] { + return calls + .map( + ([call]) => + ( + call as { + update?: { sessionUpdate?: string; content?: { text?: string } }; + } + ).update, + ) + .filter((update) => update?.sessionUpdate === "agent_message_chunk") + .map((update) => update?.content?.text ?? ""); +} + +describe("ClaudeAcpAgent.prompt — streamed assistant text wiring", () => { + beforeEach(() => { + vi.clearAllMocks(); + }); + + it("emits streamed text once and drops the assembled duplicate", async () => { + const { agent, client } = makeAgent(); + const sessionId = "s-streamed"; + const { query, input } = installFakeSession(agent, sessionId); + + const promptPromise = agent.prompt({ + sessionId, + prompt: [{ type: "text", text: "hi" }], + }); + await tick(); + + await echoUserMessage(query, input); + await send(query, messageStart(sessionId, "msg_1")); + await send(query, textDelta(sessionId, "hello")); + await send(query, assistantMessage(sessionId, "msg_1", "hello")); + await send(query, resultSuccess(sessionId)); + + const result = await promptPromise; + expect(result.stopReason).toBe("end_turn"); + expect(messageChunkTexts(client.sessionUpdate.mock.calls)).toEqual([ + "hello", + ]); + }); + + it("forwards assembled text when no deltas streamed (gateway path)", async () => { + const { agent, client } = makeAgent(); + const sessionId = "s-gateway"; + const { query, input } = installFakeSession(agent, sessionId); + + const promptPromise = agent.prompt({ + sessionId, + prompt: [{ type: "text", text: "hi" }], + }); + await tick(); + + await echoUserMessage(query, input); + await send(query, assistantMessage(sessionId, "msg_2", "gateway answer")); + await send(query, resultSuccess(sessionId)); + + const result = await promptPromise; + expect(result.stopReason).toBe("end_turn"); + expect(messageChunkTexts(client.sessionUpdate.mock.calls)).toEqual([ + "gateway answer", + ]); + }); +}); diff --git a/packages/agent/src/adapters/claude/claude-agent.ts b/packages/agent/src/adapters/claude/claude-agent.ts index 30423188f..0068b74b0 100644 --- a/packages/agent/src/adapters/claude/claude-agent.ts +++ b/packages/agent/src/adapters/claude/claude-agent.ts @@ -38,7 +38,6 @@ import { type Options, type Query, query, - type SDKMessage, type SDKUserMessage, type SlashCommand, } from "@anthropic-ai/claude-agent-sdk"; @@ -60,7 +59,12 @@ import { type PostHogProductId, } from "../../posthog-products"; import type { PostHogAPIConfig } from "../../types"; -import { isCloudRun, unreachable, withTimeout } from "../../utils/common"; +import { + isCloudRun, + unreachable, + withAbort, + withTimeout, +} from "../../utils/common"; import { resolveGithubToken } from "../../utils/github-token"; import { Logger } from "../../utils/logger"; import { Pushable } from "../../utils/streams"; @@ -462,11 +466,6 @@ export class ClaudeAcpAgent extends BaseAcpAgent { this.session.promptRunning = true; const cancelController = new AbortController(); this.session.cancelController = cancelController; - const cancelWake = new Promise((resolve) => { - cancelController.signal.addEventListener("abort", () => resolve(), { - once: true, - }); - }); let handedOff = false; let errored = false; let lastAssistantTotalUsage: number | null = null; @@ -512,13 +511,17 @@ export class ClaudeAcpAgent extends BaseAcpAgent { enrichedReadCache: this.enrichedReadCache, logger: this.logger, supportsTerminalOutput, + streamedAssistantBlocks: { + textIds: new Set(), + thinkingIds: new Set(), + }, }; try { while (true) { const nextMessage = this.session.query.next(); - const next = await Promise.race([nextMessage, cancelWake]); - if (cancelController.signal.aborted) { + const next = await withAbort(nextMessage, cancelController.signal); + if (next.result === "aborted" || cancelController.signal.aborted) { void nextMessage.catch((err) => this.logger.warn("in-flight query.next() rejected after cancel", { sessionId: params.sessionId, @@ -532,10 +535,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent { : undefined, }; } - const { value: message, done } = next as IteratorResult< - SDKMessage, - void - >; + const { value: message, done } = next.value; if (done || !message) { if (this.session.cancelled) { @@ -562,11 +562,12 @@ export class ClaudeAcpAgent extends BaseAcpAgent { switch (message.type) { case "system": if (message.subtype === "compact_boundary") { - const usedTokens = await Promise.race([ + const usedTokens = await withAbort( fetchContextUsedTokens(this.session.query, this.logger), - cancelWake.then(() => null), - ]); - lastAssistantTotalUsage = usedTokens ?? 0; + cancelController.signal, + ); + lastAssistantTotalUsage = + usedTokens.result === "success" ? (usedTokens.value ?? 0) : 0; promptReplayed = true; await this.client.sessionUpdate({ sessionId: params.sessionId, diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.test.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.test.ts index 76b4d06b2..7e9d15709 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.test.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.test.ts @@ -1,5 +1,20 @@ +import type { + AgentSideConnection, + SessionNotification, +} from "@agentclientprotocol/sdk"; +import type { + SDKAssistantMessage, + SDKPartialAssistantMessage, +} from "@anthropic-ai/claude-agent-sdk"; import { describe, expect, it } from "vitest"; -import { stripMarkerTags } from "./sdk-to-acp"; +import { Logger } from "../../../utils/logger"; +import type { Session } from "../types"; +import { + handleStreamEvent, + handleUserAssistantMessage, + type MessageHandlerContext, + stripMarkerTags, +} from "./sdk-to-acp"; describe("stripMarkerTags", () => { it("strips a single marker and keeps surrounding prose", () => { @@ -45,3 +60,203 @@ describe("stripMarkerTags", () => { expect(stripMarkerTags(input)).toBe(input); }); }); + +function createHandlerContext() { + const updates: SessionNotification[] = []; + const client = { + sessionUpdate: async (notification: SessionNotification) => { + updates.push(notification); + }, + } as unknown as AgentSideConnection; + const context: MessageHandlerContext = { + session: { + cwd: "/test", + taskState: new Map(), + notificationHistory: [], + } as unknown as Session, + sessionId: "test-session", + client, + toolUseCache: {}, + toolUseStreamCache: new Map(), + fileContentCache: {}, + logger: new Logger({ debug: false }), + streamedAssistantBlocks: { + textIds: new Set(), + thinkingIds: new Set(), + }, + }; + return { context, updates }; +} + +function streamEvent( + event: Record, + parentToolUseId: string | null = null, +): SDKPartialAssistantMessage { + return { + type: "stream_event", + parent_tool_use_id: parentToolUseId, + uuid: "00000000-0000-0000-0000-000000000001", + session_id: "test-session", + event, + } as unknown as SDKPartialAssistantMessage; +} + +function assistantMessage( + apiId: string, + content: Array>, + parentToolUseId: string | null = null, +): SDKAssistantMessage { + return { + type: "assistant", + parent_tool_use_id: parentToolUseId, + uuid: "00000000-0000-0000-0000-000000000002", + session_id: "test-session", + message: { + id: apiId, + role: "assistant", + content, + }, + } as unknown as SDKAssistantMessage; +} + +function chunkTexts( + updates: SessionNotification[], + type: "agent_message_chunk" | "agent_thought_chunk", +): string[] { + return updates + .filter((u) => u.update.sessionUpdate === type) + .map((u) => (u.update as { content: { text: string } }).content.text); +} + +async function streamLiveText( + context: MessageHandlerContext, + apiId: string, + text: string, +): Promise { + await handleStreamEvent( + streamEvent({ type: "message_start", message: { id: apiId } }), + context, + ); + await handleStreamEvent( + streamEvent({ + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text }, + }), + context, + ); +} + +describe("assembled assistant text fallback", () => { + it("forwards assembled text that never streamed", async () => { + const { context, updates } = createHandlerContext(); + await handleUserAssistantMessage( + assistantMessage("msg_1", [{ type: "text", text: "full answer" }]), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual(["full answer"]); + }); + + it("drops assembled text that already streamed live", async () => { + const { context, updates } = createHandlerContext(); + await streamLiveText(context, "msg_1", "streamed"); + updates.length = 0; + await handleUserAssistantMessage( + assistantMessage("msg_1", [{ type: "text", text: "streamed" }]), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual([]); + }); + + it("forwards un-streamed thinking when only text streamed", async () => { + const { context, updates } = createHandlerContext(); + await streamLiveText(context, "msg_1", "streamed"); + updates.length = 0; + await handleUserAssistantMessage( + assistantMessage("msg_1", [ + { type: "thinking", thinking: "private reasoning" }, + { type: "text", text: "streamed" }, + ]), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual([]); + expect(chunkTexts(updates, "agent_thought_chunk")).toEqual([ + "private reasoning", + ]); + }); + + it("tracks streamed ids per message so a later message still falls back", async () => { + const { context, updates } = createHandlerContext(); + await streamLiveText(context, "msg_1", "streamed"); + updates.length = 0; + await handleUserAssistantMessage( + assistantMessage("msg_2", [{ type: "text", text: "not streamed" }]), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual([ + "not streamed", + ]); + }); + + it("drops empty assembled blocks", async () => { + const { context, updates } = createHandlerContext(); + await handleUserAssistantMessage( + assistantMessage("msg_1", [ + { type: "thinking", thinking: "" }, + { type: "text", text: "" }, + ]), + context, + ); + expect(updates).toEqual([]); + }); + + it("always drops subagent assistant text", async () => { + const { context, updates } = createHandlerContext(); + await handleUserAssistantMessage( + assistantMessage( + "msg_1", + [{ type: "text", text: "subagent prose" }], + "tool_1", + ), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual([]); + }); + + it("does not record deltas from subagent streams", async () => { + const { context, updates } = createHandlerContext(); + await handleStreamEvent( + streamEvent({ type: "message_start", message: { id: "msg_1" } }), + context, + ); + await handleStreamEvent( + streamEvent( + { + type: "content_block_delta", + index: 0, + delta: { type: "text_delta", text: "subagent" }, + }, + "tool_1", + ), + context, + ); + updates.length = 0; + await handleUserAssistantMessage( + assistantMessage("msg_1", [{ type: "text", text: "top-level answer" }]), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual([ + "top-level answer", + ]); + }); + + it("keeps the legacy drop-all filter without a tracker (replay)", async () => { + const { context, updates } = createHandlerContext(); + context.streamedAssistantBlocks = undefined; + await handleUserAssistantMessage( + assistantMessage("msg_1", [{ type: "text", text: "replayed" }]), + context, + ); + expect(chunkTexts(updates, "agent_message_chunk")).toEqual([]); + }); +}); diff --git a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts index e17ab7ffe..3d5dc0e28 100644 --- a/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts +++ b/packages/agent/src/adapters/claude/conversion/sdk-to-acp.ts @@ -79,6 +79,22 @@ type ChunkHandlerContext = { taskState?: TaskState; }; +/** + * Per-turn record of which top-level assistant message ids actually streamed + * text/thinking live via `stream_event` deltas. The consolidated assistant + * message normally has its text/thinking blocks dropped as duplicates of the + * streamed chunks, but gateways that return a turn as a single non-streamed + * block (common with OpenAI-compatible proxies) never fire deltas, so the + * assembled block is the only copy the client will ever see. Tracked per + * block type so a gateway that streams text but not thinking (or vice versa) + * doesn't lose the un-streamed block. + */ +export interface StreamedAssistantBlocks { + currentStreamMessageId?: string; + textIds: Set; + thinkingIds: Set; +} + export interface MessageHandlerContext { session: Session; sessionId: string; @@ -91,6 +107,8 @@ export interface MessageHandlerContext { logger: Logger; registerHooks?: boolean; supportsTerminalOutput?: boolean; + /** Absent on replay, where the legacy drop-all text/thinking filter applies. */ + streamedAssistantBlocks?: StreamedAssistantBlocks; } function messageUpdateType(role: Role) { @@ -495,6 +513,7 @@ function processContentChunk( case "compaction_delta": case "advisor_tool_result": case "mid_conv_system": + case "fallback": return []; default: @@ -930,6 +949,26 @@ export async function handleStreamEvent( } = context; const parentToolCallId = message.parent_tool_use_id ?? undefined; + const streamed = context.streamedAssistantBlocks; + if (streamed) { + if (message.event.type === "message_start") { + streamed.currentStreamMessageId = message.event.message.id || undefined; + } + // Only top-level streams are recorded — subagent text is never streamed + // and must stay filtered, as it is internal to the tool call. + if ( + streamed.currentStreamMessageId && + message.parent_tool_use_id === null && + message.event.type === "content_block_delta" + ) { + if (message.event.delta.type === "text_delta") { + streamed.textIds.add(streamed.currentStreamMessageId); + } else if (message.event.delta.type === "thinking_delta") { + streamed.thinkingIds.add(streamed.currentStreamMessageId); + } + } + } + for (const notification of streamEventToAcpNotifications( message, sessionId, @@ -1067,15 +1106,43 @@ function logSpecialMessages( } } -function filterMessageContent( - content: AnthropicMessageContent, -): AnthropicMessageContent { - if (!Array.isArray(content)) { - return content; +/** + * Drops assistant `text`/`thinking` blocks that already reached the client as + * streamed chunks, while forwarding (as a fallback) any non-empty block that + * never streamed — see `StreamedAssistantBlocks`. Subagent assistant content + * (`parent_tool_use_id !== null`) is never streamed and stays internal to its + * tool call, so it is always dropped, as is everything when no tracker is + * available (replay). + */ +function filterAssistantContent( + message: SDKAssistantMessage, + streamed: StreamedAssistantBlocks | undefined, +): SDKAssistantMessage["message"]["content"] { + const content = message.message.content; + const isTopLevel = + "parent_tool_use_id" in message && message.parent_tool_use_id === null; + if (!streamed || !isTopLevel) { + return content.filter( + (block) => block.type !== "text" && block.type !== "thinking", + ); } - return content.filter( - (block) => block.type !== "text" && block.type !== "thinking", - ); + + const id = message.message.id || undefined; + return content.filter((block) => { + if (block.type !== "text" && block.type !== "thinking") { + return true; + } + const streamedLive = + id !== undefined && + (block.type === "text" ? streamed.textIds : streamed.thinkingIds).has(id); + if (streamedLive) { + return false; + } + // Some gateways emit an empty `thinking` block before the real text — + // don't forward stray empty chunks. + const blockText = block.type === "text" ? block.text : block.thinking; + return blockText.length > 0; + }); } export async function handleUserAssistantMessage( @@ -1137,7 +1204,9 @@ export async function handleUserAssistantMessage( const content = message.message.content; const contentToProcess = - message.type === "assistant" ? filterMessageContent(content) : content; + message.type === "assistant" + ? filterAssistantContent(message, context.streamedAssistantBlocks) + : content; const parentToolCallId = "parent_tool_use_id" in message ? (message.parent_tool_use_id ?? undefined) diff --git a/packages/agent/src/test/mocks/claude-sdk.ts b/packages/agent/src/test/mocks/claude-sdk.ts index 8f6228aeb..4bb084931 100644 --- a/packages/agent/src/test/mocks/claude-sdk.ts +++ b/packages/agent/src/test/mocks/claude-sdk.ts @@ -103,6 +103,9 @@ export function createMockQuery( stopTask: vi.fn().mockResolvedValue(undefined), applyFlagSettings: vi.fn().mockResolvedValue(undefined), getContextUsage: vi.fn().mockResolvedValue({}), + usage_EXPERIMENTAL_MAY_CHANGE_DO_NOT_RELY_ON_THIS_API_YET: vi + .fn() + .mockResolvedValue({}), reloadPlugins: vi.fn().mockResolvedValue(undefined), reloadSkills: vi.fn().mockResolvedValue(undefined), seedReadState: vi.fn().mockResolvedValue(undefined), diff --git a/packages/agent/src/utils/common.test.ts b/packages/agent/src/utils/common.test.ts new file mode 100644 index 000000000..484e9af2f --- /dev/null +++ b/packages/agent/src/utils/common.test.ts @@ -0,0 +1,100 @@ +import { describe, expect, it, vi } from "vitest"; +import { withAbort } from "./common"; + +describe("withAbort", () => { + it("resolves success when the operation settles first", async () => { + const controller = new AbortController(); + + const result = await withAbort(Promise.resolve(42), controller.signal); + + expect(result).toEqual({ result: "success", value: 42 }); + }); + + it("resolves aborted when the signal fires while the operation is pending", async () => { + const controller = new AbortController(); + let resolveOperation!: (value: string) => void; + const operation = new Promise((resolve) => { + resolveOperation = resolve; + }); + + const raced = withAbort(operation, controller.signal); + controller.abort(); + + await expect(raced).resolves.toEqual({ result: "aborted" }); + resolveOperation("late settle is ignored"); + }); + + it("resolves aborted immediately when the signal is already aborted", async () => { + const controller = new AbortController(); + controller.abort(); + + const result = await withAbort( + new Promise(() => {}), + controller.signal, + ); + + expect(result).toEqual({ result: "aborted" }); + }); + + it("rejects when the operation rejects before abort", async () => { + const controller = new AbortController(); + + await expect( + withAbort(Promise.reject(new Error("boom")), controller.signal), + ).rejects.toThrow("boom"); + }); + + it("removes its abort listener as soon as the operation settles", async () => { + const controller = new AbortController(); + const addSpy = vi.spyOn(controller.signal, "addEventListener"); + const removeSpy = vi.spyOn(controller.signal, "removeEventListener"); + let resolveOperation!: (value: string) => void; + const operation = new Promise((resolve) => { + resolveOperation = resolve; + }); + + const raced = withAbort(operation, controller.signal); + expect(addSpy).toHaveBeenCalledTimes(1); + + resolveOperation("done"); + await raced; + + expect(removeSpy).toHaveBeenCalledWith("abort", addSpy.mock.calls[0]?.[1]); + }); + + it.each([ + { label: "signal already aborted when called", abortFirst: true }, + { label: "signal aborts while pending", abortFirst: false }, + ])( + "observes a late rejection after abort without leaving it unhandled ($label)", + async ({ abortFirst }) => { + const controller = new AbortController(); + if (abortFirst) { + controller.abort(); + } + let rejectOperation!: (error: Error) => void; + const operation = new Promise((_, reject) => { + rejectOperation = reject; + }); + const unhandled: unknown[] = []; + const onUnhandled = (reason: unknown) => unhandled.push(reason); + process.on("unhandledRejection", onUnhandled); + + try { + const raced = withAbort(operation, controller.signal); + if (!abortFirst) { + controller.abort(); + } + await expect(raced).resolves.toEqual({ result: "aborted" }); + + rejectOperation(new Error("late failure")); + await new Promise((resolve) => setImmediate(resolve)); + await new Promise((resolve) => setImmediate(resolve)); + + expect(unhandled).toEqual([]); + } finally { + process.off("unhandledRejection", onUnhandled); + } + }, + ); +}); diff --git a/packages/agent/src/utils/common.ts b/packages/agent/src/utils/common.ts index 0ca649e7f..867f6b2f6 100644 --- a/packages/agent/src/utils/common.ts +++ b/packages/agent/src/utils/common.ts @@ -19,6 +19,43 @@ export async function withTimeout( return Promise.race([operationPromise, timeoutPromise]); } +/** + * Races an operation against an AbortSignal. + * Returns success with the value if the operation settles before the signal + * aborts, or aborted otherwise. The operation itself is not cancelled: a + * settle after abort is ignored, and a rejection is always observed so it + * never surfaces as an unhandled rejection. + * + * Use this instead of `Promise.race([operation, abortPromise])` when racing + * in a loop: each race call parks a reaction on the long-lived abort promise + * that retains that iteration's settled value until the abort promise itself + * settles. The abort listener here is removed as soon as the operation + * settles, so per-call state is reclaimed immediately. + */ +export function withAbort( + operation: Promise, + signal: AbortSignal, +): Promise<{ result: "success"; value: T } | { result: "aborted" }> { + return new Promise((resolve, reject) => { + const onAbort = () => resolve({ result: "aborted" }); + if (signal.aborted) { + onAbort(); + } else { + signal.addEventListener("abort", onAbort, { once: true }); + } + operation.then( + (value) => { + signal.removeEventListener("abort", onAbort); + resolve({ result: "success", value }); + }, + (error) => { + signal.removeEventListener("abort", onAbort); + reject(error); + }, + ); + }); +} + export const IS_ROOT = typeof process !== "undefined" && (process.geteuid?.() ?? process.getuid?.()) === 0; diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index a0540d413..d81c0b41a 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -710,11 +710,11 @@ importers: specifier: 0.25.0 version: 0.25.0(zod@4.3.6) '@anthropic-ai/claude-agent-sdk': - specifier: 0.3.165 - version: 0.3.165(@anthropic-ai/sdk@0.100.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6) + specifier: 0.3.170 + version: 0.3.170(@anthropic-ai/sdk@0.104.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6) '@anthropic-ai/sdk': - specifier: 0.100.1 - version: 0.100.1(zod@4.3.6) + specifier: 0.104.1 + version: 0.104.1(zod@4.3.6) '@hono/node-server': specifier: ^1.19.9 version: 1.19.9(hono@4.11.7) @@ -1439,7 +1439,7 @@ importers: version: 0.22.1(zod@4.3.6) '@anthropic-ai/claude-agent-sdk': specifier: 0.3.156 - version: 0.3.156(@anthropic-ai/sdk@0.100.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6) + version: 0.3.156(@anthropic-ai/sdk@0.104.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6) '@hono/node-server': specifier: 'catalog:' version: 1.19.9(hono@4.11.7) @@ -1575,8 +1575,8 @@ packages: cpu: [arm64] os: [darwin] - '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.165': - resolution: {integrity: sha512-obVodJmppNc6lgcM6Y5y3VCQLrYO2curOXrRaziKtjxYbuZP7kYsUhnonMvGoVAQh3uHKz2tivQDeztvWe3f9w==} + '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.170': + resolution: {integrity: sha512-rwfgArIa5WI0QPNqFsRBgvtSI0mrtpynUm0oK6+l6/KX4hcgnYGEzciZR1bOeD9/7sSZlTdIgt+T9alKeZmXcg==} cpu: [arm64] os: [darwin] @@ -1585,8 +1585,8 @@ packages: cpu: [x64] os: [darwin] - '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.165': - resolution: {integrity: sha512-0jc1tlYLXzPvZIkHKGHzsEEKq2YqTS8oHSNFroqLgbhrIk1Zy05ZXbciI289VDAe1Fq2a+qcUhkXct8Parx1Rg==} + '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.170': + resolution: {integrity: sha512-0e58h8UQMtsQxLGIv9r4foxfBFWKZ7NeDtoplLhuD7EwQonehomw1sBXCch77t/IfUS+q5vQ5zv+fOGmap5nLQ==} cpu: [x64] os: [darwin] @@ -1596,8 +1596,8 @@ packages: os: [linux] libc: [musl] - '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.165': - resolution: {integrity: sha512-Rccmr5chZdZJVRvoB0nildB5PTKX+amatUho9JIcNOf1iX/6ej39fwf8q9W1MRHYP7AEc4t9GrSAGLcn7/JO4w==} + '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.170': + resolution: {integrity: sha512-SRYfQcsXlOq+CD/FqkQBTSHbaD++w73GnnO+NUV9adLYrca3kfetRwWT1iguY1cNS0l34dCR3rlzCPq78vg1Jg==} cpu: [arm64] os: [linux] libc: [musl] @@ -1608,8 +1608,8 @@ packages: os: [linux] libc: [glibc] - '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.165': - resolution: {integrity: sha512-t87HgDPPaRYMTTB5cqA0M36Fyq4DOny89yk71BMgA8hAzhOjV9bla8pMVZTuX3xYYPjsa/TOmxSzwI8GZLf4Aw==} + '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.170': + resolution: {integrity: sha512-gLbaFqcGppFJQd4DLNV4IXoeahejT/p2/M8bSSvRDbla9GOsBr1AxV5XLRyBn1e7xFGozZIAIQr3+1chp7NJgQ==} cpu: [arm64] os: [linux] libc: [glibc] @@ -1620,8 +1620,8 @@ packages: os: [linux] libc: [musl] - '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.165': - resolution: {integrity: sha512-Y9Acr1RmydfEX+t+3mFn0K9VOx6nfyo08QuQH9R6ap1YYZWuobze++pNUY/rzwbQjXqcbjORtPKbO/kLQtSr9w==} + '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.170': + resolution: {integrity: sha512-m4+I0qBEk7cxRKS+pL+eoWXbXTFOAo83fQ0tQvap4z/mDMm06IWJtEPoYTaMBwsp32GJWLkHWKbZSBCHZnp2DQ==} cpu: [x64] os: [linux] libc: [musl] @@ -1632,8 +1632,8 @@ packages: os: [linux] libc: [glibc] - '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.165': - resolution: {integrity: sha512-Y8fEW0zKBn0XZI5AOQWHep0Srz0qsCauynTWkhsC6J2vSPxkTiOxv2hmb7qdfiNlFn0k1etCWVFoRkhhFJzGfg==} + '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.170': + resolution: {integrity: sha512-Xl/m7TaSC3T5IDBdHrZQ9fCQYyDmPELN34CL+MoyPIf7uSmuZnjE9fUOqDh2Rv26JxWssi1M6X+BBvVuKd6Cpg==} cpu: [x64] os: [linux] libc: [glibc] @@ -1643,8 +1643,8 @@ packages: cpu: [arm64] os: [win32] - '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.165': - resolution: {integrity: sha512-4Q01L3xaDDCvlOhABf2MnO7v7yJxKwwDyiMr+DaneUSvuh1qH0YE7qErSYLf6D9VfH8TdRwKZXwQplVVwCoHWw==} + '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.170': + resolution: {integrity: sha512-IG+8isJNNJKbnnhO7m+PGhfVCg+XoQ/MDxGde5eigFI0WsEfitjuWSWwx82bT9ghxI1aa6qNvI+UPgPcZuo5Fg==} cpu: [arm64] os: [win32] @@ -1653,8 +1653,8 @@ packages: cpu: [x64] os: [win32] - '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.165': - resolution: {integrity: sha512-Y0uOx7b7ZnkguvFFI5T5fSLnRA/e0uvMC++gSnyz6XMpNekgWc3+Mny7Dv2NO22nKbV2YiFsj6MkYYFEd51BDw==} + '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.170': + resolution: {integrity: sha512-7cuqSKbHVItPGVwRbd3A0BEJwcNtc7Fhoh6qHN4C6yrmjSrvdYYx3MLvq/VI768/RoG7mAMDxb+j7WfEfoP9BA==} cpu: [x64] os: [win32] @@ -1666,16 +1666,16 @@ packages: '@modelcontextprotocol/sdk': ^1.29.0 zod: 4.3.6 - '@anthropic-ai/claude-agent-sdk@0.3.165': - resolution: {integrity: sha512-wEUJNTAWkE6KMV35abqGi30lwhZz+jQLMtLh4SuTN2Hllzsysq8kmQFgcWulza3FLHG/GHzGHPi0+Sp2fb8xlw==} + '@anthropic-ai/claude-agent-sdk@0.3.170': + resolution: {integrity: sha512-pAvhfk+iTodXZ6RF18Kz7BEUWFjL7EcR3tKuhUNdPpE1NAYCR3mSHGbafi72JsrNwKEDIs7FU31z3fqhwy8QzA==} engines: {node: '>=18.0.0'} peerDependencies: '@anthropic-ai/sdk': '>=0.93.0' '@modelcontextprotocol/sdk': ^1.29.0 zod: 4.3.6 - '@anthropic-ai/sdk@0.100.1': - resolution: {integrity: sha512-RANcEe7LpiLczkKGOwoXOTuFdPhuubS0i4xaAKOMpcqc55YO0mukgxppV7eygx3DXNjxWT6RYOLPyOy0aIAmwg==} + '@anthropic-ai/sdk@0.104.1': + resolution: {integrity: sha512-gGACa/+IaiXzRRmF96aOhamoBgapKRBiFWbmmTFP8aMkpaEcuStF+Q61bjo4vPxBM7gqWJNZqsngslRdnLHv0Q==} hasBin: true peerDependencies: zod: 4.3.6 @@ -13491,54 +13491,54 @@ snapshots: '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.165': + '@anthropic-ai/claude-agent-sdk-darwin-arm64@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.165': + '@anthropic-ai/claude-agent-sdk-darwin-x64@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.165': + '@anthropic-ai/claude-agent-sdk-linux-arm64-musl@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.165': + '@anthropic-ai/claude-agent-sdk-linux-arm64@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.165': + '@anthropic-ai/claude-agent-sdk-linux-x64-musl@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.165': + '@anthropic-ai/claude-agent-sdk-linux-x64@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.165': + '@anthropic-ai/claude-agent-sdk-win32-arm64@0.3.170': optional: true '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.156': optional: true - '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.165': + '@anthropic-ai/claude-agent-sdk-win32-x64@0.3.170': optional: true - '@anthropic-ai/claude-agent-sdk@0.3.156(@anthropic-ai/sdk@0.100.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6)': + '@anthropic-ai/claude-agent-sdk@0.3.156(@anthropic-ai/sdk@0.104.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6)': dependencies: - '@anthropic-ai/sdk': 0.100.1(zod@4.3.6) + '@anthropic-ai/sdk': 0.104.1(zod@4.3.6) '@modelcontextprotocol/sdk': 1.29.0(zod@4.3.6) zod: 4.3.6 optionalDependencies: @@ -13551,22 +13551,22 @@ snapshots: '@anthropic-ai/claude-agent-sdk-win32-arm64': 0.3.156 '@anthropic-ai/claude-agent-sdk-win32-x64': 0.3.156 - '@anthropic-ai/claude-agent-sdk@0.3.165(@anthropic-ai/sdk@0.100.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6)': + '@anthropic-ai/claude-agent-sdk@0.3.170(@anthropic-ai/sdk@0.104.1(zod@4.3.6))(@modelcontextprotocol/sdk@1.29.0(zod@4.3.6))(zod@4.3.6)': dependencies: - '@anthropic-ai/sdk': 0.100.1(zod@4.3.6) + '@anthropic-ai/sdk': 0.104.1(zod@4.3.6) '@modelcontextprotocol/sdk': 1.29.0(zod@4.3.6) zod: 4.3.6 optionalDependencies: - '@anthropic-ai/claude-agent-sdk-darwin-arm64': 0.3.165 - '@anthropic-ai/claude-agent-sdk-darwin-x64': 0.3.165 - '@anthropic-ai/claude-agent-sdk-linux-arm64': 0.3.165 - '@anthropic-ai/claude-agent-sdk-linux-arm64-musl': 0.3.165 - '@anthropic-ai/claude-agent-sdk-linux-x64': 0.3.165 - '@anthropic-ai/claude-agent-sdk-linux-x64-musl': 0.3.165 - '@anthropic-ai/claude-agent-sdk-win32-arm64': 0.3.165 - '@anthropic-ai/claude-agent-sdk-win32-x64': 0.3.165 - - '@anthropic-ai/sdk@0.100.1(zod@4.3.6)': + '@anthropic-ai/claude-agent-sdk-darwin-arm64': 0.3.170 + '@anthropic-ai/claude-agent-sdk-darwin-x64': 0.3.170 + '@anthropic-ai/claude-agent-sdk-linux-arm64': 0.3.170 + '@anthropic-ai/claude-agent-sdk-linux-arm64-musl': 0.3.170 + '@anthropic-ai/claude-agent-sdk-linux-x64': 0.3.170 + '@anthropic-ai/claude-agent-sdk-linux-x64-musl': 0.3.170 + '@anthropic-ai/claude-agent-sdk-win32-arm64': 0.3.170 + '@anthropic-ai/claude-agent-sdk-win32-x64': 0.3.170 + + '@anthropic-ai/sdk@0.104.1(zod@4.3.6)': dependencies: json-schema-to-ts: 3.1.1 standardwebhooks: 1.0.0