Skip to content

Commit 08e6b2b

Browse files
feat(workflows): add GET /workflows/[id]/executions/[executionId] status endpoint
Normalized status (pending|running|paused|completed|failed|cancelled) across workflowExecutionLogs and pausedExecutions in a single response. Surfaces paused-state details (resumeAt, pauseKind, blockedOnBlockId) when a row exists in pausedExecutions, and the error string for failed runs. finalOutput is opt-in via ?includeOutput=true.
1 parent 6d3f752 commit 08e6b2b

3 files changed

Lines changed: 225 additions & 2 deletions

File tree

Lines changed: 169 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,169 @@
1+
import { db } from '@sim/db'
2+
import { pausedExecutions, workflowExecutionLogs } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { and, eq } from 'drizzle-orm'
5+
import { type NextRequest, NextResponse } from 'next/server'
6+
import {
7+
getWorkflowExecutionContract,
8+
type WorkflowExecutionStatusResponse,
9+
} from '@/lib/api/contracts/workflows'
10+
import { parseRequest } from '@/lib/api/server'
11+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
12+
import { validateWorkflowAccess } from '@/app/api/workflows/middleware'
13+
import type { PausePoint } from '@/executor/types'
14+
15+
const logger = createLogger('WorkflowExecutionStatusAPI')
16+
17+
type LogStatus = 'pending' | 'running' | 'completed' | 'failed' | 'cancelled'
18+
19+
interface ExecutionDataShape {
20+
finalOutput?: { error?: string } & Record<string, unknown>
21+
error?: { message?: string } | string
22+
completionFailure?: string
23+
}
24+
25+
function pickEarliestPausePoint(points: PausePoint[]): PausePoint | null {
26+
const active = points.filter((p) => p.resumeStatus === 'paused')
27+
if (active.length === 0) return null
28+
return active.reduce<PausePoint | null>((best, current) => {
29+
if (!best) return current
30+
if (!current.resumeAt) return best
31+
if (!best.resumeAt) return current
32+
return current.resumeAt < best.resumeAt ? current : best
33+
}, null)
34+
}
35+
36+
function normalizePausePoints(raw: unknown): PausePoint[] {
37+
if (!raw) return []
38+
if (Array.isArray(raw)) return raw as PausePoint[]
39+
if (typeof raw === 'object') return Object.values(raw as Record<string, PausePoint>)
40+
return []
41+
}
42+
43+
function extractError(executionData: unknown): string | null {
44+
if (!executionData || typeof executionData !== 'object') return null
45+
const data = executionData as ExecutionDataShape
46+
if (typeof data.error === 'string') return data.error
47+
if (data.error && typeof data.error === 'object' && typeof data.error.message === 'string') {
48+
return data.error.message
49+
}
50+
if (typeof data.finalOutput?.error === 'string') return data.finalOutput.error
51+
if (typeof data.completionFailure === 'string') return data.completionFailure
52+
return null
53+
}
54+
55+
export const GET = withRouteHandler(
56+
async (
57+
request: NextRequest,
58+
context: { params: Promise<{ id: string; executionId: string }> }
59+
) => {
60+
const parsed = await parseRequest(getWorkflowExecutionContract, request, context)
61+
if (!parsed.success) return parsed.response
62+
const { id: workflowId, executionId } = parsed.data.params
63+
const { includeOutput } = parsed.data.query
64+
65+
const access = await validateWorkflowAccess(request, workflowId, false)
66+
if (access.error) {
67+
return NextResponse.json({ error: access.error.message }, { status: access.error.status })
68+
}
69+
70+
const [logRow] = await db
71+
.select({
72+
executionId: workflowExecutionLogs.executionId,
73+
workflowId: workflowExecutionLogs.workflowId,
74+
status: workflowExecutionLogs.status,
75+
level: workflowExecutionLogs.level,
76+
trigger: workflowExecutionLogs.trigger,
77+
startedAt: workflowExecutionLogs.startedAt,
78+
endedAt: workflowExecutionLogs.endedAt,
79+
totalDurationMs: workflowExecutionLogs.totalDurationMs,
80+
executionData: workflowExecutionLogs.executionData,
81+
cost: workflowExecutionLogs.cost,
82+
})
83+
.from(workflowExecutionLogs)
84+
.where(
85+
and(
86+
eq(workflowExecutionLogs.executionId, executionId),
87+
eq(workflowExecutionLogs.workflowId, workflowId)
88+
)
89+
)
90+
.limit(1)
91+
92+
if (!logRow) {
93+
return NextResponse.json({ error: 'Execution not found' }, { status: 404 })
94+
}
95+
96+
const [pausedRow] = await db
97+
.select({
98+
id: pausedExecutions.id,
99+
status: pausedExecutions.status,
100+
pausePoints: pausedExecutions.pausePoints,
101+
resumedCount: pausedExecutions.resumedCount,
102+
pausedAt: pausedExecutions.pausedAt,
103+
nextResumeAt: pausedExecutions.nextResumeAt,
104+
})
105+
.from(pausedExecutions)
106+
.where(eq(pausedExecutions.executionId, executionId))
107+
.limit(1)
108+
109+
const isCurrentlyPaused =
110+
!!pausedRow && (pausedRow.status === 'paused' || pausedRow.status === 'partially_resumed')
111+
112+
let status: WorkflowExecutionStatusResponse['status']
113+
if (isCurrentlyPaused) {
114+
status = 'paused'
115+
} else {
116+
status = logRow.status as LogStatus
117+
}
118+
119+
let paused: WorkflowExecutionStatusResponse['paused'] = null
120+
if (isCurrentlyPaused && pausedRow) {
121+
const points = normalizePausePoints(pausedRow.pausePoints)
122+
const earliest = pickEarliestPausePoint(points)
123+
paused = {
124+
pausedAt: pausedRow.pausedAt.toISOString(),
125+
resumeAt: pausedRow.nextResumeAt?.toISOString() ?? earliest?.resumeAt ?? null,
126+
pauseKind: earliest?.pauseKind ?? null,
127+
blockedOnBlockId: earliest?.blockId ?? null,
128+
pausedExecutionId: pausedRow.id,
129+
pausePointCount: points.length,
130+
resumedCount: pausedRow.resumedCount,
131+
}
132+
}
133+
134+
const cost = logRow.cost
135+
? { total: Number((logRow.cost as { total?: number }).total ?? 0) }
136+
: null
137+
138+
const error = status === 'failed' ? extractError(logRow.executionData) : null
139+
140+
const finalOutput =
141+
includeOutput && status === 'completed' && logRow.executionData
142+
? ((logRow.executionData as ExecutionDataShape).finalOutput ?? null)
143+
: null
144+
145+
const response: WorkflowExecutionStatusResponse = {
146+
executionId: logRow.executionId,
147+
workflowId: logRow.workflowId ?? workflowId,
148+
status,
149+
trigger: logRow.trigger,
150+
level: logRow.level,
151+
startedAt: logRow.startedAt.toISOString(),
152+
endedAt: logRow.endedAt?.toISOString() ?? null,
153+
totalDurationMs: logRow.totalDurationMs ?? null,
154+
paused,
155+
cost,
156+
error,
157+
finalOutput,
158+
}
159+
160+
logger.debug('Fetched execution status', {
161+
workflowId,
162+
executionId,
163+
status,
164+
paused: !!paused,
165+
})
166+
167+
return NextResponse.json(response)
168+
}
169+
)

