Skip to content

Commit 8a3eeba

Browse files
Roman SnapkoMarceloRGonc
andauthored
Handle internal errors in trigger payload extraction (#2124)
Fixes OPS-3864 --------- Co-authored-by: Marcelo Gonçalves <marcelo@openops.com>
1 parent 3996653 commit 8a3eeba

11 files changed

Lines changed: 164 additions & 22 deletions

File tree

packages/engine/src/lib/handler/context/engine-constants.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
TestRunLimitSettings,
1515
TriggerHookType,
1616
} from '@openops/shared';
17+
import { parseJsonResponse } from '../../helper/response-helper';
1718
import {
1819
createPropsResolver,
1920
PropsResolver,
@@ -232,7 +233,7 @@ export class EngineConstants {
232233
},
233234
});
234235

235-
return (await response.json()) as Project;
236+
return parseJsonResponse<Project>(response);
236237
}
237238
}
238239

packages/engine/src/lib/helper/execution-errors.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,17 @@ export class FetchError extends ExecutionError {
100100
}
101101
}
102102

103+
export class InfrastructureError extends ExecutionError {
104+
constructor(message: string, cause?: unknown) {
105+
super(
106+
'InfrastructureError',
107+
formatMessage(message),
108+
ExecutionErrorType.ENGINE,
109+
cause,
110+
);
111+
}
112+
}
113+
103114
export class ExecutionLimitReachedError extends ExecutionError {
104115
formated: boolean;
105116

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { logger } from '@openops/server-shared';
2+
import { InfrastructureError } from './execution-errors';
3+
4+
export const parseJsonResponse = async <T>(response: Response): Promise<T> => {
5+
const contentType = response.headers.get('content-type');
6+
const text = await response.text();
7+
8+
if (contentType && !contentType.includes('application/json')) {
9+
logger.warn('Expected JSON response but received non-JSON content type', {
10+
status: response.status,
11+
contentType,
12+
body: text,
13+
});
14+
throw new InfrastructureError(
15+
`Expected JSON response, but received status ${response.status} and ${contentType}.`,
16+
);
17+
}
18+
19+
try {
20+
return JSON.parse(text) as T;
21+
} catch (e) {
22+
logger.warn('Failed to parse JSON response', {
23+
status: response.status,
24+
body: text,
25+
});
26+
throw new InfrastructureError(
27+
`Failed to parse JSON response with status ${response.status}.`,
28+
e,
29+
);
30+
}
31+
};

packages/engine/src/lib/operations.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,10 @@ import {
3434
import { testExecutionContext } from './handler/context/test-execution-context';
3535
import { flowExecutor } from './handler/flow-executor';
3636
import { blockHelper } from './helper/block-helper';
37-
import { ExecutionLimitReachedError } from './helper/execution-errors';
37+
import {
38+
ExecutionLimitReachedError,
39+
InfrastructureError,
40+
} from './helper/execution-errors';
3841
import { triggerHelper } from './helper/trigger-helper';
3942
import { resolveVariable } from './resolve-variable';
4043
import { progressService } from './services/progress.service';
@@ -330,6 +333,10 @@ function evaluateError(error: Error): {
330333
message = error.getMessage();
331334
}
332335

336+
if (error instanceof InfrastructureError) {
337+
status = FlowRunStatus.INFRASTRUCTURE_ERROR;
338+
}
339+
333340
return {
334341
status,
335342
message,

packages/engine/src/lib/services/connections.service.ts

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@ import {
1111
ConnectionExpiredError,
1212
ConnectionLoadingError,
1313
ConnectionNotFoundError,
14-
ExecutionError,
1514
FetchError,
1615
} from '../helper/execution-errors';
16+
import { parseJsonResponse } from '../helper/response-helper';
1717

1818
export const createConnectionService = ({
1919
projectId,
@@ -40,16 +40,13 @@ export const createConnectionService = ({
4040
httpStatus: response.status,
4141
});
4242
}
43-
const connection: AppConnection = await response.json();
43+
const connection: AppConnection =
44+
await parseJsonResponse<AppConnection>(response);
4445
if (connection.status === AppConnectionStatus.ERROR) {
4546
throw new ConnectionExpiredError(connectionName);
4647
}
4748
return getConnectionValue(connection);
4849
} catch (e) {
49-
if (e instanceof ExecutionError) {
50-
throw e;
51-
}
52-
5350
return handleFetchError({
5451
url,
5552
cause: e,

packages/engine/src/lib/services/files.service.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { FilesService, WorkflowFile } from '@openops/blocks-framework';
22
import { isNil } from '@openops/shared';
33
import fs from 'fs/promises';
4+
import { parseJsonResponse } from '../helper/response-helper';
45

56
const FILE_PREFIX_URL = 'file://';
67
const MEMORY_PREFIX_URL = 'memory://';
@@ -129,7 +130,7 @@ async function writeDbFile({
129130
throw new Error('Failed to store entry ' + response.body);
130131
}
131132

132-
const result = await response.json();
133+
const result = await parseJsonResponse<{ url: string }>(response);
133134
return result.url;
134135
}
135136

packages/engine/src/lib/services/storage.service.ts

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ import {
1919
StorageError,
2020
StorageLimitError,
2121
} from '../helper/execution-errors';
22+
import { parseJsonResponse } from '../helper/response-helper';
2223

2324
export const createStorageService = ({
2425
engineToken,
@@ -42,7 +43,7 @@ export const createStorageService = ({
4243
});
4344
}
4445

45-
return await response.json();
46+
return await parseJsonResponse<StoreEntry>(response);
4647
} catch (e) {
4748
return handleFetchError({
4849
url,
@@ -78,7 +79,7 @@ export const createStorageService = ({
7879
});
7980
}
8081

81-
return await response.json();
82+
return await parseJsonResponse<StoreEntry>(response);
8283
} catch (e) {
8384
return handleFetchError({
8485
url,
@@ -136,7 +137,9 @@ export const createStorageService = ({
136137
throw new Error(`Failed to list keys: ${response.statusText}`);
137138
}
138139

139-
const result = await response.json();
140+
const result = await parseJsonResponse<{
141+
entries: Array<{ key: string; value: unknown }>;
142+
}>(response);
140143
return result.entries || [];
141144
} catch (e) {
142145
return handleFetchError({
Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
import { parseJsonResponse } from '../src/lib/helper/response-helper';
2+
import { logger } from '@openops/server-shared';
3+
import { InfrastructureError } from '../src/lib/helper/execution-errors';
4+
5+
jest.mock('@openops/server-shared', () => ({
6+
logger: {
7+
warn: jest.fn(),
8+
},
9+
}));
10+
11+
describe('parseJsonResponse', () => {
12+
beforeEach(() => {
13+
jest.clearAllMocks();
14+
});
15+
16+
it('should return parsed JSON for a valid JSON response', async () => {
17+
const payload = { id: 1, name: 'test' };
18+
const response = {
19+
status: 200,
20+
headers: new Headers({ 'content-type': 'application/json' }),
21+
text: jest.fn().mockResolvedValue(JSON.stringify(payload)),
22+
} as unknown as Response;
23+
24+
const result = await parseJsonResponse(response);
25+
expect(result).toEqual(payload);
26+
expect(logger.warn).not.toHaveBeenCalled();
27+
});
28+
29+
it('should attempt to parse JSON when content-type header is missing', async () => {
30+
const payload = { id: 2 };
31+
const response = {
32+
status: 200,
33+
headers: new Headers(),
34+
text: jest.fn().mockResolvedValue(JSON.stringify(payload)),
35+
} as unknown as Response;
36+
37+
const result = await parseJsonResponse(response);
38+
expect(result).toEqual(payload);
39+
expect(logger.warn).not.toHaveBeenCalled();
40+
});
41+
42+
it('should include full status code and full body in InfrastructureError for non-json content-type', async () => {
43+
const body = '<html><body><h1>502 Bad Gateway</h1><p>Nginx Error</p></body></html>';
44+
const response = {
45+
status: 502,
46+
headers: new Headers({ 'content-type': 'text/html' }),
47+
text: jest.fn().mockResolvedValue(body),
48+
} as unknown as Response;
49+
50+
const promise = parseJsonResponse(response);
51+
await expect(promise).rejects.toThrow(InfrastructureError);
52+
await expect(promise).rejects.toThrow(
53+
'Expected JSON response, but received status 502 and text/html.',
54+
);
55+
expect(logger.warn).toHaveBeenCalledWith(
56+
'Expected JSON response but received non-JSON content type',
57+
{ status: 502, contentType: 'text/html', body },
58+
59+
);
60+
});
61+
62+
it('should include status code in InfrastructureError for invalid json', async () => {
63+
const body = 'invalid json';
64+
const response = {
65+
status: 200,
66+
headers: new Headers({ 'content-type': 'application/json' }),
67+
text: jest.fn().mockResolvedValue(body),
68+
} as unknown as Response;
69+
70+
const promise = parseJsonResponse(response);
71+
await expect(promise).rejects.toThrow(InfrastructureError);
72+
await expect(promise).rejects.toThrow(
73+
'Failed to parse JSON response with status 200.',
74+
);
75+
expect(logger.warn).toHaveBeenCalledWith(
76+
'Failed to parse JSON response',
77+
{ status: 200, body },
78+
);
79+
});
80+
});

packages/react-ui/src/app/features/flow-runs/lib/flow-run-utils.ts

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -83,11 +83,6 @@ export const flowRunUtils = {
8383
variant: 'default',
8484
Icon: CircleStop,
8585
};
86-
case FlowRunStatus.FAILED:
87-
return {
88-
variant: 'error',
89-
Icon: X,
90-
};
9186
case FlowRunStatus.IGNORED:
9287
return {
9388
variant: 'default',
@@ -103,12 +98,10 @@ export const flowRunUtils = {
10398
variant: 'default',
10499
Icon: PauseIcon,
105100
};
106-
case FlowRunStatus.INTERNAL_ERROR:
107-
return {
108-
variant: 'error',
109-
Icon: X,
110-
};
111101
case FlowRunStatus.TIMEOUT:
102+
case FlowRunStatus.INTERNAL_ERROR:
103+
case FlowRunStatus.INFRASTRUCTURE_ERROR:
104+
case FlowRunStatus.FAILED:
112105
return {
113106
variant: 'error',
114107
Icon: X,

packages/server/worker/src/lib/trigger/hooks/extract-trigger-payload-hooks.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
import {
77
ApplicationError,
88
ErrorCode,
9+
FlowRunStatus,
910
FlowVersion,
1011
isNil,
1112
ProjectId,
@@ -38,6 +39,21 @@ export async function extractPayloads(
3839
handleFailureFlow(flowVersion, projectId, engineToken, true);
3940
return result.output as unknown[];
4041
} else {
42+
if (
43+
!isNil(result) &&
44+
typeof result === 'object' &&
45+
'status' in result &&
46+
result.status === FlowRunStatus.INFRASTRUCTURE_ERROR
47+
) {
48+
logger.warn('Failed to execute trigger due to infrastructure issue', {
49+
result,
50+
blockName,
51+
blockVersion,
52+
flowId: flowVersion.flowId,
53+
});
54+
55+
return [];
56+
}
4157
logger.error(
4258
{
4359
result,
@@ -47,6 +63,7 @@ export async function extractPayloads(
4763
},
4864
'Failed to execute trigger',
4965
);
66+
5067
const errorMessage =
5168
result?.message ?? 'Trigger execution failed due to an unknown issue.';
5269

0 commit comments

Comments
 (0)