Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
235 changes: 117 additions & 118 deletions lib/internal/streams/iter/from.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
const {
ArrayBufferIsView,
ArrayIsArray,
ArrayPrototypeEvery,
ArrayPrototypePush,
ArrayPrototypeSlice,
DataViewPrototypeGetBuffer,
Expand Down Expand Up @@ -295,14 +294,6 @@ async function* normalizeAsyncValue(value) {
return;
}

// Handle arrays (which are also iterable, but check first for efficiency)
if (ArrayIsArray(value)) {
for (let i = 0; i < value.length; i++) {
yield* normalizeAsyncValue(value[i]);
}
return;
}

// Handle async iterables (check before sync iterables since some objects
// have both)
if (isAsyncIterable(value)) {
Expand All @@ -312,10 +303,22 @@ async function* normalizeAsyncValue(value) {
return;
}

// Handle arrays (which are also sync iterable, but check first for efficiency)
if (ArrayIsArray(value)) {
for (let i = 0; i < value.length; i++) {
// Note: array elements must be sync values; arrays containing
// async iterables are invalid.
yield* normalizeSyncValue(value[i]);
}
return;
}

// Handle sync iterables
if (isSyncIterable(value)) {
// Note: this iteration is synchronous, since async iterables
// may not be nested within sync iterables.
for (const item of value) {
yield* normalizeAsyncValue(item);
yield* normalizeSyncValue(item);
}
return;
}
Expand Down Expand Up @@ -391,16 +394,18 @@ async function* normalizeAsyncSource(source) {
yield batch;
batch = [];
}
let asyncBatch = [];
for await (const chunk of normalizeAsyncValue(value)) {
ArrayPrototypePush(asyncBatch, chunk);
if (asyncBatch.length === FROM_BATCH_SIZE) {
yield asyncBatch;
asyncBatch = [];
let syncBatch = [];
// Note: this iteration is synchronous, since async iterables
// may not be nested within sync iterables.
for (const chunk of normalizeSyncValue(value)) {
ArrayPrototypePush(syncBatch, chunk);
if (syncBatch.length === FROM_BATCH_SIZE) {
yield syncBatch;
syncBatch = [];
}
}
if (asyncBatch.length > 0) {
yield asyncBatch;
if (syncBatch.length > 0) {
yield syncBatch;
}
}

Expand Down Expand Up @@ -443,12 +448,18 @@ function fromSync(input) {
};
}

// Check toStreamable protocol (takes precedence over iteration protocols).
// toAsyncStreamable is ignored entirely in fromSync.
if (typeof input[toStreamable] === 'function') {
return fromSync(input[toStreamable]());
}

// Fast path: Uint8Array[] - yield in bounded sub-batches.
// Yielding the entire array as one batch forces downstream transforms
// to process all data at once, causing peak memory proportional to total
// data volume. Sub-batching keeps peak memory bounded while preserving
// the throughput benefit of batched processing.
if (ArrayIsArray(input)) {
if (isUint8ArrayBatch(input)) {
if (input.length === 0) {
return {
__proto__: null,
Expand All @@ -457,64 +468,37 @@ function fromSync(input) {
},
};
}
// Check if it's an array of Uint8Array (common case)
if (isUint8Array(input[0])) {
const allUint8 = ArrayPrototypeEvery(input, isUint8Array);
if (allUint8) {
const batch = input;
return {
__proto__: null,
*[SymbolIterator]() {
if (batch.length <= FROM_BATCH_SIZE) {
yield batch;
} else {
for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) {
yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE);
}
}
},
};
}
}
}

// Check toStreamable protocol (takes precedence over iteration protocols).
// toAsyncStreamable is ignored entirely in fromSync.
if (typeof input[toStreamable] === 'function') {
return fromSync(input[toStreamable]());
}

// Reject explicit async inputs
if (isAsyncIterable(input)) {
throw new ERR_INVALID_ARG_TYPE(
'input',
'a synchronous input (not AsyncIterable)',
input,
);
}
if (typeof input === 'object' && input !== null && typeof input.then === 'function') {
throw new ERR_INVALID_ARG_TYPE(
'input',
'a synchronous input (not Promise)',
input,
);
const batch = input;
return {
__proto__: null,
*[SymbolIterator]() {
if (batch.length <= FROM_BATCH_SIZE) {
yield batch;
} else {
for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) {
yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE);
}
}
},
};
}

