Skip to content

Commit dc638c3

Browse files
authored
Enable flow triggers and schedule initialization during bulk flow creation (#2208)
Part of OPS-4028
1 parent 584b85a commit dc638c3

2 files changed

Lines changed: 170 additions & 18 deletions

File tree

packages/server/api/src/app/flows/flow/flow-bulk-create.ts

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,21 @@
1+
import { TriggerStrategy } from '@openops/blocks-framework';
12
import {
23
AppConnectionsWithSupportedBlocks,
4+
BlockTrigger,
5+
flowHelper,
6+
FlowScheduleOptions,
37
FlowStatus,
48
FlowVersion,
59
FlowVersionState,
6-
TriggerWithOptionalId,
7-
flowHelper,
10+
isNil,
811
openOpsId,
12+
ScheduleType,
13+
TriggerWithOptionalId,
914
} from '@openops/shared';
1015
import fs from 'node:fs/promises';
1116
import { EntityManager } from 'typeorm';
17+
import { triggerHooks } from '../trigger';
18+
import { triggerUtils } from '../trigger/hooks/trigger-utils';
1219
import { flowRepo } from './flow.repo';
1320

1421
export async function loadWorkflowTemplate(
@@ -41,7 +48,7 @@ type FlowInsertRecord = {
4148
status: FlowStatus;
4249
publishedVersionId: null;
4350
isInternal: boolean;
44-
schedule: null;
51+
schedule?: FlowScheduleOptions | null;
4552
};
4653

4754
export async function bulkCreateAndPublishFlows(
@@ -58,6 +65,8 @@ export async function bulkCreateAndPublishFlows(
5865
buildFlowAndVersion(template, projectId, folderId, connections),
5966
);
6067

68+
await enableFlowTriggers(flowsWithVersions, projectId);
69+
6170
await flowRepo().manager.transaction(async (trx) => {
6271
await trx
6372
.getRepository('flow')
@@ -156,3 +165,42 @@ function buildFlowAndVersion(
156165

157166
return { flow, version: lockedVersion };
158167
}
168+
169+
async function enableFlowTriggers(
170+
flowsWithVersions: Array<{
171+
flow: FlowInsertRecord;
172+
version: FlowVersion;
173+
}>,
174+
projectId: string,
175+
): Promise<void> {
176+
for (const { flow, version } of flowsWithVersions) {
177+
const trigger = version.trigger as BlockTrigger;
178+
179+
const blockTrigger = await triggerUtils.getBlockTriggerOrThrow({
180+
trigger,
181+
projectId,
182+
});
183+
184+
if (blockTrigger.type === TriggerStrategy.WEBHOOK) {
185+
continue;
186+
}
187+
188+
const enableResult = await triggerHooks.enable({
189+
flowVersion: version,
190+
projectId: flow.projectId,
191+
simulate: false,
192+
});
193+
194+
const scheduleOptions = enableResult?.result.scheduleOptions;
195+
196+
if (isNil(scheduleOptions)) {
197+
continue;
198+
}
199+
200+
flow.schedule = {
201+
...scheduleOptions,
202+
type: ScheduleType.CRON_EXPRESSION,
203+
failureCount: 0,
204+
};
205+
}
206+
}

packages/server/api/test/unit/flow/flow-bulk-create.test.ts

Lines changed: 119 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,24 @@
1-
/* eslint-disable @typescript-eslint/no-explicit-any */
1+
import { TriggerStrategy } from '@openops/blocks-framework';
2+
import {
3+
AppConnectionStatus,
4+
AppConnectionsWithSupportedBlocks,
5+
AppConnectionType,
6+
BlockType,
7+
flowHelper,
8+
FlowStatus,
9+
FlowVersionState,
10+
openOpsId,
11+
PackageType,
12+
ScheduleType,
13+
TriggerType,
14+
} from '@openops/shared';
15+
import {
16+
bulkCreateAndPublishFlows,
17+
WorkflowTemplate,
18+
} from '../../../src/app/flows/flow/flow-bulk-create';
19+
import { triggerHooks } from '../../../src/app/flows/trigger';
20+
import { triggerUtils } from '../../../src/app/flows/trigger/hooks/trigger-utils';
21+
222
const mockInsert = jest.fn().mockResolvedValue(undefined);
323
const mockQuery = jest.fn().mockResolvedValue(undefined);
424
const mockGetRepository = jest.fn().mockReturnValue({ insert: mockInsert });
@@ -12,7 +32,7 @@ const mockFlowRepo = jest.fn().mockReturnValue({
1232
});
1333

1434
jest.mock('../../../src/app/flows/flow/flow.repo', () => ({
15-
flowRepo: mockFlowRepo,
35+
flowRepo: (): object => mockFlowRepo(),
1636
}));
1737

1838
jest.mock('@openops/shared', () => ({
@@ -24,34 +44,73 @@ jest.mock('@openops/shared', () => ({
2444
},
2545
}));
2646

27-
import {
28-
flowHelper,
29-
FlowStatus,
30-
FlowVersionState,
31-
openOpsId,
32-
} from '@openops/shared';
33-
import { bulkCreateAndPublishFlows } from '../../../src/app/flows/flow/flow-bulk-create';
47+
jest.mock('../../../src/app/flows/trigger/hooks/trigger-utils', () => ({
48+
triggerUtils: {
49+
getBlockTriggerOrThrow: jest.fn(),
50+
},
51+
}));
52+
53+
jest.mock('../../../src/app/flows/trigger', () => ({
54+
triggerHooks: {
55+
enable: jest.fn(),
56+
},
57+
}));
3458

3559
const mockOpenOpsId = openOpsId as jest.Mock;
3660
const mockGetImportOperations = flowHelper.getImportOperations as jest.Mock;
3761
const mockApply = flowHelper.apply as jest.Mock;
62+
const mockGetBlockTriggerOrThrow =
63+
triggerUtils.getBlockTriggerOrThrow as jest.Mock;
64+
const mockTriggerHooksEnable = triggerHooks.enable as jest.Mock;
3865

39-
const baseTemplate = {
66+
const baseTemplate: WorkflowTemplate = {
4067
template: {
4168
displayName: 'Test Flow',
42-
trigger: { type: 'WEBHOOK', name: 'trigger' } as any,
69+
trigger: {
70+
type: TriggerType.BLOCK,
71+
name: 'trigger',
72+
displayName: 'Trigger',
73+
settings: {
74+
blockName: 'test-block',
75+
blockVersion: '1.0.0',
76+
triggerName: 'test-trigger',
77+
blockType: BlockType.OFFICIAL,
78+
packageType: PackageType.REGISTRY,
79+
input: {},
80+
inputUiInfo: {},
81+
},
82+
valid: true,
83+
nextAction: null,
84+
},
4385
},
4486
};
4587

46-
const baseConnections = [
47-
{ id: 'conn-1', name: 'Test', authProviderKey: 'aws', supportedBlocks: [] },
48-
] as any[];
88+
const baseConnections: AppConnectionsWithSupportedBlocks[] = [
89+
{
90+
id: 'conn-1',
91+
name: 'Test',
92+
authProviderKey: 'aws',
93+
supportedBlocks: [],
94+
projectId: 'project-1',
95+
type: AppConnectionType.SECRET_TEXT,
96+
value: {
97+
type: AppConnectionType.SECRET_TEXT,
98+
secret_text: 'secret',
99+
},
100+
created: '2021-01-01T00:00:00Z',
101+
updated: '2021-01-01T00:00:00Z',
102+
status: AppConnectionStatus.ACTIVE,
103+
},
104+
] as unknown as AppConnectionsWithSupportedBlocks[];
49105

50106
describe('bulkCreateAndPublishFlows', () => {
51107
beforeEach(() => {
52108
jest.clearAllMocks();
53109
mockGetImportOperations.mockReturnValue([]);
54110
mockApply.mockImplementation((version) => version);
111+
mockGetBlockTriggerOrThrow.mockResolvedValue({
112+
type: TriggerStrategy.WEBHOOK,
113+
});
55114

56115
let idCounter = 0;
57116
mockOpenOpsId.mockImplementation(() => `generated-id-${++idCounter}`);
@@ -222,9 +281,54 @@ describe('bulkCreateAndPublishFlows', () => {
222281
);
223282

224283
expect(mockGetImportOperations).toHaveBeenCalledWith(
225-
expect.objectContaining({ type: 'WEBHOOK' }),
284+
expect.objectContaining({ type: TriggerType.BLOCK }),
226285
baseConnections,
227286
);
228287
expect(mockApply).toHaveBeenCalledWith(expect.any(Object), mockOp);
229288
});
289+
290+
it('enables polling triggers and sets schedule', async () => {
291+
mockGetBlockTriggerOrThrow.mockResolvedValue({
292+
type: TriggerStrategy.POLLING,
293+
});
294+
mockTriggerHooksEnable.mockResolvedValue({
295+
result: {
296+
scheduleOptions: {
297+
cronExpression: '*/5 * * * *',
298+
timezone: 'UTC',
299+
},
300+
},
301+
});
302+
303+
await bulkCreateAndPublishFlows(
304+
[baseTemplate],
305+
baseConnections,
306+
'project-1',
307+
'folder-1',
308+
);
309+
310+
const flowInsertCall = mockInsert.mock.calls[0][0];
311+
expect(flowInsertCall[0].schedule).toEqual({
312+
cronExpression: '*/5 * * * *',
313+
timezone: 'UTC',
314+
type: ScheduleType.CRON_EXPRESSION,
315+
failureCount: 0,
316+
});
317+
expect(mockTriggerHooksEnable).toHaveBeenCalled();
318+
});
319+
320+
it('does not call triggerHooks.enable for WEBHOOK triggers', async () => {
321+
mockGetBlockTriggerOrThrow.mockResolvedValue({
322+
type: TriggerStrategy.WEBHOOK,
323+
});
324+
325+
await bulkCreateAndPublishFlows(
326+
[baseTemplate],
327+
baseConnections,
328+
'project-1',
329+
'folder-1',
330+
);
331+
332+
expect(mockTriggerHooksEnable).not.toHaveBeenCalled();
333+
});
230334
});

0 commit comments

Comments
 (0)