From 06544d6c79213e895a63041956060d2febcb704a Mon Sep 17 00:00:00 2001 From: Sheldon Date: Mon, 8 Jun 2026 15:52:49 +0800 Subject: [PATCH 1/3] fix(core): strip tool output provider metadata from direct UI stream --- .changeset/cold-socks-listen.md | 5 ++ packages/core/src/agent/agent.spec.ts | 105 ++++++++++++++++++++++++++ packages/core/src/agent/agent.ts | 45 ++++++++++- 3 files changed, 154 insertions(+), 1 deletion(-) create mode 100644 .changeset/cold-socks-listen.md diff --git a/.changeset/cold-socks-listen.md b/.changeset/cold-socks-listen.md new file mode 100644 index 000000000..a7d2b425d --- /dev/null +++ b/.changeset/cold-socks-listen.md @@ -0,0 +1,5 @@ +--- +"@voltagent/core": patch +--- + +Strip provider metadata from direct UI stream tool output chunks before they reach console clients. diff --git a/packages/core/src/agent/agent.spec.ts b/packages/core/src/agent/agent.spec.ts index e78773075..00cb90609 100644 --- a/packages/core/src/agent/agent.spec.ts +++ b/packages/core/src/agent/agent.spec.ts @@ -1160,6 +1160,111 @@ Use pandas and summarize findings.`.split("\n"), expect(parts[1]).toEqual(expect.objectContaining({ type: "text-delta", id: "text-1" })); }); + it("strips providerMetadata from direct UI stream tool output chunks", async () => { + const agent = new Agent({ + name: "TestAgent", + instructions: "You are a helpful assistant", + model: mockModel as any, + }); + + const providerMetadata = { + anthropic: { + caller: { + type: "direct", + }, + }, + }; + + const mockStream = { + text: Promise.resolve("Streamed response"), + textStream: (async function* () { + yield "Streamed response"; + })(), + fullStream: (async function* () { + yield { + type: "text-delta" as const, + id: "text-1", + delta: "Streamed response", + text: "Streamed response", + }; + })(), + usage: Promise.resolve({ + inputTokens: 10, + outputTokens: 5, + totalTokens: 15, + }), + finishReason: Promise.resolve("stop"), + warnings: [], + toUIMessageStream: vi.fn().mockReturnValue( + toAsyncIterableStream( + convertArrayToReadableStream([ + { + type: "tool-input-available", + toolCallId: "tool-1", + toolName: "lookup", + input: { query: "weather" }, + providerMetadata, + }, + { + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + providerMetadata, + }, + { + type: "tool-output-error", + toolCallId: "tool-2", + errorText: "failed", + providerMetadata, + }, + { + type: "text-delta", + id: "text-1", + delta: "Done", + providerMetadata, + }, + ]), + ), + ), + toUIMessageStreamResponse: vi.fn(), + pipeUIMessageStreamToResponse: vi.fn(), + pipeTextStreamToResponse: vi.fn(), + toTextStreamResponse: vi.fn(), + partialOutputStream: undefined, + }; + + vi.mocked(ai.streamText).mockReturnValue(mockStream as any); + + const result = await agent.streamText("Stream this"); + const uiChunks: any[] = []; + for await (const chunk of result.toUIMessageStream() as AsyncIterable) { + uiChunks.push(chunk); + } + + expect(uiChunks[0]).toEqual( + expect.objectContaining({ + type: "tool-input-available", + providerMetadata, + }), + ); + expect(uiChunks[1]).toEqual({ + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + }); + expect(uiChunks[2]).toEqual({ + type: "tool-output-error", + toolCallId: "tool-2", + errorText: "failed", + }); + expect(uiChunks[3]).toEqual( + expect.objectContaining({ + type: "text-delta", + providerMetadata, + }), + ); + }); + it("uses last-step usage for finish events when provider is anthropic", async () => { const agent = new Agent({ name: "TestAgent", diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index df3f7a64c..399ebc665 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -2575,11 +2575,54 @@ export class Agent { return result.textStream; }; + const stripProviderMetadataFromToolOutputChunk = (chunk: UIStreamChunk): UIStreamChunk => { + if (chunk === null || typeof chunk !== "object") { + return chunk; + } + + const type = (chunk as { type?: unknown }).type; + if (type !== "tool-output-available" && type !== "tool-output-error") { + return chunk; + } + + if (!("providerMetadata" in chunk)) { + return chunk; + } + + const { providerMetadata: _providerMetadata, ...sanitizedChunk } = chunk as Record< + string, + unknown + >; + return sanitizedChunk as UIStreamChunk; + }; + + const stripProviderMetadataFromDirectUIStream = ( + baseStream: ToUIMessageStreamReturn, + ): ToUIMessageStreamReturn => { + return createAsyncIterableReadable(async (controller) => { + const reader = (baseStream as ReadableStream).getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + if (value !== undefined) { + controller.enqueue(stripProviderMetadataFromToolOutputChunk(value)); + } + } + controller.close(); + } catch (error) { + controller.error(error); + } finally { + reader.releaseLock(); + } + }); + }; + const getGuardrailAwareUIStream = ( streamOptions?: ToUIMessageStreamOptions, ): ToUIMessageStreamReturn => { if (!guardrailPipeline) { - return result.toUIMessageStream(streamOptions); + return stripProviderMetadataFromDirectUIStream(result.toUIMessageStream(streamOptions)); } return guardrailPipeline.createUIStream(streamOptions) as ToUIMessageStreamReturn; }; From 0f1da974686cf2c4f2dd6cec5eb009c63ae920d7 Mon Sep 17 00:00:00 2001 From: Sheldon Date: Thu, 11 Jun 2026 14:19:04 +0800 Subject: [PATCH 2/3] fix(core): preserve UI stream cancellation --- packages/core/src/agent/agent.ts | 24 +++++++----------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index 399ebc665..479264d65 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -2599,23 +2599,13 @@ export class Agent { const stripProviderMetadataFromDirectUIStream = ( baseStream: ToUIMessageStreamReturn, ): ToUIMessageStreamReturn => { - return createAsyncIterableReadable(async (controller) => { - const reader = (baseStream as ReadableStream).getReader(); - try { - while (true) { - const { done, value } = await reader.read(); - if (done) break; - if (value !== undefined) { - controller.enqueue(stripProviderMetadataFromToolOutputChunk(value)); - } - } - controller.close(); - } catch (error) { - controller.error(error); - } finally { - reader.releaseLock(); - } - }); + return (baseStream as ReadableStream).pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(stripProviderMetadataFromToolOutputChunk(chunk)); + }, + }), + ) as ToUIMessageStreamReturn; }; const getGuardrailAwareUIStream = ( From 8e02def4059809800e38c7a5f10b04d1356566ed Mon Sep 17 00:00:00 2001 From: Sheldon Date: Thu, 11 Jun 2026 14:19:15 +0800 Subject: [PATCH 3/3] test(core): cover direct UI stream cancellation --- packages/core/src/agent/agent.spec.ts | 67 +++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) diff --git a/packages/core/src/agent/agent.spec.ts b/packages/core/src/agent/agent.spec.ts index 00cb90609..ed788cb97 100644 --- a/packages/core/src/agent/agent.spec.ts +++ b/packages/core/src/agent/agent.spec.ts @@ -1265,6 +1265,73 @@ Use pandas and summarize findings.`.split("\n"), ); }); + it("propagates cancellation when stripping providerMetadata from direct UI stream chunks", async () => { + const agent = new Agent({ + name: "TestAgent", + instructions: "You are a helpful assistant", + model: mockModel as any, + }); + + const cancelSpy = vi.fn(); + const baseStream = new ReadableStream({ + start(controller) { + controller.enqueue({ + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + providerMetadata: { anthropic: { caller: { type: "direct" } } }, + }); + }, + cancel: cancelSpy, + }); + + const mockStream = { + text: Promise.resolve("Streamed response"), + textStream: (async function* () { + yield "Streamed response"; + })(), + fullStream: (async function* () { + yield { + type: "text-delta" as const, + id: "text-1", + delta: "Streamed response", + text: "Streamed response", + }; + })(), + usage: Promise.resolve({ + inputTokens: 10, + outputTokens: 5, + totalTokens: 15, + }), + finishReason: Promise.resolve("stop"), + warnings: [], + toUIMessageStream: vi.fn().mockReturnValue(toAsyncIterableStream(baseStream)), + toUIMessageStreamResponse: vi.fn(), + pipeUIMessageStreamToResponse: vi.fn(), + pipeTextStreamToResponse: vi.fn(), + toTextStreamResponse: vi.fn(), + partialOutputStream: undefined, + }; + + vi.mocked(ai.streamText).mockReturnValue(mockStream as any); + + const result = await agent.streamText("Stream this"); + const reader = (result.toUIMessageStream() as ReadableStream).getReader(); + + await expect(reader.read()).resolves.toEqual({ + done: false, + value: { + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + }, + }); + + await reader.cancel("client disconnected"); + + expect(cancelSpy).toHaveBeenCalledWith("client disconnected"); + }); + it("uses last-step usage for finish events when provider is anthropic", async () => { const agent = new Agent({ name: "TestAgent",