apps/sim/lib/api/contracts/workflows.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -509,6 +509,49 @@ const pausedWorkflowExecutionDetailSchema = pausedWorkflowExecutionSummarySchema
509509
queue: z.array(z.record(z.string(), z.unknown())),
510510
})
511511

512+
const workflowExecutionStatusEnum = z.enum([
513+
'pending',
514+
'running',
515+
'paused',
516+
'completed',
517+
'failed',
518+
'cancelled',
519+
])
520+
521+
const workflowExecutionPausedDetailSchema = z.object({
522+
pausedAt: z.string(),
523+
resumeAt: z.string().nullable(),
524+
pauseKind: z.enum(['time', 'human']).nullable(),
525+
blockedOnBlockId: z.string().nullable(),
526+
pausedExecutionId: z.string(),
527+
pausePointCount: z.number(),
528+
resumedCount: z.number(),
529+
})
530+
531+
const workflowExecutionStatusResponseSchema = z.object({
532+
executionId: z.string(),
533+
workflowId: z.string(),
534+
status: workflowExecutionStatusEnum,
535+
trigger: z.string(),
536+
level: z.string(),
537+
startedAt: z.string(),
538+
endedAt: z.string().nullable(),
539+
totalDurationMs: z.number().nullable(),
540+
paused: workflowExecutionPausedDetailSchema.nullable(),
541+
cost: z.object({ total: z.number() }).nullable(),
542+
error: z.string().nullable(),
543+
finalOutput: z.unknown().nullable(),
544+
})
545+
546+
export type WorkflowExecutionStatusResponse = z.output<typeof workflowExecutionStatusResponseSchema>
547+
548+
const workflowExecutionStatusQuerySchema = z.object({
549+
includeOutput: z
550+
.enum(['true', 'false'])
551+
.optional()
552+
.transform((value) => value === 'true'),
553+
})
554+
512555
const cancelWorkflowExecutionResponseSchema = z.object({
513556
success: z.boolean(),
514557
executionId: z.string(),
@@ -793,6 +836,17 @@ export const cancelWorkflowExecutionContract = defineRouteContract({
793836
},
794837
})
795838

839+
export const getWorkflowExecutionContract = defineRouteContract({
840+
method: 'GET',
841+
path: '/api/workflows/[id]/executions/[executionId]',
842+
params: workflowExecutionParamsSchema,
843+
query: workflowExecutionStatusQuerySchema,
844+
response: {
845+
mode: 'json',
846+
schema: workflowExecutionStatusResponseSchema,
847+
},
848+
})
849+
796850
export const streamWorkflowExecutionContract = defineRouteContract({
797851
method: 'GET',
798852
path: '/api/workflows/[id]/executions/[executionId]/stream',

scripts/check-api-validation-contracts.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ const QUERY_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/queries')
99
const SELECTOR_HOOKS_DIR = path.join(ROOT, 'apps/sim/hooks/selectors')
1010

1111
const BASELINE = {
12-
totalRoutes: 734,
13-
zodRoutes: 734,
12+
totalRoutes: 735,
13+
zodRoutes: 735,
1414
nonZodRoutes: 0,
1515
} as const
1616

0 commit comments

Comments
 (0)