From 63afe6a9c6bf2329b051f451a1bb864e9f803f62 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 1 Apr 2026 15:38:53 +0100 Subject: [PATCH 01/18] set min-release-age --- .changeset/plain-breads-follow.md | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .changeset/plain-breads-follow.md diff --git a/.changeset/plain-breads-follow.md b/.changeset/plain-breads-follow.md new file mode 100644 index 000000000..2adfe057a --- /dev/null +++ b/.changeset/plain-breads-follow.md @@ -0,0 +1,7 @@ +--- +'@openfn/runtime': patch +'@openfn/cli': patch +'@openfn/ws-worker': patch +--- + +When installing adaptors, only install versions that have been released for 24hours From 88b3298c6ad97eeeed200825cc4292bf0083f093 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 1 Apr 2026 16:51:27 +0100 Subject: [PATCH 02/18] versions --- .changeset/plain-breads-follow.md | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 .changeset/plain-breads-follow.md diff --git a/.changeset/plain-breads-follow.md b/.changeset/plain-breads-follow.md deleted file mode 100644 index 2adfe057a..000000000 --- a/.changeset/plain-breads-follow.md +++ /dev/null @@ -1,7 +0,0 @@ ---- -'@openfn/runtime': patch -'@openfn/cli': patch -'@openfn/ws-worker': patch ---- - -When installing adaptors, only install versions that have been released for 24hours From 5f99a414572c79e7e00da582d17b0dca3629ac8a Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 2 Apr 2026 09:15:16 +0100 Subject: [PATCH 03/18] debugging flaky test --- integration-tests/worker/test/integration.test.ts | 3 ++- packages/ws-worker/src/util/send-event.ts | 2 ++ 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index dd7492ded..b7b5093a6 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -920,7 +920,8 @@ test.serial('set a default payload limit on the worker', (t) => { }); }); -test.serial('override the worker payload through run options', (t) => { +// this is being flaky!! is this new flakiness? +test.serial.only('override the worker payload through run options', (t) => { return new Promise(async (done) => { if (!worker.destroyed) { await worker.destroy(); diff --git a/packages/ws-worker/src/util/send-event.ts b/packages/ws-worker/src/util/send-event.ts index f4cad446f..0f08184aa 100644 --- a/packages/ws-worker/src/util/send-event.ts +++ b/packages/ws-worker/src/util/send-event.ts @@ -20,6 +20,8 @@ export const sendEvent = ( const { channel, logger, id: runId = '' } = context; + console.log('>> ', event); + return new Promise((resolve, reject) => { const report = (error: any) => { logger.error(`${runId} :: ${event} :: ERR: ${error.message || error}`); From a7519d7d20826bd6c878450e2bf1786d8e6e4c62 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 2 Apr 2026 16:51:45 +0100 Subject: [PATCH 04/18] fix an issue where the batch is never clear --- packages/ws-worker/src/api/process-events.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index bf5aceca6..c781131e0 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -73,7 +73,7 @@ export function eventProcessor( let activeBatch: string | null = null; let batch: any = []; - let batchTimeout: NodeJS.Timeout; + let batchTimeout: NodeJS.Timeout | null = null; let didFinish = false; let timeoutHandle: NodeJS.Timeout; @@ -104,7 +104,9 @@ export function eventProcessor( }; const sendBatch = async (name: string) => { - clearTimeout(batchTimeout); + clearTimeout(batchTimeout!); + batchTimeout = null; + // first clear the batch activeBatch = null; await send(name, batch, batch.length); @@ -116,7 +118,9 @@ export function eventProcessor( const start = Date.now(); // @ts-ignore const lightningEvent = eventMap[name] ?? name; + console.log('!! calling ', name); await callbacks[name](context, payload); + console.log('!! finished ', name); if (batchSize) { logger.info( `${planId} :: sent ${lightningEvent} (${batchSize}):: OK :: ${ @@ -140,6 +144,7 @@ export function eventProcessor( const process = async (name: string, event: any) => { // TODO this actually shouldn't be here - should be done separately + console.log('<<<<<<<< ', name); if (name !== 'workflow-log') { Sentry.addBreadcrumb({ category: 'event', @@ -196,9 +201,11 @@ export function eventProcessor( }; const enqueue = (name: string, event: any) => { + console.log('>>>>> ', name); queue.push({ name, event }); if (queue.length == 1) { + // if an event is still in flight, will this cause a duplicate? next(); } }; From e6c0919bc43f7c4ce4cb48727c851a997b013d6b Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 2 Apr 2026 17:58:43 +0100 Subject: [PATCH 05/18] fix a timing issue when sending batch events Big help from claude --- packages/ws-worker/src/api/process-events.ts | 14 +++-- .../ws-worker/test/api/process-event.test.ts | 56 +++++++++++++++++++ 2 files changed, 64 insertions(+), 6 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index c781131e0..b9ff0edcb 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -74,10 +74,14 @@ export function eventProcessor( let activeBatch: string | null = null; let batch: any = []; let batchTimeout: NodeJS.Timeout | null = null; + let batchSendPromise: Promise | null = null; let didFinish = false; let timeoutHandle: NodeJS.Timeout; const next = async () => { + if (batchSendPromise) { + await batchSendPromise; + } const evt = queue[0]; if (evt) { didFinish = false; @@ -118,9 +122,7 @@ export function eventProcessor( const start = Date.now(); // @ts-ignore const lightningEvent = eventMap[name] ?? name; - console.log('!! calling ', name); await callbacks[name](context, payload); - console.log('!! finished ', name); if (batchSize) { logger.info( `${planId} :: sent ${lightningEvent} (${batchSize}):: OK :: ${ @@ -144,7 +146,6 @@ export function eventProcessor( const process = async (name: string, event: any) => { // TODO this actually shouldn't be here - should be done separately - console.log('<<<<<<<< ', name); if (name !== 'workflow-log') { Sentry.addBreadcrumb({ category: 'event', @@ -188,8 +189,10 @@ export function eventProcessor( // finally wait for a time before sending the batch if (!batchTimeout) { const batchName = activeBatch!; - batchTimeout = setTimeout(async () => { - sendBatch(batchName); + batchTimeout = setTimeout(() => { + batchSendPromise = sendBatch(batchName).finally(() => { + batchSendPromise = null; + }); }, interval); } } else { @@ -201,7 +204,6 @@ export function eventProcessor( }; const enqueue = (name: string, event: any) => { - console.log('>>>>> ', name); queue.push({ name, event }); if (queue.length == 1) { diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 6687ab53f..d9f57ace4 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -710,6 +710,62 @@ test('queue events behind a slow event II', async (t) => { t.is(events[1], 10); }); +// Regression test for ordering race condition when batch timeout fires +// while the queue is empty, and a new event arrives during the slow send. +// +// Timeline: +// t=0ms LOG1 arrives, batch opens, timeout set for t=5ms +// t=5ms Timeout fires: activeBatch=null (sync), then awaits slow send (~50ms) +// t=30ms Job's async work completes, JOB_COMPLETE arrives +// Queue is empty, activeBatch is null -> next() fires immediately +// JOB_COMPLETE sends and completes before the log batch resolves +// t=55ms Log batch send finally resolves +// +// Without a fix: events = ['job-complete', 'log'] (wrong order) +test('batch timeout send should not race with subsequent events', async (t) => { + const engine = await createMockEngine(); + const plan = createPlan( + `fn(async (s) => { + console.log(1); + await new Promise(resolve => setTimeout(resolve, 30)); + return {}; + })` + ); + + const context = { + id: 'a', + plan, + options: {}, + logger, + }; + + const events: string[] = []; + + const callbacks = { + [WORKFLOW_LOG]: async (_ctx: any, _event: any) => { + // Slow send simulates real websocket latency + await new Promise(resolve => setTimeout(resolve, 50)); + events.push('log'); + }, + [JOB_COMPLETE]: async (_ctx: any, _event: any) => { + events.push('job-complete'); + }, + }; + + const options = { + batch: { [WORKFLOW_LOG]: true }, + batchInterval: 5, // fires well before the job's 30ms async completes + }; + + eventProcessor(engine, context as any, callbacks, options); + + await engine.execute(plan, {}); + await waitForAsync(200); + + t.is(events[0], 'log'); + t.is(events[1], 'job-complete'); +}); + test('should timeout and continue processing when event handler hangs', async (t) => { const engine = await createMockEngine(); const plan = createPlan(); From 9b223c2325d1838eefc6941050741aa570101e59 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Tue, 7 Apr 2026 09:08:54 +0100 Subject: [PATCH 06/18] logging --- packages/ws-worker/src/api/process-events.ts | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index b9ff0edcb..bcfa4b1c6 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -108,6 +108,7 @@ export function eventProcessor( }; const sendBatch = async (name: string) => { + console.log('sending batch', name); clearTimeout(batchTimeout!); batchTimeout = null; @@ -145,6 +146,7 @@ export function eventProcessor( }; const process = async (name: string, event: any) => { + console.log('process', name); // TODO this actually shouldn't be here - should be done separately if (name !== 'workflow-log') { Sentry.addBreadcrumb({ @@ -207,8 +209,11 @@ export function eventProcessor( queue.push({ name, event }); if (queue.length == 1) { + console.log(`[${name}] executing immediately`); // if an event is still in flight, will this cause a duplicate? next(); + } else { + console.log(`[${name}] deffering event`); } }; From bf3dd8111830c16ba6068bc16bb805cd8bc95403 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 8 Apr 2026 16:12:06 +0100 Subject: [PATCH 07/18] add a bunch of more controlled unit tests --- packages/ws-worker/src/api/process-events.ts | 8 +- .../ws-worker/test/api/process-event.test.ts | 253 ++++++++++++++++-- 2 files changed, 240 insertions(+), 21 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index bcfa4b1c6..4958380f6 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -67,6 +67,7 @@ export function eventProcessor( batchLimit: limit = DEFAULT_BATCH_LIMIT, batchInterval: interval = DEFAULT_BATCH_INTERVAL, timeout_ms, + events, } = options; const queue: any = []; @@ -211,16 +212,19 @@ export function eventProcessor( if (queue.length == 1) { console.log(`[${name}] executing immediately`); // if an event is still in flight, will this cause a duplicate? - next(); + setImmediate(next); } else { console.log(`[${name}] deffering event`); } }; - const e = allEngineEvents.reduce( + const e = (events || allEngineEvents).reduce( (obj, e) => Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }), {} ); engine.listen(planId, e); + + // return debug state + return { queue }; } diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index d9f57ace4..2f6b0ff85 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -14,6 +14,7 @@ import createMockEngine from '../../src/mock/runtime-engine'; import type { ExecutionPlan } from '@openfn/lexicon'; import { createMockLogger } from '@openfn/logger'; +import { EventEmitter } from 'node:events'; const logger = createMockLogger(); @@ -40,7 +41,221 @@ const createPlan = (...expressions: string[]) => options: {}, } as ExecutionPlan); -test('should process a workflow-start event and call the callback', async (t) => { +const createFakeEngine = () => { + const bus = new EventEmitter(); + return { + listen: (_id: string, events) => { + for (const evt in events) { + bus.on(evt, (...args) => { + events[evt](...args); + }); + } + }, + emit: (name, payload) => bus.emit(name, payload), + }; +}; + +const createCallbacks = (events: Record) => { + const obj = {}; + for (const event in events) { + const fn = (...args) => { + fn.count++; + return events[event](...args); + }; + fn.count = 0; + obj[event] = fn; + } + + return obj; +}; + +// TODO try simpler tests with full control - don't use the engine +test('should process one event', async (t) => { + const callbacks = createCallbacks({ test: () => {} }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + const { queue } = eventProcessor(engine, context, callbacks, { + events: ['test'], + }); + + t.is(queue.length, 0); + + engine.emit('test', {}); + + t.is(queue.length, 1); + + await waitForAsync(5); + + t.is(callbacks.test.count, 1); +}); + +test('should process several events in order', async (t) => { + const result = []; + + const callbacks = createCallbacks({ + test: (context, evt) => { + result.push(evt.id); + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + const { queue } = eventProcessor(engine, context, callbacks, { + events: ['test'], + }); + + t.is(queue.length, 0); + + engine.emit('test', { id: 1 }); + engine.emit('test', { id: 2 }); + engine.emit('test', { id: 3 }); + + await waitForAsync(10); + + t.is(callbacks.test.count, 3); + t.deepEqual(result, [1, 2, 3]); +}); + +test('should process 100 events in order', async (t) => { + t.plan(100); + return new Promise((resolve) => { + const results = []; + + const finish = () => { + console.log('finishing'); + results.forEach((r, idx) => { + // the 0th item should be 0, the 1st item 1, etc + t.is(r, idx); + }); + console.log('resolving...'); + resolve(); + }; + + const callbacks = createCallbacks({ + test: (context, evt) => { + results.push(evt.id); + + if (evt.id === 99) { + finish(); + } + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + }); + + new Array(100).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +test('should send a batch after a default timeout', async (t) => { + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (context, evt) => { + // We should have a single event with 11 entries + t.is(evt.length, 11); + t.is(callbacks.test.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + batch: { test: 1 }, + batchLimit: 20, + batchInterval: 20, + }); + + // send 11 events in quick succession + new Array(11).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +test.only('should send a batch when a limit is hit', async (t) => { + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (context, evt) => { + // We should have a single event with 11 entries + t.is(evt.length, 11); + t.is(callbacks.test.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + batch: { test: 1 }, + batchLimit: 11, + batchInterval: 1000 * 60 * 60, + }); + + // send 11 events in quick succession + new Array(11).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +// should send two batches + +test.only('should send two batches', async (t) => { + return new Promise((resolve) => { + let total = 0; + const callbacks = createCallbacks({ + test: (context, evt) => { + t.is(evt.length, 6); + total += evt.length; + + if (total == 12) { + t.is(callbacks.test.count, 2); + resolve(); + } + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + batch: { test: 1 }, + batchLimit: 6, + batchInterval: 1000 * 60 * 60, + }); + + // send 12 events in quick succession + new Array(12).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +// should send the batch on interrupt + +test('integration: should process a workflow-start event and call the callback', async (t) => { t.plan(3); const engine = await createMockEngine(); @@ -69,7 +284,7 @@ test('should process a workflow-start event and call the callback', async (t) => await waitForAsync(); }); -test('should process a workflow-complete event and call the callback', async (t) => { +test('integration: should process a workflow-complete event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); @@ -97,7 +312,7 @@ test('should process a workflow-complete event and call the callback', async (t) await waitForAsync(); }); -test('should process a job-start event and call the callback', async (t) => { +test('integration: should process a job-start event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); @@ -125,7 +340,7 @@ test('should process a job-start event and call the callback', async (t) => { await waitForAsync(); }); -test('should process a job-complete event and call the callback', async (t) => { +test('integration: should process a job-complete event and call the callback', async (t) => { t.plan(5); const engine = await createMockEngine(); @@ -154,7 +369,7 @@ test('should process a job-complete event and call the callback', async (t) => { await waitForAsync(); }); -test('should process a workflow-log event and call the callback', async (t) => { +test('integration: should process a workflow-log event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); @@ -182,7 +397,7 @@ test('should process a workflow-log event and call the callback', async (t) => { await waitForAsync(); }); -test('should process a job-error event and call the callback', async (t) => { +test('integration: should process a job-error event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); @@ -210,7 +425,7 @@ test('should process a job-error event and call the callback', async (t) => { await new Promise((resolve) => setTimeout(resolve, 50)); }); -test('should process a workflow-error event and call the callback', async (t) => { +test('integration: should process a workflow-error event and call the callback', async (t) => { t.plan(5); const engine = await createMockEngine(); @@ -239,7 +454,7 @@ test('should process a workflow-error event and call the callback', async (t) => await new Promise((resolve) => setTimeout(resolve, 50)); }); -test('should process events in the correct order', async (t) => { +test('integration: should process events in the correct order', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -306,7 +521,7 @@ test('should process events in the correct order', async (t) => { t.assert(events.every((e) => e.workflowId === 'a')); }); -test('should batch sequential log events', async (t) => { +test('integration: should batch sequential log events', async (t) => { t.plan(4); const engine = await createMockEngine(); @@ -348,7 +563,7 @@ test('should batch sequential log events', async (t) => { }); // 3 logs will be sent within 10ms, but they'll be interrupted by step:complete and step:start -test('should interrupt a batch of log events', async (t) => { +test('integration: should interrupt a batch of log events', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -399,7 +614,7 @@ test('should interrupt a batch of log events', async (t) => { t.is(second, 1); }); -test('should respect the limit', async (t) => { +test('integration: should respect the limit', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -443,7 +658,7 @@ test('should respect the limit', async (t) => { t.is(events[1].length, 2); }); -test('should respect the interval', async (t) => { +test('integration: should respect the interval', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn(async (s) => { @@ -486,7 +701,7 @@ test('should respect the interval', async (t) => { t.is(events[1].length, 1); }); -test('should handle two batches of logs', async (t) => { +test('integration: should handle two batches of logs', async (t) => { const engine = await createMockEngine(); // syntax is weird because of how the fake RTE works const plan = createPlan( @@ -535,7 +750,7 @@ test('should handle two batches of logs', async (t) => { t.is(second, 2); }); -test('should process events in the correct order with batching', async (t) => { +test('integration: should process events in the correct order with batching', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -597,7 +812,7 @@ test('should process events in the correct order with batching', async (t) => { t.is(events[4].type, 'workflow-complete'); }); -test('queue events behind a slow event', async (t) => { +test('integration: queue events behind a slow event', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -650,7 +865,7 @@ test('queue events behind a slow event', async (t) => { // This isn't the most watertight test - but I've debugged it closely and it seems // to do the right thing -test('queue events behind a slow event II', async (t) => { +test('integration: queue events behind a slow event II', async (t) => { const engine = await createMockEngine(); const plan = createPlan( ` @@ -722,7 +937,7 @@ test('queue events behind a slow event II', async (t) => { // t=55ms Log batch send finally resolves // // Without a fix: events = ['job-complete', 'log'] (wrong order) -test('batch timeout send should not race with subsequent events', async (t) => { +test('integration: batch timeout send should not race with subsequent events', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn(async (s) => { @@ -744,7 +959,7 @@ test('batch timeout send should not race with subsequent events', async (t) => { const callbacks = { [WORKFLOW_LOG]: async (_ctx: any, _event: any) => { // Slow send simulates real websocket latency - await new Promise(resolve => setTimeout(resolve, 50)); + await new Promise((resolve) => setTimeout(resolve, 50)); events.push('log'); }, [JOB_COMPLETE]: async (_ctx: any, _event: any) => { @@ -766,7 +981,7 @@ test('batch timeout send should not race with subsequent events', async (t) => { t.is(events[1], 'job-complete'); }); -test('should timeout and continue processing when event handler hangs', async (t) => { +test('integration: should timeout and continue processing when event handler hangs', async (t) => { const engine = await createMockEngine(); const plan = createPlan(); From b9dcddae8568bd173b0080a0e353b2574f8dceca Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 8 Apr 2026 16:39:10 +0100 Subject: [PATCH 08/18] test on interrupt --- .../ws-worker/test/api/process-event.test.ts | 38 ++++++++++++++++--- 1 file changed, 33 insertions(+), 5 deletions(-) diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 2f6b0ff85..40cbff0ba 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -189,7 +189,7 @@ test('should send a batch after a default timeout', async (t) => { }); }); -test.only('should send a batch when a limit is hit', async (t) => { +test('should send a batch when a limit is hit', async (t) => { return new Promise((resolve) => { const callbacks = createCallbacks({ test: (context, evt) => { @@ -218,9 +218,7 @@ test.only('should send a batch when a limit is hit', async (t) => { }); }); -// should send two batches - -test.only('should send two batches', async (t) => { +test('should send two batches', async (t) => { return new Promise((resolve) => { let total = 0; const callbacks = createCallbacks({ @@ -253,7 +251,37 @@ test.only('should send two batches', async (t) => { }); }); -// should send the batch on interrupt +test.only('should send a batch on interrupt', async (t) => { + t.plan(3); + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (context, evt) => { + t.is(evt.length, 6); + }, + interrupt: (context, evt) => { + t.is(callbacks.test.count, 1); + t.is(callbacks.interrupt.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test', 'interrupt'], + batch: { test: 1 }, + batchLimit: 99, + batchInterval: 1000 * 60 * 60, + }); + + new Array(6).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + engine.emit('interrupt', { id: 99 }); + }); +}); test('integration: should process a workflow-start event and call the callback', async (t) => { t.plan(3); From 72c0edbed94e0cfb74d2ca2139df8913f17b9c9e Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 8 Apr 2026 17:27:27 +0100 Subject: [PATCH 09/18] update and fix tests I think this this fixes the actual issue - I just want more good focused tests now --- packages/ws-worker/src/api/process-events.ts | 117 +++++++++++++----- .../ws-worker/test/api/process-event.test.ts | 47 ++++++- 2 files changed, 128 insertions(+), 36 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 4958380f6..2c7baa6fc 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -55,6 +55,20 @@ const allEngineEvents = [ * Queuing ensures events are sent in order while allowing batching to reduce network calls. * Batching helps with high-volume logs by sending fewer requests with larger payloads, * reducing websocket latency. Events batch by count or time interval, whichever comes first. + * + * The basic architecture is: + * - events are synchronously and immediately added to a queue + * - items are processed sequentially + * - after an item has been processed, we pull the next item from the queue + * + * If an event is flagged as batchable, we introduce new rules + * - We flag if a batch is "open" + * - A batch event will wait for the batch interval to expire before process () completes + * - So batch will usually block the async loop until the batch has naturally expired + * - The batch event can be interrupted early if a limit is hit, or a new event type comes in + * + * The batch exposes a danger of having two loops running async, so it's managed very carefully. + * After a batch event is sent, in some cases, the batch will trigger next */ export function eventProcessor( engine: RuntimeEngine, @@ -64,15 +78,15 @@ export function eventProcessor( ) { const { id: planId, logger } = context; const { - batchLimit: limit = DEFAULT_BATCH_LIMIT, - batchInterval: interval = DEFAULT_BATCH_INTERVAL, + batchLimit = DEFAULT_BATCH_LIMIT, + batchInterval = DEFAULT_BATCH_INTERVAL, timeout_ms, events, } = options; const queue: any = []; - let activeBatch: string | null = null; + let activeBatch: string | null | number = null; let batch: any = []; let batchTimeout: NodeJS.Timeout | null = null; let batchSendPromise: Promise | null = null; @@ -104,19 +118,34 @@ export function eventProcessor( } await process(evt.name, evt.event); + console.log(`finish ${evt.name}`); finish(); } }; - const sendBatch = async (name: string) => { - console.log('sending batch', name); - clearTimeout(batchTimeout!); - batchTimeout = null; + // If sending the batch early, we break the cycle of the main + // process loop + // So we need to control whether to trigger the next call, + // or whether the calling function will process the next item for us + // TODO: rename to exitEarly = false, and only early exists have to set this + const sendBatch = async (triggerNext = false) => { + if (activeBatch) { + console.log('sending batch', activeBatch, batch.length); + clearTimeout(batchTimeout!); + batchTimeout = null; + + // first clear the batch + const name = activeBatch; + activeBatch = Infinity; + await send(name, batch, batch.length); + activeBatch = null; + batch = []; - // first clear the batch - activeBatch = null; - await send(name, batch, batch.length); - batch = []; + if (triggerNext) { + queue.shift(); + next(); + } + } }; const send = async (name: string, payload: any, batchSize?: number) => { @@ -146,6 +175,15 @@ export function eventProcessor( } }; + const addToBatch = async (event: any) => { + batch.push(event); + + if (batch.length >= batchLimit) { + // If we're at the batch limit, return right away + return sendBatch(true); + } + }; + const process = async (name: string, event: any) => { console.log('process', name); // TODO this actually shouldn't be here - should be done separately @@ -159,15 +197,8 @@ export function eventProcessor( if (name === activeBatch) { // if there's a batch open, just push the event - batch.push(event); - - if (batch.length >= limit) { - await sendBatch(name); - } + await addToBatch(event); return; - } else if (activeBatch) { - // If a different event comes in, send the batch (and carry on processing the event) - await sendBatch(activeBatch); } if (name in callbacks) { @@ -179,24 +210,31 @@ export function eventProcessor( batch.push(event); // Next, peek ahead in the queue for more pending events - while (queue.length > 1 && queue[1].name === name) { - const [nextBatchItem] = queue.splice(1, 1); - batch.push(nextBatchItem.event); + while (queue.length > 1) { + if (queue[1].name === name) { + const [nextBatchItem] = queue.splice(1, 1); + batch.push(nextBatchItem.event); - if (batch.length >= limit) { - // If we're at the batch limit, return right away - return sendBatch(name); + if (batch.length >= batchLimit) { + // If we're at the batch limit, return right away + return sendBatch(true); + } + } else { + // If there's another pending item not a part of this batch, + // just send the batch now + // send the batch early + return sendBatch(true); } } - // finally wait for a time before sending the batch if (!batchTimeout) { - const batchName = activeBatch!; - batchTimeout = setTimeout(() => { - batchSendPromise = sendBatch(batchName).finally(() => { - batchSendPromise = null; - }); - }, interval); + // finally wait for a time before sending the batch + // This is the "natural" batch trigger + return new Promise((resolve) => { + batchTimeout = setTimeout(() => { + sendBatch(false).then(resolve); + }, batchInterval); + }); } } else { await send(name, event); @@ -207,12 +245,25 @@ export function eventProcessor( }; const enqueue = (name: string, event: any) => { + console.log('queue', name); + if (name === 'workflow-log') { + console.log(event.message); + } queue.push({ name, event }); if (queue.length == 1) { + // If this is the only item in the queue, start executing right away console.log(`[${name}] executing immediately`); - // if an event is still in flight, will this cause a duplicate? setImmediate(next); + } else if (activeBatch === name) { + addToBatch(event); + queue.pop(); + } else if (queue.length == 2 && batchTimeout) { + console.log('Sending batch early'); + // If this is the second item in the queue, and we have a batch active, + // send the batch early + // (note that this event will still be deferred) + sendBatch(true); } else { console.log(`[${name}] deffering event`); } diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 40cbff0ba..caaf7bec4 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -251,7 +251,7 @@ test('should send two batches', async (t) => { }); }); -test.only('should send a batch on interrupt', async (t) => { +test('should send a batch on interrupt with a full queue', async (t) => { t.plan(3); return new Promise((resolve) => { const callbacks = createCallbacks({ @@ -276,6 +276,8 @@ test.only('should send a batch on interrupt', async (t) => { batchInterval: 1000 * 60 * 60, }); + // TODO this sends a full queue + // We need to do more deferred stuff with events coming later new Array(6).fill(0).forEach((_v, idx) => { engine.emit('test', { id: idx }); }); @@ -283,6 +285,43 @@ test.only('should send a batch on interrupt', async (t) => { }); }); +test('should send a batch on interrupt with an async queue', async (t) => { + t.plan(3); + return new Promise(async (resolve) => { + const callbacks = createCallbacks({ + test: async (context, evt) => { + t.is(evt.length, 6); + await waitForAsync(5); + }, + interrupt: (context, evt) => { + t.is(callbacks.test.count, 1); + t.is(callbacks.interrupt.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test', 'interrupt'], + batch: { test: 1 }, + batchLimit: 99, + batchInterval: 1000 * 60 * 60, + }); + + // TODO this sends a full queue + // We need to do more deferred stuff with events coming later + for (let i = 0; i < 6; i++) { + await waitForAsync(2); + engine.emit('test', { id: i }); + } + await waitForAsync(2); + engine.emit('interrupt', { id: 99 }); + }); +}); + test('integration: should process a workflow-start event and call the callback', async (t) => { t.plan(3); @@ -691,7 +730,7 @@ test('integration: should respect the interval', async (t) => { const plan = createPlan( `fn(async (s) => { console.log(1); - await new Promise((resolve) => setTimeout(() => resolve(s), 5)), + await new Promise((resolve) => setTimeout(() => resolve(s), 10)), console.log(3); return {}; })` @@ -708,6 +747,7 @@ test('integration: should respect the interval', async (t) => { const callbacks = { [WORKFLOW_LOG]: (_ctx: any, event: any) => { + console.log({ event }); events.push(event); }, }; @@ -716,7 +756,8 @@ test('integration: should respect the interval', async (t) => { batch: { [WORKFLOW_LOG]: true, }, - batchInterval: 2, + // low interval so I expect this to send two small batches + batchInterval: 1, }; eventProcessor(engine, context as any, callbacks, options); From 189963fceef960363a666078c5facd62b532c75d Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 8 Apr 2026 17:56:00 +0100 Subject: [PATCH 10/18] tidy logging --- packages/ws-worker/src/api/process-events.ts | 25 +++++++++++++------ .../ws-worker/test/api/process-event.test.ts | 1 - 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 2c7baa6fc..0f13bcc5a 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -26,6 +26,8 @@ export type EventProcessorOptions = { batchInterval?: number; batchLimit?: number; timeout_ms?: number; + trace?: boolean; + events: Record; }; const DEFAULT_BATCH_LIMIT = 10; @@ -93,6 +95,13 @@ export function eventProcessor( let didFinish = false; let timeoutHandle: NodeJS.Timeout; + // TODO plug this in because the console tracing is super helpful + const trace = (...message) => { + if (options.trace) { + console.log(...message); + } + }; + const next = async () => { if (batchSendPromise) { await batchSendPromise; @@ -118,7 +127,7 @@ export function eventProcessor( } await process(evt.name, evt.event); - console.log(`finish ${evt.name}`); + trace(`finish ${evt.name}`); finish(); } }; @@ -130,7 +139,7 @@ export function eventProcessor( // TODO: rename to exitEarly = false, and only early exists have to set this const sendBatch = async (triggerNext = false) => { if (activeBatch) { - console.log('sending batch', activeBatch, batch.length); + trace('sending batch', activeBatch, batch.length); clearTimeout(batchTimeout!); batchTimeout = null; @@ -185,7 +194,7 @@ export function eventProcessor( }; const process = async (name: string, event: any) => { - console.log('process', name); + trace('process', name); // TODO this actually shouldn't be here - should be done separately if (name !== 'workflow-log') { Sentry.addBreadcrumb({ @@ -245,27 +254,27 @@ export function eventProcessor( }; const enqueue = (name: string, event: any) => { - console.log('queue', name); + trace('queue', name); if (name === 'workflow-log') { - console.log(event.message); + trace(event.message); } queue.push({ name, event }); if (queue.length == 1) { // If this is the only item in the queue, start executing right away - console.log(`[${name}] executing immediately`); + trace(`[${name}] executing immediately`); setImmediate(next); } else if (activeBatch === name) { addToBatch(event); queue.pop(); } else if (queue.length == 2 && batchTimeout) { - console.log('Sending batch early'); + trace('Sending batch early'); // If this is the second item in the queue, and we have a batch active, // send the batch early // (note that this event will still be deferred) sendBatch(true); } else { - console.log(`[${name}] deffering event`); + trace(`[${name}] deffering event`); } }; diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index caaf7bec4..8521229b1 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -747,7 +747,6 @@ test('integration: should respect the interval', async (t) => { const callbacks = { [WORKFLOW_LOG]: (_ctx: any, event: any) => { - console.log({ event }); events.push(event); }, }; From f02a09af6a831dc42885faf444c71af925cd8ea2 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 8 Apr 2026 18:32:09 +0100 Subject: [PATCH 11/18] more tests --- .../ws-worker/test/api/process-event.test.ts | 109 +++++++++++++++++- 1 file changed, 107 insertions(+), 2 deletions(-) diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 8521229b1..b8566c954 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -127,12 +127,10 @@ test('should process 100 events in order', async (t) => { const results = []; const finish = () => { - console.log('finishing'); results.forEach((r, idx) => { // the 0th item should be 0, the 1st item 1, etc t.is(r, idx); }); - console.log('resolving...'); resolve(); }; @@ -160,6 +158,36 @@ test('should process 100 events in order', async (t) => { }); }); +test('should process multiple different event types in order', async (t) => { + const result: number[] = []; + const callbacks = createCallbacks({ + foo: (_context: any, evt: any) => { + result.push(evt.id); + }, + bar: (_context: any, evt: any) => { + result.push(evt.id); + }, + baz: (_context: any, evt: any) => { + result.push(evt.id); + }, + }); + const engine = createFakeEngine(); + const context = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['foo', 'bar', 'baz'], + }); + + engine.emit('foo', { id: 1 }); + engine.emit('bar', { id: 2 }); + engine.emit('baz', { id: 3 }); + engine.emit('foo', { id: 4 }); + + await waitForAsync(20); + + t.deepEqual(result, [1, 2, 3, 4]); +}); + test('should send a batch after a default timeout', async (t) => { return new Promise((resolve) => { const callbacks = createCallbacks({ @@ -285,6 +313,35 @@ test('should send a batch on interrupt with a full queue', async (t) => { }); }); +test('should add deferred events directly to an open batch', async (t) => { + return new Promise(async (resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + t.is(evt.length, 2); + t.is(evt[0].id, 1); + t.is(evt[1].id, 2); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['test'], + batch: { test: true }, + batchLimit: 10, + batchInterval: 30, + }); + + // First event opens the batch and sets activeBatch + engine.emit('test', { id: 1 }); + // Wait for setImmediate to fire so activeBatch is set, but before the timeout + await waitForAsync(5); + // This event arrives while the batch is open — hits the activeBatch === name path + engine.emit('test', { id: 2 }); + }); +}); + test('should send a batch on interrupt with an async queue', async (t) => { t.plan(3); return new Promise(async (resolve) => { @@ -322,6 +379,54 @@ test('should send a batch on interrupt with an async queue', async (t) => { }); }); +test('should continue processing if a callback throws', async (t) => { + const result: number[] = []; + const callbacks = createCallbacks({ + broken: () => { + throw new Error('boom'); + }, + ok: (_context: any, evt: any) => { + result.push(evt.id); + }, + }); + const engine = createFakeEngine(); + const context = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['broken', 'ok'], + }); + + engine.emit('broken', {}); + engine.emit('ok', { id: 1 }); + engine.emit('ok', { id: 2 }); + + await waitForAsync(20); + + t.deepEqual(result, [1, 2]); +}); + +test('should send a batch of 1 when the timeout fires with a single queued event', async (t) => { + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + t.is(evt.length, 1); + t.is(callbacks.test.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['test'], + batch: { test: true }, + batchInterval: 10, + }); + + engine.emit('test', { id: 1 }); + }); +}); + test('integration: should process a workflow-start event and call the callback', async (t) => { t.plan(3); From 7ed1dbc596dc73691329c54eaf00cec20ce9f963 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Wed, 8 Apr 2026 18:32:55 +0100 Subject: [PATCH 12/18] changeset --- .changeset/five-hands-dance.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/five-hands-dance.md diff --git a/.changeset/five-hands-dance.md b/.changeset/five-hands-dance.md new file mode 100644 index 000000000..88a75f1b2 --- /dev/null +++ b/.changeset/five-hands-dance.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Fix a critical issue when processing batch events that could cause runs to be lost From 082b9e880df49aa0af4f1115a15ac3e341fb6fa0 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 9 Apr 2026 16:22:11 +0100 Subject: [PATCH 13/18] types --- packages/ws-worker/src/api/process-events.ts | 15 +-- .../ws-worker/test/api/process-event.test.ts | 106 +++++++++--------- 2 files changed, 61 insertions(+), 60 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 0f13bcc5a..0cae1f6aa 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -27,7 +27,7 @@ export type EventProcessorOptions = { batchLimit?: number; timeout_ms?: number; trace?: boolean; - events: Record; + events?: Record; }; const DEFAULT_BATCH_LIMIT = 10; @@ -88,7 +88,7 @@ export function eventProcessor( const queue: any = []; - let activeBatch: string | null | number = null; + let activeBatch: string | null = null; let batch: any = []; let batchTimeout: NodeJS.Timeout | null = null; let batchSendPromise: Promise | null = null; @@ -96,7 +96,7 @@ export function eventProcessor( let timeoutHandle: NodeJS.Timeout; // TODO plug this in because the console tracing is super helpful - const trace = (...message) => { + const trace = (...message: any) => { if (options.trace) { console.log(...message); } @@ -143,9 +143,9 @@ export function eventProcessor( clearTimeout(batchTimeout!); batchTimeout = null; - // first clear the batch - const name = activeBatch; - activeBatch = Infinity; + // first clear the batch (but leave it truthy) + const name = activeBatch as string; + activeBatch = '--'; await send(name, batch, batch.length); activeBatch = null; batch = []; @@ -279,7 +279,8 @@ export function eventProcessor( }; const e = (events || allEngineEvents).reduce( - (obj, e) => Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }), + (obj: any, e: string) => + Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }), {} ); diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index b8566c954..56186ec26 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -41,24 +41,24 @@ const createPlan = (...expressions: string[]) => options: {}, } as ExecutionPlan); -const createFakeEngine = () => { +const createFakeEngine = (): any => { const bus = new EventEmitter(); return { - listen: (_id: string, events) => { + listen: (_id: string, events: any) => { for (const evt in events) { bus.on(evt, (...args) => { events[evt](...args); }); } }, - emit: (name, payload) => bus.emit(name, payload), + emit: (name: string, payload: any) => bus.emit(name, payload), }; }; -const createCallbacks = (events: Record) => { - const obj = {}; +const createCallbacks = (events: Record): Record => { + const obj: Record = {}; for (const event in events) { - const fn = (...args) => { + const fn = (...args: any) => { fn.count++; return events[event](...args); }; @@ -73,7 +73,7 @@ const createCallbacks = (events: Record) => { test('should process one event', async (t) => { const callbacks = createCallbacks({ test: () => {} }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; @@ -93,15 +93,15 @@ test('should process one event', async (t) => { }); test('should process several events in order', async (t) => { - const result = []; + const result: any = []; const callbacks = createCallbacks({ - test: (context, evt) => { + test: (_context: any, evt: any) => { result.push(evt.id); }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; @@ -124,10 +124,10 @@ test('should process several events in order', async (t) => { test('should process 100 events in order', async (t) => { t.plan(100); return new Promise((resolve) => { - const results = []; + const results: any = []; const finish = () => { - results.forEach((r, idx) => { + results.forEach((r: any, idx: number) => { // the 0th item should be 0, the 1st item 1, etc t.is(r, idx); }); @@ -135,7 +135,7 @@ test('should process 100 events in order', async (t) => { }; const callbacks = createCallbacks({ - test: (context, evt) => { + test: (_context: any, evt: any) => { results.push(evt.id); if (evt.id === 99) { @@ -144,7 +144,7 @@ test('should process 100 events in order', async (t) => { }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; @@ -172,7 +172,7 @@ test('should process multiple different event types in order', async (t) => { }, }); const engine = createFakeEngine(); - const context = { logger }; + const context: any = { logger }; eventProcessor(engine as any, context as any, callbacks, { events: ['foo', 'bar', 'baz'], @@ -191,7 +191,7 @@ test('should process multiple different event types in order', async (t) => { test('should send a batch after a default timeout', async (t) => { return new Promise((resolve) => { const callbacks = createCallbacks({ - test: (context, evt) => { + test: (_context: any, evt: any) => { // We should have a single event with 11 entries t.is(evt.length, 11); t.is(callbacks.test.count, 1); @@ -199,13 +199,13 @@ test('should send a batch after a default timeout', async (t) => { }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; eventProcessor(engine, context, callbacks, { events: ['test'], - batch: { test: 1 }, + batch: { test: true }, batchLimit: 20, batchInterval: 20, }); @@ -220,7 +220,7 @@ test('should send a batch after a default timeout', async (t) => { test('should send a batch when a limit is hit', async (t) => { return new Promise((resolve) => { const callbacks = createCallbacks({ - test: (context, evt) => { + test: (_context: any, evt: any) => { // We should have a single event with 11 entries t.is(evt.length, 11); t.is(callbacks.test.count, 1); @@ -228,13 +228,13 @@ test('should send a batch when a limit is hit', async (t) => { }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; eventProcessor(engine, context, callbacks, { events: ['test'], - batch: { test: 1 }, + batch: { test: true }, batchLimit: 11, batchInterval: 1000 * 60 * 60, }); @@ -250,7 +250,7 @@ test('should send two batches', async (t) => { return new Promise((resolve) => { let total = 0; const callbacks = createCallbacks({ - test: (context, evt) => { + test: (_context: any, evt: any) => { t.is(evt.length, 6); total += evt.length; @@ -261,13 +261,13 @@ test('should send two batches', async (t) => { }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; eventProcessor(engine, context, callbacks, { events: ['test'], - batch: { test: 1 }, + batch: { test: true }, batchLimit: 6, batchInterval: 1000 * 60 * 60, }); @@ -283,23 +283,23 @@ test('should send a batch on interrupt with a full queue', async (t) => { t.plan(3); return new Promise((resolve) => { const callbacks = createCallbacks({ - test: (context, evt) => { + test: (_context: any, evt: any) => { t.is(evt.length, 6); }, - interrupt: (context, evt) => { + interrupt: (_context: any) => { t.is(callbacks.test.count, 1); t.is(callbacks.interrupt.count, 1); resolve(); }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; eventProcessor(engine, context, callbacks, { events: ['test', 'interrupt'], - batch: { test: 1 }, + batch: { test: true }, batchLimit: 99, batchInterval: 1000 * 60 * 60, }); @@ -324,7 +324,7 @@ test('should add deferred events directly to an open batch', async (t) => { }, }); const engine = createFakeEngine(); - const context = { logger }; + const context: any = { logger }; eventProcessor(engine as any, context as any, callbacks, { events: ['test'], @@ -346,24 +346,24 @@ test('should send a batch on interrupt with an async queue', async (t) => { t.plan(3); return new Promise(async (resolve) => { const callbacks = createCallbacks({ - test: async (context, evt) => { + test: async (_context: any, evt: any) => { t.is(evt.length, 6); await waitForAsync(5); }, - interrupt: (context, evt) => { + interrupt: (_context: any) => { t.is(callbacks.test.count, 1); t.is(callbacks.interrupt.count, 1); resolve(); }, }); const engine = createFakeEngine(); - const context = { + const context: any = { logger, }; eventProcessor(engine, context, callbacks, { events: ['test', 'interrupt'], - batch: { test: 1 }, + batch: { test: true }, batchLimit: 99, batchInterval: 1000 * 60 * 60, }); @@ -390,7 +390,7 @@ test('should continue processing if a callback throws', async (t) => { }, }); const engine = createFakeEngine(); - const context = { logger }; + const context: any = { logger }; eventProcessor(engine as any, context as any, callbacks, { events: ['broken', 'ok'], @@ -415,7 +415,7 @@ test('should send a batch of 1 when the timeout fires with a single queued event }, }); const engine = createFakeEngine(); - const context = { logger }; + const context: any = { logger }; eventProcessor(engine as any, context as any, callbacks, { events: ['test'], @@ -433,7 +433,7 @@ test('integration: should process a workflow-start event and call the callback', const engine = await createMockEngine(); const plan = createPlan(); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -462,7 +462,7 @@ test('integration: should process a workflow-complete event and call the callbac const engine = await createMockEngine(); const plan = createPlan('fn(() => ({ data: { x: 10 } }))'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -490,7 +490,7 @@ test('integration: should process a job-start event and call the callback', asyn const engine = await createMockEngine(); const plan = createPlan(); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -518,7 +518,7 @@ test('integration: should process a job-complete event and call the callback', a const engine = await createMockEngine(); const plan = createPlan('fn(() => ({ data: { result: 42 } }))'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -547,7 +547,7 @@ test('integration: should process a workflow-log event and call the callback', a const engine = await createMockEngine(); const plan = createPlan('fn((s) => { console.log("test log"); return s; })'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -575,7 +575,7 @@ test('integration: should process a job-error event and call the callback', asyn const engine = await createMockEngine(); const plan = createPlan('fn(() => { throw new Error("job error"); })'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -603,7 +603,7 @@ test('integration: should process a workflow-error event and call the callback', const engine = await createMockEngine(); const plan = createPlan('fn(() => ( @~!"@£!4 )'); // Invalid syntax to trigger error - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -637,7 +637,7 @@ test('integration: should process events in the correct order', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -706,7 +706,7 @@ test('integration: should batch sequential log events', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -747,7 +747,7 @@ test('integration: should interrupt a batch of log events', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -798,7 +798,7 @@ test('integration: should respect the limit', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -841,7 +841,7 @@ test('integration: should respect the interval', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -891,7 +891,7 @@ test('integration: should handle two batches of logs', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -934,7 +934,7 @@ test('integration: should process events in the correct order with batching', as })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -994,7 +994,7 @@ test('integration: queue events behind a slow event', async (t) => { } })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -1058,7 +1058,7 @@ test('integration: queue events behind a slow event II', async (t) => { } })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -1120,7 +1120,7 @@ test('integration: batch timeout send should not race with subsequent events', a })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -1158,7 +1158,7 @@ test('integration: should timeout and continue processing when event handler han const engine = await createMockEngine(); const plan = createPlan(); - const context = { + const context: any = { id: 'a', plan, options: {}, From 2fc83fbb06b51d5b74b12865b3e17eb6a2dbe672 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 9 Apr 2026 16:56:45 +0100 Subject: [PATCH 14/18] remove only --- integration-tests/worker/test/integration.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index b7b5093a6..a0613eb73 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -921,7 +921,7 @@ test.serial('set a default payload limit on the worker', (t) => { }); // this is being flaky!! is this new flakiness? -test.serial.only('override the worker payload through run options', (t) => { +test.serial('override the worker payload through run options', (t) => { return new Promise(async (done) => { if (!worker.destroyed) { await worker.destroy(); From fd3014f4cae727bd9f1658563144d40d9721e451 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 9 Apr 2026 17:17:58 +0100 Subject: [PATCH 15/18] run tests in serial --- .../worker/test/exit-reasons.test.ts | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index 598fa272e..f378c6ecd 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -34,7 +34,7 @@ const run = async (attempt) => { }); }; -test('crash: syntax error', async (t) => { +test.serial('crash: syntax error', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -54,7 +54,7 @@ test('crash: syntax error', async (t) => { }); // https://github.com/OpenFn/kit/issues/1045 -test('crash: reference error', async (t) => { +test.serial('crash: reference error', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -78,7 +78,7 @@ test('crash: reference error', async (t) => { }); // https://github.com/OpenFn/kit/issues/758 -test('crash: job not found', async (t) => { +test.serial('crash: job not found', async (t) => { lightning.addDataclip('x', {}); const attempt = { @@ -102,7 +102,7 @@ test('crash: job not found', async (t) => { t.regex(error_message, /could not find start job: y/i); }); -test('exception: autoinstall error', async (t) => { +test.serial('exception: autoinstall error', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -125,7 +125,7 @@ test('exception: autoinstall error', async (t) => { ); }); -test('exception: bad credential (not found)', async (t) => { +test.serial('exception: bad credential (not found)', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -154,7 +154,7 @@ test('exception: bad credential (not found)', async (t) => { ); }); -test('exception: credential timeout', async (t) => { +test.serial('exception: credential timeout', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -177,7 +177,7 @@ test('exception: credential timeout', async (t) => { ); }); -test('kill: oom (small, kill worker)', async (t) => { +test.serial('kill: oom (small, kill worker)', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -201,7 +201,8 @@ test('kill: oom (small, kill worker)', async (t) => { t.is(error_message, 'Run exceeded maximum memory usage'); }); -test('kill: oom (large, kill vm)', async (t) => { +// TODO this is failing locally... is it OK in CI? +test.serial('kill: oom (large, kill vm)', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -225,7 +226,7 @@ test('kill: oom (large, kill vm)', async (t) => { t.is(error_message, 'Run exceeded maximum memory usage'); }); -test('crash: process.exit() triggered by postgres', async (t) => { +test.serial('crash: process.exit() triggered by postgres', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ From 66243160341c96098250f40a0131e3d4ee9e4c69 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Thu, 9 Apr 2026 17:18:33 +0100 Subject: [PATCH 16/18] worker: tweak event processor and be sure to reset timeout on batch --- packages/ws-worker/src/api/process-events.ts | 9 +++++---- packages/ws-worker/src/util/send-event.ts | 2 -- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index 0cae1f6aa..dc7032856 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -93,7 +93,7 @@ export function eventProcessor( let batchTimeout: NodeJS.Timeout | null = null; let batchSendPromise: Promise | null = null; let didFinish = false; - let timeoutHandle: NodeJS.Timeout; + let processTimeoutHandle: NodeJS.Timeout; // TODO plug this in because the console tracing is super helpful const trace = (...message: any) => { @@ -111,7 +111,7 @@ export function eventProcessor( didFinish = false; const finish = () => { - clearTimeout(timeoutHandle); + clearTimeout(processTimeoutHandle); if (!didFinish) { didFinish = true; queue.shift(); @@ -120,7 +120,7 @@ export function eventProcessor( }; if (timeout_ms) { - timeoutHandle = setTimeout(() => { + processTimeoutHandle = setTimeout(() => { logger.error(`${planId} :: ${evt.name} :: timeout (fallback)`); finish(); }, timeout_ms); @@ -136,7 +136,6 @@ export function eventProcessor( // process loop // So we need to control whether to trigger the next call, // or whether the calling function will process the next item for us - // TODO: rename to exitEarly = false, and only early exists have to set this const sendBatch = async (triggerNext = false) => { if (activeBatch) { trace('sending batch', activeBatch, batch.length); @@ -151,6 +150,7 @@ export function eventProcessor( batch = []; if (triggerNext) { + clearTimeout(processTimeoutHandle); queue.shift(); next(); } @@ -239,6 +239,7 @@ export function eventProcessor( if (!batchTimeout) { // finally wait for a time before sending the batch // This is the "natural" batch trigger + clearTimeout(processTimeoutHandle); return new Promise((resolve) => { batchTimeout = setTimeout(() => { sendBatch(false).then(resolve); diff --git a/packages/ws-worker/src/util/send-event.ts b/packages/ws-worker/src/util/send-event.ts index 0f08184aa..f4cad446f 100644 --- a/packages/ws-worker/src/util/send-event.ts +++ b/packages/ws-worker/src/util/send-event.ts @@ -20,8 +20,6 @@ export const sendEvent = ( const { channel, logger, id: runId = '' } = context; - console.log('>> ', event); - return new Promise((resolve, reject) => { const report = (error: any) => { logger.error(`${runId} :: ${event} :: ERR: ${error.message || error}`); From afe07d3e4682d70eebd1da0ab8128ba8e154f278 Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 10 Apr 2026 10:06:42 +0100 Subject: [PATCH 17/18] remove comment --- integration-tests/worker/test/integration.test.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/integration-tests/worker/test/integration.test.ts b/integration-tests/worker/test/integration.test.ts index a0613eb73..dd7492ded 100644 --- a/integration-tests/worker/test/integration.test.ts +++ b/integration-tests/worker/test/integration.test.ts @@ -920,7 +920,6 @@ test.serial('set a default payload limit on the worker', (t) => { }); }); -// this is being flaky!! is this new flakiness? test.serial('override the worker payload through run options', (t) => { return new Promise(async (done) => { if (!worker.destroyed) { From f17eca427010e594956d6bfe9934f5c99d20f25e Mon Sep 17 00:00:00 2001 From: Joe Clark Date: Fri, 10 Apr 2026 10:09:18 +0100 Subject: [PATCH 18/18] remove more comments --- packages/ws-worker/src/api/process-events.ts | 1 - packages/ws-worker/test/api/process-event.test.ts | 1 - 2 files changed, 2 deletions(-) diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index dc7032856..20718c32d 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -95,7 +95,6 @@ export function eventProcessor( let didFinish = false; let processTimeoutHandle: NodeJS.Timeout; - // TODO plug this in because the console tracing is super helpful const trace = (...message: any) => { if (options.trace) { console.log(...message); diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 56186ec26..d554063f4 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -69,7 +69,6 @@ const createCallbacks = (events: Record): Record => { return obj; }; -// TODO try simpler tests with full control - don't use the engine test('should process one event', async (t) => { const callbacks = createCallbacks({ test: () => {} }); const engine = createFakeEngine();