diff --git a/.changeset/cold-socks-listen.md b/.changeset/cold-socks-listen.md new file mode 100644 index 000000000..a7d2b425d --- /dev/null +++ b/.changeset/cold-socks-listen.md @@ -0,0 +1,5 @@ +--- +"@voltagent/core": patch +--- + +Strip provider metadata from direct UI stream tool output chunks before they reach console clients. diff --git a/packages/core/src/agent/agent.spec.ts b/packages/core/src/agent/agent.spec.ts index e78773075..ed788cb97 100644 --- a/packages/core/src/agent/agent.spec.ts +++ b/packages/core/src/agent/agent.spec.ts @@ -1160,6 +1160,178 @@ Use pandas and summarize findings.`.split("\n"), expect(parts[1]).toEqual(expect.objectContaining({ type: "text-delta", id: "text-1" })); }); + it("strips providerMetadata from direct UI stream tool output chunks", async () => { + const agent = new Agent({ + name: "TestAgent", + instructions: "You are a helpful assistant", + model: mockModel as any, + }); + + const providerMetadata = { + anthropic: { + caller: { + type: "direct", + }, + }, + }; + + const mockStream = { + text: Promise.resolve("Streamed response"), + textStream: (async function* () { + yield "Streamed response"; + })(), + fullStream: (async function* () { + yield { + type: "text-delta" as const, + id: "text-1", + delta: "Streamed response", + text: "Streamed response", + }; + })(), + usage: Promise.resolve({ + inputTokens: 10, + outputTokens: 5, + totalTokens: 15, + }), + finishReason: Promise.resolve("stop"), + warnings: [], + toUIMessageStream: vi.fn().mockReturnValue( + toAsyncIterableStream( + convertArrayToReadableStream([ + { + type: "tool-input-available", + toolCallId: "tool-1", + toolName: "lookup", + input: { query: "weather" }, + providerMetadata, + }, + { + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + providerMetadata, + }, + { + type: "tool-output-error", + toolCallId: "tool-2", + errorText: "failed", + providerMetadata, + }, + { + type: "text-delta", + id: "text-1", + delta: "Done", + providerMetadata, + }, + ]), + ), + ), + toUIMessageStreamResponse: vi.fn(), + pipeUIMessageStreamToResponse: vi.fn(), + pipeTextStreamToResponse: vi.fn(), + toTextStreamResponse: vi.fn(), + partialOutputStream: undefined, + }; + + vi.mocked(ai.streamText).mockReturnValue(mockStream as any); + + const result = await agent.streamText("Stream this"); + const uiChunks: any[] = []; + for await (const chunk of result.toUIMessageStream() as AsyncIterable) { + uiChunks.push(chunk); + } + + expect(uiChunks[0]).toEqual( + expect.objectContaining({ + type: "tool-input-available", + providerMetadata, + }), + ); + expect(uiChunks[1]).toEqual({ + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + }); + expect(uiChunks[2]).toEqual({ + type: "tool-output-error", + toolCallId: "tool-2", + errorText: "failed", + }); + expect(uiChunks[3]).toEqual( + expect.objectContaining({ + type: "text-delta", + providerMetadata, + }), + ); + }); + + it("propagates cancellation when stripping providerMetadata from direct UI stream chunks", async () => { + const agent = new Agent({ + name: "TestAgent", + instructions: "You are a helpful assistant", + model: mockModel as any, + }); + + const cancelSpy = vi.fn(); + const baseStream = new ReadableStream({ + start(controller) { + controller.enqueue({ + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + providerMetadata: { anthropic: { caller: { type: "direct" } } }, + }); + }, + cancel: cancelSpy, + }); + + const mockStream = { + text: Promise.resolve("Streamed response"), + textStream: (async function* () { + yield "Streamed response"; + })(), + fullStream: (async function* () { + yield { + type: "text-delta" as const, + id: "text-1", + delta: "Streamed response", + text: "Streamed response", + }; + })(), + usage: Promise.resolve({ + inputTokens: 10, + outputTokens: 5, + totalTokens: 15, + }), + finishReason: Promise.resolve("stop"), + warnings: [], + toUIMessageStream: vi.fn().mockReturnValue(toAsyncIterableStream(baseStream)), + toUIMessageStreamResponse: vi.fn(), + pipeUIMessageStreamToResponse: vi.fn(), + pipeTextStreamToResponse: vi.fn(), + toTextStreamResponse: vi.fn(), + partialOutputStream: undefined, + }; + + vi.mocked(ai.streamText).mockReturnValue(mockStream as any); + + const result = await agent.streamText("Stream this"); + const reader = (result.toUIMessageStream() as ReadableStream).getReader(); + + await expect(reader.read()).resolves.toEqual({ + done: false, + value: { + type: "tool-output-available", + toolCallId: "tool-1", + output: { ok: true }, + }, + }); + + await reader.cancel("client disconnected"); + + expect(cancelSpy).toHaveBeenCalledWith("client disconnected"); + }); + it("uses last-step usage for finish events when provider is anthropic", async () => { const agent = new Agent({ name: "TestAgent", diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index df3f7a64c..479264d65 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -2575,11 +2575,44 @@ export class Agent { return result.textStream; }; + const stripProviderMetadataFromToolOutputChunk = (chunk: UIStreamChunk): UIStreamChunk => { + if (chunk === null || typeof chunk !== "object") { + return chunk; + } + + const type = (chunk as { type?: unknown }).type; + if (type !== "tool-output-available" && type !== "tool-output-error") { + return chunk; + } + + if (!("providerMetadata" in chunk)) { + return chunk; + } + + const { providerMetadata: _providerMetadata, ...sanitizedChunk } = chunk as Record< + string, + unknown + >; + return sanitizedChunk as UIStreamChunk; + }; + + const stripProviderMetadataFromDirectUIStream = ( + baseStream: ToUIMessageStreamReturn, + ): ToUIMessageStreamReturn => { + return (baseStream as ReadableStream).pipeThrough( + new TransformStream({ + transform(chunk, controller) { + controller.enqueue(stripProviderMetadataFromToolOutputChunk(chunk)); + }, + }), + ) as ToUIMessageStreamReturn; + }; + const getGuardrailAwareUIStream = ( streamOptions?: ToUIMessageStreamOptions, ): ToUIMessageStreamReturn => { if (!guardrailPipeline) { - return result.toUIMessageStream(streamOptions); + return stripProviderMetadataFromDirectUIStream(result.toUIMessageStream(streamOptions)); } return guardrailPipeline.createUIStream(streamOptions) as ToUIMessageStreamReturn; };