Skip to content

Commit b75de8c

Browse files
committed
fix a timing issue when sending batch events
Big help from claude
1 parent 3c61be4 commit b75de8c

2 files changed

Lines changed: 64 additions & 6 deletions

File tree

packages/ws-worker/src/api/process-events.ts

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,10 +74,14 @@ export function eventProcessor(
7474
let activeBatch: string | null = null;
7575
let batch: any = [];
7676
let batchTimeout: NodeJS.Timeout | null = null;
77+
let batchSendPromise: Promise<void> | null = null;
7778
let didFinish = false;
7879
let timeoutHandle: NodeJS.Timeout;
7980

8081
const next = async () => {
82+
if (batchSendPromise) {
83+
await batchSendPromise;
84+
}
8185
const evt = queue[0];
8286
if (evt) {
8387
didFinish = false;
@@ -118,9 +122,7 @@ export function eventProcessor(
118122
const start = Date.now();
119123
// @ts-ignore
120124
const lightningEvent = eventMap[name] ?? name;
121-
console.log('!! calling ', name);
122125
await callbacks[name](context, payload);
123-
console.log('!! finished ', name);
124126
if (batchSize) {
125127
logger.info(
126128
`${planId} :: sent ${lightningEvent} (${batchSize}):: OK :: ${
@@ -144,7 +146,6 @@ export function eventProcessor(
144146

145147
const process = async (name: string, event: any) => {
146148
// TODO this actually shouldn't be here - should be done separately
147-
console.log('<<<<<<<< ', name);
148149
if (name !== 'workflow-log') {
149150
Sentry.addBreadcrumb({
150151
category: 'event',
@@ -188,8 +189,10 @@ export function eventProcessor(
188189
// finally wait for a time before sending the batch
189190
if (!batchTimeout) {
190191
const batchName = activeBatch!;
191-
batchTimeout = setTimeout(async () => {
192-
sendBatch(batchName);
192+
batchTimeout = setTimeout(() => {
193+
batchSendPromise = sendBatch(batchName).finally(() => {
194+
batchSendPromise = null;
195+
});
193196
}, interval);
194197
}
195198
} else {
@@ -201,7 +204,6 @@ export function eventProcessor(
201204
};
202205

203206
const enqueue = (name: string, event: any) => {
204-
console.log('>>>>> ', name);
205207
queue.push({ name, event });
206208

207209
if (queue.length == 1) {

packages/ws-worker/test/api/process-event.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,6 +710,62 @@ test('queue events behind a slow event II', async (t) => {
710710
t.is(events[1], 10);
711711
});
712712

713+
// Regression test for ordering race condition when batch timeout fires
714+
// while the queue is empty, and a new event arrives during the slow send.
715+
//
716+
// Timeline:
717+
// t=0ms LOG1 arrives, batch opens, timeout set for t=5ms
718+
// t=5ms Timeout fires: activeBatch=null (sync), then awaits slow send (~50ms)
719+
// t=30ms Job's async work completes, JOB_COMPLETE arrives
720+
// Queue is empty, activeBatch is null -> next() fires immediately
721+
// JOB_COMPLETE sends and completes before the log batch resolves
722+
// t=55ms Log batch send finally resolves
723+
//
724+
// Without a fix: events = ['job-complete', 'log'] (wrong order)
725+
test('batch timeout send should not race with subsequent events', async (t) => {
726+
const engine = await createMockEngine();
727+
const plan = createPlan(
728+
`fn(async (s) => {
729+
console.log(1);
730+
await new Promise(resolve => setTimeout(resolve, 30));
731+
return {};
732+
})`
733+
);
734+
735+
const context = {
736+
id: 'a',
737+
plan,
738+
options: {},
739+
logger,
740+
};
741+
742+
const events: string[] = [];
743+
744+
const callbacks = {
745+
[WORKFLOW_LOG]: async (_ctx: any, _event: any) => {
746+
// Slow send simulates real websocket latency
747+
await new Promise(resolve => setTimeout(resolve, 50));
748+
events.push('log');
749+
},
750+
[JOB_COMPLETE]: async (_ctx: any, _event: any) => {
751+
events.push('job-complete');
752+
},
753+
};
754+
755+
const options = {
756+
batch: { [WORKFLOW_LOG]: true },
757+
batchInterval: 5, // fires well before the job's 30ms async completes
758+
};
759+
760+
eventProcessor(engine, context as any, callbacks, options);
761+
762+
await engine.execute(plan, {});
763+
await waitForAsync(200);
764+
765+
t.is(events[0], 'log');
766+
t.is(events[1], 'job-complete');
767+
});
768+
713769
test('should timeout and continue processing when event handler hangs', async (t) => {
714770
const engine = await createMockEngine();
715771
const plan = createPlan();

0 commit comments

Comments
 (0)