// Must be a SyncStreamable
if (!isSyncIterable(input)) {
throw new ERR_INVALID_ARG_TYPE(
'input',
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'toStreamable'],
input,
);
// Other sync Streamables
if (isSyncIterable(input)) {
return {
__proto__: null,
*[SymbolIterator]() {
yield* normalizeSyncSource(input);
},
};
}

return {
__proto__: null,
*[SymbolIterator]() {
yield* normalizeSyncSource(input);
},
};
throw new ERR_INVALID_ARG_TYPE(
'input',
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable', 'toStreamable'],
input,
);
}

/**
Expand Down Expand Up @@ -543,40 +527,6 @@ function from(input) {
};
}

// Fast path: Uint8Array[] - yield in bounded sub-batches.
// Yielding the entire array as one batch forces downstream transforms
// to process all data at once, causing peak memory proportional to total
// data volume. Sub-batching keeps peak memory bounded while preserving
// the throughput benefit of batched processing.
if (ArrayIsArray(input)) {
if (input.length === 0) {
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
// Empty - yield nothing
},
};
}
if (isUint8Array(input[0])) {
const allUint8 = ArrayPrototypeEvery(input, isUint8Array);
if (allUint8) {
const batch = input;
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
if (batch.length <= FROM_BATCH_SIZE) {
yield batch;
} else {
for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) {
yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE);
}
}
},
};
}
}
}

// Check toAsyncStreamable protocol (takes precedence over toStreamable and
// iteration protocols)
if (typeof input[toAsyncStreamable] === 'function') {
Expand All @@ -592,30 +542,79 @@ function from(input) {
// itself (if tagged) and the resolved value.
const resolved = await result;
if (resolved?.[kValidatedSource]) {
yield* resolved[SymbolAsyncIterator]();
yield* resolved;
return;
}
yield* from(resolved)[SymbolAsyncIterator]();
yield* from(resolved);
},
};
}

// Check toStreamable protocol (takes precedence over iteration protocols)
if (typeof input[toStreamable] === 'function') {
return from(input[toStreamable]());
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
// Note: use fromSync here, since toStreamable must not return
// an async source.
yield* fromSync(input[toStreamable]());
},
};
}

// Must be a Streamable (sync or async iterable)
if (!isSyncIterable(input) && !isAsyncIterable(input)) {
throw new ERR_INVALID_ARG_TYPE(
'input',
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable',
'AsyncIterable', 'toStreamable', 'toAsyncStreamable'],
input,
);
// Check async Streamable before sync cases
if (isAsyncIterable(input)) {
return normalizeAsyncSource(input);
}

// Fast path: Uint8Array[] - yield in bounded sub-batches.
// Yielding the entire array as one batch forces downstream transforms
// to process all data at once, causing peak memory proportional to total
// data volume. Sub-batching keeps peak memory bounded while preserving
// the throughput benefit of batched processing.
if (isUint8ArrayBatch(input)) {
if (input.length === 0) {
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
// Empty - yield nothing
},
};
}

const batch = input;
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
if (batch.length <= FROM_BATCH_SIZE) {
yield batch;
} else {
for (let i = 0; i < batch.length; i += FROM_BATCH_SIZE) {
yield ArrayPrototypeSlice(batch, i, i + FROM_BATCH_SIZE);
}
}
},
};
}

return normalizeAsyncSource(input);
// Other sync Streamables
if (isSyncIterable(input)) {
return {
__proto__: null,
async *[SymbolAsyncIterator]() {
// Note: use normalizeSyncSource here, since sync iterables
// containing nested async iterables should raise an error.
yield* normalizeSyncSource(input);
},
};
}

throw new ERR_INVALID_ARG_TYPE(
'input',
['string', 'ArrayBuffer', 'ArrayBufferView', 'Iterable',
'AsyncIterable', 'toStreamable', 'toAsyncStreamable'],
input,
);
}

// =============================================================================
Expand Down
Loading