Skip to content

Commit e5de72a

Browse files
change back to cell based trigger jobs
1 parent 52b3dc1 commit e5de72a

20 files changed

Lines changed: 1515 additions & 567 deletions

File tree

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
import { db } from '@sim/db'
2+
import { userTableRows } from '@sim/db/schema'
3+
import { createLogger } from '@sim/logger'
4+
import { generateId } from '@sim/utils/id'
5+
import { and, asc, eq } from 'drizzle-orm'
6+
import { type NextRequest, NextResponse } from 'next/server'
7+
import { z } from 'zod'
8+
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
9+
import { generateRequestId } from '@/lib/core/utils/request'
10+
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
11+
import { batchUpdateRows } from '@/lib/table'
12+
import type { RowData, TableRow, WorkflowCellValue } from '@/lib/table'
13+
import { areWorkflowColumnDepsSatisfied } from '@/lib/table/workflow-columns'
14+
import { accessError, checkAccess } from '@/app/api/table/utils'
15+
16+
const logger = createLogger('TableRunColumnAPI')
17+
18+
const RunSchema = z.object({
19+
workspaceId: z.string().min(1, 'Workspace ID is required'),
20+
})
21+
22+
interface RouteParams {
23+
params: Promise<{ tableId: string; columnName: string }>
24+
}
25+
26+
/**
27+
* POST /api/table/[tableId]/columns/[columnName]/run
28+
*
29+
* Manually triggers a workflow column run for every row in the table. Each
30+
* cell is force-reset to `pending`, which fires the scheduler and enqueues
31+
* a per-cell trigger.dev job.
32+
*/
33+
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
34+
const requestId = generateRequestId()
35+
const { tableId, columnName } = await params
36+
37+
try {
38+
const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
39+
if (!authResult.success || !authResult.userId) {
40+
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
41+
}
42+
43+
const body = await request.json()
44+
const validated = RunSchema.parse(body)
45+
46+
const result = await checkAccess(tableId, authResult.userId, 'write')
47+
if (!result.ok) return accessError(result, requestId, tableId)
48+
const { table } = result
49+
50+
if (table.workspaceId !== validated.workspaceId) {
51+
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
52+
}
53+
54+
const column = table.schema.columns.find((c) => c.name === columnName)
55+
if (!column || column.type !== 'workflow' || !column.workflowConfig?.workflowId) {
56+
return NextResponse.json(
57+
{ error: 'Column is not a configured workflow column' },
58+
{ status: 400 }
59+
)
60+
}
61+
62+
const workflowId = column.workflowConfig.workflowId
63+
const columnIndex = table.schema.columns.findIndex((c) => c.name === columnName)
64+
65+
const allRows = await db
66+
.select({
67+
id: userTableRows.id,
68+
position: userTableRows.position,
69+
data: userTableRows.data,
70+
createdAt: userTableRows.createdAt,
71+
updatedAt: userTableRows.updatedAt,
72+
})
73+
.from(userTableRows)
74+
.where(
75+
and(
76+
eq(userTableRows.tableId, tableId),
77+
eq(userTableRows.workspaceId, validated.workspaceId)
78+
)
79+
)
80+
.orderBy(asc(userTableRows.position))
81+
82+
if (allRows.length === 0) {
83+
return NextResponse.json({ success: true, data: { triggered: 0 } })
84+
}
85+
86+
// Only target rows whose deps are satisfied AND aren't already running.
87+
// Forcing every row through `pending` would leave dep-unsatisfied rows
88+
// stuck pending forever (the scheduler's eligibility predicate filters
89+
// them out), and would re-issue runs that are already in flight.
90+
const eligibleRows = allRows.filter((r) => {
91+
const tableRow: TableRow = {
92+
id: r.id,
93+
data: r.data as RowData,
94+
position: r.position,
95+
createdAt: r.createdAt,
96+
updatedAt: r.updatedAt,
97+
}
98+
const cell = (r.data as RowData)[columnName] as WorkflowCellValue | null | undefined
99+
if (cell?.status === 'running') return false
100+
try {
101+
return areWorkflowColumnDepsSatisfied(column, columnIndex, tableRow, table.schema)
102+
} catch {
103+
return false
104+
}
105+
})
106+
107+
if (eligibleRows.length === 0) {
108+
return NextResponse.json({ success: true, data: { triggered: 0 } })
109+
}
110+
111+
const updates = eligibleRows.map((r) => {
112+
const pendingCell: WorkflowCellValue = {
113+
executionId: generateId(),
114+
jobId: null,
115+
workflowId,
116+
status: 'pending',
117+
output: null,
118+
error: null,
119+
}
120+
return {
121+
rowId: r.id,
122+
data: { [columnName]: pendingCell as unknown as RowData[string] } as RowData,
123+
}
124+
})
125+
126+
const opResult = await batchUpdateRows(
127+
{
128+
tableId,
129+
updates,
130+
workspaceId: validated.workspaceId,
131+
},
132+
table,
133+
requestId
134+
)
135+
136+
return NextResponse.json({
137+
success: true,
138+
data: { triggered: opResult.affectedCount },
139+
})
140+
} catch (error) {
141+
if (error instanceof z.ZodError) {
142+
return NextResponse.json(
143+
{ error: 'Invalid request data', details: error.errors },
144+
{ status: 400 }
145+
)
146+
}
147+
logger.error(`run-column failed for ${tableId}/${columnName}:`, error)
148+
return NextResponse.json({ error: 'Failed to run column' }, { status: 500 })
149+
}
150+
})

