Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/cold-socks-listen.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@voltagent/core": patch
---

Strip provider metadata from direct UI stream tool output chunks before they reach console clients.
172 changes: 172 additions & 0 deletions packages/core/src/agent/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1160,6 +1160,178 @@ Use pandas and summarize findings.`.split("\n"),
expect(parts[1]).toEqual(expect.objectContaining({ type: "text-delta", id: "text-1" }));
});

it("strips providerMetadata from direct UI stream tool output chunks", async () => {
const agent = new Agent({
name: "TestAgent",
instructions: "You are a helpful assistant",
model: mockModel as any,
});

const providerMetadata = {
anthropic: {
caller: {
type: "direct",
},
},
};

const mockStream = {
text: Promise.resolve("Streamed response"),
textStream: (async function* () {
yield "Streamed response";
})(),
fullStream: (async function* () {
yield {
type: "text-delta" as const,
id: "text-1",
delta: "Streamed response",
text: "Streamed response",
};
})(),
usage: Promise.resolve({
inputTokens: 10,
outputTokens: 5,
totalTokens: 15,
}),
finishReason: Promise.resolve("stop"),
warnings: [],
toUIMessageStream: vi.fn().mockReturnValue(
toAsyncIterableStream(
convertArrayToReadableStream([
{
type: "tool-input-available",
toolCallId: "tool-1",
toolName: "lookup",
input: { query: "weather" },
providerMetadata,
},
{
type: "tool-output-available",
toolCallId: "tool-1",
output: { ok: true },
providerMetadata,
},
{
type: "tool-output-error",
toolCallId: "tool-2",
errorText: "failed",
providerMetadata,
},
{
type: "text-delta",
id: "text-1",
delta: "Done",
providerMetadata,
},
]),
),
),
toUIMessageStreamResponse: vi.fn(),
pipeUIMessageStreamToResponse: vi.fn(),
pipeTextStreamToResponse: vi.fn(),
toTextStreamResponse: vi.fn(),
partialOutputStream: undefined,
};

vi.mocked(ai.streamText).mockReturnValue(mockStream as any);

const result = await agent.streamText("Stream this");
const uiChunks: any[] = [];
for await (const chunk of result.toUIMessageStream() as AsyncIterable<any>) {
uiChunks.push(chunk);
}

expect(uiChunks[0]).toEqual(
expect.objectContaining({
type: "tool-input-available",
providerMetadata,
}),
);
expect(uiChunks[1]).toEqual({
type: "tool-output-available",
toolCallId: "tool-1",
output: { ok: true },
});
expect(uiChunks[2]).toEqual({
type: "tool-output-error",
toolCallId: "tool-2",
errorText: "failed",
});
expect(uiChunks[3]).toEqual(
expect.objectContaining({
type: "text-delta",
providerMetadata,
}),
);
});

it("propagates cancellation when stripping providerMetadata from direct UI stream chunks", async () => {
const agent = new Agent({
name: "TestAgent",
instructions: "You are a helpful assistant",
model: mockModel as any,
});

const cancelSpy = vi.fn();
const baseStream = new ReadableStream({
start(controller) {
controller.enqueue({
type: "tool-output-available",
toolCallId: "tool-1",
output: { ok: true },
providerMetadata: { anthropic: { caller: { type: "direct" } } },
});
},
cancel: cancelSpy,
});

const mockStream = {
text: Promise.resolve("Streamed response"),
textStream: (async function* () {
yield "Streamed response";
})(),
fullStream: (async function* () {
yield {
type: "text-delta" as const,
id: "text-1",
delta: "Streamed response",
text: "Streamed response",
};
})(),
usage: Promise.resolve({
inputTokens: 10,
outputTokens: 5,
totalTokens: 15,
}),
finishReason: Promise.resolve("stop"),
warnings: [],
toUIMessageStream: vi.fn().mockReturnValue(toAsyncIterableStream(baseStream)),
toUIMessageStreamResponse: vi.fn(),
pipeUIMessageStreamToResponse: vi.fn(),
pipeTextStreamToResponse: vi.fn(),
toTextStreamResponse: vi.fn(),
partialOutputStream: undefined,
};

vi.mocked(ai.streamText).mockReturnValue(mockStream as any);

const result = await agent.streamText("Stream this");
const reader = (result.toUIMessageStream() as ReadableStream<any>).getReader();

await expect(reader.read()).resolves.toEqual({
done: false,
value: {
type: "tool-output-available",
toolCallId: "tool-1",
output: { ok: true },
},
});

await reader.cancel("client disconnected");

expect(cancelSpy).toHaveBeenCalledWith("client disconnected");
});

it("uses last-step usage for finish events when provider is anthropic", async () => {
const agent = new Agent({
name: "TestAgent",
Expand Down
35 changes: 34 additions & 1 deletion packages/core/src/agent/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2575,11 +2575,44 @@ export class Agent {
return result.textStream;
};

const stripProviderMetadataFromToolOutputChunk = (chunk: UIStreamChunk): UIStreamChunk => {
if (chunk === null || typeof chunk !== "object") {
return chunk;
}

const type = (chunk as { type?: unknown }).type;
if (type !== "tool-output-available" && type !== "tool-output-error") {
return chunk;
}

if (!("providerMetadata" in chunk)) {
return chunk;
}

const { providerMetadata: _providerMetadata, ...sanitizedChunk } = chunk as Record<
string,
unknown
>;
return sanitizedChunk as UIStreamChunk;
};

const stripProviderMetadataFromDirectUIStream = (
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
baseStream: ToUIMessageStreamReturn,
): ToUIMessageStreamReturn => {
return (baseStream as ReadableStream<UIStreamChunk>).pipeThrough(
new TransformStream<UIStreamChunk, UIStreamChunk>({
transform(chunk, controller) {
controller.enqueue(stripProviderMetadataFromToolOutputChunk(chunk));
},
}),
) as ToUIMessageStreamReturn;
};

const getGuardrailAwareUIStream = (
streamOptions?: ToUIMessageStreamOptions,
): ToUIMessageStreamReturn => {
if (!guardrailPipeline) {
return result.toUIMessageStream(streamOptions);
return stripProviderMetadataFromDirectUIStream(result.toUIMessageStream(streamOptions));
}
return guardrailPipeline.createUIStream(streamOptions) as ToUIMessageStreamReturn;
};
Expand Down