diff --git a/apps/sim/app/api/table/[tableId]/dispatches/route.ts b/apps/sim/app/api/table/[tableId]/dispatches/route.ts index a5e442f05d..7682ba8299 100644 --- a/apps/sim/app/api/table/[tableId]/dispatches/route.ts +++ b/apps/sim/app/api/table/[tableId]/dispatches/route.ts @@ -5,7 +5,7 @@ import { parseRequest } from '@/lib/api/server' import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid' import { generateRequestId } from '@/lib/core/utils/request' import { withRouteHandler } from '@/lib/core/utils/with-route-handler' -import { countRunningCells, listActiveDispatches } from '@/lib/table/dispatcher' +import { countActiveRunCells, listActiveDispatches } from '@/lib/table/dispatcher' import { accessError, checkAccess } from '@/app/api/table/utils' const logger = createLogger('TableDispatchesAPI') @@ -37,10 +37,8 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou const result = await checkAccess(tableId, authResult.userId, 'read') if (!result.ok) return accessError(result, requestId, tableId) - const [rows, running] = await Promise.all([ - listActiveDispatches(tableId), - countRunningCells(tableId), - ]) + const rows = await listActiveDispatches(tableId) + const running = await countActiveRunCells(tableId, rows) const dispatches: ActiveDispatch[] = rows.map((r) => ({ id: r.id, status: r.status as 'pending' | 'dispatching', diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx index 7201a3f7d2..48cd644480 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/components/table-grid/cells/cell-render.tsx @@ -303,25 +303,28 @@ const TYPEWRITER_MS_PER_CHAR = 15 */ function useTypewriter(text: string | null): string | null { const [revealed, setRevealed] = useState(text) - const isFirstRunRef = useRef(true) const prevTextRef = useRef(text) + const mountedRef = useRef(false) + const animateRef = useRef(false) - useEffect(() => { - if (isFirstRunRef.current) { - isFirstRunRef.current = false - prevTextRef.current = text - setRevealed(text) - return - } - if (prevTextRef.current === text) return + // Reset synchronously during render when `text` changes (not on first mount) + // so no frame ever shows the full new value before the animation begins — + // an effect-based reset lands one frame late and flashes the whole text. + if (prevTextRef.current !== text) { prevTextRef.current = text + const animate = mountedRef.current && text !== null && text.length > 0 + animateRef.current = animate + setRevealed(animate ? '' : text) + } - if (text === null || text.length === 0) { - setRevealed(text) - return - } + useEffect(() => { + mountedRef.current = true + }, []) - const full = text + useEffect(() => { + if (!animateRef.current) return + animateRef.current = false + const full = text as string const start = performance.now() let raf = 0 const tick = (now: number) => { @@ -329,7 +332,6 @@ function useTypewriter(text: string | null): string | null { setRevealed(full.slice(0, chars)) if (chars < full.length) raf = requestAnimationFrame(tick) } - setRevealed('') raf = requestAnimationFrame(tick) return () => cancelAnimationFrame(raf) }, [text]) diff --git a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts index 7c00d06e33..b56f64a8cc 100644 --- a/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts +++ b/apps/sim/app/workspace/[workspaceId]/tables/[tableId]/hooks/use-table-event-stream.ts @@ -73,11 +73,11 @@ export function useTableEventStream({ let lastEventId = loadPointer(tableId) let reconnectAttempt = 0 - const updateRunStateCounters = ( - rowId: string, - wasInFlight: boolean, - isInFlight: boolean - ): void => { + // Keeps the per-row gutter (`runningByRowId`) live between dispatch events. + // `runningCellCount` (the "X running" badge) is NOT touched here — it's the + // server's dispatch-scope count, seeded optimistically on click and + // re-synced by `applyDispatch` on every window, so live matches reload. + const updateRunningByRow = (rowId: string, wasInFlight: boolean, isInFlight: boolean): void => { if (wasInFlight === isInFlight) return const delta = isInFlight ? 1 : -1 queryClient.setQueryData(tableKeys.activeDispatches(tableId), (prev) => { @@ -87,11 +87,7 @@ export function useTableEventStream({ const nextByRow = { ...prev.runningByRowId } if (nextForRow === 0) delete nextByRow[rowId] else nextByRow[rowId] = nextForRow - return { - ...prev, - runningCellCount: Math.max(0, prev.runningCellCount + delta), - runningByRowId: nextByRow, - } + return { ...prev, runningByRowId: nextByRow } }) } @@ -145,11 +141,7 @@ export function useTableEventStream({ queryKey: tableKeys.activeDispatches(tableId), }) } else { - updateRunStateCounters( - rowId, - wasInFlight, - isExecInFlight({ status } as RowExecutionMetadata) - ) + updateRunningByRow(rowId, wasInFlight, isExecInFlight({ status } as RowExecutionMetadata)) } } @@ -195,6 +187,11 @@ export function useTableEventStream({ merged[idx] = next return { ...base, dispatches: merged } }) + // The dispatcher emits this once per window (after the window's cells + // finish + the cursor advances) and on completion. Re-sync the + // dispatch-scope `runningCellCount` from the server so the badge steps + // down per window and matches a reload exactly. + void queryClient.invalidateQueries({ queryKey: tableKeys.activeDispatches(tableId) }) } const handlePrune = (payload: PrunedEvent): void => { diff --git a/apps/sim/hooks/queries/tables.ts b/apps/sim/hooks/queries/tables.ts index da55705743..3f3f3cd531 100644 --- a/apps/sim/hooks/queries/tables.ts +++ b/apps/sim/hooks/queries/tables.ts @@ -73,7 +73,6 @@ import type { } from '@/lib/table' import { areGroupDepsSatisfied, - areOutputsFilled, isExecInFlight, optimisticallyScheduleNewlyEligibleGroups, } from '@/lib/table/deps' @@ -235,17 +234,34 @@ function countNewlyInFlight(before: RowExecutions, after: RowExecutions): number return n } -/** Add optimistically-stamped cells to the run-state counter so the "X running" - * badge + per-row gutter Stop reflect them instantly (the optimistic stamp - * eats the dispatcher's `pending` SSE, so `applyCell` never bumps the count). - * Returns the prior snapshot for rollback, or `null` when nothing was bumped. */ +/** The table's maintained, unfiltered `rowCount` from the detail cache (or + * `null` when the detail hasn't loaded). This is the right scope for a Run-all + * estimate: the dispatcher runs every row regardless of the active view + * filter, whereas the rows query's `totalCount` is filter-scoped. */ +function readTableRowCount( + queryClient: ReturnType, + tableId: string +): number | null { + const def = queryClient.getQueryData(tableKeys.detail(tableId)) + return typeof def?.rowCount === 'number' ? def.rowCount : null +} + +/** Optimistically reflect a run on the "X running" badge + per-row gutter Stop + * instantly (the optimistic stamp eats the dispatcher's `pending` SSE, so + * `applyCell` never bumps the count, and the server's dispatch-scope count + * isn't live until the first window). `stampedByRow` drives the per-row gutter + * (loaded rows only); `cellCountDelta` is the badge delta — pass the full run + * scope (rows × groups) for Run-all so it matches the server, or omit to use + * the stamped total. Returns the prior snapshot for rollback. */ function bumpRunState( queryClient: ReturnType, tableId: string, - stampedByRow: Record + stampedByRow: Record, + cellCountDelta?: number ): { snapshot: TableRunState | undefined } | null { - const total = Object.values(stampedByRow).reduce((s, n) => s + n, 0) - if (total === 0) return null + const stampedTotal = Object.values(stampedByRow).reduce((s, n) => s + n, 0) + const countDelta = cellCountDelta ?? stampedTotal + if (countDelta === 0 && stampedTotal === 0) return null const snapshot = queryClient.getQueryData(tableKeys.activeDispatches(tableId)) queryClient.setQueryData(tableKeys.activeDispatches(tableId), (prev) => { const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} } @@ -255,7 +271,7 @@ function bumpRunState( } return { ...base, - runningCellCount: base.runningCellCount + total, + runningCellCount: base.runningCellCount + countDelta, runningByRowId: nextByRow, } }) @@ -1418,12 +1434,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { // dispatcher regardless of mode. Stamping pending here would leave // the cell flashing Queued indefinitely (no SSE event will arrive). if (group && !areGroupDepsSatisfied(group, r)) continue - // Mirror server eligibility for `mode: 'incomplete'`: skip cells whose - // outputs are filled, regardless of exec status. A cancelled/error - // cell with a leftover value from a prior run was rendering as filled - // but flipping to "queued" optimistically here even though the server - // would skip it. - if (runMode === 'incomplete' && group && areOutputsFilled(group, r)) continue + // Mirror server eligibility for manual `mode: 'incomplete'`: a + // `completed` group is done (even with a blank output) — only "Run + // all" re-runs it. error/cancelled/never-run cells still re-run. + if (runMode === 'incomplete' && exec?.status === 'completed') continue next[groupId] = buildPendingExec(exec) // Mirror the server-side bulk clear: wipe output values so the cell // doesn't render the stale completed value behind a pending badge. @@ -1442,7 +1456,14 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) { return { ...r, data: nextData, executions: next } }) - const bumped = bumpRunState(queryClient, tableId, stampedByRow) + // Badge counts the whole run scope (rows × groups), matching the server's + // dispatch-scope count — not just the loaded rows we could stamp. For + // Run-all that's the table's totalCount; for a scoped run, the rowIds. + const scopeRowCount = targetRowIds + ? targetRowIds.size + : (readTableRowCount(queryClient, tableId) ?? Object.keys(stampedByRow).length) + const cellCountDelta = scopeRowCount * targetGroupIds.size + const bumped = bumpRunState(queryClient, tableId, stampedByRow, cellCountDelta) return { snapshots, runStateSnapshot: bumped?.snapshot, didBumpRunState: bumped !== null } }, onError: (_err, _variables, context) => { diff --git a/apps/sim/lib/table/dispatcher.ts b/apps/sim/lib/table/dispatcher.ts index ab91ff67cb..d4ab6d8aa4 100644 --- a/apps/sim/lib/table/dispatcher.ts +++ b/apps/sim/lib/table/dispatcher.ts @@ -65,67 +65,69 @@ export async function bulkClearWorkflowGroupCells(input: { // Pre-existing outputs on any other row must not be wiped by an auto-fire. if (mode === 'new') return - const outputCols = Array.from(new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName)))) const groupIds = groups.map((g) => g.id) - - // Step 1: clear the targeted output columns from `data` on every row in - // scope. Identical chain to the previous JSONB-only path. - let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)` - for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text` - - const filters: SQL[] = [eq(userTableRows.tableId, tableId)] - if (rowIds && rowIds.length > 0) { - filters.push(inArray(userTableRows.id, rowIds)) - } - if (mode === 'incomplete') { - // Skip rows where all output columns across all targeted groups already - // have a non-empty value — those are "completed-and-filled" and the - // eligibility predicate would skip them anyway. - const filledChecks = outputCols.map( - (col) => sql`coalesce(${userTableRows.data} ->> ${col}, '') != ''` + const rowScope = rowIds && rowIds.length > 0 ? rowIds : null + + if (mode === 'all') { + // Run-all re-runs every targeted group: wipe all their output columns + + // executions for the rows in scope. (Prior in-flight runs were already + // cancelled by the caller.) + const outputCols = Array.from( + new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName))) ) - const allFilled = filledChecks.reduce((acc, expr) => sql`${acc} AND ${expr}`) - filters.push(sql`NOT (${allFilled})`) - // Also skip rows where ANY targeted group has an in-flight exec — those - // belong to another dispatch and clobbering them would race. Encoded as - // a NOT EXISTS subquery against the sidecar's `(table_id, status)` - // partial index. - filters.push( - sql`NOT EXISTS ( + let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)` + for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text` + const filters: SQL[] = [eq(userTableRows.tableId, tableId)] + if (rowScope) filters.push(inArray(userTableRows.id, rowScope)) + + await db.transaction(async (trx) => { + await trx + .update(userTableRows) + .set({ data: dataExpr, updatedAt: new Date() }) + .where(and(...filters)) + const execFilters: SQL[] = [ + eq(tableRowExecutions.tableId, tableId), + inArray(tableRowExecutions.groupId, groupIds), + ] + if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope)) + await trx.delete(tableRowExecutions).where(and(...execFilters)) + }) + return + } + + // `incomplete`: clear per-group, not per-row. Only groups that are + // re-runnable (`error` / `cancelled`) get their output columns + exec wiped; + // `completed` and in-flight groups are left fully intact. A row-level "all + // filled" check would otherwise wipe a completed group's data + exec just + // because a *sibling* group on the same row is incomplete, re-running the + // completed one. (`never-run` groups have no exec/output to clear — the + // dispatcher runs them via eligibility.) + await db.transaction(async (trx) => { + for (const group of groups) { + const reRunnable = sql`EXISTS ( SELECT 1 FROM ${tableRowExecutions} re WHERE re.row_id = ${userTableRows.id} - AND re.group_id = ANY(ARRAY[${sql.join( - groupIds.map((gid) => sql`${gid}`), - sql`, ` - )}]::text[]) - AND re.status IN ('queued', 'running', 'pending') + AND re.group_id = ${group.id} + AND re.status IN ('error', 'cancelled') )` - ) - } + const filters: SQL[] = [eq(userTableRows.tableId, tableId), reRunnable] + if (rowScope) filters.push(inArray(userTableRows.id, rowScope)) - await db.transaction(async (trx) => { - await trx - .update(userTableRows) - .set({ data: dataExpr, updatedAt: new Date() }) - .where(and(...filters)) + let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)` + for (const out of group.outputs) dataExpr = sql`(${dataExpr}) - ${out.columnName}::text` + await trx + .update(userTableRows) + .set({ data: dataExpr, updatedAt: new Date() }) + .where(and(...filters)) - // Step 2: delete the targeted groups' executions for the rows in scope. - // Reuse the same row-scope filter via a subquery. - const execFilters: SQL[] = [ - eq(tableRowExecutions.tableId, tableId), - inArray(tableRowExecutions.groupId, groupIds), - ] - if (rowIds && rowIds.length > 0) { - execFilters.push(inArray(tableRowExecutions.rowId, rowIds)) - } - if (mode === 'incomplete') { - // For `incomplete`, only delete entries that aren't already in-flight - // — terminal states (completed/error/cancelled) get wiped so the - // dispatcher re-enqueues; in-flight entries stay so we don't race - // with their worker. - execFilters.push(sql`${tableRowExecutions.status} NOT IN ('queued', 'running', 'pending')`) + const execFilters: SQL[] = [ + eq(tableRowExecutions.tableId, tableId), + eq(tableRowExecutions.groupId, group.id), + sql`${tableRowExecutions.status} IN ('error', 'cancelled')`, + ] + if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope)) + await trx.delete(tableRowExecutions).where(and(...execFilters)) } - await trx.delete(tableRowExecutions).where(and(...execFilters)) }) } @@ -193,6 +195,44 @@ export async function countRunningCells( return { total, byRowId } } +/** Authoritative "cells queued or running" count for the table, derived from + * active dispatches so it survives reload and matches the live count. For each + * active dispatch every row in scope ahead of the cursor still has to run each + * targeted group, so remaining work = (rows ahead of cursor) × |groupIds|. + * Exact for Run-all; an upper bound for incomplete/new (rows the eligibility + * filter later skips are still counted). Falls back to the sidecar in-flight + * count when no dispatch is active (orphan stragglers). `byRowId` stays + * sidecar-based — the client overlay renders queued rows ahead of the cursor. */ +export async function countActiveRunCells( + tableId: string, + dispatches?: DispatchRow[] +): Promise<{ total: number; byRowId: Record }> { + const active = dispatches ?? (await listActiveDispatches(tableId)) + if (active.length === 0) return countRunningCells(tableId) + + const countRowsAhead = async (d: DispatchRow): Promise => { + const groupCount = d.scope.groupIds.length + if (groupCount === 0) return 0 + const filters = [eq(userTableRows.tableId, tableId), gt(userTableRows.position, d.cursor)] + if (d.scope.rowIds && d.scope.rowIds.length > 0) { + filters.push(inArray(userTableRows.id, d.scope.rowIds)) + } + const [row] = await db + .select({ rowsAhead: sql`count(*)::int` }) + .from(userTableRows) + .where(and(...filters)) + return (row?.rowsAhead ?? 0) * groupCount + } + + // One round-trip per dispatch + the sidecar count, all in parallel. + const [sidecar, perDispatch] = await Promise.all([ + countRunningCells(tableId), + Promise.all(active.map(countRowsAhead)), + ]) + const total = perDispatch.reduce((sum, n) => sum + n, 0) + return { total, byRowId: sidecar.byRowId } +} + export async function listActiveDispatches(tableId: string): Promise { const rows = await db .select() diff --git a/apps/sim/lib/table/workflow-columns.ts b/apps/sim/lib/table/workflow-columns.ts index b0e7511936..eda70c99f5 100644 --- a/apps/sim/lib/table/workflow-columns.ts +++ b/apps/sim/lib/table/workflow-columns.ts @@ -93,7 +93,14 @@ export function classifyEligibility( if (!isManualRun && completedAndFilled) return 'completed-on-auto' if (!isManualRun && status === 'error') return 'error-on-auto' if (!isManualRun && status === 'cancelled') return 'cancelled-on-auto' - if (mode === 'incomplete' && completedAndFilled) return 'completed-on-incomplete' + // Manual incomplete-mode runs (Run row / Run incomplete) treat a `completed` + // group as done even if an output is blank — only "Run all" re-runs it. The + // auto cascade still re-fills blank outputs (completedAndFilled). + if (mode === 'incomplete') { + if (isManualRun ? status === 'completed' : completedAndFilled) { + return 'completed-on-incomplete' + } + } if (isManualRun && group.autoRun === false) return 'manual-bypass' return areGroupDepsSatisfied(group, row) ? 'eligible' : 'deps-unmet'