Skip to content

Commit e7ad034

Browse files
Roman SnapkoMarceloRGonc
andauthored
Save workflow run when trigger failed (#1577)
<!-- Ensure the title clearly reflects what was changed. Provide a clear and concise description of the changes made. The PR should only contain the changes related to the issue, and no other unrelated changes. --> Fixes OPS-2926. https://www.loom.com/share/ef9620a0a5404edebae9a019cff75385 --------- Co-authored-by: Marcelo Gonçalves <marcelo@openops.com>
1 parent 970aeb8 commit e7ad034

5 files changed

Lines changed: 277 additions & 15 deletions

File tree

packages/server/api/src/app/flows/flow-run/flow-run-service.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ import {
2525
RunEnvironment,
2626
SeekPage,
2727
spreadIfDefined,
28+
StepOutput,
29+
StepOutputStatus,
2830
TERMINAL_STATUSES,
2931
} from '@openops/shared';
3032
import { nanoid } from 'nanoid';
@@ -131,6 +133,58 @@ function getEffectiveProgressUpdateType(
131133
}
132134

133135
export const flowRunService = {
136+
async recordTriggerFailure({
137+
projectId,
138+
flowVersionId,
139+
errorMessage,
140+
reason,
141+
triggerInput,
142+
}: {
143+
projectId: ProjectId;
144+
flowVersionId: FlowVersionId;
145+
errorMessage: string;
146+
reason: string;
147+
triggerInput?: unknown;
148+
}): Promise<void> {
149+
const flowVersion = await flowVersionService.getOneOrThrow(flowVersionId);
150+
151+
const executionState: ExecutionState = {
152+
steps: {
153+
[flowVersion.trigger.name]: {
154+
type: flowVersion.trigger.type,
155+
status: StepOutputStatus.FAILED,
156+
input: triggerInput,
157+
errorMessage,
158+
},
159+
} as Record<string, StepOutput>,
160+
};
161+
162+
const logsFileId = await updateLogs({
163+
logsFileId: null,
164+
projectId,
165+
executionState,
166+
});
167+
168+
const failedRun = {
169+
id: openOpsId(),
170+
projectId,
171+
flowId: flowVersion.flowId,
172+
flowVersionId: flowVersion.id,
173+
environment: RunEnvironment.PRODUCTION,
174+
flowDisplayName: flowVersion.displayName,
175+
startTime: new Date().toISOString(),
176+
finishTime: new Date().toISOString(),
177+
status: FlowRunStatus.FAILED,
178+
triggerSource: FlowRunTriggerSource.TRIGGERED,
179+
terminationReason: reason,
180+
tasks: 0,
181+
duration: 0,
182+
tags: [],
183+
logsFileId,
184+
};
185+
186+
await flowRunRepo().save(failedRun);
187+
},
134188
async list({
135189
projectId,
136190
flowId,

packages/server/api/src/app/workers/engine-controller.ts

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,19 @@ export const flowEngineWorker: FastifyPluginAsyncTypebox = async (app) => {
110110
projectId,
111111
success,
112112
});
113+
114+
if (success === false) {
115+
const { flowVersionId, errorMessage, reason, triggerInput } =
116+
request.body as Extract<UpdateFailureCountRequest, { success: false }>;
117+
118+
await flowRunService.recordTriggerFailure({
119+
projectId,
120+
flowVersionId,
121+
errorMessage,
122+
reason,
123+
triggerInput,
124+
});
125+
}
113126
});
114127

115128
app.post('/update-run', UpdateStepProgress, async (request) => {
Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,161 @@
1+
export const mockedRepo = {
2+
findOneBy: jest.fn(),
3+
createQueryBuilder: jest.fn(),
4+
save: jest.fn(),
5+
update: jest.fn(),
6+
};
7+
8+
jest.mock('../../../src/app/core/db/repo-factory', () => {
9+
return {
10+
repoFactory: () => jest.fn().mockReturnValue(mockedRepo),
11+
};
12+
});
13+
14+
const fileServiceMock = {
15+
save: jest.fn(),
16+
};
17+
jest.mock('../../../src/app/file/file.service', () => {
18+
return {
19+
fileService: fileServiceMock,
20+
};
21+
});
22+
23+
const flowVersionServiceMock = {
24+
getOneOrThrow: jest.fn(),
25+
};
26+
jest.mock('../../../src/app/flows/flow-version/flow-version.service', () => {
27+
return {
28+
flowVersionService: flowVersionServiceMock,
29+
};
30+
});
31+
32+
const logSerializerMock = {
33+
serialize: jest.fn(),
34+
};
35+
jest.mock('../../../src/app/flows/flow-run/log-serializer', () => {
36+
return {
37+
logSerializer: logSerializerMock,
38+
};
39+
});
40+
41+
import {
42+
FileCompression,
43+
FileType,
44+
FlowRunStatus,
45+
FlowRunTriggerSource,
46+
RunEnvironment,
47+
StepOutputStatus,
48+
} from '@openops/shared';
49+
import { flowRunService } from '../../../src/app/flows/flow-run/flow-run-service';
50+
51+
describe('flowRunService.recordTriggerFailure', () => {
52+
const now = new Date('2024-01-02T03:04:05.000Z');
53+
54+
beforeAll(() => {
55+
jest.useFakeTimers();
56+
});
57+
58+
beforeEach(() => {
59+
jest.setSystemTime(now);
60+
jest.clearAllMocks();
61+
logSerializerMock.serialize.mockResolvedValue(
62+
Buffer.from('compressed-logs'),
63+
);
64+
fileServiceMock.save.mockResolvedValue({ id: 'file_123' });
65+
flowVersionServiceMock.getOneOrThrow.mockResolvedValue({
66+
id: 'fv_1',
67+
flowId: 'flow_1',
68+
displayName: 'My Flow V1',
69+
trigger: {
70+
name: 'triggerStep',
71+
type: 'POLLING',
72+
},
73+
});
74+
});
75+
76+
afterAll(() => {
77+
jest.useRealTimers();
78+
});
79+
80+
it('should create a failed flow run with logs when triggerInput is provided', async () => {
81+
await flowRunService.recordTriggerFailure({
82+
projectId: 'proj_1',
83+
flowVersionId: 'fv_1',
84+
errorMessage: 'Trigger failed to execute',
85+
reason: 'TRIGGER_ERROR',
86+
triggerInput: { a: 1 },
87+
});
88+
89+
expect(logSerializerMock.serialize).toHaveBeenCalledTimes(1);
90+
const serializeArg = (logSerializerMock.serialize as jest.Mock).mock
91+
.calls[0][0];
92+
expect(serializeArg).toEqual({
93+
executionState: {
94+
steps: {
95+
triggerStep: {
96+
type: 'POLLING',
97+
status: StepOutputStatus.FAILED,
98+
input: { a: 1 },
99+
errorMessage: 'Trigger failed to execute',
100+
},
101+
},
102+
},
103+
});
104+
105+
expect(fileServiceMock.save).toHaveBeenCalledTimes(1);
106+
const saveArg = (fileServiceMock.save as jest.Mock).mock.calls[0][0];
107+
expect(saveArg).toEqual(
108+
expect.objectContaining({
109+
fileId: expect.any(String),
110+
data: Buffer.from('compressed-logs'),
111+
type: FileType.FLOW_RUN_LOG,
112+
compression: FileCompression.GZIP,
113+
projectId: 'proj_1',
114+
}),
115+
);
116+
117+
expect(mockedRepo.save).toHaveBeenCalledTimes(1);
118+
const saved = mockedRepo.save.mock.calls[0][0];
119+
expect(saved).toMatchObject({
120+
projectId: 'proj_1',
121+
flowId: 'flow_1',
122+
flowVersionId: 'fv_1',
123+
environment: RunEnvironment.PRODUCTION,
124+
flowDisplayName: 'My Flow V1',
125+
startTime: now.toISOString(),
126+
finishTime: now.toISOString(),
127+
status: FlowRunStatus.FAILED,
128+
triggerSource: FlowRunTriggerSource.TRIGGERED,
129+
terminationReason: 'TRIGGER_ERROR',
130+
tasks: 0,
131+
duration: 0,
132+
tags: [],
133+
});
134+
expect(saved.logsFileId).toBe(saveArg.fileId);
135+
136+
expect(typeof saved.id).toBe('string');
137+
expect(saved.id.length).toBeGreaterThan(0);
138+
});
139+
140+
it('should create a failed flow run without triggerInput when not provided', async () => {
141+
await flowRunService.recordTriggerFailure({
142+
projectId: 'proj_2',
143+
flowVersionId: 'fv_1',
144+
errorMessage: 'Boom',
145+
reason: 'TRIGGER_ERROR_NO_INPUT',
146+
});
147+
148+
const serializeArg = (logSerializerMock.serialize as jest.Mock).mock
149+
.calls[0][0];
150+
expect(serializeArg.executionState.steps.triggerStep).toEqual({
151+
type: 'POLLING',
152+
status: StepOutputStatus.FAILED,
153+
input: undefined,
154+
errorMessage: 'Boom',
155+
});
156+
157+
const saved = mockedRepo.save.mock.calls[0][0];
158+
expect(saved.projectId).toBe('proj_2');
159+
expect(saved.terminationReason).toBe('TRIGGER_ERROR_NO_INPUT');
160+
});
161+
});

packages/server/shared/src/lib/job/index.ts

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,11 +39,22 @@ export const UpdateJobRequest = Type.Object({
3939
});
4040
export type UpdateJobRequest = Static<typeof UpdateJobRequest>;
4141

42-
export const UpdateFailureCountRequest = Type.Object({
43-
flowId: Type.String(),
44-
projectId: Type.String(),
45-
success: Type.Boolean(),
46-
});
42+
export const UpdateFailureCountRequest = Type.Union([
43+
Type.Object({
44+
flowId: Type.String(),
45+
projectId: Type.String(),
46+
success: Type.Literal(true),
47+
}),
48+
Type.Object({
49+
flowId: Type.String(),
50+
projectId: Type.String(),
51+
success: Type.Literal(false),
52+
flowVersionId: Type.String(),
53+
reason: Type.String(),
54+
errorMessage: Type.String(),
55+
triggerInput: Type.Optional(Type.Unknown()),
56+
}),
57+
]);
4758

4859
export type UpdateFailureCountRequest = Static<
4960
typeof UpdateFailureCountRequest

packages/server/worker/src/lib/trigger/hooks/extract-trigger-payload-hooks.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,8 @@
1-
import { logger, rejectedPromiseHandler } from '@openops/server-shared';
1+
import {
2+
logger,
3+
rejectedPromiseHandler,
4+
UpdateFailureCountRequest,
5+
} from '@openops/server-shared';
26
import {
37
ApplicationError,
48
ErrorCode,
@@ -43,8 +47,15 @@ export async function extractPayloads(
4347
},
4448
'Failed to execute trigger',
4549
);
46-
handleFailureFlow(flowVersion, projectId, engineToken, false);
50+
const errorMessage =
51+
result?.message ?? 'Trigger execution failed due to an unknown issue.';
4752

53+
handleFailureFlow(flowVersion, projectId, engineToken, false, {
54+
reason: 'TRIGGER_HOOK_FAILED',
55+
flowVersionId: flowVersion.id,
56+
errorMessage,
57+
triggerInput: result.input,
58+
});
4859
return [];
4960
}
5061
} catch (e) {
@@ -61,7 +72,12 @@ export async function extractPayloads(
6172
},
6273
'Failed to execute trigger due to timeout',
6374
);
64-
handleFailureFlow(flowVersion, projectId, engineToken, false);
75+
76+
handleFailureFlow(flowVersion, projectId, engineToken, false, {
77+
reason: 'TRIGGER_TIMEOUT',
78+
flowVersionId: flowVersion.id,
79+
errorMessage: 'Trigger execution timed out',
80+
});
6581
return [];
6682
}
6783
throw e;
@@ -73,16 +89,23 @@ function handleFailureFlow(
7389
projectId: ProjectId,
7490
engineToken: string,
7591
success: boolean,
92+
failureDetails?: {
93+
reason: string;
94+
flowVersionId: string;
95+
errorMessage: string;
96+
triggerInput?: unknown;
97+
},
7698
): void {
7799
const engineController = engineApiService(engineToken);
78100

79-
rejectedPromiseHandler(
80-
engineController.updateFailureCount({
81-
flowId: flowVersion.flowId,
82-
projectId,
83-
success,
84-
}),
85-
);
101+
const request = {
102+
flowId: flowVersion.flowId,
103+
projectId,
104+
success,
105+
...(!success && failureDetails ? failureDetails : {}),
106+
} as UpdateFailureCountRequest;
107+
108+
rejectedPromiseHandler(engineController.updateFailureCount(request));
86109
}
87110

88111
type ExecuteTrigger = {

0 commit comments

Comments
 (0)