diff --git a/packages/nodejs-lib/src/stream/pipeline.test.ts b/packages/nodejs-lib/src/stream/pipeline.test.ts index 650ffac4..009c68bc 100644 --- a/packages/nodejs-lib/src/stream/pipeline.test.ts +++ b/packages/nodejs-lib/src/stream/pipeline.test.ts @@ -1,4 +1,5 @@ import { Readable } from 'node:stream' +import { END } from '@naturalcycles/js-lib/types' import { expect, test } from 'vitest' import { Pipeline } from './pipeline.js' import type { ReadableTyped } from './stream.model.js' @@ -16,6 +17,33 @@ test('Pipeline', async () => { expect(r).toEqual(['p_1', 'p_2', 'p_3']) }) +test('forEach returning END aborts the source', async () => { + const seen: string[] = [] + + await Pipeline.from(new HonestReadable(100, 'p')).forEach( + async item => { + seen.push(item) + if (seen.length >= 5) return END + }, + { concurrency: 1 }, + ) + + expect(seen.length).toBeGreaterThanOrEqual(5) + expect(seen.length).toBeLessThan(100) +}) + +test('forEachSync returning END aborts the source', async () => { + const seen: string[] = [] + + await Pipeline.from(new HonestReadable(100, 'p')).forEachSync(item => { + seen.push(item) + if (seen.length >= 5) return END + }) + + expect(seen.length).toBeGreaterThanOrEqual(5) + expect(seen.length).toBeLessThan(100) +}) + /** * Readable that Honestly respects backpressure. */ diff --git a/packages/nodejs-lib/src/stream/pipeline.ts b/packages/nodejs-lib/src/stream/pipeline.ts index 444b9c9c..2309836b 100644 --- a/packages/nodejs-lib/src/stream/pipeline.ts +++ b/packages/nodejs-lib/src/stream/pipeline.ts @@ -8,6 +8,7 @@ import { createAbortableSignal } from '@naturalcycles/js-lib' import { _passthroughPredicate } from '@naturalcycles/js-lib/types' import type { AbortableAsyncMapper, + AbortableMapper, AsyncIndexedMapper, AsyncPredicate, END, @@ -236,12 +237,12 @@ export class Pipeline { return this } - tap(fn: AsyncIndexedMapper, opt?: TransformOptions): this { + tap(fn: AsyncIndexedMapper, opt?: TransformOptions): this { this.transforms.push(transformTap(fn, opt)) return this } - tapSync(fn: IndexedMapper, opt?: TransformOptions): this { + tapSync(fn: IndexedMapper, opt?: TransformOptions): this { this.transforms.push(transformTapSync(fn, opt)) return this } @@ -420,7 +421,7 @@ export class Pipeline { } async forEach( - fn: AsyncIndexedMapper, + fn: AbortableAsyncMapper, opt: TransformMapOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push( @@ -437,7 +438,7 @@ export class Pipeline { } async forEachSync( - fn: IndexedMapper, + fn: AbortableMapper, opt: TransformMapSyncOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push(