Skip to content

Commit 5f2ffe5

Browse files
committed
stream: handle falsy push writer fail reasons
Reject reads based on writer state so explicit falsy fail reasons do not leave reads pending. Fixes: #63568 Signed-off-by: Kamat, Trivikram <16024985+trivikr@users.noreply.github.com> Assisted-by: openai:gpt-5.5
1 parent b9203ee commit 5f2ffe5

2 files changed

Lines changed: 45 additions & 5 deletions

File tree

lib/internal/streams/iter/push.js

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ const {
1212
PromiseReject,
1313
PromiseResolve,
1414
PromiseWithResolvers,
15+
Symbol,
1516
SymbolAsyncDispose,
1617
SymbolAsyncIterator,
1718
SymbolDispose,
@@ -55,6 +56,8 @@ const {
5556
RingBuffer,
5657
} = require('internal/streams/iter/ringbuffer');
5758

59+
const kNoFailReason = Symbol('kNoFailReason');
60+
5861
// =============================================================================
5962
// PushQueue - Internal Queue with Chunk-Based Backpressure
6063
// =============================================================================
@@ -317,14 +320,16 @@ class PushQueue {
317320
* No-op if errored or closed (fully drained).
318321
* If closing (draining), short-circuits the drain.
319322
*/
320-
fail(reason) {
323+
fail(reason = kNoFailReason) {
321324
if (this.#writerState === 'errored' || this.#writerState === 'closed') {
322325
return;
323326
}
324327

325328
const wasClosing = this.#writerState === 'closing';
326329
this.#writerState = 'errored';
327-
this.#error = reason ?? new ERR_INVALID_STATE('Failed');
330+
this.#error = reason === kNoFailReason ?
331+
new ERR_INVALID_STATE('Failed') :
332+
reason;
328333
this.#cleanup();
329334
this.#rejectPendingReads(this.#error);
330335
this.#rejectPendingDrains(this.#error);
@@ -413,7 +418,7 @@ class PushQueue {
413418
return { __proto__: null, value: undefined, done: true };
414419
}
415420

416-
if (this.#writerState === 'errored' && this.#error) {
421+
if (this.#writerState === 'errored') {
417422
throw this.#error;
418423
}
419424

@@ -482,7 +487,7 @@ class PushQueue {
482487
} else if (this.#writerState === 'closed') {
483488
const pending = this.#pendingReads.shift();
484489
pending.resolve({ __proto__: null, value: undefined, done: true });
485-
} else if (this.#writerState === 'errored' && this.#error) {
490+
} else if (this.#writerState === 'errored') {
486491
const pending = this.#pendingReads.shift();
487492
pending.reject(this.#error);
488493
} else {
@@ -659,7 +664,7 @@ class PushWriter {
659664
}
660665

661666
fail(reason) {
662-
this.#queue.fail(reason);
667+
this.#queue.fail(arguments.length === 0 ? kNoFailReason : reason);
663668
}
664669

665670
[SymbolAsyncDispose]() {

test/parallel/test-stream-iter-push-writer.js

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,39 @@ async function testFailRejectsPendingRead() {
323323
);
324324
}
325325

326+
async function testFailRejectsFutureReadWithFalsyReason() {
327+
for (const reason of [0, null]) {
328+
const { writer, readable } = push();
329+
330+
writer.fail(reason);
331+
332+
const iter = readable[Symbol.asyncIterator]();
333+
await iter.next().then(
334+
common.mustNotCall(),
335+
common.mustCall((rejection) => {
336+
assert.strictEqual(rejection, reason);
337+
}),
338+
);
339+
}
340+
}
341+
342+
async function testFailRejectsPendingReadWithFalsyReason() {
343+
const { writer, readable } = push();
344+
345+
const iter = readable[Symbol.asyncIterator]();
346+
const readPromise = iter.next();
347+
348+
await new Promise(setImmediate);
349+
350+
writer.fail(false);
351+
await readPromise.then(
352+
common.mustNotCall(),
353+
common.mustCall((reason) => {
354+
assert.strictEqual(reason, false);
355+
}),
356+
);
357+
}
358+
326359
// end() while writes are pending rejects those writes
327360
async function testEndRejectsPendingWrites() {
328361
const { writer, readable } = push({ highWaterMark: 1, backpressure: 'block' });
@@ -435,6 +468,8 @@ Promise.all([
435468
testConsumerThrowRejectsWrites(),
436469
testEndResolvesPendingRead(),
437470
testFailRejectsPendingRead(),
471+
testFailRejectsFutureReadWithFalsyReason(),
472+
testFailRejectsPendingReadWithFalsyReason(),
438473
testEndRejectsPendingWrites(),
439474
testEndIdempotentWhenClosed(),
440475
testEndRejectsWhenErrored(),

0 commit comments

Comments
 (0)