Skip to content

Commit d8f1fd5

Browse files
Move functionality from benchmarks to shared location
1 parent c5b0d70 commit d8f1fd5

7 files changed

Lines changed: 92 additions & 93 deletions

File tree

packages/server/api/src/app/benchmark/benchmark-status.service.ts

Lines changed: 7 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,13 @@ import {
66
BenchmarkStatusResponse,
77
BenchmarkWorkflowStatusItem,
88
ErrorCode,
9-
FlowRunStatus,
109
} from '@openops/shared';
1110
import { IsNull } from 'typeorm';
12-
import { flowRunRepo } from '../flows/flow-run/flow-run-service';
11+
import {
12+
FlowRunSummary,
13+
getLatestRunByFlowId,
14+
resolveRunStatus,
15+
} from '../flows/flow-run/flow-run-status.service';
1316
import { benchmarkFlowRepo } from './benchmark-flow.repo';
1417
import { benchmarkRepo } from './benchmark.repo';
1518

@@ -22,30 +25,6 @@ type FlowRow = {
2225

2326
type FlowRowWithBenchmarkId = FlowRow & { benchmarkId: string };
2427

25-
type FlowRunSummary = {
26-
id: string;
27-
status: FlowRunStatus;
28-
finishTime?: string;
29-
};
30-
31-
function mapFlowRunStatusToBenchmarkStatus(
32-
status: FlowRunStatus,
33-
): BenchmarkStatus {
34-
switch (status) {
35-
case FlowRunStatus.RUNNING:
36-
case FlowRunStatus.PAUSED:
37-
case FlowRunStatus.SCHEDULED:
38-
return BenchmarkStatus.RUNNING;
39-
case FlowRunStatus.FAILED:
40-
case FlowRunStatus.INTERNAL_ERROR:
41-
case FlowRunStatus.TIMEOUT:
42-
case FlowRunStatus.IGNORED:
43-
return BenchmarkStatus.FAILED;
44-
default:
45-
return BenchmarkStatus.SUCCEEDED;
46-
}
47-
}
48-
4928
async function fetchFlowRowsByBenchmarkIds(
5029
benchmarkIds: string[],
5130
): Promise<FlowRowWithBenchmarkId[]> {
@@ -80,63 +59,17 @@ function buildWorkflowStatusItems(
8059
isOrchestrator: row.isOrchestrator,
8160
isCleanup: row.isCleanup,
8261
runStatus: latestRun
83-
? mapFlowRunStatusToBenchmarkStatus(latestRun.status)
62+
? resolveRunStatus(latestRun.status)
8463
: BenchmarkStatus.CREATED,
8564
runId: latestRun?.id,
8665
};
8766
});
8867
}
8968

90-
async function getLatestRunByFlowId(
91-
flowIds: string[],
92-
projectId: string,
93-
): Promise<Record<string, FlowRunSummary | undefined>> {
94-
const rows = await flowRunRepo()
95-
.createQueryBuilder('fr')
96-
.distinctOn(['fr.flowId'])
97-
.select('fr.flowId', 'flowId')
98-
.addSelect('fr.id', 'id')
99-
.addSelect('fr.status', 'status')
100-
.addSelect('fr.finishTime', 'finishTime')
101-
.where('fr.projectId = :projectId', { projectId })
102-
.andWhere('fr.flowId IN (:...flowIds)', { flowIds })
103-
.orderBy('fr.flowId', 'ASC')
104-
.addOrderBy('fr.created', 'DESC')
105-
.addOrderBy('fr.id', 'DESC')
106-
.getRawMany<{
107-
id: string;
108-
flowId: string;
109-
status: FlowRunStatus;
110-
finishTime?: string;
111-
}>();
112-
113-
return mapLatestRuns(flowIds, rows);
114-
}
115-
116-
function mapLatestRuns(
117-
flowIds: string[],
118-
rows: {
119-
flowId: string;
120-
id: string;
121-
status: FlowRunStatus;
122-
finishTime?: string;
123-
}[],
124-
): Record<string, FlowRunSummary | undefined> {
125-
const result: Record<string, FlowRunSummary | undefined> = Object.fromEntries(
126-
flowIds.map((fid) => [fid, undefined]),
127-
);
128-
for (const r of rows) {
129-
result[r.flowId] = { id: r.id, status: r.status, finishTime: r.finishTime };
130-
}
131-
return result;
132-
}
133-
13469
function resolveOrchestratorStatus(
13570
run: FlowRunSummary | undefined,
13671
): BenchmarkStatus {
137-
return run
138-
? mapFlowRunStatusToBenchmarkStatus(run.status)
139-
: BenchmarkStatus.CREATED;
72+
return run ? resolveRunStatus(run.status) : BenchmarkStatus.CREATED;
14073
}
14174

