Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 26 additions & 13 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,15 @@ export abstract class WorkflowExecutorError extends Error {
}
}

// Domain error CATEGORIES (transport-agnostic). toHttpError maps each category to one HTTP status,
// so a new request-level error gets the right status simply by extending the right category — no
// per-error binding in the HTTP layer. These are for Runner/request-level outcomes that surface
// over HTTP; step-execution failures (RecordNotFoundError, …) stay plain WorkflowExecutorError —
// they never reach toHttpError (the step executor turns them into a step error outcome).
export abstract class NotFoundError extends WorkflowExecutorError {}
export abstract class AccessDeniedError extends WorkflowExecutorError {}
export abstract class UnavailableError extends WorkflowExecutorError {}

export class MissingToolCallError extends WorkflowExecutorError {
constructor() {
super(
Expand Down Expand Up @@ -117,7 +126,7 @@ export class UnsupportedActionFormError extends WorkflowExecutorError {
}
}

export class RunStorePortError extends WorkflowExecutorError {
export class RunStorePortError extends UnavailableError {
constructor(operation: string, cause: unknown) {
super(
`Run store "${operation}" failed: ${cause instanceof Error ? cause.message : String(cause)}`,
Expand Down Expand Up @@ -271,7 +280,7 @@ export class SchemaNotCachedError extends WorkflowExecutorError {
}
}

export class WorkflowPortError extends WorkflowExecutorError {
export class WorkflowPortError extends UnavailableError {
constructor(operation: string, cause: unknown) {
super(
`Workflow port "${operation}" failed: ${
Expand Down Expand Up @@ -312,27 +321,31 @@ export class ConfigurationError extends Error {
}
}

export class RunNotFoundError extends Error {
cause?: unknown;

// Run lifecycle/access errors raised by the Runner. Each extends a domain category, so toHttpError
// maps them by category (404/409/403) — no per-error HTTP binding.
export class RunNotFoundError extends NotFoundError {
constructor(runId: string, cause?: unknown) {
super(`Run "${runId}" not found or unavailable`);
this.name = 'RunNotFoundError';
super(`Run "${runId}" not found or unavailable`, 'Run not found or unavailable');
if (cause !== undefined) this.cause = cause;
}
}

export class UserMismatchError extends Error {
constructor(runId: string) {
super(`User not authorized for run "${runId}"`);
this.name = 'UserMismatchError';
export class UserMismatchError extends AccessDeniedError {
// The bearer/owner ids stay in the technical `message` (logged via the cause) but NOT in the
// userMessage — the HTTP body must never leak who owns a run.
constructor(runId: string, bearerUserId: number, ownerUserId: number) {
super(
`User ${bearerUserId} not authorized for run "${runId}" (owned by user ${ownerUserId})`,
'You are not authorized to access this run.',
);
}
}

export class RunAlreadyInFlightError extends Error {
// Stays uncategorized: it maps to 400 (see toHttpError) rather than a category status. Kept as a
// distinct class so toHttpError can flag it as expected churn (a double trigger isn't log-worthy).
export class RunAlreadyInFlightError extends WorkflowExecutorError {
constructor(runId: string) {
super(`Run "${runId}" is already being processed`);
this.name = 'RunAlreadyInFlightError';
}
}

Expand Down
124 changes: 45 additions & 79 deletions packages/workflow-executor/src/http/executor-http-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import http from 'http';
import Koa from 'koa';
import koaJwt from 'koa-jwt';

import {
BadRequestHttpError,
ForbiddenHttpError,
ServiceUnavailableHttpError,
toHttpError,
} from './http-errors';
import serializeStepForWire from './step-serializer';
import ConsoleLogger from '../adapters/console-logger';
import {
RunAlreadyInFlightError,
RunNotFoundError,
UserMismatchError,
WorkflowExecutorError,
extractErrorMessage,
} from '../errors';
import { extractErrorMessage } from '../errors';

export interface ExecutorHttpServerOptions {
port: number;
Expand All @@ -39,28 +39,43 @@ export default class ExecutorHttpServer {
this.logger = options.logger ?? new ConsoleLogger();
this.app = new Koa();

// Error middleware — catches all errors (including JWT 401) and returns structured JSON
// Error-translation middleware — the single place converting thrown errors (typed HTTP
// errors, domain errors via toHttpError, JWT 401) into HTTP responses. Handlers just throw.
this.app.use(async (ctx, next) => {
try {
await next();
} catch (err: unknown) {
const { status } = err as { status?: number };

if (status === 401) {
ctx.status = 401;
ctx.body = { error: 'Unauthorized' };
const httpError = toHttpError(err);

if (!httpError) {
this.logger.error('Unhandled HTTP error', {
method: ctx.method,
path: ctx.path,
error: extractErrorMessage(err),
stack: err instanceof Error ? err.stack : undefined,
});
ctx.status = 500;
ctx.body = { error: 'Internal server error' };

return;
}

this.logger.error('Unhandled HTTP error', {
method: ctx.method,
path: ctx.path,
error: extractErrorMessage(err),
stack: err instanceof Error ? err.stack : undefined,
});
ctx.status = 500;
ctx.body = { error: 'Internal server error' };
if (httpError.log) {
this.logger.error('HTTP request failed', {
method: ctx.method,
path: ctx.path,
status: httpError.status,
error: extractErrorMessage(httpError.cause ?? httpError),
// Prefer the cause's stack (points at the real fault site); fall back to the HTTP
// error's own stack so a log:true error without a cause never logs an empty stack.
stack:
(httpError.cause instanceof Error ? httpError.cause.stack : undefined) ??
httpError.stack,
});
}

ctx.status = httpError.status;
ctx.body = { error: httpError.userMessage };
}
});

Expand Down Expand Up @@ -133,30 +148,26 @@ export default class ExecutorHttpServer {

private async hasRunAccessMiddleware(ctx: Koa.Context, next: Koa.Next): Promise<void> {
const user = ctx.state.user as StepUser;
let allowed: boolean;

try {
const allowed = await this.options.workflowPort.hasRunAccess(ctx.params.runId, user);

if (!allowed) {
ctx.status = 403;
ctx.body = { error: 'Forbidden' };

return;
}
allowed = await this.options.workflowPort.hasRunAccess(ctx.params.runId, user);
} catch (err) {
// Logged here rather than by the translation middleware: the runId context lives here.
this.logger.error('Failed to check run access', {
runId: ctx.params.runId,
method: ctx.method,
path: ctx.path,
error: extractErrorMessage(err),
stack: err instanceof Error ? err.stack : undefined,
});
ctx.status = 503;
ctx.body = { error: 'Service unavailable' };

return;
// log:false — already logged above with the richer runId context.
throw new ServiceUnavailableHttpError('Service unavailable', { cause: err });
}

if (!allowed) throw new ForbiddenHttpError();

await next();
}

Expand All @@ -171,57 +182,12 @@ export default class ExecutorHttpServer {
const bearerUserId = typeof rawId === 'number' ? rawId : Number(rawId);

if (!Number.isFinite(bearerUserId)) {
ctx.status = 400;
ctx.body = { error: 'Missing or invalid user id in token' };

return;
throw new BadRequestHttpError('Missing or invalid user id in token');
}

const pendingData = (ctx.request.body as { pendingData?: unknown })?.pendingData;

try {
await this.options.runner.triggerPoll(runId, {
pendingData,
bearerUserId,
});
} catch (err) {
if (err instanceof RunNotFoundError) {
ctx.status = 404;
ctx.body = { error: 'Run not found or unavailable' };

return;
}

if (err instanceof RunAlreadyInFlightError) {
ctx.status = 400;
ctx.body = { error: err.message };

return;
}

if (err instanceof UserMismatchError) {
this.logger.error('User mismatch on trigger', { runId, bearerUserId });
ctx.status = 403;
ctx.body = { error: 'Forbidden' };

return;
}

if (err instanceof WorkflowExecutorError) {
this.logger.error('Malformed run on trigger', {
runId,
bearerUserId,
error: extractErrorMessage(err),
stack: err.stack,
});
ctx.status = 400;
ctx.body = { error: err.userMessage };

return;
}

throw err;
}
await this.options.runner.triggerPoll(runId, { pendingData, bearerUserId });

ctx.status = 200;
ctx.body = { triggered: true };
Expand Down
96 changes: 96 additions & 0 deletions packages/workflow-executor/src/http/http-errors.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
/* eslint-disable max-classes-per-file */
import {
AccessDeniedError,
NotFoundError,
RunAlreadyInFlightError,
UnavailableError,
WorkflowExecutorError,
} from '../errors';

// HTTP-typed errors: each carries its response semantics (status + user-facing body) so the
// error-translation middleware can render any thrown error without per-handler branching. One
// concrete class per status; handlers throw them directly, and toHttpError maps domain error
// categories onto them.
export abstract class BaseHttpError extends Error {
readonly status: number;
readonly userMessage: string;
// When true, the translation middleware logs the error (with its cause's stack) at error level.
// Off by default so expected client churn (completed run, double trigger) is not noise-logged.
readonly log: boolean;
cause?: unknown;

constructor(
status: number,
userMessage: string,
options: { log?: boolean; cause?: unknown } = {},
) {
super(userMessage);
this.name = this.constructor.name;
this.status = status;
this.userMessage = userMessage;
this.log = options.log ?? false;
if (options.cause !== undefined) this.cause = options.cause;
}
}

export class BadRequestHttpError extends BaseHttpError {
constructor(userMessage: string, options?: { log?: boolean; cause?: unknown }) {
super(400, userMessage, options);
}
}

export class UnauthorizedHttpError extends BaseHttpError {
constructor() {
super(401, 'Unauthorized');
}
}

export class ForbiddenHttpError extends BaseHttpError {
// 403 bodies stay opaque ('Forbidden') on purpose — never echo why access was denied.
constructor(options?: { log?: boolean; cause?: unknown }) {
super(403, 'Forbidden', options);
}
}

export class NotFoundHttpError extends BaseHttpError {
constructor(userMessage: string, options?: { log?: boolean; cause?: unknown }) {
super(404, userMessage, options);
}
}

export class ServiceUnavailableHttpError extends BaseHttpError {
constructor(userMessage: string, options?: { log?: boolean; cause?: unknown }) {
super(503, userMessage, options);
}
}

// The single domain→HTTP mapping. Maps by domain category, not concrete class, so a new
// NotFoundError/ConflictError/… is translated correctly without touching this function. Returns
// null when the error has no HTTP translation — the middleware then responds 500 without leaking
// internals.
export function toHttpError(err: unknown): BaseHttpError | null {
if (err instanceof BaseHttpError) return err;

// koa-jwt rejects with an error object carrying status 401.
if ((err as { status?: number })?.status === 401) return new UnauthorizedHttpError();

// Category branches MUST precede the WorkflowExecutorError catch-all: every category extends it.
if (err instanceof NotFoundError) return new NotFoundHttpError(err.userMessage, { cause: err });
if (err instanceof AccessDeniedError) return new ForbiddenHttpError({ log: true, cause: err });

// Expected client churn (a double trigger): 400, not logged. Precedes the catch-all, which logs.
if (err instanceof RunAlreadyInFlightError) {
return new BadRequestHttpError(err.userMessage, { cause: err });
}

if (err instanceof UnavailableError) {
return new ServiceUnavailableHttpError(err.userMessage, { log: true, cause: err });
}

// Uncategorized domain error: 400 with its userMessage (safe default — never a silent 500).
if (err instanceof WorkflowExecutorError) {
return new BadRequestHttpError(err.userMessage, { log: true, cause: err });
}

return null;
}
2 changes: 1 addition & 1 deletion packages/workflow-executor/src/runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ export default class Runner {
const { step, auth } = dispatch;

if (options?.bearerUserId !== undefined && step.user.id !== options.bearerUserId) {
throw new UserMismatchError(runId);
throw new UserMismatchError(runId, options.bearerUserId, step.user.id);
}

await this.executeStep(step, auth.forestServerToken, options?.pendingData);
Expand Down
Loading
Loading