Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions apps/sim/app/api/table/[tableId]/dispatches/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import { parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { countRunningCells, listActiveDispatches } from '@/lib/table/dispatcher'
import { countActiveRunCells, listActiveDispatches } from '@/lib/table/dispatcher'
import { accessError, checkAccess } from '@/app/api/table/utils'

const logger = createLogger('TableDispatchesAPI')
Expand Down Expand Up @@ -37,10 +37,8 @@ export const GET = withRouteHandler(async (request: NextRequest, { params }: Rou
const result = await checkAccess(tableId, authResult.userId, 'read')
if (!result.ok) return accessError(result, requestId, tableId)

const [rows, running] = await Promise.all([
listActiveDispatches(tableId),
countRunningCells(tableId),
])
const rows = await listActiveDispatches(tableId)
const running = await countActiveRunCells(tableId, rows)
const dispatches: ActiveDispatch[] = rows.map((r) => ({
id: r.id,
status: r.status as 'pending' | 'dispatching',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,33 +303,35 @@ const TYPEWRITER_MS_PER_CHAR = 15
*/
function useTypewriter(text: string | null): string | null {
const [revealed, setRevealed] = useState<string | null>(text)
const isFirstRunRef = useRef(true)
const prevTextRef = useRef<string | null>(text)
const mountedRef = useRef(false)
const animateRef = useRef(false)

useEffect(() => {
if (isFirstRunRef.current) {
isFirstRunRef.current = false
prevTextRef.current = text
setRevealed(text)
return
}
if (prevTextRef.current === text) return
// Reset synchronously during render when `text` changes (not on first mount)
// so no frame ever shows the full new value before the animation begins —
// an effect-based reset lands one frame late and flashes the whole text.
if (prevTextRef.current !== text) {
prevTextRef.current = text
const animate = mountedRef.current && text !== null && text.length > 0
animateRef.current = animate
setRevealed(animate ? '' : text)
}

if (text === null || text.length === 0) {
setRevealed(text)
return
}
useEffect(() => {
mountedRef.current = true
}, [])

const full = text
useEffect(() => {
if (!animateRef.current) return
animateRef.current = false
const full = text as string
const start = performance.now()
let raf = 0
const tick = (now: number) => {
const chars = Math.min(full.length, Math.floor((now - start) / TYPEWRITER_MS_PER_CHAR))
setRevealed(full.slice(0, chars))
if (chars < full.length) raf = requestAnimationFrame(tick)
}
setRevealed('')
raf = requestAnimationFrame(tick)
return () => cancelAnimationFrame(raf)
}, [text])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,11 +73,11 @@ export function useTableEventStream({
let lastEventId = loadPointer(tableId)
let reconnectAttempt = 0

const updateRunStateCounters = (
rowId: string,
wasInFlight: boolean,
isInFlight: boolean
): void => {
// Keeps the per-row gutter (`runningByRowId`) live between dispatch events.
// `runningCellCount` (the "X running" badge) is NOT touched here — it's the
// server's dispatch-scope count, seeded optimistically on click and
// re-synced by `applyDispatch` on every window, so live matches reload.
const updateRunningByRow = (rowId: string, wasInFlight: boolean, isInFlight: boolean): void => {
if (wasInFlight === isInFlight) return
const delta = isInFlight ? 1 : -1
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
Expand All @@ -87,11 +87,7 @@ export function useTableEventStream({
const nextByRow = { ...prev.runningByRowId }
if (nextForRow === 0) delete nextByRow[rowId]
else nextByRow[rowId] = nextForRow
return {
...prev,
runningCellCount: Math.max(0, prev.runningCellCount + delta),
runningByRowId: nextByRow,
}
return { ...prev, runningByRowId: nextByRow }
})
}

Expand Down Expand Up @@ -145,11 +141,7 @@ export function useTableEventStream({
queryKey: tableKeys.activeDispatches(tableId),
})
} else {
updateRunStateCounters(
rowId,
wasInFlight,
isExecInFlight({ status } as RowExecutionMetadata)
)
updateRunningByRow(rowId, wasInFlight, isExecInFlight({ status } as RowExecutionMetadata))
}
}

Expand Down Expand Up @@ -195,6 +187,11 @@ export function useTableEventStream({
merged[idx] = next
return { ...base, dispatches: merged }
})
// The dispatcher emits this once per window (after the window's cells
// finish + the cursor advances) and on completion. Re-sync the
// dispatch-scope `runningCellCount` from the server so the badge steps
// down per window and matches a reload exactly.
void queryClient.invalidateQueries({ queryKey: tableKeys.activeDispatches(tableId) })
}

const handlePrune = (payload: PrunedEvent): void => {
Expand Down
53 changes: 37 additions & 16 deletions apps/sim/hooks/queries/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ import type {
} from '@/lib/table'
import {
areGroupDepsSatisfied,
areOutputsFilled,
isExecInFlight,
optimisticallyScheduleNewlyEligibleGroups,
} from '@/lib/table/deps'
Expand Down Expand Up @@ -235,17 +234,34 @@ function countNewlyInFlight(before: RowExecutions, after: RowExecutions): number
return n
}

/** Add optimistically-stamped cells to the run-state counter so the "X running"
* badge + per-row gutter Stop reflect them instantly (the optimistic stamp
* eats the dispatcher's `pending` SSE, so `applyCell` never bumps the count).
* Returns the prior snapshot for rollback, or `null` when nothing was bumped. */
/** The table's maintained, unfiltered `rowCount` from the detail cache (or
* `null` when the detail hasn't loaded). This is the right scope for a Run-all
* estimate: the dispatcher runs every row regardless of the active view
* filter, whereas the rows query's `totalCount` is filter-scoped. */
function readTableRowCount(
queryClient: ReturnType<typeof useQueryClient>,
tableId: string
): number | null {
const def = queryClient.getQueryData<TableDefinition>(tableKeys.detail(tableId))
return typeof def?.rowCount === 'number' ? def.rowCount : null
}

/** Optimistically reflect a run on the "X running" badge + per-row gutter Stop
* instantly (the optimistic stamp eats the dispatcher's `pending` SSE, so
* `applyCell` never bumps the count, and the server's dispatch-scope count
* isn't live until the first window). `stampedByRow` drives the per-row gutter
* (loaded rows only); `cellCountDelta` is the badge delta — pass the full run
* scope (rows × groups) for Run-all so it matches the server, or omit to use
* the stamped total. Returns the prior snapshot for rollback. */
function bumpRunState(
queryClient: ReturnType<typeof useQueryClient>,
tableId: string,
stampedByRow: Record<string, number>
stampedByRow: Record<string, number>,
cellCountDelta?: number
): { snapshot: TableRunState | undefined } | null {
const total = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
if (total === 0) return null
const stampedTotal = Object.values(stampedByRow).reduce((s, n) => s + n, 0)
const countDelta = cellCountDelta ?? stampedTotal
if (countDelta === 0 && stampedTotal === 0) return null
const snapshot = queryClient.getQueryData<TableRunState>(tableKeys.activeDispatches(tableId))
queryClient.setQueryData<TableRunState>(tableKeys.activeDispatches(tableId), (prev) => {
const base = prev ?? { dispatches: [], runningCellCount: 0, runningByRowId: {} }
Expand All @@ -255,7 +271,7 @@ function bumpRunState(
}
return {
...base,
runningCellCount: base.runningCellCount + total,
runningCellCount: base.runningCellCount + countDelta,
runningByRowId: nextByRow,
}
})
Expand Down Expand Up @@ -1418,12 +1434,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
// dispatcher regardless of mode. Stamping pending here would leave
// the cell flashing Queued indefinitely (no SSE event will arrive).
if (group && !areGroupDepsSatisfied(group, r)) continue
// Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
// outputs are filled, regardless of exec status. A cancelled/error
// cell with a leftover value from a prior run was rendering as filled
// but flipping to "queued" optimistically here even though the server
// would skip it.
if (runMode === 'incomplete' && group && areOutputsFilled(group, r)) continue
// Mirror server eligibility for manual `mode: 'incomplete'`: a
// `completed` group is done (even with a blank output) — only "Run
// all" re-runs it. error/cancelled/never-run cells still re-run.
if (runMode === 'incomplete' && exec?.status === 'completed') continue
next[groupId] = buildPendingExec(exec)
// Mirror the server-side bulk clear: wipe output values so the cell
// doesn't render the stale completed value behind a pending badge.
Expand All @@ -1442,7 +1456,14 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
return { ...r, data: nextData, executions: next }
})

const bumped = bumpRunState(queryClient, tableId, stampedByRow)
// Badge counts the whole run scope (rows × groups), matching the server's
// dispatch-scope count — not just the loaded rows we could stamp. For
// Run-all that's the table's totalCount; for a scoped run, the rowIds.
const scopeRowCount = targetRowIds
? targetRowIds.size
: (readTableRowCount(queryClient, tableId) ?? Object.keys(stampedByRow).length)
const cellCountDelta = scopeRowCount * targetGroupIds.size
Comment thread
cursor[bot] marked this conversation as resolved.
const bumped = bumpRunState(queryClient, tableId, stampedByRow, cellCountDelta)
return { snapshots, runStateSnapshot: bumped?.snapshot, didBumpRunState: bumped !== null }
},
onError: (_err, _variables, context) => {
Expand Down
146 changes: 93 additions & 53 deletions apps/sim/lib/table/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,67 +65,69 @@ export async function bulkClearWorkflowGroupCells(input: {
// Pre-existing outputs on any other row must not be wiped by an auto-fire.
if (mode === 'new') return

const outputCols = Array.from(new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName))))
const groupIds = groups.map((g) => g.id)

// Step 1: clear the targeted output columns from `data` on every row in
// scope. Identical chain to the previous JSONB-only path.
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`

const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
if (rowIds && rowIds.length > 0) {
filters.push(inArray(userTableRows.id, rowIds))
}
if (mode === 'incomplete') {
// Skip rows where all output columns across all targeted groups already
// have a non-empty value — those are "completed-and-filled" and the
// eligibility predicate would skip them anyway.
const filledChecks = outputCols.map(
(col) => sql`coalesce(${userTableRows.data} ->> ${col}, '') != ''`
const rowScope = rowIds && rowIds.length > 0 ? rowIds : null

if (mode === 'all') {
// Run-all re-runs every targeted group: wipe all their output columns +
// executions for the rows in scope. (Prior in-flight runs were already
// cancelled by the caller.)
const outputCols = Array.from(
new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName)))
)
const allFilled = filledChecks.reduce((acc, expr) => sql`${acc} AND ${expr}`)
filters.push(sql`NOT (${allFilled})`)
// Also skip rows where ANY targeted group has an in-flight exec — those
// belong to another dispatch and clobbering them would race. Encoded as
// a NOT EXISTS subquery against the sidecar's `(table_id, status)`
// partial index.
filters.push(
sql`NOT EXISTS (
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`
const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))

await db.transaction(async (trx) => {
await trx
.update(userTableRows)
.set({ data: dataExpr, updatedAt: new Date() })
.where(and(...filters))
const execFilters: SQL[] = [
eq(tableRowExecutions.tableId, tableId),
inArray(tableRowExecutions.groupId, groupIds),
]
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
await trx.delete(tableRowExecutions).where(and(...execFilters))
})
return
}

// `incomplete`: clear per-group, not per-row. Only groups that are
// re-runnable (`error` / `cancelled`) get their output columns + exec wiped;
// `completed` and in-flight groups are left fully intact. A row-level "all
// filled" check would otherwise wipe a completed group's data + exec just
// because a *sibling* group on the same row is incomplete, re-running the
// completed one. (`never-run` groups have no exec/output to clear — the
// dispatcher runs them via eligibility.)
await db.transaction(async (trx) => {
for (const group of groups) {
const reRunnable = sql`EXISTS (
SELECT 1 FROM ${tableRowExecutions} re
WHERE re.row_id = ${userTableRows.id}
AND re.group_id = ANY(ARRAY[${sql.join(
groupIds.map((gid) => sql`${gid}`),
sql`, `
)}]::text[])
AND re.status IN ('queued', 'running', 'pending')
AND re.group_id = ${group.id}
AND re.status IN ('error', 'cancelled')
)`
)
}
const filters: SQL[] = [eq(userTableRows.tableId, tableId), reRunnable]
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))

await db.transaction(async (trx) => {
await trx
.update(userTableRows)
.set({ data: dataExpr, updatedAt: new Date() })
.where(and(...filters))
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
for (const out of group.outputs) dataExpr = sql`(${dataExpr}) - ${out.columnName}::text`
await trx
.update(userTableRows)
.set({ data: dataExpr, updatedAt: new Date() })
.where(and(...filters))

// Step 2: delete the targeted groups' executions for the rows in scope.
// Reuse the same row-scope filter via a subquery.
const execFilters: SQL[] = [
eq(tableRowExecutions.tableId, tableId),
inArray(tableRowExecutions.groupId, groupIds),
]
if (rowIds && rowIds.length > 0) {
execFilters.push(inArray(tableRowExecutions.rowId, rowIds))
}
if (mode === 'incomplete') {
// For `incomplete`, only delete entries that aren't already in-flight
// — terminal states (completed/error/cancelled) get wiped so the
// dispatcher re-enqueues; in-flight entries stay so we don't race
// with their worker.
execFilters.push(sql`${tableRowExecutions.status} NOT IN ('queued', 'running', 'pending')`)
const execFilters: SQL[] = [
eq(tableRowExecutions.tableId, tableId),
eq(tableRowExecutions.groupId, group.id),
sql`${tableRowExecutions.status} IN ('error', 'cancelled')`,
]
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
await trx.delete(tableRowExecutions).where(and(...execFilters))
}
await trx.delete(tableRowExecutions).where(and(...execFilters))
})
}

Expand Down Expand Up @@ -193,6 +195,44 @@ export async function countRunningCells(
return { total, byRowId }
}

/** Authoritative "cells queued or running" count for the table, derived from
* active dispatches so it survives reload and matches the live count. For each
* active dispatch every row in scope ahead of the cursor still has to run each
* targeted group, so remaining work = (rows ahead of cursor) × |groupIds|.
* Exact for Run-all; an upper bound for incomplete/new (rows the eligibility
* filter later skips are still counted). Falls back to the sidecar in-flight
* count when no dispatch is active (orphan stragglers). `byRowId` stays
* sidecar-based — the client overlay renders queued rows ahead of the cursor. */
export async function countActiveRunCells(
tableId: string,
dispatches?: DispatchRow[]
): Promise<{ total: number; byRowId: Record<string, number> }> {
const active = dispatches ?? (await listActiveDispatches(tableId))
if (active.length === 0) return countRunningCells(tableId)

const countRowsAhead = async (d: DispatchRow): Promise<number> => {
const groupCount = d.scope.groupIds.length
if (groupCount === 0) return 0
const filters = [eq(userTableRows.tableId, tableId), gt(userTableRows.position, d.cursor)]
if (d.scope.rowIds && d.scope.rowIds.length > 0) {
filters.push(inArray(userTableRows.id, d.scope.rowIds))
}
const [row] = await db
.select({ rowsAhead: sql<number>`count(*)::int` })
.from(userTableRows)
.where(and(...filters))
return (row?.rowsAhead ?? 0) * groupCount
}

// One round-trip per dispatch + the sidecar count, all in parallel.
const [sidecar, perDispatch] = await Promise.all([
countRunningCells(tableId),
Promise.all(active.map(countRowsAhead)),
])
const total = perDispatch.reduce((sum, n) => sum + n, 0)
return { total, byRowId: sidecar.byRowId }
}

export async function listActiveDispatches(tableId: string): Promise<DispatchRow[]> {
const rows = await db
.select()
Expand Down
Loading
Loading