apps/sim/app/api/table/[tableId]/metadata/route.ts

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@ const MetadataSchema = z.object({
1515
metadata: z.object({
1616
columnWidths: z.record(z.number().positive()).optional(),
1717
columnOrder: z.array(z.string()).optional(),
18-
workflowColumnBatchSize: z.number().int().min(1).max(100).optional(),
1918
}),
2019
})
2120

apps/sim/app/api/table/[tableId]/rows/[rowId]/run-workflow-column/route.ts

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@ import { type NextRequest, NextResponse } from 'next/server'
44
import { z } from 'zod'
55
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
66
import { generateRequestId } from '@/lib/core/utils/request'
7-
import { getRowById } from '@/lib/table'
8-
import { runWorkflowColumn } from '@/lib/table/workflow-columns'
7+
import { getRowById, updateRow } from '@/lib/table'
8+
import type { RowData, WorkflowCellValue } from '@/lib/table'
99
import { accessError, checkAccess } from '@/app/api/table/utils'
1010

1111
const logger = createLogger('TableRunWorkflowColumnAPI')
@@ -62,15 +62,27 @@ export async function POST(request: NextRequest, { params }: RouteParams) {
6262
const executionId = generateId()
6363
const workflowId = column.workflowConfig.workflowId
6464

65-
void runWorkflowColumn({
66-
tableId,
67-
tableName: table.name,
68-
rowId,
69-
columnName: validated.columnName,
70-
workflowId,
71-
workspaceId: validated.workspaceId,
65+
// Force the cell back to a non-terminal state so the scheduler's
66+
// eligibility check picks it up. Calling `updateRow` here also fires
67+
// `scheduleWorkflowColumnRuns(table, [row])` from inside the service,
68+
// which enqueues a `workflow-column-execution` job for this cell.
69+
const pendingCell: WorkflowCellValue = {
7270
executionId,
73-
})
71+
jobId: null,
72+
workflowId,
73+
status: 'pending',
74+
output: null,
75+
error: null,
76+
}
77+
const nextData: RowData = {
78+
...row.data,
79+
[validated.columnName]: pendingCell as unknown as RowData[string],
80+
}
81+
await updateRow(
82+
{ tableId, rowId, data: nextData, workspaceId: validated.workspaceId },
83+
table,
84+
requestId
85+
)
7486

7587
return NextResponse.json({ success: true, data: { executionId } })
7688
} catch (error) {

apps/sim/app/api/table/utils.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ export const CreateColumnSchema = z.object({
178178
path: z.string().min(1),
179179
})
180180
)
181-
.optional(),
181+
.min(1, 'Workflow column requires at least one output'),
182182
})
183183
.optional(),
184184
}),
@@ -203,7 +203,7 @@ export const UpdateColumnSchema = z.object({
203203
path: z.string().min(1),
204204
})
205205
)
206-
.optional(),
206+
.min(1, 'Workflow column requires at least one output'),
207207
})
208208
.optional(),
209209
}),

0 commit comments

Comments
 (0)