Skip to content

Commit 3a7038b

Browse files
authored
Worker: support optional batching of log events (#1162)
* worker: re-write core event handler to allow more control of sequencing * tidy tests and finish * types * add basic tests * more testing * Add batching and tests * more tests * options and more tests * types * copy new log stuff from other branch * updates for live testing against lightning Works great against new and legacy app, with or without batching * options for interval and limit * fix test * copy across old test * fix type * add batch log test * upate default batch size * changeset * run integration tests in batch mode * update tests * remove logging * docs
1 parent 641c460 commit 3a7038b

21 files changed

Lines changed: 1428 additions & 217 deletions

File tree

.changeset/fair-papayas-stop.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
'@openfn/ws-worker': minor
3+
---
4+
5+
Allow logs to be sent to Lightning in batches. This behavior is disabled by default for back-compatibility: set `WORKER_BATCH_LOGS=true` to enable.

integration-tests/worker/src/init.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ export const initWorker = async (
4747
secret: crypto.randomUUID(),
4848
collectionsVersion: '1.0.0',
4949
messageTimeoutSeconds: 0.01,
50+
batchLogs: true,
5051
...workerArgs,
5152
});
5253

integration-tests/worker/test/integration.test.ts

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -988,9 +988,11 @@ test.serial('Redact logs which exceed the payload limit', (t) => {
988988
};
989989

990990
lightning.on('run:log', (evt) => {
991-
if (evt.payload.source === 'JOB') {
992-
t.regex(evt.payload.message[0], /redacted/i);
993-
}
991+
evt.payload.logs.forEach((log) => {
992+
if (log.source === 'JOB') {
993+
t.regex(log.message[0], /redacted/i);
994+
}
995+
});
994996
});
995997

996998
lightning.enqueueRun(run);
@@ -1066,11 +1068,13 @@ test.serial(
10661068
const rtLogs = [];
10671069

10681070
lightning.on('run:log', (e) => {
1069-
if (e.payload.source === 'JOB') {
1070-
jobLogs.push(e.payload);
1071-
} else if (e.payload.source === 'R/T') {
1072-
rtLogs.push(e.payload);
1073-
}
1071+
e.payload.logs.forEach((log) => {
1072+
if (log.source === 'JOB') {
1073+
jobLogs.push(log);
1074+
} else if (log.source === 'R/T') {
1075+
rtLogs.push(log);
1076+
}
1077+
});
10741078
});
10751079

10761080
lightning.once('run:complete', () => {

packages/lexicon/lightning.d.ts

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -173,15 +173,23 @@ export type RunCompletePayload = ExitReason & {
173173
};
174174
export type RunCompleteReply = undefined;
175175

176-
export type RunLogPayload = {
176+
export type RunLogLine = {
177177
message: Array<string | object>;
178178
timestamp: TimeInMicroSeconds;
179-
run_id: string;
180179
level?: string;
181180
source?: string; // namespace
182-
job_id?: string;
183181
step_id?: string;
184182
};
183+
184+
export type RunLogPayload = {
185+
run_id: string;
186+
logs: RunLogLine[];
187+
};
188+
189+
export type LegacyRunLogPayload = {
190+
run_id: string;
191+
} & RunLogLine;
192+
185193
export type RunLogReply = void;
186194

187195
export type StepStartPayload = {

packages/lightning-mock/src/api-sockets.ts

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import type {
2121
StepCompleteReply,
2222
StepStartPayload,
2323
StepStartReply,
24+
LegacyRunLogPayload,
2425
} from '@openfn/lexicon/lightning';
2526

2627
import createPheonixMockSocketServer, {
@@ -320,27 +321,41 @@ const createSocketAPI = (
320321
evt: PhoenixEvent<RunLogPayload>
321322
) {
322323
const { ref, join_ref, topic } = evt;
323-
const { run_id: runId } = evt.payload;
324-
325-
state.pending[runId].logs.push(evt.payload);
326324

327325
let payload: any = {
328326
status: 'ok',
329327
};
330328

331-
if (
332-
!evt.payload.message ||
333-
!evt.payload.source ||
334-
!evt.payload.timestamp ||
335-
!evt.payload.level
336-
) {
337-
payload = {
338-
status: 'error',
339-
response: 'Missing property on log',
340-
};
341-
}
329+
const { run_id: runId } = evt.payload;
330+
331+
if (evt.payload.logs) {
332+
// handle batch logs
333+
evt.payload.logs.forEach((log) => {
334+
state.pending[runId].logs.push(log);
342335

343-
logger?.info(`LOG [${runId}] ${evt.payload.message[0]}`);
336+
logger?.info(`LOG [${runId}] ${log.message}`);
337+
338+
if (!log.message || !log.source || !log.timestamp || !log.level) {
339+
payload = {
340+
status: 'error',
341+
response: 'Missing property on log',
342+
};
343+
}
344+
});
345+
} else {
346+
// handle legacy logs
347+
const log = evt.payload as unknown as LegacyRunLogPayload;
348+
state.pending[runId].logs.push(log);
349+
350+
logger?.info(`LOG [${runId}] ${log.message}`);
351+
352+
if (!log.message || !log.source || !log.timestamp || !log.level) {
353+
payload = {
354+
status: 'error',
355+
response: 'Missing property on log',
356+
};
357+
}
358+
}
344359

345360
ws.reply<RunLogReply>({
346361
ref,

packages/lightning-mock/src/server.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import createLogger, {
88
Logger,
99
} from '@openfn/logger';
1010
import type { StepId } from '@openfn/lexicon';
11-
import type { RunLogPayload, LightningPlan } from '@openfn/lexicon/lightning';
11+
import type { LightningPlan, RunLogLine } from '@openfn/lexicon/lightning';
1212

1313
import createWebSocketAPI from './api-sockets';
1414
import createDevAPI from './api-dev';
@@ -20,7 +20,7 @@ type JobId = string;
2020

2121
export type RunState = {
2222
status: 'queued' | 'started' | 'complete';
23-
logs: RunLogPayload[];
23+
logs: RunLogLine[];
2424
steps: Record<JobId, StepId>;
2525
};
2626

packages/lightning-mock/test/events/log.test.ts

Lines changed: 58 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,14 @@ test.serial('acknowledge valid message (run log)', async (t) => {
1818

1919
const event = {
2020
run_id: run.id,
21-
message: 'blah',
22-
level: 'info',
23-
source: 'R/T',
24-
timestamp: '123',
21+
logs: [
22+
{
23+
message: 'blah',
24+
level: 'info',
25+
source: 'R/T',
26+
timestamp: '123',
27+
},
28+
],
2529
};
2630

2731
const channel = await join(client, run.id);
@@ -41,11 +45,15 @@ test.serial('acknowledge valid message (job log)', async (t) => {
4145

4246
const event = {
4347
run_id: run.id,
44-
job_id: 'a',
45-
message: 'blah',
46-
level: 'info',
47-
source: 'R/T',
48-
timestamp: '123',
48+
logs: [
49+
{
50+
job_id: 'a',
51+
message: 'blah',
52+
level: 'info',
53+
source: 'R/T',
54+
timestamp: '123',
55+
},
56+
],
4957
};
5058

5159
const channel = await join(client, run.id);
@@ -65,19 +73,23 @@ test.serial('save log to state', async (t) => {
6573

6674
const event = {
6775
run_id: run.id,
68-
job_id: 'a',
69-
message: 'blah',
70-
level: 'info',
71-
source: 'R/T',
72-
timestamp: '123',
76+
logs: [
77+
{
78+
job_id: 'a',
79+
message: 'blah',
80+
level: 'info',
81+
source: 'R/T',
82+
timestamp: '123',
83+
},
84+
],
7385
};
7486

7587
const channel = await join(client, run.id);
7688

7789
channel.push(RUN_LOG, event).receive('ok', () => {
7890
const { pending } = server.getState();
7991
const [savedLog] = pending[run.id].logs;
80-
t.deepEqual(savedLog, event);
92+
t.deepEqual(savedLog, event.logs[0]);
8193
done();
8294
});
8395
});
@@ -91,10 +103,14 @@ test.serial('error if no message', async (t) => {
91103

92104
const event = {
93105
run_id: run.id,
94-
job_id: 'a',
95-
level: 'info',
96-
source: 'R/T',
97-
timestamp: '123',
106+
logs: [
107+
{
108+
job_id: 'a',
109+
level: 'info',
110+
source: 'R/T',
111+
timestamp: '123',
112+
},
113+
],
98114
};
99115
const channel = await join(client, run.id);
100116

@@ -113,10 +129,14 @@ test.serial('error if no source', async (t) => {
113129

114130
const event = {
115131
run_id: run.id,
116-
job_id: 'a',
117-
message: 'blah',
118-
level: 'info',
119-
timestamp: '123',
132+
logs: [
133+
{
134+
job_id: 'a',
135+
message: 'blah',
136+
level: 'info',
137+
timestamp: '123',
138+
},
139+
],
120140
};
121141
const channel = await join(client, run.id);
122142

@@ -135,9 +155,13 @@ test.serial('error if no timestamp', async (t) => {
135155

136156
const event = {
137157
run_id: run.id,
138-
message: 'blah',
139-
level: 'info',
140-
source: 'R/T',
158+
logs: [
159+
{
160+
message: 'blah',
161+
level: 'info',
162+
source: 'R/T',
163+
},
164+
],
141165
};
142166
const channel = await join(client, run.id);
143167

@@ -156,10 +180,14 @@ test.serial('error if no level', async (t) => {
156180

157181
const event = {
158182
run_id: run.id,
159-
job_id: 'a',
160-
message: 'blah',
161-
source: 'R/T',
162-
timestamp: '123',
183+
logs: [
184+
{
185+
job_id: 'a',
186+
message: 'blah',
187+
source: 'R/T',
188+
timestamp: '123',
189+
},
190+
],
163191
};
164192
const channel = await join(client, run.id);
165193

0 commit comments

Comments
 (0)