diff --git a/lib/internal/streams/iter/from.js b/lib/internal/streams/iter/from.js index 1efe83e9a04162..2c793fb2f2f0f5 100644 --- a/lib/internal/streams/iter/from.js +++ b/lib/internal/streams/iter/from.js @@ -8,7 +8,6 @@ const { ArrayBufferIsView, ArrayIsArray, - ArrayPrototypeEvery, ArrayPrototypePush, ArrayPrototypeSlice, DataViewPrototypeGetBuffer, @@ -295,14 +294,6 @@ async function* normalizeAsyncValue(value) { return; } - // Handle arrays (which are also iterable, but check first for efficiency) - if (ArrayIsArray(value)) { - for (let i = 0; i < value.length; i++) { - yield* normalizeAsyncValue(value[i]); - } - return; - } - // Handle async iterables (check before sync iterables since some objects // have both) if (isAsyncIterable(value)) { @@ -312,10 +303,22 @@ async function* normalizeAsyncValue(value) { return; } + // Handle arrays (which are also sync iterable, but check first for efficiency) + if (ArrayIsArray(value)) { + for (let i = 0; i < value.length; i++) { + // Note: array elements must be sync values; arrays containing + // async iterables are invalid. + yield* normalizeSyncValue(value[i]); + } + return; + } + // Handle sync iterables if (isSyncIterable(value)) { + // Note: this iteration is synchronous, since async iterables + // may not be nested within sync iterables. for (const item of value) { - yield* normalizeAsyncValue(item); + yield* normalizeSyncValue(item); } return; } @@ -391,16 +394,18 @@ async function* normalizeAsyncSource(source) { yield batch; batch = []; } - let asyncBatch = []; - for await (const chunk of normalizeAsyncValue(value)) { - ArrayPrototypePush(asyncBatch, chunk); - if (asyncBatch.length === FROM_BATCH_SIZE) { - yield asyncBatch; - asyncBatch = []; + let syncBatch = []; + // Note: this iteration is synchronous, since async iterables + // may not be nested within sync iterables. + for (const chunk of normalizeSyncValue(value)) { + ArrayPrototypePush(syncBatch, chunk); + if (syncBatch.length === FROM_BATCH_SIZE) { + yield syncBatch; + syncBatch = []; } } - if (asyncBatch.length > 0) { - yield asyncBatch; + if (syncBatch.length > 0) { + yield syncBatch; } } @@ -443,12 +448,18 @@ function fromSync(input) { }; } + // Check toStreamable protocol (takes precedence over iteration protocols). + // toAsyncStreamable is ignored entirely in fromSync. + if (typeof input[toStreamable] === 'function') { + return fromSync(input[toStreamable]()); + } + // Fast path: Uint8Array[] - yield in bounded sub-batches. // Yielding the entire array as one batch forces downstream transforms // to process all data at once, causing peak memory proportional to total // data volume. Sub-batching keeps peak memory bounded while preserving // the throughput benefit of batched processing. - if (ArrayIsArray(input)) { + if (isUint8ArrayBatch(input)) { if (input.length === 0) { return { __proto__: null, @@ -457,64 +468,37 @@ function fromSync(input) { }, }; } - // Check if it's an array of Uint8Array (common case) - if (isUint8Array(input[0])) { - const allUint8 = ArrayPrototypeEvery(input, isUint8Array); - if (allUint8) { - const batch = input; - return { - __proto__: null, - *[SymbolIterator]() { - if (batch.length <= FROM_BATCH_SIZE) { - yield batch; - } else { - for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) { - yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE); - } - } - }, - }; - } - } - } - - // Check toStreamable protocol (takes precedence over iteration protocols). - // toAsyncStreamable is ignored entirely in fromSync. - if (typeof input[toStreamable] === 'function') { - return fromSync(input[toStreamable]()); - } - // Reject explicit async inputs - if (isAsyncIterable(input)) { - throw new ERR_INVALID_ARG_TYPE( - 'input', - 'a synchronous input (not AsyncIterable)', - input, - ); - } - if (typeof input === 'object' && input !== null && typeof input.then === 'function') { - throw new ERR_INVALID_ARG_TYPE( - 'input', - 'a synchronous input (not Promise)', - input, - ); + const batch = input; + return { + __proto__: null, + *[SymbolIterator]() { + if (batch.length <= FROM_BATCH_SIZE) { + yield batch; + } else { + for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) { + yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE); + } + } + }, + }; } - // Must be a SyncStreamable - if (!isSyncIterable(input)) { - throw new ERR_INVALID_ARG_TYPE( - 'input', - ['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'toStreamable'], - input, - ); + // Other sync Streamables + if (isSyncIterable(input)) { + return { + __proto__: null, + *[SymbolIterator]() { + yield* normalizeSyncSource(input); + }, + }; } - return { - __proto__: null, - *[SymbolIterator]() { - yield* normalizeSyncSource(input); - }, - }; + throw new ERR_INVALID_ARG_TYPE( + 'input', + ['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'toStreamable'], + input, + ); } /** @@ -543,40 +527,6 @@ function from(input) { }; } - // Fast path: Uint8Array[] - yield in bounded sub-batches. - // Yielding the entire array as one batch forces downstream transforms - // to process all data at once, causing peak memory proportional to total - // data volume. Sub-batching keeps peak memory bounded while preserving - // the throughput benefit of batched processing. - if (ArrayIsArray(input)) { - if (input.length === 0) { - return { - __proto__: null, - async *[SymbolAsyncIterator]() { - // Empty - yield nothing - }, - }; - } - if (isUint8Array(input[0])) { - const allUint8 = ArrayPrototypeEvery(input, isUint8Array); - if (allUint8) { - const batch = input; - return { - __proto__: null, - async *[SymbolAsyncIterator]() { - if (batch.length <= FROM_BATCH_SIZE) { - yield batch; - } else { - for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) { - yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE); - } - } - }, - }; - } - } - } - // Check toAsyncStreamable protocol (takes precedence over toStreamable and // iteration protocols) if (typeof input[toAsyncStreamable] === 'function') { @@ -592,30 +542,79 @@ function from(input) { // itself (if tagged) and the resolved value. const resolved = await result; if (resolved?.[kValidatedSource]) { - yield* resolved[SymbolAsyncIterator](); + yield* resolved; return; } - yield* from(resolved)[SymbolAsyncIterator](); + yield* from(resolved); }, }; } // Check toStreamable protocol (takes precedence over iteration protocols) if (typeof input[toStreamable] === 'function') { - return from(input[toStreamable]()); + return { + __proto__: null, + async *[SymbolAsyncIterator]() { + // Note: use fromSync here, since toStreamable must not return + // an async source. + yield* fromSync(input[toStreamable]()); + }, + }; } - // Must be a Streamable (sync or async iterable) - if (!isSyncIterable(input) && !isAsyncIterable(input)) { - throw new ERR_INVALID_ARG_TYPE( - 'input', - ['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', - 'AsyncIterable', 'toStreamable', 'toAsyncStreamable'], - input, - ); + // Check async Streamable before sync cases + if (isAsyncIterable(input)) { + return normalizeAsyncSource(input); + } + + // Fast path: Uint8Array[] - yield in bounded sub-batches. + // Yielding the entire array as one batch forces downstream transforms + // to process all data at once, causing peak memory proportional to total + // data volume. Sub-batching keeps peak memory bounded while preserving + // the throughput benefit of batched processing. + if (isUint8ArrayBatch(input)) { + if (input.length === 0) { + return { + __proto__: null, + async *[SymbolAsyncIterator]() { + // Empty - yield nothing + }, + }; + } + + const batch = input; + return { + __proto__: null, + async *[SymbolAsyncIterator]() { + if (batch.length <= FROM_BATCH_SIZE) { + yield batch; + } else { + for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) { + yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE); + } + } + }, + }; } - return normalizeAsyncSource(input); + // Other sync Streamables + if (isSyncIterable(input)) { + return { + __proto__: null, + async *[SymbolAsyncIterator]() { + // Note: use normalizeSyncSource here, since sync iterables + // containing nested async iterables should raise an error. + yield* normalizeSyncSource(input); + }, + }; + } + + throw new ERR_INVALID_ARG_TYPE( + 'input', + ['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', + 'AsyncIterable', 'toStreamable', 'toAsyncStreamable'], + input, + ); } // =============================================================================