diff --git a/.changeset/five-hands-dance.md b/.changeset/five-hands-dance.md new file mode 100644 index 000000000..88a75f1b2 --- /dev/null +++ b/.changeset/five-hands-dance.md @@ -0,0 +1,5 @@ +--- +'@openfn/ws-worker': patch +--- + +Fix a critical issue when processing batch events that could cause runs to be lost diff --git a/integration-tests/worker/test/exit-reasons.test.ts b/integration-tests/worker/test/exit-reasons.test.ts index 598fa272e..f378c6ecd 100644 --- a/integration-tests/worker/test/exit-reasons.test.ts +++ b/integration-tests/worker/test/exit-reasons.test.ts @@ -34,7 +34,7 @@ const run = async (attempt) => { }); }; -test('crash: syntax error', async (t) => { +test.serial('crash: syntax error', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -54,7 +54,7 @@ test('crash: syntax error', async (t) => { }); // https://github.com/OpenFn/kit/issues/1045 -test('crash: reference error', async (t) => { +test.serial('crash: reference error', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -78,7 +78,7 @@ test('crash: reference error', async (t) => { }); // https://github.com/OpenFn/kit/issues/758 -test('crash: job not found', async (t) => { +test.serial('crash: job not found', async (t) => { lightning.addDataclip('x', {}); const attempt = { @@ -102,7 +102,7 @@ test('crash: job not found', async (t) => { t.regex(error_message, /could not find start job: y/i); }); -test('exception: autoinstall error', async (t) => { +test.serial('exception: autoinstall error', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -125,7 +125,7 @@ test('exception: autoinstall error', async (t) => { ); }); -test('exception: bad credential (not found)', async (t) => { +test.serial('exception: bad credential (not found)', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -154,7 +154,7 @@ test('exception: bad credential (not found)', async (t) => { ); }); -test('exception: credential timeout', async (t) => { +test.serial('exception: credential timeout', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -177,7 +177,7 @@ test('exception: credential timeout', async (t) => { ); }); -test('kill: oom (small, kill worker)', async (t) => { +test.serial('kill: oom (small, kill worker)', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -201,7 +201,8 @@ test('kill: oom (small, kill worker)', async (t) => { t.is(error_message, 'Run exceeded maximum memory usage'); }); -test('kill: oom (large, kill vm)', async (t) => { +// TODO this is failing locally... is it OK in CI? +test.serial('kill: oom (large, kill vm)', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ @@ -225,7 +226,7 @@ test('kill: oom (large, kill vm)', async (t) => { t.is(error_message, 'Run exceeded maximum memory usage'); }); -test('crash: process.exit() triggered by postgres', async (t) => { +test.serial('crash: process.exit() triggered by postgres', async (t) => { const attempt = { id: crypto.randomUUID(), jobs: [ diff --git a/packages/ws-worker/src/api/process-events.ts b/packages/ws-worker/src/api/process-events.ts index bf5aceca6..20718c32d 100644 --- a/packages/ws-worker/src/api/process-events.ts +++ b/packages/ws-worker/src/api/process-events.ts @@ -26,6 +26,8 @@ export type EventProcessorOptions = { batchInterval?: number; batchLimit?: number; timeout_ms?: number; + trace?: boolean; + events?: Record; }; const DEFAULT_BATCH_LIMIT = 10; @@ -55,6 +57,20 @@ const allEngineEvents = [ * Queuing ensures events are sent in order while allowing batching to reduce network calls. * Batching helps with high-volume logs by sending fewer requests with larger payloads, * reducing websocket latency. Events batch by count or time interval, whichever comes first. + * + * The basic architecture is: + * - events are synchronously and immediately added to a queue + * - items are processed sequentially + * - after an item has been processed, we pull the next item from the queue + * + * If an event is flagged as batchable, we introduce new rules + * - We flag if a batch is "open" + * - A batch event will wait for the batch interval to expire before process () completes + * - So batch will usually block the async loop until the batch has naturally expired + * - The batch event can be interrupted early if a limit is hit, or a new event type comes in + * + * The batch exposes a danger of having two loops running async, so it's managed very carefully. + * After a batch event is sent, in some cases, the batch will trigger next */ export function eventProcessor( engine: RuntimeEngine, @@ -64,26 +80,37 @@ export function eventProcessor( ) { const { id: planId, logger } = context; const { - batchLimit: limit = DEFAULT_BATCH_LIMIT, - batchInterval: interval = DEFAULT_BATCH_INTERVAL, + batchLimit = DEFAULT_BATCH_LIMIT, + batchInterval = DEFAULT_BATCH_INTERVAL, timeout_ms, + events, } = options; const queue: any = []; let activeBatch: string | null = null; let batch: any = []; - let batchTimeout: NodeJS.Timeout; + let batchTimeout: NodeJS.Timeout | null = null; + let batchSendPromise: Promise | null = null; let didFinish = false; - let timeoutHandle: NodeJS.Timeout; + let processTimeoutHandle: NodeJS.Timeout; + + const trace = (...message: any) => { + if (options.trace) { + console.log(...message); + } + }; const next = async () => { + if (batchSendPromise) { + await batchSendPromise; + } const evt = queue[0]; if (evt) { didFinish = false; const finish = () => { - clearTimeout(timeoutHandle); + clearTimeout(processTimeoutHandle); if (!didFinish) { didFinish = true; queue.shift(); @@ -92,23 +119,41 @@ export function eventProcessor( }; if (timeout_ms) { - timeoutHandle = setTimeout(() => { + processTimeoutHandle = setTimeout(() => { logger.error(`${planId} :: ${evt.name} :: timeout (fallback)`); finish(); }, timeout_ms); } await process(evt.name, evt.event); + trace(`finish ${evt.name}`); finish(); } }; - const sendBatch = async (name: string) => { - clearTimeout(batchTimeout); - // first clear the batch - activeBatch = null; - await send(name, batch, batch.length); - batch = []; + // If sending the batch early, we break the cycle of the main + // process loop + // So we need to control whether to trigger the next call, + // or whether the calling function will process the next item for us + const sendBatch = async (triggerNext = false) => { + if (activeBatch) { + trace('sending batch', activeBatch, batch.length); + clearTimeout(batchTimeout!); + batchTimeout = null; + + // first clear the batch (but leave it truthy) + const name = activeBatch as string; + activeBatch = '--'; + await send(name, batch, batch.length); + activeBatch = null; + batch = []; + + if (triggerNext) { + clearTimeout(processTimeoutHandle); + queue.shift(); + next(); + } + } }; const send = async (name: string, payload: any, batchSize?: number) => { @@ -138,7 +183,17 @@ export function eventProcessor( } }; + const addToBatch = async (event: any) => { + batch.push(event); + + if (batch.length >= batchLimit) { + // If we're at the batch limit, return right away + return sendBatch(true); + } + }; + const process = async (name: string, event: any) => { + trace('process', name); // TODO this actually shouldn't be here - should be done separately if (name !== 'workflow-log') { Sentry.addBreadcrumb({ @@ -150,15 +205,8 @@ export function eventProcessor( if (name === activeBatch) { // if there's a batch open, just push the event - batch.push(event); - - if (batch.length >= limit) { - await sendBatch(name); - } + await addToBatch(event); return; - } else if (activeBatch) { - // If a different event comes in, send the batch (and carry on processing the event) - await sendBatch(activeBatch); } if (name in callbacks) { @@ -170,22 +218,32 @@ export function eventProcessor( batch.push(event); // Next, peek ahead in the queue for more pending events - while (queue.length > 1 && queue[1].name === name) { - const [nextBatchItem] = queue.splice(1, 1); - batch.push(nextBatchItem.event); + while (queue.length > 1) { + if (queue[1].name === name) { + const [nextBatchItem] = queue.splice(1, 1); + batch.push(nextBatchItem.event); - if (batch.length >= limit) { - // If we're at the batch limit, return right away - return sendBatch(name); + if (batch.length >= batchLimit) { + // If we're at the batch limit, return right away + return sendBatch(true); + } + } else { + // If there's another pending item not a part of this batch, + // just send the batch now + // send the batch early + return sendBatch(true); } } - // finally wait for a time before sending the batch if (!batchTimeout) { - const batchName = activeBatch!; - batchTimeout = setTimeout(async () => { - sendBatch(batchName); - }, interval); + // finally wait for a time before sending the batch + // This is the "natural" batch trigger + clearTimeout(processTimeoutHandle); + return new Promise((resolve) => { + batchTimeout = setTimeout(() => { + sendBatch(false).then(resolve); + }, batchInterval); + }); } } else { await send(name, event); @@ -196,17 +254,38 @@ export function eventProcessor( }; const enqueue = (name: string, event: any) => { + trace('queue', name); + if (name === 'workflow-log') { + trace(event.message); + } queue.push({ name, event }); if (queue.length == 1) { - next(); + // If this is the only item in the queue, start executing right away + trace(`[${name}] executing immediately`); + setImmediate(next); + } else if (activeBatch === name) { + addToBatch(event); + queue.pop(); + } else if (queue.length == 2 && batchTimeout) { + trace('Sending batch early'); + // If this is the second item in the queue, and we have a batch active, + // send the batch early + // (note that this event will still be deferred) + sendBatch(true); + } else { + trace(`[${name}] deffering event`); } }; - const e = allEngineEvents.reduce( - (obj, e) => Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }), + const e = (events || allEngineEvents).reduce( + (obj: any, e: string) => + Object.assign(obj, { [e]: (p: any) => enqueue(e, p) }), {} ); engine.listen(planId, e); + + // return debug state + return { queue }; } diff --git a/packages/ws-worker/test/api/process-event.test.ts b/packages/ws-worker/test/api/process-event.test.ts index 6687ab53f..d554063f4 100644 --- a/packages/ws-worker/test/api/process-event.test.ts +++ b/packages/ws-worker/test/api/process-event.test.ts @@ -14,6 +14,7 @@ import createMockEngine from '../../src/mock/runtime-engine'; import type { ExecutionPlan } from '@openfn/lexicon'; import { createMockLogger } from '@openfn/logger'; +import { EventEmitter } from 'node:events'; const logger = createMockLogger(); @@ -40,13 +41,398 @@ const createPlan = (...expressions: string[]) => options: {}, } as ExecutionPlan); -test('should process a workflow-start event and call the callback', async (t) => { +const createFakeEngine = (): any => { + const bus = new EventEmitter(); + return { + listen: (_id: string, events: any) => { + for (const evt in events) { + bus.on(evt, (...args) => { + events[evt](...args); + }); + } + }, + emit: (name: string, payload: any) => bus.emit(name, payload), + }; +}; + +const createCallbacks = (events: Record): Record => { + const obj: Record = {}; + for (const event in events) { + const fn = (...args: any) => { + fn.count++; + return events[event](...args); + }; + fn.count = 0; + obj[event] = fn; + } + + return obj; +}; + +test('should process one event', async (t) => { + const callbacks = createCallbacks({ test: () => {} }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + const { queue } = eventProcessor(engine, context, callbacks, { + events: ['test'], + }); + + t.is(queue.length, 0); + + engine.emit('test', {}); + + t.is(queue.length, 1); + + await waitForAsync(5); + + t.is(callbacks.test.count, 1); +}); + +test('should process several events in order', async (t) => { + const result: any = []; + + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + result.push(evt.id); + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + const { queue } = eventProcessor(engine, context, callbacks, { + events: ['test'], + }); + + t.is(queue.length, 0); + + engine.emit('test', { id: 1 }); + engine.emit('test', { id: 2 }); + engine.emit('test', { id: 3 }); + + await waitForAsync(10); + + t.is(callbacks.test.count, 3); + t.deepEqual(result, [1, 2, 3]); +}); + +test('should process 100 events in order', async (t) => { + t.plan(100); + return new Promise((resolve) => { + const results: any = []; + + const finish = () => { + results.forEach((r: any, idx: number) => { + // the 0th item should be 0, the 1st item 1, etc + t.is(r, idx); + }); + resolve(); + }; + + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + results.push(evt.id); + + if (evt.id === 99) { + finish(); + } + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + }); + + new Array(100).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +test('should process multiple different event types in order', async (t) => { + const result: number[] = []; + const callbacks = createCallbacks({ + foo: (_context: any, evt: any) => { + result.push(evt.id); + }, + bar: (_context: any, evt: any) => { + result.push(evt.id); + }, + baz: (_context: any, evt: any) => { + result.push(evt.id); + }, + }); + const engine = createFakeEngine(); + const context: any = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['foo', 'bar', 'baz'], + }); + + engine.emit('foo', { id: 1 }); + engine.emit('bar', { id: 2 }); + engine.emit('baz', { id: 3 }); + engine.emit('foo', { id: 4 }); + + await waitForAsync(20); + + t.deepEqual(result, [1, 2, 3, 4]); +}); + +test('should send a batch after a default timeout', async (t) => { + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + // We should have a single event with 11 entries + t.is(evt.length, 11); + t.is(callbacks.test.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + batch: { test: true }, + batchLimit: 20, + batchInterval: 20, + }); + + // send 11 events in quick succession + new Array(11).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +test('should send a batch when a limit is hit', async (t) => { + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + // We should have a single event with 11 entries + t.is(evt.length, 11); + t.is(callbacks.test.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + batch: { test: true }, + batchLimit: 11, + batchInterval: 1000 * 60 * 60, + }); + + // send 11 events in quick succession + new Array(11).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +test('should send two batches', async (t) => { + return new Promise((resolve) => { + let total = 0; + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + t.is(evt.length, 6); + total += evt.length; + + if (total == 12) { + t.is(callbacks.test.count, 2); + resolve(); + } + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test'], + batch: { test: true }, + batchLimit: 6, + batchInterval: 1000 * 60 * 60, + }); + + // send 12 events in quick succession + new Array(12).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + }); +}); + +test('should send a batch on interrupt with a full queue', async (t) => { + t.plan(3); + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + t.is(evt.length, 6); + }, + interrupt: (_context: any) => { + t.is(callbacks.test.count, 1); + t.is(callbacks.interrupt.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test', 'interrupt'], + batch: { test: true }, + batchLimit: 99, + batchInterval: 1000 * 60 * 60, + }); + + // TODO this sends a full queue + // We need to do more deferred stuff with events coming later + new Array(6).fill(0).forEach((_v, idx) => { + engine.emit('test', { id: idx }); + }); + engine.emit('interrupt', { id: 99 }); + }); +}); + +test('should add deferred events directly to an open batch', async (t) => { + return new Promise(async (resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + t.is(evt.length, 2); + t.is(evt[0].id, 1); + t.is(evt[1].id, 2); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context: any = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['test'], + batch: { test: true }, + batchLimit: 10, + batchInterval: 30, + }); + + // First event opens the batch and sets activeBatch + engine.emit('test', { id: 1 }); + // Wait for setImmediate to fire so activeBatch is set, but before the timeout + await waitForAsync(5); + // This event arrives while the batch is open — hits the activeBatch === name path + engine.emit('test', { id: 2 }); + }); +}); + +test('should send a batch on interrupt with an async queue', async (t) => { + t.plan(3); + return new Promise(async (resolve) => { + const callbacks = createCallbacks({ + test: async (_context: any, evt: any) => { + t.is(evt.length, 6); + await waitForAsync(5); + }, + interrupt: (_context: any) => { + t.is(callbacks.test.count, 1); + t.is(callbacks.interrupt.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context: any = { + logger, + }; + + eventProcessor(engine, context, callbacks, { + events: ['test', 'interrupt'], + batch: { test: true }, + batchLimit: 99, + batchInterval: 1000 * 60 * 60, + }); + + // TODO this sends a full queue + // We need to do more deferred stuff with events coming later + for (let i = 0; i < 6; i++) { + await waitForAsync(2); + engine.emit('test', { id: i }); + } + await waitForAsync(2); + engine.emit('interrupt', { id: 99 }); + }); +}); + +test('should continue processing if a callback throws', async (t) => { + const result: number[] = []; + const callbacks = createCallbacks({ + broken: () => { + throw new Error('boom'); + }, + ok: (_context: any, evt: any) => { + result.push(evt.id); + }, + }); + const engine = createFakeEngine(); + const context: any = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['broken', 'ok'], + }); + + engine.emit('broken', {}); + engine.emit('ok', { id: 1 }); + engine.emit('ok', { id: 2 }); + + await waitForAsync(20); + + t.deepEqual(result, [1, 2]); +}); + +test('should send a batch of 1 when the timeout fires with a single queued event', async (t) => { + return new Promise((resolve) => { + const callbacks = createCallbacks({ + test: (_context: any, evt: any) => { + t.is(evt.length, 1); + t.is(callbacks.test.count, 1); + resolve(); + }, + }); + const engine = createFakeEngine(); + const context: any = { logger }; + + eventProcessor(engine as any, context as any, callbacks, { + events: ['test'], + batch: { test: true }, + batchInterval: 10, + }); + + engine.emit('test', { id: 1 }); + }); +}); + +test('integration: should process a workflow-start event and call the callback', async (t) => { t.plan(3); const engine = await createMockEngine(); const plan = createPlan(); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -69,13 +455,13 @@ test('should process a workflow-start event and call the callback', async (t) => await waitForAsync(); }); -test('should process a workflow-complete event and call the callback', async (t) => { +test('integration: should process a workflow-complete event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); const plan = createPlan('fn(() => ({ data: { x: 10 } }))'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -97,13 +483,13 @@ test('should process a workflow-complete event and call the callback', async (t) await waitForAsync(); }); -test('should process a job-start event and call the callback', async (t) => { +test('integration: should process a job-start event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); const plan = createPlan(); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -125,13 +511,13 @@ test('should process a job-start event and call the callback', async (t) => { await waitForAsync(); }); -test('should process a job-complete event and call the callback', async (t) => { +test('integration: should process a job-complete event and call the callback', async (t) => { t.plan(5); const engine = await createMockEngine(); const plan = createPlan('fn(() => ({ data: { result: 42 } }))'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -154,13 +540,13 @@ test('should process a job-complete event and call the callback', async (t) => { await waitForAsync(); }); -test('should process a workflow-log event and call the callback', async (t) => { +test('integration: should process a workflow-log event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); const plan = createPlan('fn((s) => { console.log("test log"); return s; })'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -182,13 +568,13 @@ test('should process a workflow-log event and call the callback', async (t) => { await waitForAsync(); }); -test('should process a job-error event and call the callback', async (t) => { +test('integration: should process a job-error event and call the callback', async (t) => { t.plan(4); const engine = await createMockEngine(); const plan = createPlan('fn(() => { throw new Error("job error"); })'); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -210,13 +596,13 @@ test('should process a job-error event and call the callback', async (t) => { await new Promise((resolve) => setTimeout(resolve, 50)); }); -test('should process a workflow-error event and call the callback', async (t) => { +test('integration: should process a workflow-error event and call the callback', async (t) => { t.plan(5); const engine = await createMockEngine(); const plan = createPlan('fn(() => ( @~!"@£!4 )'); // Invalid syntax to trigger error - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -239,7 +625,7 @@ test('should process a workflow-error event and call the callback', async (t) => await new Promise((resolve) => setTimeout(resolve, 50)); }); -test('should process events in the correct order', async (t) => { +test('integration: should process events in the correct order', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -250,7 +636,7 @@ test('should process events in the correct order', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -306,7 +692,7 @@ test('should process events in the correct order', async (t) => { t.assert(events.every((e) => e.workflowId === 'a')); }); -test('should batch sequential log events', async (t) => { +test('integration: should batch sequential log events', async (t) => { t.plan(4); const engine = await createMockEngine(); @@ -319,7 +705,7 @@ test('should batch sequential log events', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -348,7 +734,7 @@ test('should batch sequential log events', async (t) => { }); // 3 logs will be sent within 10ms, but they'll be interrupted by step:complete and step:start -test('should interrupt a batch of log events', async (t) => { +test('integration: should interrupt a batch of log events', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -360,7 +746,7 @@ test('should interrupt a batch of log events', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -399,7 +785,7 @@ test('should interrupt a batch of log events', async (t) => { t.is(second, 1); }); -test('should respect the limit', async (t) => { +test('integration: should respect the limit', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -411,7 +797,7 @@ test('should respect the limit', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -443,18 +829,18 @@ test('should respect the limit', async (t) => { t.is(events[1].length, 2); }); -test('should respect the interval', async (t) => { +test('integration: should respect the interval', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn(async (s) => { console.log(1); - await new Promise((resolve) => setTimeout(() => resolve(s), 5)), + await new Promise((resolve) => setTimeout(() => resolve(s), 10)), console.log(3); return {}; })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -473,7 +859,8 @@ test('should respect the interval', async (t) => { batch: { [WORKFLOW_LOG]: true, }, - batchInterval: 2, + // low interval so I expect this to send two small batches + batchInterval: 1, }; eventProcessor(engine, context as any, callbacks, options); @@ -486,7 +873,7 @@ test('should respect the interval', async (t) => { t.is(events[1].length, 1); }); -test('should handle two batches of logs', async (t) => { +test('integration: should handle two batches of logs', async (t) => { const engine = await createMockEngine(); // syntax is weird because of how the fake RTE works const plan = createPlan( @@ -503,7 +890,7 @@ test('should handle two batches of logs', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -535,7 +922,7 @@ test('should handle two batches of logs', async (t) => { t.is(second, 2); }); -test('should process events in the correct order with batching', async (t) => { +test('integration: should process events in the correct order with batching', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -546,7 +933,7 @@ test('should process events in the correct order with batching', async (t) => { })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -597,7 +984,7 @@ test('should process events in the correct order with batching', async (t) => { t.is(events[4].type, 'workflow-complete'); }); -test('queue events behind a slow event', async (t) => { +test('integration: queue events behind a slow event', async (t) => { const engine = await createMockEngine(); const plan = createPlan( `fn((s) => { @@ -606,7 +993,7 @@ test('queue events behind a slow event', async (t) => { } })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -650,7 +1037,7 @@ test('queue events behind a slow event', async (t) => { // This isn't the most watertight test - but I've debugged it closely and it seems // to do the right thing -test('queue events behind a slow event II', async (t) => { +test('integration: queue events behind a slow event II', async (t) => { const engine = await createMockEngine(); const plan = createPlan( ` @@ -670,7 +1057,7 @@ test('queue events behind a slow event II', async (t) => { } })` ); - const context = { + const context: any = { id: 'a', plan, options: {}, @@ -710,11 +1097,67 @@ test('queue events behind a slow event II', async (t) => { t.is(events[1], 10); }); -test('should timeout and continue processing when event handler hangs', async (t) => { +// Regression test for ordering race condition when batch timeout fires +// while the queue is empty, and a new event arrives during the slow send. +// +// Timeline: +// t=0ms LOG1 arrives, batch opens, timeout set for t=5ms +// t=5ms Timeout fires: activeBatch=null (sync), then awaits slow send (~50ms) +// t=30ms Job's async work completes, JOB_COMPLETE arrives +// Queue is empty, activeBatch is null -> next() fires immediately +// JOB_COMPLETE sends and completes before the log batch resolves +// t=55ms Log batch send finally resolves +// +// Without a fix: events = ['job-complete', 'log'] (wrong order) +test('integration: batch timeout send should not race with subsequent events', async (t) => { + const engine = await createMockEngine(); + const plan = createPlan( + `fn(async (s) => { + console.log(1); + await new Promise(resolve => setTimeout(resolve, 30)); + return {}; + })` + ); + + const context: any = { + id: 'a', + plan, + options: {}, + logger, + }; + + const events: string[] = []; + + const callbacks = { + [WORKFLOW_LOG]: async (_ctx: any, _event: any) => { + // Slow send simulates real websocket latency + await new Promise((resolve) => setTimeout(resolve, 50)); + events.push('log'); + }, + [JOB_COMPLETE]: async (_ctx: any, _event: any) => { + events.push('job-complete'); + }, + }; + + const options = { + batch: { [WORKFLOW_LOG]: true }, + batchInterval: 5, // fires well before the job's 30ms async completes + }; + + eventProcessor(engine, context as any, callbacks, options); + + await engine.execute(plan, {}); + await waitForAsync(200); + + t.is(events[0], 'log'); + t.is(events[1], 'job-complete'); +}); + +test('integration: should timeout and continue processing when event handler hangs', async (t) => { const engine = await createMockEngine(); const plan = createPlan(); - const context = { + const context: any = { id: 'a', plan, options: {},