|
1 | 1 | import { describe, expect, it } from "vitest"; |
2 | | -import { AppendRecord, BatchTransform } from "@s2-dev/streamstore"; |
3 | 2 |
|
4 | 3 | import { ChatChunkTooLargeError, isChatChunkTooLargeError } from "../errors.js"; |
5 | 4 | import { encodeChunkOrError } from "./streamsWriterV2.js"; |
@@ -76,65 +75,3 @@ describe("isChatChunkTooLargeError", () => { |
76 | 75 | expect(isChatChunkTooLargeError(undefined)).toBe(false); |
77 | 76 | }); |
78 | 77 | }); |
79 | | - |
80 | | -// Regression guard for the `@s2-dev/streamstore` linger-timer race that |
81 | | -// surfaced as `TASK_RUN_UNCAUGHT_EXCEPTION` ("Invalid state: Unable to |
82 | | -// enqueue") in `chat.agent`. `StreamsWriterV2` pipes records through a |
83 | | -// `BatchTransform` into S2's `session.writable`. When a run aborts mid-turn |
84 | | -// while a record is still buffered in the linger window, the writable is |
85 | | -// aborted and the transform's readable controller errors — but the pending |
86 | | -// linger `setTimeout` still fires and calls `controller.enqueue()` on the |
87 | | -// now-dead controller, throwing from a timer callback (so it's uncaught). |
88 | | -// |
89 | | -// Fixed upstream in `@s2-dev/streamstore@0.22.10` by wrapping the linger |
90 | | -// flush in a try/catch that discards the closed-controller `TypeError`. This |
91 | | -// test exercises the *real* `BatchTransform` (no mock) and fails if the |
92 | | -// dependency is ever downgraded below the fix. |
93 | | -describe("BatchTransform linger-timer abort safety (s2 dependency contract)", () => { |
94 | | - it("does not throw an uncaught error when the controller dies before the linger fires", async () => { |
95 | | - const lingerDurationMillis = 50; |
96 | | - const captured: unknown[] = []; |
97 | | - const onUncaught = (err: unknown) => captured.push(err); |
98 | | - |
99 | | - // Intercept uncaught errors from the linger timer for the duration of the |
100 | | - // test — the throw happens in a `setTimeout`, so it can't be caught with a |
101 | | - // surrounding try/catch. |
102 | | - const prevUncaught = process.listeners("uncaughtException"); |
103 | | - const prevUnhandled = process.listeners("unhandledRejection"); |
104 | | - process.removeAllListeners("uncaughtException"); |
105 | | - process.removeAllListeners("unhandledRejection"); |
106 | | - process.on("uncaughtException", onUncaught); |
107 | | - process.on("unhandledRejection", onUncaught); |
108 | | - |
109 | | - try { |
110 | | - // Mirror the StreamsWriterV2 pipeline shape: source -> BatchTransform -> |
111 | | - // session.writable. The downstream never acks, so the buffered record |
112 | | - // stays in the linger window until we abort. |
113 | | - const batcher = new BatchTransform({ lingerDurationMillis }); |
114 | | - const downstream = new WritableStream({ |
115 | | - write() { |
116 | | - return new Promise(() => {}); |
117 | | - }, |
118 | | - }); |
119 | | - batcher.readable.pipeTo(downstream).catch(() => {}); |
120 | | - |
121 | | - const writer = batcher.writable.getWriter(); |
122 | | - // Buffer a record — this arms the linger setTimeout. |
123 | | - await writer.write(AppendRecord.string({ body: "hello" })); |
124 | | - // Abort the downstream before the linger fires (== run suspend/abort -> |
125 | | - // session.writable.abort()), which errors the transform's readable side. |
126 | | - await downstream.abort?.("aborted").catch(() => {}); |
127 | | - writer.abort("aborted").catch(() => {}); |
128 | | - // Wait past the linger window so the pending timer fires on the dead |
129 | | - // controller. |
130 | | - await new Promise((r) => setTimeout(r, lingerDurationMillis + 150)); |
131 | | - } finally { |
132 | | - process.removeListener("uncaughtException", onUncaught); |
133 | | - process.removeListener("unhandledRejection", onUncaught); |
134 | | - prevUncaught.forEach((l) => process.on("uncaughtException", l)); |
135 | | - prevUnhandled.forEach((l) => process.on("unhandledRejection", l)); |
136 | | - } |
137 | | - |
138 | | - expect(captured).toEqual([]); |
139 | | - }); |
140 | | -}); |
0 commit comments