From 170f6bf7663be75041c75e1f61f03285e827bf08 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=B3=C3=B0i=20Karlsson?= Date: Thu, 4 Jun 2026 11:06:09 +0200 Subject: [PATCH 1/3] feat(nodejs-lib): harden pipeline mapper typings --- .../nodejs-lib/src/stream/pipeline.test.ts | 28 +++++++++++++++++++ packages/nodejs-lib/src/stream/pipeline.ts | 8 +++--- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/packages/nodejs-lib/src/stream/pipeline.test.ts b/packages/nodejs-lib/src/stream/pipeline.test.ts index 650ffac4a..009c68bc4 100644 --- a/packages/nodejs-lib/src/stream/pipeline.test.ts +++ b/packages/nodejs-lib/src/stream/pipeline.test.ts @@ -1,4 +1,5 @@ import { Readable } from 'node:stream' +import { END } from '@naturalcycles/js-lib/types' import { expect, test } from 'vitest' import { Pipeline } from './pipeline.js' import type { ReadableTyped } from './stream.model.js' @@ -16,6 +17,33 @@ test('Pipeline', async () => { expect(r).toEqual(['p_1', 'p_2', 'p_3']) }) +test('forEach returning END aborts the source', async () => { + const seen: string[] = [] + + await Pipeline.from(new HonestReadable(100, 'p')).forEach( + async item => { + seen.push(item) + if (seen.length >= 5) return END + }, + { concurrency: 1 }, + ) + + expect(seen.length).toBeGreaterThanOrEqual(5) + expect(seen.length).toBeLessThan(100) +}) + +test('forEachSync returning END aborts the source', async () => { + const seen: string[] = [] + + await Pipeline.from(new HonestReadable(100, 'p')).forEachSync(item => { + seen.push(item) + if (seen.length >= 5) return END + }) + + expect(seen.length).toBeGreaterThanOrEqual(5) + expect(seen.length).toBeLessThan(100) +}) + /** * Readable that Honestly respects backpressure. */ diff --git a/packages/nodejs-lib/src/stream/pipeline.ts b/packages/nodejs-lib/src/stream/pipeline.ts index 444b9c9ce..8e18d1e4e 100644 --- a/packages/nodejs-lib/src/stream/pipeline.ts +++ b/packages/nodejs-lib/src/stream/pipeline.ts @@ -236,12 +236,12 @@ export class Pipeline { return this } - tap(fn: AsyncIndexedMapper, opt?: TransformOptions): this { + tap(fn: AsyncIndexedMapper, opt?: TransformOptions): this { this.transforms.push(transformTap(fn, opt)) return this } - tapSync(fn: IndexedMapper, opt?: TransformOptions): this { + tapSync(fn: IndexedMapper, opt?: TransformOptions): this { this.transforms.push(transformTapSync(fn, opt)) return this } @@ -420,7 +420,7 @@ export class Pipeline { } async forEach( - fn: AsyncIndexedMapper, + fn: AsyncIndexedMapper, opt: TransformMapOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push( @@ -437,7 +437,7 @@ export class Pipeline { } async forEachSync( - fn: IndexedMapper, + fn: IndexedMapper, opt: TransformMapSyncOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push( From 1fd36ea4ce4446022f22d4a961504ad1c4e88d0a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=B3=C3=B0i=20Karlsson?= <53127288+frodi-karlsson@users.noreply.github.com> Date: Thu, 4 Jun 2026 19:30:41 +0200 Subject: [PATCH 2/3] Update packages/nodejs-lib/src/stream/pipeline.ts Co-authored-by: Kirill Groshkov --- packages/nodejs-lib/src/stream/pipeline.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/nodejs-lib/src/stream/pipeline.ts b/packages/nodejs-lib/src/stream/pipeline.ts index 8e18d1e4e..facfde92c 100644 --- a/packages/nodejs-lib/src/stream/pipeline.ts +++ b/packages/nodejs-lib/src/stream/pipeline.ts @@ -420,7 +420,7 @@ export class Pipeline { } async forEach( - fn: AsyncIndexedMapper, + fn: AbortableAsyncMapper, opt: TransformMapOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push( From 3100efb4174dd0a0612faac76d6ff468c36925dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fr=C3=B3=C3=B0i=20Karlsson?= Date: Thu, 4 Jun 2026 19:32:12 +0200 Subject: [PATCH 3/3] refactor: abortable sync as well --- packages/nodejs-lib/src/stream/pipeline.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/nodejs-lib/src/stream/pipeline.ts b/packages/nodejs-lib/src/stream/pipeline.ts index facfde92c..2309836b7 100644 --- a/packages/nodejs-lib/src/stream/pipeline.ts +++ b/packages/nodejs-lib/src/stream/pipeline.ts @@ -8,6 +8,7 @@ import { createAbortableSignal } from '@naturalcycles/js-lib' import { _passthroughPredicate } from '@naturalcycles/js-lib/types' import type { AbortableAsyncMapper, + AbortableMapper, AsyncIndexedMapper, AsyncPredicate, END, @@ -437,7 +438,7 @@ export class Pipeline { } async forEachSync( - fn: IndexedMapper, + fn: AbortableMapper, opt: TransformMapSyncOptions & TransformLogProgressOptions = {}, ): Promise { this.transforms.push(