Skip to content

Commit ae8689c

Browse files
committed
fix(core,cli): stop chat.agent uncaught exception on mid-stream abort
Bumps @s2-dev/streamstore to 0.22.10. The realtime stream writer batches chunks with a linger timer; when a turn was aborted mid-stream while a record was still buffered, the timer fired enqueue() on an already-closed stream controller and threw from a setTimeout callback — surfacing as TASK_RUN_UNCAUGHT_EXCEPTION ("Invalid state: Unable to enqueue"). The upstream patch wraps the linger flush in a try/catch.
1 parent 9211032 commit ae8689c

6 files changed

Lines changed: 86 additions & 17 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"trigger.dev": patch
4+
---
5+
6+
Bump `@s2-dev/streamstore` to `0.22.10` to fix a `TASK_RUN_UNCAUGHT_EXCEPTION` ("Invalid state: Unable to enqueue") when a `chat.agent` turn is aborted mid-stream.

apps/webapp/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@
112112
"@remix-run/serve": "2.17.4",
113113
"@remix-run/server-runtime": "2.17.4",
114114
"@remix-run/v1-meta": "^0.1.3",
115-
"@s2-dev/streamstore": "^0.22.5",
115+
"@s2-dev/streamstore": "^0.22.10",
116116
"@sentry/remix": "9.46.0",
117117
"@slack/web-api": "7.9.1",
118118
"@socket.io/redis-adapter": "^8.3.0",

packages/cli-v3/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@
9494
"@opentelemetry/resources": "2.0.1",
9595
"@opentelemetry/sdk-trace-node": "2.0.1",
9696
"@opentelemetry/semantic-conventions": "1.36.0",
97-
"@s2-dev/streamstore": "^0.22.5",
97+
"@s2-dev/streamstore": "^0.22.10",
9898
"@trigger.dev/build": "workspace:4.5.0-rc.2",
9999
"@trigger.dev/core": "workspace:4.5.0-rc.2",
100100
"@trigger.dev/schema-to-json": "workspace:4.5.0-rc.2",

packages/core/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -206,7 +206,7 @@
206206
"@opentelemetry/sdk-trace-base": "2.0.1",
207207
"@opentelemetry/sdk-trace-node": "2.0.1",
208208
"@opentelemetry/semantic-conventions": "1.36.0",
209-
"@s2-dev/streamstore": "0.22.5",
209+
"@s2-dev/streamstore": "0.22.10",
210210
"dequal": "^2.0.3",
211211
"eventsource": "^3.0.5",
212212
"eventsource-parser": "^3.0.0",

packages/core/src/v3/realtimeStreams/streamsWriterV2.test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { describe, expect, it } from "vitest";
2+
import { AppendRecord, BatchTransform } from "@s2-dev/streamstore";
23

34
import { ChatChunkTooLargeError, isChatChunkTooLargeError } from "../errors.js";
45
import { encodeChunkOrError } from "./streamsWriterV2.js";
@@ -75,3 +76,65 @@ describe("isChatChunkTooLargeError", () => {
7576
expect(isChatChunkTooLargeError(undefined)).toBe(false);
7677
});
7778
});
79+
80+
// Regression guard for the `@s2-dev/streamstore` linger-timer race that
81+
// surfaced as `TASK_RUN_UNCAUGHT_EXCEPTION` ("Invalid state: Unable to
82+
// enqueue") in `chat.agent`. `StreamsWriterV2` pipes records through a
83+
// `BatchTransform` into S2's `session.writable`. When a run aborts mid-turn
84+
// while a record is still buffered in the linger window, the writable is
85+
// aborted and the transform's readable controller errors — but the pending
86+
// linger `setTimeout` still fires and calls `controller.enqueue()` on the
87+
// now-dead controller, throwing from a timer callback (so it's uncaught).
88+
//
89+
// Fixed upstream in `@s2-dev/streamstore@0.22.10` by wrapping the linger
90+
// flush in a try/catch that discards the closed-controller `TypeError`. This
91+
// test exercises the *real* `BatchTransform` (no mock) and fails if the
92+
// dependency is ever downgraded below the fix.
93+
describe("BatchTransform linger-timer abort safety (s2 dependency contract)", () => {
94+
it("does not throw an uncaught error when the controller dies before the linger fires", async () => {
95+
const lingerDurationMillis = 50;
96+
const captured: unknown[] = [];
97+
const onUncaught = (err: unknown) => captured.push(err);
98+
99+
// Intercept uncaught errors from the linger timer for the duration of the
100+
// test — the throw happens in a `setTimeout`, so it can't be caught with a
101+
// surrounding try/catch.
102+
const prevUncaught = process.listeners("uncaughtException");
103+
const prevUnhandled = process.listeners("unhandledRejection");
104+
process.removeAllListeners("uncaughtException");
105+
process.removeAllListeners("unhandledRejection");
106+
process.on("uncaughtException", onUncaught);
107+
process.on("unhandledRejection", onUncaught);
108+
109+
try {
110+
// Mirror the StreamsWriterV2 pipeline shape: source -> BatchTransform ->
111+
// session.writable. The downstream never acks, so the buffered record
112+
// stays in the linger window until we abort.
113+
const batcher = new BatchTransform({ lingerDurationMillis });
114+
const downstream = new WritableStream({
115+
write() {
116+
return new Promise(() => {});
117+
},
118+
});
119+
batcher.readable.pipeTo(downstream).catch(() => {});
120+
121+
const writer = batcher.writable.getWriter();
122+
// Buffer a record — this arms the linger setTimeout.
123+
await writer.write(AppendRecord.string({ body: "hello" }));
124+
// Abort the downstream before the linger fires (== run suspend/abort ->
125+
// session.writable.abort()), which errors the transform's readable side.
126+
await downstream.abort?.("aborted").catch(() => {});
127+
writer.abort("aborted").catch(() => {});
128+
// Wait past the linger window so the pending timer fires on the dead
129+
// controller.
130+
await new Promise((r) => setTimeout(r, lingerDurationMillis + 150));
131+
} finally {
132+
process.removeListener("uncaughtException", onUncaught);
133+
process.removeListener("unhandledRejection", onUncaught);
134+
prevUncaught.forEach((l) => process.on("uncaughtException", l));
135+
prevUnhandled.forEach((l) => process.on("unhandledRejection", l));
136+
}
137+
138+
expect(captured).toEqual([]);
139+
});
140+
});

pnpm-lock.yaml

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)