Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,8 +129,8 @@
},
"dependencies": {
"@agentclientprotocol/sdk": "0.25.0",
"@anthropic-ai/claude-agent-sdk": "0.3.165",
"@anthropic-ai/sdk": "0.100.1",
"@anthropic-ai/claude-agent-sdk": "0.3.170",
"@anthropic-ai/sdk": "0.104.1",
"@hono/node-server": "^1.19.9",
"@opentelemetry/api-logs": "^0.208.0",
"@opentelemetry/exporter-logs-otlp-http": "^0.208.0",
Expand Down
46 changes: 43 additions & 3 deletions packages/agent/src/adapters/claude/UPSTREAM.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ Fork of `@anthropic-ai/claude-agent-acp`. Upstream repo: https://github.com/anth
## Fork Point

- **Forked**: v0.10.9, commit `5411e0f4`, Dec 2 2025
- **Last sync**: v0.42.0, commit `0dbccf5`, Jun 5 2026
- **SDK**: `@anthropic-ai/claude-agent-sdk` 0.3.165, `@agentclientprotocol/sdk` 0.25.0, `@anthropic-ai/sdk` 0.100.1
- **Last sync**: v0.44.0, commit `7de5e4b`, Jun 11 2026
- **SDK**: `@anthropic-ai/claude-agent-sdk` 0.3.170, `@agentclientprotocol/sdk` 0.25.0, `@anthropic-ai/sdk` 0.104.1

## File Mapping

Expand Down Expand Up @@ -54,6 +54,46 @@ Fork of `@anthropic-ai/claude-agent-acp`. Upstream repo: https://github.com/anth
| Session fingerprinting | Implicit teardown on cwd/mcp change | Explicit `refreshSession()` | Caller-initiated is more predictable |
| Shutdown on ACP close | Process exits | No standalone process | Agent is embedded in server |
| Unsupported slash commands | Loops silently on early idle | Emits "Unsupported slash command" chunk, gated on `initializationResult().commands` so plugin/skill commands (e.g. `/skills-store`) whose echoes use a fresh uuid are not false-flagged | The SDK consumes some slash commands without producing output (e.g. `/plugin` in non-interactive mode); without this we hang. The known-commands gate avoids racing plugin/skill loads where idle can arrive before the transformed user-message echo. |
| Prompt-loop cancel race | `Promise.race([query.next(), cancelWake])` each iteration (#742) | `withAbort(query.next(), cancelController.signal)` helper in `utils/common.ts`, also guarding the `compact_boundary` `getContextUsage` fetch | The classic `Promise.race` leak (nodejs/node#17469): each race call parks a reaction on the turn-lived `cancelWake` promise that retains that iteration's settled value, so every yielded message (and every stream event, since `includePartialMessages` is on) stays reachable until the turn ends. Long high-reasoning turns could pin tens of MB. `withAbort` removes its abort listener as soon as `next()` settles, so nothing accumulates. Cancel semantics are unchanged, including the force-cancel backstop. |

## Changes Ported in v0.44.0 Sync

- **SDK bumps**: claude-agent-sdk 0.3.165 -> 0.3.170 (the 0.3.169 bump #754 was version-only),
anthropic SDK 0.100.1 -> 0.104.1 (upstream now carries it as a dev dependency; 0.104.1 matches
upstream HEAD), ACP SDK unchanged at 0.25.0.
- **Forward unstreamed assistant text blocks** (#757, 7ff6b7f): The consolidated assistant
message's `text`/`thinking` blocks were always dropped as duplicates of the streamed chunks,
which loses the whole answer behind gateways that return a turn as a single non-streamed block
(common with OpenAI-compatible proxies). Added a per-turn `StreamedAssistantBlocks` tracker on
`MessageHandlerContext` (populated in `handleStreamEvent` from `message_start` ids +
`content_block_delta` types, top-level streams only); `filterAssistantContent` (replaces
`filterMessageContent`) now drops a block only if its exact (message id, block type) pair
streamed live or the block is empty. Subagent assistant text stays always-dropped, and the
replay path (no tracker) keeps the legacy drop-all filter — upstream's replay never filtered, so
this divergence is contained to `replaySessionHistory`. Covered by new unit tests in
`conversion/sdk-to-acp.test.ts`.
- **`fallback` content block no-op** (#761, d8af943): New @anthropic-ai/sdk block type added to
the `processContentChunk` no-op group so it doesn't trip the `unreachable` default (same
treatment as `advisor_tool_result` / `mid_conv_system` in the v0.38 sync).
- **Test mock**: added `usage_EXPERIMENTAL_MAY_CHANGE_DO_NOT_RELY_ON_THIS_API_YET` to the SDK
`MockQuery` (new method on the SDK `Query` interface in 0.3.170).

## Skipped in v0.44.0 Sync

- **Experimental elicitation support** (#756, 12bd276): Upstream re-enables AskUserQuestion by
rendering it through ACP's unstable elicitation API (`unstable_createElicitation`, gated on
`clientCapabilities.elicitation`) and forwards MCP-server elicitations the same way. Conflicts
with our AskUserQuestion divergence (own `questions/` machinery behind
`CLAUDE_CODE_ENABLE_ASK_USER_QUESTION_TOOL`, plus existing AskUserQuestion rendering in
`conversion/tool-use-to-acp.ts`), and our renderer does not advertise elicitation capabilities.
Revisit if the renderer adopts ACP elicitation; the `elicitation_complete` system subtype also
stays unhandled (we never create elicitations, and `handleSystemMessage` defaults to no-op).
- **`model_refusal_fallback` system subtype** (#761, d8af943): Upstream adds it to their
exhaustive status-TODO case group. Our `handleSystemMessage` ends in `default: break`, so the
new subtype already no-ops harmlessly (same precedent as the v0.38 `thinking_tokens` skip).
- **Release / dep-group / dev-dep bumps** (#752, #758, #759, #763): No fork-relevant code beyond
the SDK versions captured above. (#751 `validateCwd` appears in upstream's v0.43.0 changelog but
predates our v0.42.0 sync point and is already in the fork at `claude-agent.ts`.)

## Changes Ported in v0.42.0 Sync

Expand Down Expand Up @@ -229,7 +269,7 @@ Fork of `@anthropic-ai/claude-agent-acp`. Upstream repo: https://github.com/anth

## Next Sync

1. Check upstream changelog since v0.42.0
1. Check upstream changelog since v0.44.0
2. Diff upstream source against PostHog Code using the file mapping above
3. Port in phases: bug fixes first, then features
4. After each phase: `pnpm --filter agent typecheck && pnpm --filter agent build && pnpm lint`
Expand Down
219 changes: 219 additions & 0 deletions packages/agent/src/adapters/claude/claude-agent.streamed-text.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,219 @@
import type { AgentSideConnection } from "@agentclientprotocol/sdk";
import type {
SDKMessage,
SDKUserMessage,
} from "@anthropic-ai/claude-agent-sdk";
import { beforeEach, describe, expect, it, vi } from "vitest";
import { createMockQuery, type MockQuery } from "../../test/mocks/claude-sdk";
import { Pushable } from "../../utils/streams";

vi.mock("@anthropic-ai/claude-agent-sdk", () => ({
query: vi.fn(),
}));

vi.mock("./mcp/tool-metadata", () => ({
fetchMcpToolMetadata: vi.fn().mockResolvedValue(undefined),
getConnectedMcpServerNames: vi.fn().mockReturnValue([]),
setMcpToolApprovalStates: vi.fn(),
isMcpToolReadOnly: vi.fn().mockReturnValue(false),
getMcpToolMetadata: vi.fn().mockReturnValue(undefined),
getMcpToolApprovalState: vi.fn().mockReturnValue(undefined),
}));

const { ClaudeAcpAgent } = await import("./claude-agent");
type Agent = InstanceType<typeof ClaudeAcpAgent>;

interface ClientMocks {
sessionUpdate: ReturnType<typeof vi.fn>;
extNotification: ReturnType<typeof vi.fn>;
}

function makeAgent(): { agent: Agent; client: ClientMocks } {
const client: ClientMocks = {
sessionUpdate: vi.fn().mockResolvedValue(undefined),
extNotification: vi.fn().mockResolvedValue(undefined),
};
const agent = new ClaudeAcpAgent(client as unknown as AgentSideConnection);
return { agent, client };
}

function installFakeSession(
agent: Agent,
sessionId: string,
): { query: MockQuery; input: Pushable<SDKUserMessage> } {
const query = createMockQuery();
const input = new Pushable<SDKUserMessage>();
const abortController = new AbortController();

const session = {
query,
queryOptions: { sessionId, cwd: "/tmp/repo", abortController },
input,
cancelled: false,
interruptReason: undefined,
settingsManager: { dispose: vi.fn(), getRepoRoot: () => "/tmp/repo" },
permissionMode: "default" as const,
abortController,
accumulatedUsage: {
inputTokens: 0,
outputTokens: 0,
cachedReadTokens: 0,
cachedWriteTokens: 0,
},
sessionResources: new Set(),
configOptions: [],
promptRunning: false,
pendingMessages: new Map(),
nextPendingOrder: 0,
cwd: "/tmp/repo",
notificationHistory: [] as unknown[],
taskRunId: "run-1",
lastContextWindowSize: 200_000,
modelId: "claude-sonnet-4-6",
taskState: new Map(),
};

(agent as unknown as { session: typeof session }).session = session;
(agent as unknown as { sessionId: string }).sessionId = sessionId;

return { query, input };
}

function tick(): Promise<void> {
return new Promise((resolve) => setImmediate(resolve));
}

async function send(query: MockQuery, message: unknown): Promise<void> {
query._mockHelpers.sendMessage(message as SDKMessage);
await tick();
}

// Replays the prompt's own user message back through the query so
// `promptReplayed` flips and the terminal `result` message is not skipped as a
// background-task result.
async function echoUserMessage(
query: MockQuery,
input: Pushable<SDKUserMessage>,
): Promise<void> {
const { value: pushed } = await input[Symbol.asyncIterator]().next();
await send(query, pushed);
}

function messageStart(sessionId: string, apiId: string) {
return {
type: "stream_event",
parent_tool_use_id: null,
session_id: sessionId,
uuid: `start-${apiId}`,
event: { type: "message_start", message: { id: apiId, usage: {} } },
};
}

function textDelta(sessionId: string, text: string) {
return {
type: "stream_event",
parent_tool_use_id: null,
session_id: sessionId,
uuid: `delta-${text}`,
event: {
type: "content_block_delta",
index: 0,
delta: { type: "text_delta", text },
},
};
}

function assistantMessage(sessionId: string, apiId: string, text: string) {
return {
type: "assistant",
parent_tool_use_id: null,
session_id: sessionId,
uuid: `assistant-${apiId}`,
message: {
id: apiId,
role: "assistant",
content: [{ type: "text", text }],
},
};
}

function resultSuccess(sessionId: string) {
return {
type: "result",
subtype: "success",
session_id: sessionId,
uuid: "result-1",
result: "",
is_error: false,
usage: {},
modelUsage: {},
};
}

function messageChunkTexts(
calls: ClientMocks["sessionUpdate"]["mock"]["calls"],
): string[] {
return calls
.map(
([call]) =>
(
call as {
update?: { sessionUpdate?: string; content?: { text?: string } };
}
).update,
)
.filter((update) => update?.sessionUpdate === "agent_message_chunk")
.map((update) => update?.content?.text ?? "");
}

describe("ClaudeAcpAgent.prompt — streamed assistant text wiring", () => {
beforeEach(() => {
vi.clearAllMocks();
});

it("emits streamed text once and drops the assembled duplicate", async () => {
const { agent, client } = makeAgent();
const sessionId = "s-streamed";
const { query, input } = installFakeSession(agent, sessionId);

const promptPromise = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "hi" }],
});
await tick();

await echoUserMessage(query, input);
await send(query, messageStart(sessionId, "msg_1"));
await send(query, textDelta(sessionId, "hello"));
await send(query, assistantMessage(sessionId, "msg_1", "hello"));
await send(query, resultSuccess(sessionId));

const result = await promptPromise;
expect(result.stopReason).toBe("end_turn");
expect(messageChunkTexts(client.sessionUpdate.mock.calls)).toEqual([
"hello",
]);
});

it("forwards assembled text when no deltas streamed (gateway path)", async () => {
const { agent, client } = makeAgent();
const sessionId = "s-gateway";
const { query, input } = installFakeSession(agent, sessionId);

const promptPromise = agent.prompt({
sessionId,
prompt: [{ type: "text", text: "hi" }],
});
await tick();

await echoUserMessage(query, input);
await send(query, assistantMessage(sessionId, "msg_2", "gateway answer"));
await send(query, resultSuccess(sessionId));

const result = await promptPromise;
expect(result.stopReason).toBe("end_turn");
expect(messageChunkTexts(client.sessionUpdate.mock.calls)).toEqual([
"gateway answer",
]);
});
});
35 changes: 18 additions & 17 deletions packages/agent/src/adapters/claude/claude-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import {
type Options,
type Query,
query,
type SDKMessage,
type SDKUserMessage,
type SlashCommand,
} from "@anthropic-ai/claude-agent-sdk";
Expand All @@ -60,7 +59,12 @@ import {
type PostHogProductId,
} from "../../posthog-products";
import type { PostHogAPIConfig } from "../../types";
import { isCloudRun, unreachable, withTimeout } from "../../utils/common";
import {
isCloudRun,
unreachable,
withAbort,
withTimeout,
} from "../../utils/common";
import { resolveGithubToken } from "../../utils/github-token";
import { Logger } from "../../utils/logger";
import { Pushable } from "../../utils/streams";
Expand Down Expand Up @@ -462,11 +466,6 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
this.session.promptRunning = true;
const cancelController = new AbortController();
this.session.cancelController = cancelController;
const cancelWake = new Promise<void>((resolve) => {
cancelController.signal.addEventListener("abort", () => resolve(), {
once: true,
});
});
let handedOff = false;
let errored = false;
let lastAssistantTotalUsage: number | null = null;
Expand Down Expand Up @@ -512,13 +511,17 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
enrichedReadCache: this.enrichedReadCache,
logger: this.logger,
supportsTerminalOutput,
streamedAssistantBlocks: {
textIds: new Set<string>(),
thinkingIds: new Set<string>(),
},
};

try {
while (true) {
const nextMessage = this.session.query.next();
const next = await Promise.race([nextMessage, cancelWake]);
if (cancelController.signal.aborted) {
const next = await withAbort(nextMessage, cancelController.signal);
if (next.result === "aborted" || cancelController.signal.aborted) {
void nextMessage.catch((err) =>
this.logger.warn("in-flight query.next() rejected after cancel", {
sessionId: params.sessionId,
Expand All @@ -532,10 +535,7 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
: undefined,
};
}
const { value: message, done } = next as IteratorResult<
SDKMessage,
void
>;
const { value: message, done } = next.value;

if (done || !message) {
if (this.session.cancelled) {
Expand All @@ -562,11 +562,12 @@ export class ClaudeAcpAgent extends BaseAcpAgent {
switch (message.type) {
case "system":
if (message.subtype === "compact_boundary") {
const usedTokens = await Promise.race([
const usedTokens = await withAbort(
fetchContextUsedTokens(this.session.query, this.logger),
cancelWake.then(() => null),
]);
lastAssistantTotalUsage = usedTokens ?? 0;
cancelController.signal,
);
lastAssistantTotalUsage =
usedTokens.result === "success" ? (usedTokens.value ?? 0) : 0;
promptReplayed = true;
await this.client.sessionUpdate({
sessionId: params.sessionId,
Expand Down
Loading
Loading