Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions .changeset/quiet-stream-abort.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"@voltagent/core": patch
---

fix: prevent unhandled rejections when aborting `Agent.streamText()` streams

## The Problem

`Agent.streamText()` eagerly read the AI SDK result getters for `text`, `usage`, and `finishReason` while constructing VoltAgent's wrapped result. In AI SDK v6 these fields are lazy promises, so reading them early could materialize promises that the caller never consumes.

When a caller only consumed the UI/full stream and aborted the run, those unconsumed promises could reject globally as `unhandledRejection` events.

## The Solution

VoltAgent now preserves the lazy getter behavior for `text`, `usage`, and `finishReason`. The sanitized text promise is also created only when `result.text` is accessed.

## Impact

- Aborting a consumed `streamText()` stream no longer emits unhandled rejections for unconsumed result fields
- Callers using only `toUIMessageStream()`, `toUIMessageStreamResponse()`, `fullStream`, or `textStream` do not need to attach defensive `.catch()` handlers to `text`, `usage`, or `finishReason`
- Matches AI SDK v6's lazy stream result contract more closely
76 changes: 76 additions & 0 deletions packages/core/src/agent/agent.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1008,6 +1008,82 @@ Use pandas and summarize findings.`.split("\n"),
expect(text).toBe("Streamed response");
});

it("does not eagerly materialize lazy stream result promises", async () => {
const agent = new Agent({
name: "TestAgent",
instructions: "You are a helpful assistant",
model: mockModel as any,
});

const getterAccesses = {
text: 0,
usage: 0,
finishReason: 0,
};

const mockStream = {
get text() {
getterAccesses.text += 1;
return Promise.resolve("Streamed response");
},
textStream: (async function* () {
yield "Streamed response";
})(),
fullStream: toAsyncIterableStream(
convertArrayToReadableStream([
{
type: "text-delta" as const,
id: "text-1",
delta: "Streamed response",
text: "Streamed response",
},
]),
),
get usage() {
getterAccesses.usage += 1;
return Promise.resolve({
inputTokens: 10,
outputTokens: 5,
totalTokens: 15,
});
},
get finishReason() {
getterAccesses.finishReason += 1;
return Promise.resolve("stop");
},
warnings: [],
toUIMessageStream: vi.fn(),
toUIMessageStreamResponse: vi.fn(),
pipeUIMessageStreamToResponse: vi.fn(),
pipeTextStreamToResponse: vi.fn(),
toTextStreamResponse: vi.fn(),
partialOutputStream: undefined,
};

vi.mocked(ai.streamText).mockReturnValue(mockStream as any);

const result = await agent.streamText("Stream this");

expect(getterAccesses).toEqual({
text: 0,
usage: 0,
finishReason: 0,
});

await expect(result.text).resolves.toBe("Streamed response");
await expect(result.usage).resolves.toEqual({
inputTokens: 10,
outputTokens: 5,
totalTokens: 15,
});
await expect(result.finishReason).resolves.toBe("stop");
expect(getterAccesses).toEqual({
text: 1,
usage: 1,
finishReason: 1,
});
});

it("pre-creates streaming message ids and forwards them to UI streams", async () => {
const agent = new Agent({
name: "TestAgent",
Expand Down
72 changes: 46 additions & 26 deletions packages/core/src/agent/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1961,7 +1961,7 @@ export class Agent {
const guardrailStreamingEnabled = guardrailSet.output.length > 0;

let guardrailPipeline: GuardrailPipeline | null = null;
let sanitizedTextPromise!: PromiseLike<string>;
let sanitizedTextPromise: Promise<string> | undefined;
const { result, modelName: effectiveModelName } = await this.executeWithModelFallback({
oc,
operation: "streamText",
Expand Down Expand Up @@ -2190,7 +2190,7 @@ export class Agent {
finalText = bailedResult.response;
}
} else if (guardrailPipeline) {
finalText = await sanitizedTextPromise;
finalText = await getSanitizedTextPromise();
} else if (guardrailSet.output.length > 0) {
finalText = await executeOutputGuardrails({
output: finalResult.text,
Expand Down Expand Up @@ -2512,31 +2512,28 @@ export class Agent {
? createBaseFullStream()
: undefined;

if (guardrailStreamingEnabled) {
guardrailPipeline = createGuardrailPipeline(
baseFullStreamForPipeline as AsyncIterable<VoltAgentTextStreamPart>,
result.textStream,
guardrailContext,
);
sanitizedTextPromise = guardrailPipeline.finalizePromise.then(async () => {
const sanitized = guardrailPipeline?.runner?.getSanitizedText();
if (typeof sanitized === "string" && sanitized.length > 0) {
return sanitized;
}
// Wait for AI SDK text first (stream must complete)
const aiSdkText = await result.text;
const createSanitizedTextPromise = (): Promise<string> => {
if (guardrailPipeline) {
return guardrailPipeline.finalizePromise.then(async () => {
const sanitized = guardrailPipeline?.runner?.getSanitizedText();
if (typeof sanitized === "string" && sanitized.length > 0) {
return sanitized;
}
// Wait for AI SDK text first (stream must complete)
const aiSdkText = await result.text;

// NOW check for bailed result (set during stream processing)
const bailedResult = oc.systemContext.get("bailedResult") as
| { agentName: string; response: string }
| undefined;
return bailedResult?.response || aiSdkText;
});
}

// NOW check for bailed result (set during stream processing)
const bailedResult = oc.systemContext.get("bailedResult") as
| { agentName: string; response: string }
| undefined;
return bailedResult?.response || aiSdkText;
});
} else {
// Wrap result.text with a bail check
// IMPORTANT: Wait for AI SDK text first (stream must complete/abort)
// This ensures createStepHandler has processed tool results and set bailedResult
sanitizedTextPromise = result.text.then((aiSdkText) => {
return Promise.resolve(result.text).then((aiSdkText) => {
// NOW check if bailed (set by createStepHandler during stream processing)
const bailedResult = oc.systemContext.get("bailedResult") as
| { agentName: string; response: string }
Expand All @@ -2545,6 +2542,23 @@ export class Agent {
// Return bailed subagent's result instead of supervisor's (if bailed)
return bailedResult?.response || aiSdkText;
});
};

const getSanitizedTextPromise = (): Promise<string> => {
sanitizedTextPromise ??= createSanitizedTextPromise();
return sanitizedTextPromise;
};

if (guardrailStreamingEnabled) {
guardrailPipeline = createGuardrailPipeline(
baseFullStreamForPipeline as AsyncIterable<VoltAgentTextStreamPart>,
result.textStream,
guardrailContext,
);
void guardrailPipeline.finalizePromise.catch(() => {
// The guarded streams surface this error to their consumers. Keep the
// internal finalizer promise from leaking when text is never requested.
});
}

const getGuardrailAwareFullStream = (): AsyncIterable<VoltAgentTextStreamPart> => {
Expand Down Expand Up @@ -2676,15 +2690,21 @@ export class Agent {

// Create a wrapper that includes context and delegates to the original result
const resultWithContext: StreamTextResultWithContext = {
text: sanitizedTextPromise,
get text() {
return getSanitizedTextPromise();
},
get textStream() {
return getGuardrailAwareTextStream();
},
get fullStream() {
return getGuardrailAwareFullStream();
},
usage: result.usage,
finishReason: result.finishReason,
get usage() {
return result.usage;
},
get finishReason() {
return result.finishReason;
},
get partialOutputStream() {
return result.partialOutputStream;
},
Expand Down
Loading