Skip to content

Commit 0799b7f

Browse files
committed
fix(lib): tolerate malformed json in sse stream
1 parent 4f2b30f commit 0799b7f

2 files changed

Lines changed: 53 additions & 8 deletions

File tree

sources/lib/lib.integration.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,4 +92,23 @@ describe("lib integration", () => {
9292
},
9393
TEST_TIMEOUT_MS
9494
);
95+
96+
it(
97+
"sse stream skips malformed and non-json lines",
98+
async () => {
99+
const runner = createNodeRunner(
100+
'console.log("event: connected"); console.log("data: {\\"id\\":1}"); console.log("not-json"); console.log("{\\"id\\":2}"); console.log("data: [DONE]");'
101+
);
102+
103+
const stream = createJsonSseStream<{ id: number }>(runner);
104+
const received: Array<{ id: number }> = [];
105+
106+
for await (const event of stream.events) {
107+
received.push(event.data);
108+
}
109+
110+
expect(received).toEqual([{ id: 1 }, { id: 2 }]);
111+
},
112+
TEST_TIMEOUT_MS
113+
);
95114
});

sources/lib/sse.ts

Lines changed: 34 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,13 +77,19 @@ export function createJsonSseStream<T = unknown>(
7777
if (!trimmed) {
7878
continue;
7979
}
80-
yield parseJsonLine<T>(trimmed);
80+
const parsed = tryParseJsonLine<T>(trimmed);
81+
if (parsed) {
82+
yield parsed;
83+
}
8184
}
8285
}
8386

8487
const remaining = buffer.trim();
8588
if (remaining) {
86-
yield parseJsonLine<T>(remaining);
89+
const parsed = tryParseJsonLine<T>(remaining);
90+
if (parsed) {
91+
yield parsed;
92+
}
8793
}
8894

8995
const exitCode = await process.exited;
@@ -113,15 +119,35 @@ export function createJsonSseStream<T = unknown>(
113119
return { events, close, process };
114120
}
115121

116-
function parseJsonLine<T>(line: string): JsonSseEvent<T> {
122+
function tryParseJsonLine<T>(line: string): JsonSseEvent<T> | null {
123+
const payload = normalizeJsonPayload(line);
124+
if (payload === null) {
125+
return null;
126+
}
127+
117128
try {
118-
const data = JSON.parse(line) as T;
129+
const data = JSON.parse(payload) as T;
119130
return { data, raw: line };
120-
} catch (error) {
121-
const message =
122-
error instanceof Error ? error.message : "Unknown JSON parse error";
123-
throw new Error(`Invalid JSON from bee stream: ${message}`);
131+
} catch {
132+
return null;
133+
}
134+
}
135+
136+
function normalizeJsonPayload(line: string): string | null {
137+
if (line.startsWith(":")) {
138+
return null;
139+
}
140+
if (line.startsWith("event:") || line.startsWith("id:")) {
141+
return null;
142+
}
143+
if (line.startsWith("data:")) {
144+
const payload = line.slice("data:".length).trimStart();
145+
if (!payload || payload === "[DONE]") {
146+
return null;
147+
}
148+
return payload;
124149
}
150+
return line;
125151
}
126152

127153
function readStream(

0 commit comments

Comments
 (0)