Skip to content
This repository was archived by the owner on Mar 4, 2026. It is now read-only.

Commit 210ef50

Browse files
authored
fix: ensure all data is consumed before emitting end event in PartialResultStream (#2516)
1 parent 163534c commit 210ef50

2 files changed

Lines changed: 43 additions & 1 deletion

File tree

src/partial-result-stream.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,7 @@ export class PartialResultStream extends Transform implements ResultEvents {
246246
}
247247

248248
if (chunk.last) {
249-
this.emit('end');
249+
this.push(null);
250250
return;
251251
}
252252

test/partial-result-stream.ts

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,48 @@ describe('PartialResultStream', () => {
200200
});
201201
});
202202
});
203+
204+
it('should not lose data if paused when last chunk is received', done => {
205+
const stream = new PartialResultStream({});
206+
// Pause the stream initially to force buffering
207+
stream.pause();
208+
209+
const rows: any[] = [];
210+
stream.on('data', row => rows.push(row));
211+
stream.on('end', () => {
212+
try {
213+
// We expect 2 rows.
214+
assert.strictEqual(rows.length, 2);
215+
done();
216+
} catch (e) {
217+
done(e);
218+
}
219+
});
220+
221+
const fields = [{name: NAME, type: {code: 'STRING'}}];
222+
223+
// Write a normal chunk
224+
stream.write({
225+
metadata: {rowType: {fields}},
226+
values: [convertToIValue('row1')],
227+
resumeToken: 't1',
228+
});
229+
230+
// Write the last chunk immediately
231+
stream.write({
232+
values: [convertToIValue('row2')],
233+
resumeToken: 't2',
234+
last: true,
235+
});
236+
237+
// Resume after a tick.
238+
// If emit('end') was called synchronously during write, the 'end' event might fire
239+
// and close the stream before we consume the buffered 'row1' and 'row2'.
240+
// With push(null), it waits for buffer to drain.
241+
process.nextTick(() => {
242+
stream.resume();
243+
});
244+
});
203245
});
204246

205247
describe('partialResultStream', () => {

0 commit comments

Comments
 (0)