Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -303,33 +303,35 @@ const TYPEWRITER_MS_PER_CHAR = 15
*/
function useTypewriter(text: string | null): string | null {
const [revealed, setRevealed] = useState<string | null>(text)
const isFirstRunRef = useRef(true)
const prevTextRef = useRef<string | null>(text)
const mountedRef = useRef(false)
const animateRef = useRef(false)

useEffect(() => {
if (isFirstRunRef.current) {
isFirstRunRef.current = false
prevTextRef.current = text
setRevealed(text)
return
}
if (prevTextRef.current === text) return
// Reset synchronously during render when `text` changes (not on first mount)
// so no frame ever shows the full new value before the animation begins —
// an effect-based reset lands one frame late and flashes the whole text.
if (prevTextRef.current !== text) {
prevTextRef.current = text
const animate = mountedRef.current && text !== null && text.length > 0
animateRef.current = animate
setRevealed(animate ? '' : text)
}

if (text === null || text.length === 0) {
setRevealed(text)
return
}
useEffect(() => {
mountedRef.current = true
}, [])

const full = text
useEffect(() => {
if (!animateRef.current) return
animateRef.current = false
const full = text as string
const start = performance.now()
let raf = 0
const tick = (now: number) => {
const chars = Math.min(full.length, Math.floor((now - start) / TYPEWRITER_MS_PER_CHAR))
setRevealed(full.slice(0, chars))
if (chars < full.length) raf = requestAnimationFrame(tick)
}
setRevealed('')
raf = requestAnimationFrame(tick)
return () => cancelAnimationFrame(raf)
}, [text])
Expand Down
11 changes: 4 additions & 7 deletions apps/sim/hooks/queries/tables.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ import type {
} from '@/lib/table'
import {
areGroupDepsSatisfied,
areOutputsFilled,
isExecInFlight,
optimisticallyScheduleNewlyEligibleGroups,
} from '@/lib/table/deps'
Expand Down Expand Up @@ -1418,12 +1417,10 @@ export function useRunColumn({ workspaceId, tableId }: RowMutationContext) {
// dispatcher regardless of mode. Stamping pending here would leave
// the cell flashing Queued indefinitely (no SSE event will arrive).
if (group && !areGroupDepsSatisfied(group, r)) continue
// Mirror server eligibility for `mode: 'incomplete'`: skip cells whose
// outputs are filled, regardless of exec status. A cancelled/error
// cell with a leftover value from a prior run was rendering as filled
// but flipping to "queued" optimistically here even though the server
// would skip it.
if (runMode === 'incomplete' && group && areOutputsFilled(group, r)) continue
// Mirror server eligibility for manual `mode: 'incomplete'`: a
// `completed` group is done (even with a blank output) — only "Run
// all" re-runs it. error/cancelled/never-run cells still re-run.
if (runMode === 'incomplete' && exec?.status === 'completed') continue
next[groupId] = buildPendingExec(exec)
// Mirror the server-side bulk clear: wipe output values so the cell
// doesn't render the stale completed value behind a pending badge.
Expand Down
110 changes: 56 additions & 54 deletions apps/sim/lib/table/dispatcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,67 +65,69 @@ export async function bulkClearWorkflowGroupCells(input: {
// Pre-existing outputs on any other row must not be wiped by an auto-fire.
if (mode === 'new') return

const outputCols = Array.from(new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName))))
const groupIds = groups.map((g) => g.id)

// Step 1: clear the targeted output columns from `data` on every row in
// scope. Identical chain to the previous JSONB-only path.
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`

const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
if (rowIds && rowIds.length > 0) {
filters.push(inArray(userTableRows.id, rowIds))
}
if (mode === 'incomplete') {
// Skip rows where all output columns across all targeted groups already
// have a non-empty value — those are "completed-and-filled" and the
// eligibility predicate would skip them anyway.
const filledChecks = outputCols.map(
(col) => sql`coalesce(${userTableRows.data} ->> ${col}, '') != ''`
const rowScope = rowIds && rowIds.length > 0 ? rowIds : null

if (mode === 'all') {
// Run-all re-runs every targeted group: wipe all their output columns +
// executions for the rows in scope. (Prior in-flight runs were already
// cancelled by the caller.)
const outputCols = Array.from(
new Set(groups.flatMap((g) => g.outputs.map((o) => o.columnName)))
)
const allFilled = filledChecks.reduce((acc, expr) => sql`${acc} AND ${expr}`)
filters.push(sql`NOT (${allFilled})`)
// Also skip rows where ANY targeted group has an in-flight exec — those
// belong to another dispatch and clobbering them would race. Encoded as
// a NOT EXISTS subquery against the sidecar's `(table_id, status)`
// partial index.
filters.push(
sql`NOT EXISTS (
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
for (const col of outputCols) dataExpr = sql`(${dataExpr}) - ${col}::text`
const filters: SQL[] = [eq(userTableRows.tableId, tableId)]
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))

await db.transaction(async (trx) => {
await trx
.update(userTableRows)
.set({ data: dataExpr, updatedAt: new Date() })
.where(and(...filters))
const execFilters: SQL[] = [
eq(tableRowExecutions.tableId, tableId),
inArray(tableRowExecutions.groupId, groupIds),
]
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
await trx.delete(tableRowExecutions).where(and(...execFilters))
})
return
}

// `incomplete`: clear per-group, not per-row. Only groups that are
// re-runnable (`error` / `cancelled`) get their output columns + exec wiped;
// `completed` and in-flight groups are left fully intact. A row-level "all
// filled" check would otherwise wipe a completed group's data + exec just
// because a *sibling* group on the same row is incomplete, re-running the
// completed one. (`never-run` groups have no exec/output to clear — the
// dispatcher runs them via eligibility.)
await db.transaction(async (trx) => {
for (const group of groups) {
const reRunnable = sql`EXISTS (
SELECT 1 FROM ${tableRowExecutions} re
WHERE re.row_id = ${userTableRows.id}
AND re.group_id = ANY(ARRAY[${sql.join(
groupIds.map((gid) => sql`${gid}`),
sql`, `
)}]::text[])
AND re.status IN ('queued', 'running', 'pending')
AND re.group_id = ${group.id}
AND re.status IN ('error', 'cancelled')
)`
)
}
const filters: SQL[] = [eq(userTableRows.tableId, tableId), reRunnable]
if (rowScope) filters.push(inArray(userTableRows.id, rowScope))

await db.transaction(async (trx) => {
await trx
.update(userTableRows)
.set({ data: dataExpr, updatedAt: new Date() })
.where(and(...filters))

// Step 2: delete the targeted groups' executions for the rows in scope.
// Reuse the same row-scope filter via a subquery.
const execFilters: SQL[] = [
eq(tableRowExecutions.tableId, tableId),
inArray(tableRowExecutions.groupId, groupIds),
]
if (rowIds && rowIds.length > 0) {
execFilters.push(inArray(tableRowExecutions.rowId, rowIds))
}
if (mode === 'incomplete') {
// For `incomplete`, only delete entries that aren't already in-flight
// — terminal states (completed/error/cancelled) get wiped so the
// dispatcher re-enqueues; in-flight entries stay so we don't race
// with their worker.
execFilters.push(sql`${tableRowExecutions.status} NOT IN ('queued', 'running', 'pending')`)
let dataExpr: SQL = sql`coalesce(${userTableRows.data}, '{}'::jsonb)`
for (const out of group.outputs) dataExpr = sql`(${dataExpr}) - ${out.columnName}::text`
await trx
.update(userTableRows)
.set({ data: dataExpr, updatedAt: new Date() })
.where(and(...filters))

const execFilters: SQL[] = [
eq(tableRowExecutions.tableId, tableId),
eq(tableRowExecutions.groupId, group.id),
sql`${tableRowExecutions.status} IN ('error', 'cancelled')`,
]
if (rowScope) execFilters.push(inArray(tableRowExecutions.rowId, rowScope))
await trx.delete(tableRowExecutions).where(and(...execFilters))
}
await trx.delete(tableRowExecutions).where(and(...execFilters))
})
}

Expand Down
9 changes: 8 additions & 1 deletion apps/sim/lib/table/workflow-columns.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,14 @@ export function classifyEligibility(
if (!isManualRun && completedAndFilled) return 'completed-on-auto'
if (!isManualRun && status === 'error') return 'error-on-auto'
if (!isManualRun && status === 'cancelled') return 'cancelled-on-auto'
if (mode === 'incomplete' && completedAndFilled) return 'completed-on-incomplete'
// Manual incomplete-mode runs (Run row / Run incomplete) treat a `completed`
// group as done even if an output is blank — only "Run all" re-runs it. The
// auto cascade still re-fills blank outputs (completedAndFilled).
if (mode === 'incomplete') {
if (isManualRun ? status === 'completed' : completedAndFilled) {
return 'completed-on-incomplete'
}
}

if (isManualRun && group.autoRun === false) return 'manual-bypass'
return areGroupDepsSatisfied(group, row) ? 'eligible' : 'deps-unmet'
Expand Down
Loading