From 18c2b1133056a9c596def0a5e7a926909e1dbd7e Mon Sep 17 00:00:00 2001 From: Omer Date: Sun, 31 May 2026 07:17:03 -0700 Subject: [PATCH] fix(core): prevent streamText abort unhandled rejections --- .changeset/quiet-stream-abort.md | 21 ++++++++ packages/core/src/agent/agent.spec.ts | 76 +++++++++++++++++++++++++++ packages/core/src/agent/agent.ts | 72 ++++++++++++++++--------- 3 files changed, 143 insertions(+), 26 deletions(-) create mode 100644 .changeset/quiet-stream-abort.md diff --git a/.changeset/quiet-stream-abort.md b/.changeset/quiet-stream-abort.md new file mode 100644 index 000000000..c5bf8cdad --- /dev/null +++ b/.changeset/quiet-stream-abort.md @@ -0,0 +1,21 @@ +--- +"@voltagent/core": patch +--- + +fix: prevent unhandled rejections when aborting `Agent.streamText()` streams + +## The Problem + +`Agent.streamText()` eagerly read the AI SDK result getters for `text`, `usage`, and `finishReason` while constructing VoltAgent's wrapped result. In AI SDK v6 these fields are lazy promises, so reading them early could materialize promises that the caller never consumes. + +When a caller only consumed the UI/full stream and aborted the run, those unconsumed promises could reject globally as `unhandledRejection` events. + +## The Solution + +VoltAgent now preserves the lazy getter behavior for `text`, `usage`, and `finishReason`. The sanitized text promise is also created only when `result.text` is accessed. + +## Impact + +- Aborting a consumed `streamText()` stream no longer emits unhandled rejections for unconsumed result fields +- Callers using only `toUIMessageStream()`, `toUIMessageStreamResponse()`, `fullStream`, or `textStream` do not need to attach defensive `.catch()` handlers to `text`, `usage`, or `finishReason` +- Matches AI SDK v6's lazy stream result contract more closely diff --git a/packages/core/src/agent/agent.spec.ts b/packages/core/src/agent/agent.spec.ts index 8439f573f..e78773075 100644 --- a/packages/core/src/agent/agent.spec.ts +++ b/packages/core/src/agent/agent.spec.ts @@ -1008,6 +1008,82 @@ Use pandas and summarize findings.`.split("\n"), expect(text).toBe("Streamed response"); }); + it("does not eagerly materialize lazy stream result promises", async () => { + const agent = new Agent({ + name: "TestAgent", + instructions: "You are a helpful assistant", + model: mockModel as any, + }); + + const getterAccesses = { + text: 0, + usage: 0, + finishReason: 0, + }; + + const mockStream = { + get text() { + getterAccesses.text += 1; + return Promise.resolve("Streamed response"); + }, + textStream: (async function* () { + yield "Streamed response"; + })(), + fullStream: toAsyncIterableStream( + convertArrayToReadableStream([ + { + type: "text-delta" as const, + id: "text-1", + delta: "Streamed response", + text: "Streamed response", + }, + ]), + ), + get usage() { + getterAccesses.usage += 1; + return Promise.resolve({ + inputTokens: 10, + outputTokens: 5, + totalTokens: 15, + }); + }, + get finishReason() { + getterAccesses.finishReason += 1; + return Promise.resolve("stop"); + }, + warnings: [], + toUIMessageStream: vi.fn(), + toUIMessageStreamResponse: vi.fn(), + pipeUIMessageStreamToResponse: vi.fn(), + pipeTextStreamToResponse: vi.fn(), + toTextStreamResponse: vi.fn(), + partialOutputStream: undefined, + }; + + vi.mocked(ai.streamText).mockReturnValue(mockStream as any); + + const result = await agent.streamText("Stream this"); + + expect(getterAccesses).toEqual({ + text: 0, + usage: 0, + finishReason: 0, + }); + + await expect(result.text).resolves.toBe("Streamed response"); + await expect(result.usage).resolves.toEqual({ + inputTokens: 10, + outputTokens: 5, + totalTokens: 15, + }); + await expect(result.finishReason).resolves.toBe("stop"); + expect(getterAccesses).toEqual({ + text: 1, + usage: 1, + finishReason: 1, + }); + }); + it("pre-creates streaming message ids and forwards them to UI streams", async () => { const agent = new Agent({ name: "TestAgent", diff --git a/packages/core/src/agent/agent.ts b/packages/core/src/agent/agent.ts index abd863747..df3f7a64c 100644 --- a/packages/core/src/agent/agent.ts +++ b/packages/core/src/agent/agent.ts @@ -1961,7 +1961,7 @@ export class Agent { const guardrailStreamingEnabled = guardrailSet.output.length > 0; let guardrailPipeline: GuardrailPipeline | null = null; - let sanitizedTextPromise!: PromiseLike; + let sanitizedTextPromise: Promise | undefined; const { result, modelName: effectiveModelName } = await this.executeWithModelFallback({ oc, operation: "streamText", @@ -2190,7 +2190,7 @@ export class Agent { finalText = bailedResult.response; } } else if (guardrailPipeline) { - finalText = await sanitizedTextPromise; + finalText = await getSanitizedTextPromise(); } else if (guardrailSet.output.length > 0) { finalText = await executeOutputGuardrails({ output: finalResult.text, @@ -2512,31 +2512,28 @@ export class Agent { ? createBaseFullStream() : undefined; - if (guardrailStreamingEnabled) { - guardrailPipeline = createGuardrailPipeline( - baseFullStreamForPipeline as AsyncIterable, - result.textStream, - guardrailContext, - ); - sanitizedTextPromise = guardrailPipeline.finalizePromise.then(async () => { - const sanitized = guardrailPipeline?.runner?.getSanitizedText(); - if (typeof sanitized === "string" && sanitized.length > 0) { - return sanitized; - } - // Wait for AI SDK text first (stream must complete) - const aiSdkText = await result.text; + const createSanitizedTextPromise = (): Promise => { + if (guardrailPipeline) { + return guardrailPipeline.finalizePromise.then(async () => { + const sanitized = guardrailPipeline?.runner?.getSanitizedText(); + if (typeof sanitized === "string" && sanitized.length > 0) { + return sanitized; + } + // Wait for AI SDK text first (stream must complete) + const aiSdkText = await result.text; + + // NOW check for bailed result (set during stream processing) + const bailedResult = oc.systemContext.get("bailedResult") as + | { agentName: string; response: string } + | undefined; + return bailedResult?.response || aiSdkText; + }); + } - // NOW check for bailed result (set during stream processing) - const bailedResult = oc.systemContext.get("bailedResult") as - | { agentName: string; response: string } - | undefined; - return bailedResult?.response || aiSdkText; - }); - } else { // Wrap result.text with a bail check // IMPORTANT: Wait for AI SDK text first (stream must complete/abort) // This ensures createStepHandler has processed tool results and set bailedResult - sanitizedTextPromise = result.text.then((aiSdkText) => { + return Promise.resolve(result.text).then((aiSdkText) => { // NOW check if bailed (set by createStepHandler during stream processing) const bailedResult = oc.systemContext.get("bailedResult") as | { agentName: string; response: string } @@ -2545,6 +2542,23 @@ export class Agent { // Return bailed subagent's result instead of supervisor's (if bailed) return bailedResult?.response || aiSdkText; }); + }; + + const getSanitizedTextPromise = (): Promise => { + sanitizedTextPromise ??= createSanitizedTextPromise(); + return sanitizedTextPromise; + }; + + if (guardrailStreamingEnabled) { + guardrailPipeline = createGuardrailPipeline( + baseFullStreamForPipeline as AsyncIterable, + result.textStream, + guardrailContext, + ); + void guardrailPipeline.finalizePromise.catch(() => { + // The guarded streams surface this error to their consumers. Keep the + // internal finalizer promise from leaking when text is never requested. + }); } const getGuardrailAwareFullStream = (): AsyncIterable => { @@ -2676,15 +2690,21 @@ export class Agent { // Create a wrapper that includes context and delegates to the original result const resultWithContext: StreamTextResultWithContext = { - text: sanitizedTextPromise, + get text() { + return getSanitizedTextPromise(); + }, get textStream() { return getGuardrailAwareTextStream(); }, get fullStream() { return getGuardrailAwareFullStream(); }, - usage: result.usage, - finishReason: result.finishReason, + get usage() { + return result.usage; + }, + get finishReason() { + return result.finishReason; + }, get partialOutputStream() { return result.partialOutputStream; },