From c1d14728e86be9ef2e5a480eff88d85bcfef69b0 Mon Sep 17 00:00:00 2001 From: cookesan <6601329+cookesan@users.noreply.github.com> Date: Thu, 21 May 2026 04:48:14 -0400 Subject: [PATCH] fs: handle early writeFile stream errors Attach a temporary error listener to readable stream inputs before opening the destination file. This lets writeFile() reject with the stream error instead of allowing an early source error to become an uncaught exception. Remove the listener when the write finishes or when opening the destination fails. Signed-off-by: cookesan <6601329+cookesan@users.noreply.github.com> --- lib/internal/fs/promises.js | 84 +++++++++++++++---- .../test-fs-promises-file-handle-writeFile.js | 30 +++++++ test/parallel/test-fs-promises-writefile.js | 57 +++++++++++++ 3 files changed, 155 insertions(+), 16 deletions(-) diff --git a/lib/internal/fs/promises.js b/lib/internal/fs/promises.js index 720bd1319b381f..c238ab868af77a 100644 --- a/lib/internal/fs/promises.js +++ b/lib/internal/fs/promises.js @@ -107,7 +107,11 @@ const EventEmitter = require('events'); const { StringDecoder } = require('string_decoder'); const { kFSWatchStart, watch } = require('internal/fs/watchers'); const nonNativeWatcher = require('internal/fs/recursive_watch'); -const { isIterable } = require('internal/streams/utils'); +const { + isIterable, + isReadableErrored, + isReadableNodeStream, +} = require('internal/streams/utils'); const assert = require('internal/assert'); const permission = require('internal/process/permission'); @@ -1116,24 +1120,62 @@ function checkAborted(signal) { throw new AbortError(undefined, { cause: signal.reason }); } -async function writeFileHandle(filehandle, data, signal, encoding) { - checkAborted(signal); +function makeWriteFileStreamErrorHandler(data) { + if (!isReadableNodeStream(data) || + typeof data.removeListener !== 'function') { + return undefined; + } + + let error; + let errored = false; + function onError(err) { + error = err; + errored = true; + } + const streamError = isReadableErrored(data); + if (streamError != null) + onError(streamError); + data.on('error', onError); + + return { + __proto__: null, + check() { + if (errored) + throw error; + }, + cleanup() { + data.removeListener('error', onError); + }, + }; +} + +async function writeFileHandle(filehandle, data, signal, encoding, streamErrorHandler) { if (isCustomIterable(data)) { - for await (const buf of data) { + streamErrorHandler ??= makeWriteFileStreamErrorHandler(data); + try { checkAborted(signal); - const toWrite = - isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8'); - let remaining = toWrite.byteLength; - while (remaining > 0) { - const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); - const { bytesWritten } = await write( - filehandle, toWrite, toWrite.byteLength - remaining, writeSize); - remaining -= bytesWritten; + streamErrorHandler?.check(); + for await (const buf of data) { checkAborted(signal); + streamErrorHandler?.check(); + const toWrite = + isArrayBufferView(buf) ? buf : Buffer.from(buf, encoding || 'utf8'); + let remaining = toWrite.byteLength; + while (remaining > 0) { + const writeSize = MathMin(kWriteFileMaxChunkSize, remaining); + const { bytesWritten } = await write( + filehandle, toWrite, toWrite.byteLength - remaining, writeSize); + remaining -= bytesWritten; + checkAborted(signal); + streamErrorHandler?.check(); + } } + } finally { + streamErrorHandler?.cleanup(); } return; } + checkAborted(signal); data = new Uint8Array(data.buffer, data.byteOffset, data.byteLength); let remaining = data.byteLength; if (remaining === 0) return; @@ -1891,13 +1933,23 @@ async function writeFile(path, data, options) { } validateAbortSignal(options.signal); + checkAborted(options.signal); + const streamErrorHandler = makeWriteFileStreamErrorHandler(data); + if (path instanceof FileHandle) - return writeFileHandle(path, data, options.signal, options.encoding); + return writeFileHandle( + path, data, options.signal, options.encoding, streamErrorHandler); - checkAborted(options.signal); + let fd; + try { + fd = await open(path, flag, options.mode); + } catch (err) { + streamErrorHandler?.cleanup(); + throw err; + } - const fd = await open(path, flag, options.mode); - let writeOp = writeFileHandle(fd, data, options.signal, options.encoding); + let writeOp = writeFileHandle( + fd, data, options.signal, options.encoding, streamErrorHandler); if (flush) { writeOp = handleFdSync(writeOp, fd); diff --git a/test/parallel/test-fs-promises-file-handle-writeFile.js b/test/parallel/test-fs-promises-file-handle-writeFile.js index 2c1a80e4f52d49..7635ec57567db0 100644 --- a/test/parallel/test-fs-promises-file-handle-writeFile.js +++ b/test/parallel/test-fs-promises-file-handle-writeFile.js @@ -48,6 +48,7 @@ async function doWriteAndCancel() { const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const errorDest = path.resolve(tmpDir, 'tmp-error.txt'); const stream = Readable.from(['a', 'b', 'c']); const stream2 = Readable.from(['ümlaut', ' ', 'sechzig']); const iterable = { @@ -65,6 +66,15 @@ function iterableWith(value) { } }; } + +function createEarlyErrorStream(error) { + const stream = new Readable({ + read() {} + }); + process.nextTick(() => stream.destroy(error)); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -94,6 +104,25 @@ async function doWriteStream() { } } +async function doWriteStreamError() { + const fileHandle = await open(errorDest, 'w+'); + const error = new Error('early file handle writeFile stream error'); + const stream = createEarlyErrorStream(error); + const uncaughtException = common.mustNotCall( + 'stream errors should reject FileHandle.writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fileHandle.writeFile(stream), + { message: error.message } + ); + } finally { + process.removeListener('uncaughtException', uncaughtException); + await fileHandle.close(); + } +} + async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -190,6 +219,7 @@ async function doWriteInvalidValues() { await validateWriteFile(); await doWriteAndCancel(); await doWriteStream(); + await doWriteStreamError(); await doWriteStreamWithCancel(); await doWriteIterable(); await doWriteInvalidIterable(); diff --git a/test/parallel/test-fs-promises-writefile.js b/test/parallel/test-fs-promises-writefile.js index 25df61b2b48414..b536cf54c04c85 100644 --- a/test/parallel/test-fs-promises-writefile.js +++ b/test/parallel/test-fs-promises-writefile.js @@ -13,6 +13,7 @@ tmpdir.refresh(); const dest = path.resolve(tmpDir, 'tmp.txt'); const otherDest = path.resolve(tmpDir, 'tmp-2.txt'); +const errorDest = path.resolve(tmpDir, 'tmp-error.txt'); const buffer = Buffer.from('abc'.repeat(1000)); const buffer2 = Buffer.from('xyz'.repeat(1000)); const stream = Readable.from(['a', 'b', 'c']); @@ -25,6 +26,16 @@ const iterable = { yield 'c'; } }; +const streamLikeIterable = { + expected: 'abc', + pipe: common.mustNotCall('pipe should not be called for custom iterables'), + on: common.mustNotCall('on should not be called without removeListener'), + *[Symbol.iterator]() { + yield 'a'; + yield 'b'; + yield 'c'; + } +}; const veryLargeBuffer = { expected: 'dogs running'.repeat(512 * 1024), @@ -40,6 +51,15 @@ function iterableWith(value) { } }; } + +function createEarlyErrorStream(error) { + const stream = new Readable({ + read() {} + }); + process.nextTick(() => stream.destroy(error)); + return stream; +} + const bufferIterable = { expected: 'abc', *[Symbol.iterator]() { @@ -70,6 +90,34 @@ async function doWriteStream() { assert.deepStrictEqual(data, expected); } +async function doWriteStreamError() { + const error = new Error('early writeFile stream error'); + const stream = createEarlyErrorStream(error); + const uncaughtException = common.mustNotCall( + 'stream errors should reject writeFile()'); + + process.once('uncaughtException', uncaughtException); + try { + await assert.rejects( + fsPromises.writeFile(errorDest, stream), + { message: error.message } + ); + assert.strictEqual(stream.listenerCount('error'), 0); + } finally { + process.removeListener('uncaughtException', uncaughtException); + } +} + +async function doWriteStreamOpenError() { + const stream = Readable.from(['a']); + + await assert.rejects( + fsPromises.writeFile(path.resolve(tmpDir, 'not-found', 'tmp.txt'), stream), + { code: 'ENOENT' } + ); + assert.strictEqual(stream.listenerCount('error'), 0); +} + async function doWriteStreamWithCancel() { const controller = new AbortController(); const { signal } = controller; @@ -86,6 +134,12 @@ async function doWriteIterable() { assert.deepStrictEqual(data, iterable.expected); } +async function doWriteStreamLikeIterable() { + await fsPromises.writeFile(dest, streamLikeIterable); + const data = fs.readFileSync(dest, 'utf-8'); + assert.deepStrictEqual(data, streamLikeIterable.expected); +} + async function doWriteInvalidIterable() { await Promise.all( [42, 42n, {}, Symbol('42'), true, undefined, null, NaN].map((value) => @@ -168,8 +222,11 @@ async function doReadWithEncoding() { await doRead(); await doReadWithEncoding(); await doWriteStream(); + await doWriteStreamError(); + await doWriteStreamOpenError(); await doWriteStreamWithCancel(); await doWriteIterable(); + await doWriteStreamLikeIterable(); await doWriteInvalidIterable(); await doWriteIterableWithEncoding(); await doWriteBufferIterable();