Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 32 additions & 36 deletions apps/sim/background/cleanup-logs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@ import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/
import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { and, eq, inArray, isNull, lt, notInArray, or, sql } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
chunkArray,
chunkedBatchDelete,
type TableCleanupResult,
} from '@/lib/cleanup/batch-delete'
Expand All @@ -27,31 +28,34 @@ interface FileDeleteStats {

const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling']

/** Caps the per-row predicate cost: keys-per-row is `O(chunk)` not `O(uniqueKeys)`. */
const REFERENCE_CHECK_KEY_CHUNK_SIZE = 200

/**
* One `LATERAL unnest` scan per chunk replaces N per-key sequential scans
* (each detoasting the entire JSONB column). Substring semantics identical.
*/
async function filterLargeValueKeysWithoutRetainedReferences(
keys: string[],
deletedLogIds: string[]
): Promise<string[]> {
if (keys.length === 0 || deletedLogIds.length === 0) return []

const unreferencedKeys: string[] = []
for (const key of Array.from(new Set(keys))) {
const [referencingLog] = await db
.select({ id: workflowExecutionLogs.id })
.from(workflowExecutionLogs)
.where(
and(
notInArray(workflowExecutionLogs.id, deletedLogIds),
sql`position(${key} in ${workflowExecutionLogs.executionData}::text) > 0`
)
)
.limit(1)

if (!referencingLog) {
unreferencedKeys.push(key)
}
const uniqueKeys = Array.from(new Set(keys))
const referencedKeys = new Set<string>()

for (const keyChunk of chunkArray(uniqueKeys, REFERENCE_CHECK_KEY_CHUNK_SIZE)) {
const rows = await db.execute<{ key: string }>(sql`
SELECT DISTINCT k.key AS key
FROM ${workflowExecutionLogs} AS wel,
unnest(${keyChunk}::text[]) AS k(key)
WHERE wel.id <> ALL(${deletedLogIds}::text[])
AND position(k.key in wel.execution_data::text) > 0
`)
for (const row of rows) referencedKeys.add(row.key)
}

return unreferencedKeys
return uniqueKeys.filter((key) => !referencedKeys.has(key))
}

async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
Expand Down Expand Up @@ -154,14 +158,7 @@ async function cleanupWorkflowExecutionLogs(
return { ...dbStats, ...fileStats }
}

async function cleanupFreePlanOrphanedSnapshots(
payload: CleanupJobPayload,
retentionHours: number
): Promise<void> {
if (payload.plan !== 'free') {
return
}

async function cleanupFreePlanOrphanedSnapshots(retentionHours: number): Promise<void> {
try {
const retentionDays = Math.floor(retentionHours / 24)
const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1)
Expand All @@ -173,20 +170,15 @@ async function cleanupFreePlanOrphanedSnapshots(

export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
const startTime = Date.now()

const scope = await resolveCleanupScope('cleanup-logs', payload)
if (!scope) {
logger.info(`[${payload.plan}] No retention configured, skipping`)
return
}

const { workspaceIds, retentionHours, label } = scope
const { workspaceIds, retentionHours, label, plan, runGlobalHousekeeping } = payload

const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000)

if (workspaceIds.length === 0) {
logger.info(`[${label}] No workspaces to process`)
await cleanupFreePlanOrphanedSnapshots(payload, retentionHours)
if (runGlobalHousekeeping && plan === 'free') {
await cleanupFreePlanOrphanedSnapshots(retentionHours)
}
return
}

Expand All @@ -211,13 +203,17 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
tableName: `${label}/job_execution_logs`,
})

await cleanupFreePlanOrphanedSnapshots(payload, retentionHours)
if (runGlobalHousekeeping && plan === 'free') {
await cleanupFreePlanOrphanedSnapshots(retentionHours)
}

const timeElapsed = (Date.now() - startTime) / 1000
logger.info(`[${label}] Job completed in ${timeElapsed.toFixed(2)}s`)
}

export const cleanupLogsTask = task({
id: 'cleanup-logs',
machine: 'large-1x',
queue: { concurrencyLimit: 5 },
run: runCleanupLogs,
})
47 changes: 23 additions & 24 deletions apps/sim/background/cleanup-soft-deletes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import {
import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { and, inArray, isNotNull, lt } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
deleteRowsById,
Expand Down Expand Up @@ -92,22 +92,26 @@ async function cleanupWorkspaceFileStorage(
const stats = { filesDeleted: 0, filesFailed: 0 }
if (!isUsingCloudStorage()) return stats

const toDelete: Array<{ key: string; context: StorageContext }> = [
...scope.legacyRows.map((r) => ({ key: r.key, context: 'workspace' as StorageContext })),
...scope.multiContextRows.map((r) => ({ key: r.key, context: r.context })),
]
const keysByContext = new Map<StorageContext, string[]>()
for (const r of scope.legacyRows) {
const bucket = keysByContext.get('workspace')
if (bucket) bucket.push(r.key)
else keysByContext.set('workspace', [r.key])
}
for (const r of scope.multiContextRows) {
const bucket = keysByContext.get(r.context)
if (bucket) bucket.push(r.key)
else keysByContext.set(r.context, [r.key])
}

await Promise.all(
toDelete.map(async ({ key, context }) => {
try {
await StorageService.deleteFile({ key, context })
stats.filesDeleted++
} catch (error) {
stats.filesFailed++
logger.error(`Failed to delete storage file ${key} (context: ${context}):`, { error })
}
})
)
for (const [context, keys] of keysByContext) {
const result = await StorageService.deleteFiles(keys, context)
stats.filesDeleted += result.deleted
stats.filesFailed += result.failed.length
for (const { key, error } of result.failed) {
logger.error(`Failed to delete storage file ${key} (context: ${context}):`, { error })
}
}

return stats
}
Expand Down Expand Up @@ -160,14 +164,7 @@ const CLEANUP_TARGETS = [

export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise<void> {
const startTime = Date.now()

const scope = await resolveCleanupScope('cleanup-soft-deletes', payload)
if (!scope) {
logger.info(`[${payload.plan}] No retention configured, skipping`)
return
}

const { workspaceIds, retentionHours, label } = scope
const { workspaceIds, retentionHours, label } = payload

if (workspaceIds.length === 0) {
logger.info(`[${label}] No workspaces to process`)
Expand Down Expand Up @@ -274,5 +271,7 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise

export const cleanupSoftDeletesTask = task({
id: 'cleanup-soft-deletes',
machine: 'large-1x',
queue: { concurrencyLimit: 5 },
run: runCleanupSoftDeletes,
})
64 changes: 12 additions & 52 deletions apps/sim/background/cleanup-tasks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import {
} from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { task } from '@trigger.dev/sdk'
import { and, inArray, lt, sql } from 'drizzle-orm'
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
import { and, inArray, lt } from 'drizzle-orm'
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
import {
batchDeleteByWorkspaceAndTimestamp,
deleteRowsById,
Expand Down Expand Up @@ -38,27 +38,6 @@ const RUN_CHILD_TABLES = [
},
] as const

async function deleteByRunIds(
table: (typeof RUN_CHILD_TABLES)[number]['table'],
runIdCol: (typeof RUN_CHILD_TABLES)[number]['runIdCol'],
runIds: string[],
tableName: string
): Promise<TableCleanupResult> {
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
try {
const deleted = await db
.delete(table)
.where(inArray(runIdCol, runIds))
.returning({ id: sql`id` })
result.deleted = deleted.length
logger.info(`[${tableName}] Deleted ${deleted.length} rows`)
} catch (error) {
result.failed++
logger.error(`[${tableName}] Delete failed:`, { error })
}
return result
}

async function cleanupRunChildren(
workspaceIds: string[],
retentionDate: Date,
Expand All @@ -83,20 +62,13 @@ async function cleanupRunChildren(
const ids = runIds.map((r) => r.id)

return Promise.all(
RUN_CHILD_TABLES.map((t) => deleteByRunIds(t.table, t.runIdCol, ids, `${label}/${t.name}`))
RUN_CHILD_TABLES.map((t) => deleteRowsById(t.table, t.runIdCol, ids, `${label}/${t.name}`))
)
}

export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void> {
const startTime = Date.now()

const scope = await resolveCleanupScope('cleanup-tasks', payload)
if (!scope) {
logger.info(`[${payload.plan}] No retention configured, skipping`)
return
}

const { workspaceIds, retentionHours, label } = scope
const { workspaceIds, retentionHours, label } = payload

if (workspaceIds.length === 0) {
logger.info(`[${label}] No workspaces to process`)
Expand Down Expand Up @@ -130,26 +102,12 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
}

// Delete feedback — no direct workspaceId, reuse chat IDs collected above
const feedbackResult: TableCleanupResult = {
table: `${label}/copilotFeedback`,
deleted: 0,
failed: 0,
}
try {
if (doomedChatIds.length > 0) {
const deleted = await db
.delete(copilotFeedback)
.where(inArray(copilotFeedback.chatId, doomedChatIds))
.returning({ id: copilotFeedback.feedbackId })
feedbackResult.deleted = deleted.length
logger.info(`[${feedbackResult.table}] Deleted ${deleted.length} rows`)
} else {
logger.info(`[${feedbackResult.table}] No expired rows found`)
}
} catch (error) {
feedbackResult.failed++
logger.error(`[${feedbackResult.table}] Delete failed:`, { error })
}
const feedbackResult = await deleteRowsById(
copilotFeedback,
copilotFeedback.chatId,
doomedChatIds,
`${label}/copilotFeedback`
)

// Delete copilot runs (has workspaceId directly, cascades checkpoints)
const runsResult = await batchDeleteByWorkspaceAndTimestamp({
Expand Down Expand Up @@ -198,5 +156,7 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>

export const cleanupTasksTask = task({
id: 'cleanup-tasks',
machine: 'large-1x',
queue: { concurrencyLimit: 5 },
run: runCleanupTasks,
})
Loading
Loading