14275
async function resolveStatusByBenchmarkId(

packages/server/api/src/app/benchmark/create-benchmark.service.ts

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,11 @@ import {
88
ErrorCode,
99
Folder,
1010
} from '@openops/shared';
11-
import fs from 'node:fs/promises';
1211
import { IsNull } from 'typeorm';
1312
import { getConnectionsWithBlockSupport } from '../app-connection/connections-with-block-support';
1413
import {
1514
bulkCreateAndPublishFlows,
15+
loadWorkflowTemplate,
1616
type WorkflowTemplate,
1717
} from '../flows/flow/flow-bulk-create';
1818
import { flowService } from '../flows/flow/flow.service';
@@ -137,16 +137,6 @@ export async function deleteFlowsForExistingBenchmark(params: {
137137
);
138138
}
139139

140-
async function loadWorkflowTemplate(
141-
filePath: string,
142-
): Promise<WorkflowTemplate> {
143-
const content = await fs.readFile(filePath, 'utf-8');
144-
const parsed = JSON.parse(content) as {
145-
template: WorkflowTemplate['template'];
146-
};
147-
return { template: parsed.template };
148-
}
149-
150140
type CategorizedWorkflowTemplates = {
151141
orchestrator: WorkflowTemplate;
152142
cleanup: WorkflowTemplate;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
import { FlowRunStatus, SimplifiedRunStatus } from '@openops/shared';
2+
import { flowRunRepo } from './flow-run-service';
3+
4+
export type FlowRunSummary = {
5+
id: string;
6+
status: FlowRunStatus;
7+
finishTime?: string;
8+
};
9+
10+
export function resolveRunStatus(
11+
status: FlowRunStatus,
12+
): SimplifiedRunStatus {
13+
switch (status) {
14+
case FlowRunStatus.RUNNING:
15+
case FlowRunStatus.PAUSED:
16+
case FlowRunStatus.SCHEDULED:
17+
return SimplifiedRunStatus.RUNNING;
18+
case FlowRunStatus.FAILED:
19+
case FlowRunStatus.INTERNAL_ERROR:
20+
case FlowRunStatus.TIMEOUT:
21+
case FlowRunStatus.IGNORED:
22+
return SimplifiedRunStatus.FAILED;
23+
default:
24+
return SimplifiedRunStatus.SUCCEEDED;
25+
}
26+
}
27+
28+
export async function getLatestRunByFlowId(
29+
flowIds: string[],
30+
projectId: string,
31+
): Promise<Record<string, FlowRunSummary | undefined>> {
32+
if (flowIds.length === 0) {
33+
return {};
34+
}
35+
36+
const rows = await flowRunRepo()
37+
.createQueryBuilder('fr')
38+
.distinctOn(['fr.flowId'])
39+
.select('fr.flowId', 'flowId')
40+
.addSelect('fr.id', 'id')
41+
.addSelect('fr.status', 'status')
42+
.addSelect('fr.finishTime', 'finishTime')
43+
.where('fr.projectId = :projectId', { projectId })
44+
.andWhere('fr.flowId IN (:...flowIds)', { flowIds })
45+
.orderBy('fr.flowId', 'ASC')
46+
.addOrderBy('fr.created', 'DESC')
47+
.addOrderBy('fr.id', 'DESC')
48+
.getRawMany<{
49+
id: string;
50+
flowId: string;
51+
status: FlowRunStatus;
52+
finishTime?: string;
53+
}>();
54+
55+
const result: Record<string, FlowRunSummary | undefined> = Object.fromEntries(
56+
flowIds.map((fid) => [fid, undefined]),
57+
);
58+
for (const r of rows) {
59+
result[r.flowId] = { id: r.id, status: r.status, finishTime: r.finishTime };
60+
}
61+
return result;
62+
}

packages/server/api/src/app/flows/flow/flow-bulk-create.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,20 @@ import {
77
flowHelper,
88
openOpsId,
99
} from '@openops/shared';
10+
import fs from 'node:fs/promises';
1011
import { EntityManager } from 'typeorm';
1112
import { flowRepo } from './flow.repo';
1213

14+
export async function loadWorkflowTemplate(
15+
filePath: string,
16+
): Promise<WorkflowTemplate> {
17+
const content = await fs.readFile(filePath, 'utf-8');
18+
const parsed = JSON.parse(content) as {
19+
template: WorkflowTemplate['template'];
20+
};
21+
return { template: parsed.template };
22+
}
23+
1324
export type WorkflowTemplate = {
1425
template: {
1526
displayName: string;

packages/shared/src/lib/benchmark/dto/benchmark-status-response.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
import { Static, Type } from '@sinclair/typebox';
2+
import { SimplifiedRunStatus } from '../../common/run-status';
23
import { BenchmarkWorkflowBase } from './create-benchmark-response';
34

4-
export enum BenchmarkStatus {
5-
CREATED = 'CREATED',
6-
RUNNING = 'RUNNING',
7-
SUCCEEDED = 'SUCCEEDED',
8-
FAILED = 'FAILED',
9-
}
5+
export { SimplifiedRunStatus as BenchmarkStatus };
106

117
export const BenchmarkWorkflowStatusItem = Type.Intersect([
128
BenchmarkWorkflowBase,
139
Type.Object({
14-
runStatus: Type.Enum(BenchmarkStatus),
10+
runStatus: Type.Enum(SimplifiedRunStatus),
1511
runId: Type.Optional(Type.String()),
1612
}),
1713
]);
@@ -22,7 +18,7 @@ export type BenchmarkWorkflowStatusItem = Static<
2218

2319
export const BenchmarkStatusResponse = Type.Object({
2420
benchmarkId: Type.String(),
25-
status: Type.Enum(BenchmarkStatus),
21+
status: Type.Enum(SimplifiedRunStatus),
2622
workflows: Type.Array(BenchmarkWorkflowStatusItem),
2723
lastRunId: Type.Optional(Type.String()),
2824
lastRunFinishedAt: Type.Optional(Type.String()),
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
export * from './base-model';
22
export * from './locale';
3+
export * from './run-status';
34
export * from './security';
45
export * from './utils';
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export enum SimplifiedRunStatus {
2+
CREATED = 'CREATED',
3+
RUNNING = 'RUNNING',
4+
SUCCEEDED = 'SUCCEEDED',
5+
FAILED = 'FAILED',
6+
}

0 commit comments

Comments
 (0)