From 3f3f2005c4519e4f37f9a34600cfa580f15fe836 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 19:08:12 -0700 Subject: [PATCH 1/2] improvement(cleanup): batchTrigger fan-out, chunked queries, batched S3, faster outlier drain - Fan cleanup-tasks/logs/soft-deletes out via tasks.batchTrigger (500 ws/chunk); bump to large-1x with concurrencyLimit: 5 - Chunk bulk DELETEs (1000 IDs/stmt) and collectChatFiles JSONB SELECT (500 chats/stmt) to bound worker memory and lock duration - Replace per-key position() table scans with one LATERAL unnest scan per 200-key chunk - Route storage deletes through StorageService.deleteFiles (S3 DeleteObjects: 1000 keys/HTTP) - Raise per-run row cap to 100K so long-tail tenants (one prod workspace has 723K doomed rows) drain in days, not weeks --- apps/sim/background/cleanup-logs.ts | 68 +++--- apps/sim/background/cleanup-soft-deletes.ts | 47 ++-- apps/sim/background/cleanup-tasks.ts | 64 +---- apps/sim/lib/billing/cleanup-dispatcher.ts | 244 +++++++++---------- apps/sim/lib/cleanup/batch-delete.ts | 41 +++- apps/sim/lib/cleanup/chat-cleanup.ts | 122 +++++----- apps/sim/lib/uploads/core/storage-service.ts | 43 ++++ apps/sim/lib/uploads/providers/s3/client.ts | 48 ++++ 8 files changed, 361 insertions(+), 316 deletions(-) diff --git a/apps/sim/background/cleanup-logs.ts b/apps/sim/background/cleanup-logs.ts index 1f09fa16ecc..090f2e4a729 100644 --- a/apps/sim/background/cleanup-logs.ts +++ b/apps/sim/background/cleanup-logs.ts @@ -3,9 +3,10 @@ import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/ import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { and, eq, inArray, isNull, lt, notInArray, or, sql } from 'drizzle-orm' -import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' +import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, + chunkArray, chunkedBatchDelete, type TableCleanupResult, } from '@/lib/cleanup/batch-delete' @@ -27,31 +28,34 @@ interface FileDeleteStats { const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling'] +/** Caps the per-row predicate cost: keys-per-row is `O(chunk)` not `O(uniqueKeys)`. */ +const REFERENCE_CHECK_KEY_CHUNK_SIZE = 200 + +/** + * One `LATERAL unnest` scan per chunk replaces N per-key sequential scans + * (each detoasting the entire JSONB column). Substring semantics identical. + */ async function filterLargeValueKeysWithoutRetainedReferences( keys: string[], deletedLogIds: string[] ): Promise { if (keys.length === 0 || deletedLogIds.length === 0) return [] - const unreferencedKeys: string[] = [] - for (const key of Array.from(new Set(keys))) { - const [referencingLog] = await db - .select({ id: workflowExecutionLogs.id }) - .from(workflowExecutionLogs) - .where( - and( - notInArray(workflowExecutionLogs.id, deletedLogIds), - sql`position(${key} in ${workflowExecutionLogs.executionData}::text) > 0` - ) - ) - .limit(1) - - if (!referencingLog) { - unreferencedKeys.push(key) - } + const uniqueKeys = Array.from(new Set(keys)) + const referencedKeys = new Set() + + for (const keyChunk of chunkArray(uniqueKeys, REFERENCE_CHECK_KEY_CHUNK_SIZE)) { + const rows = await db.execute<{ key: string }>(sql` + SELECT DISTINCT k.key AS key + FROM ${workflowExecutionLogs} AS wel, + unnest(${keyChunk}::text[]) AS k(key) + WHERE wel.id <> ALL(${deletedLogIds}::text[]) + AND position(k.key in wel.execution_data::text) > 0 + `) + for (const row of rows) referencedKeys.add(row.key) } - return unreferencedKeys + return uniqueKeys.filter((key) => !referencedKeys.has(key)) } async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise { @@ -154,14 +158,7 @@ async function cleanupWorkflowExecutionLogs( return { ...dbStats, ...fileStats } } -async function cleanupFreePlanOrphanedSnapshots( - payload: CleanupJobPayload, - retentionHours: number -): Promise { - if (payload.plan !== 'free') { - return - } - +async function cleanupFreePlanOrphanedSnapshots(retentionHours: number): Promise { try { const retentionDays = Math.floor(retentionHours / 24) const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1) @@ -173,20 +170,15 @@ async function cleanupFreePlanOrphanedSnapshots( export async function runCleanupLogs(payload: CleanupJobPayload): Promise { const startTime = Date.now() - - const scope = await resolveCleanupScope('cleanup-logs', payload) - if (!scope) { - logger.info(`[${payload.plan}] No retention configured, skipping`) - return - } - - const { workspaceIds, retentionHours, label } = scope + const { workspaceIds, retentionHours, label, plan, runGlobalHousekeeping } = payload const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000) if (workspaceIds.length === 0) { logger.info(`[${label}] No workspaces to process`) - await cleanupFreePlanOrphanedSnapshots(payload, retentionHours) + if (runGlobalHousekeeping && plan === 'free') { + await cleanupFreePlanOrphanedSnapshots(retentionHours) + } return } @@ -211,7 +203,9 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise tableName: `${label}/job_execution_logs`, }) - await cleanupFreePlanOrphanedSnapshots(payload, retentionHours) + if (runGlobalHousekeeping && plan === 'free') { + await cleanupFreePlanOrphanedSnapshots(retentionHours) + } const timeElapsed = (Date.now() - startTime) / 1000 logger.info(`[${label}] Job completed in ${timeElapsed.toFixed(2)}s`) @@ -219,5 +213,7 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise export const cleanupLogsTask = task({ id: 'cleanup-logs', + machine: 'large-1x', + queue: { concurrencyLimit: 5 }, run: runCleanupLogs, }) diff --git a/apps/sim/background/cleanup-soft-deletes.ts b/apps/sim/background/cleanup-soft-deletes.ts index 51733074075..9b960ff42f2 100644 --- a/apps/sim/background/cleanup-soft-deletes.ts +++ b/apps/sim/background/cleanup-soft-deletes.ts @@ -15,7 +15,7 @@ import { import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' import { and, inArray, isNotNull, lt } from 'drizzle-orm' -import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' +import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, deleteRowsById, @@ -92,22 +92,26 @@ async function cleanupWorkspaceFileStorage( const stats = { filesDeleted: 0, filesFailed: 0 } if (!isUsingCloudStorage()) return stats - const toDelete: Array<{ key: string; context: StorageContext }> = [ - ...scope.legacyRows.map((r) => ({ key: r.key, context: 'workspace' as StorageContext })), - ...scope.multiContextRows.map((r) => ({ key: r.key, context: r.context })), - ] + const keysByContext = new Map() + for (const r of scope.legacyRows) { + const bucket = keysByContext.get('workspace') + if (bucket) bucket.push(r.key) + else keysByContext.set('workspace', [r.key]) + } + for (const r of scope.multiContextRows) { + const bucket = keysByContext.get(r.context) + if (bucket) bucket.push(r.key) + else keysByContext.set(r.context, [r.key]) + } - await Promise.all( - toDelete.map(async ({ key, context }) => { - try { - await StorageService.deleteFile({ key, context }) - stats.filesDeleted++ - } catch (error) { - stats.filesFailed++ - logger.error(`Failed to delete storage file ${key} (context: ${context}):`, { error }) - } - }) - ) + for (const [context, keys] of keysByContext) { + const result = await StorageService.deleteFiles(keys, context) + stats.filesDeleted += result.deleted + stats.filesFailed += result.failed.length + for (const { key, error } of result.failed) { + logger.error(`Failed to delete storage file ${key} (context: ${context}):`, { error }) + } + } return stats } @@ -160,14 +164,7 @@ const CLEANUP_TARGETS = [ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise { const startTime = Date.now() - - const scope = await resolveCleanupScope('cleanup-soft-deletes', payload) - if (!scope) { - logger.info(`[${payload.plan}] No retention configured, skipping`) - return - } - - const { workspaceIds, retentionHours, label } = scope + const { workspaceIds, retentionHours, label } = payload if (workspaceIds.length === 0) { logger.info(`[${label}] No workspaces to process`) @@ -274,5 +271,7 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise export const cleanupSoftDeletesTask = task({ id: 'cleanup-soft-deletes', + machine: 'large-1x', + queue: { concurrencyLimit: 5 }, run: runCleanupSoftDeletes, }) diff --git a/apps/sim/background/cleanup-tasks.ts b/apps/sim/background/cleanup-tasks.ts index 25ef67461ed..341f5132eaa 100644 --- a/apps/sim/background/cleanup-tasks.ts +++ b/apps/sim/background/cleanup-tasks.ts @@ -9,8 +9,8 @@ import { } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { task } from '@trigger.dev/sdk' -import { and, inArray, lt, sql } from 'drizzle-orm' -import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher' +import { and, inArray, lt } from 'drizzle-orm' +import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher' import { batchDeleteByWorkspaceAndTimestamp, deleteRowsById, @@ -38,27 +38,6 @@ const RUN_CHILD_TABLES = [ }, ] as const -async function deleteByRunIds( - table: (typeof RUN_CHILD_TABLES)[number]['table'], - runIdCol: (typeof RUN_CHILD_TABLES)[number]['runIdCol'], - runIds: string[], - tableName: string -): Promise { - const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 } - try { - const deleted = await db - .delete(table) - .where(inArray(runIdCol, runIds)) - .returning({ id: sql`id` }) - result.deleted = deleted.length - logger.info(`[${tableName}] Deleted ${deleted.length} rows`) - } catch (error) { - result.failed++ - logger.error(`[${tableName}] Delete failed:`, { error }) - } - return result -} - async function cleanupRunChildren( workspaceIds: string[], retentionDate: Date, @@ -83,20 +62,13 @@ async function cleanupRunChildren( const ids = runIds.map((r) => r.id) return Promise.all( - RUN_CHILD_TABLES.map((t) => deleteByRunIds(t.table, t.runIdCol, ids, `${label}/${t.name}`)) + RUN_CHILD_TABLES.map((t) => deleteRowsById(t.table, t.runIdCol, ids, `${label}/${t.name}`)) ) } export async function runCleanupTasks(payload: CleanupJobPayload): Promise { const startTime = Date.now() - - const scope = await resolveCleanupScope('cleanup-tasks', payload) - if (!scope) { - logger.info(`[${payload.plan}] No retention configured, skipping`) - return - } - - const { workspaceIds, retentionHours, label } = scope + const { workspaceIds, retentionHours, label } = payload if (workspaceIds.length === 0) { logger.info(`[${label}] No workspaces to process`) @@ -130,26 +102,12 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise } // Delete feedback — no direct workspaceId, reuse chat IDs collected above - const feedbackResult: TableCleanupResult = { - table: `${label}/copilotFeedback`, - deleted: 0, - failed: 0, - } - try { - if (doomedChatIds.length > 0) { - const deleted = await db - .delete(copilotFeedback) - .where(inArray(copilotFeedback.chatId, doomedChatIds)) - .returning({ id: copilotFeedback.feedbackId }) - feedbackResult.deleted = deleted.length - logger.info(`[${feedbackResult.table}] Deleted ${deleted.length} rows`) - } else { - logger.info(`[${feedbackResult.table}] No expired rows found`) - } - } catch (error) { - feedbackResult.failed++ - logger.error(`[${feedbackResult.table}] Delete failed:`, { error }) - } + const feedbackResult = await deleteRowsById( + copilotFeedback, + copilotFeedback.chatId, + doomedChatIds, + `${label}/copilotFeedback` + ) // Delete copilot runs (has workspaceId directly, cascades checkpoints) const runsResult = await batchDeleteByWorkspaceAndTimestamp({ @@ -198,5 +156,7 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise export const cleanupTasksTask = task({ id: 'cleanup-tasks', + machine: 'large-1x', + queue: { concurrencyLimit: 5 }, run: runCleanupTasks, }) diff --git a/apps/sim/lib/billing/cleanup-dispatcher.ts b/apps/sim/lib/billing/cleanup-dispatcher.ts index 3c1c5cd81d8..93568887621 100644 --- a/apps/sim/lib/billing/cleanup-dispatcher.ts +++ b/apps/sim/lib/billing/cleanup-dispatcher.ts @@ -7,6 +7,7 @@ import { eq, isNull } from 'drizzle-orm' import { getOrganizationSubscription } from '@/lib/billing/core/billing' import { getHighestPriorityPersonalSubscription } from '@/lib/billing/core/subscription' import { getPlanType, type PlanCategory } from '@/lib/billing/plan-helpers' +import { chunkArray } from '@/lib/cleanup/batch-delete' import { getJobQueue } from '@/lib/core/async-jobs' import { shouldExecuteInline } from '@/lib/core/async-jobs/config' import type { EnqueueOptions } from '@/lib/core/async-jobs/types' @@ -15,8 +16,12 @@ import { isOrganizationWorkspace, WORKSPACE_MODE } from '@/lib/workspaces/policy const logger = createLogger('RetentionDispatcher') +/** Trigger.dev's documented cap on items per `batchTrigger` call (SDK 4.3.1+). */ const BATCH_TRIGGER_CHUNK_SIZE = 1000 +/** Bounds per-run memory + DB connections regardless of plan size. */ +const WORKSPACES_PER_CLEANUP_CHUNK = 500 + export type CleanupJobType = 'cleanup-logs' | 'cleanup-soft-deletes' | 'cleanup-tasks' export type OrganizationRetentionKey = @@ -32,9 +37,14 @@ export type NonEnterprisePlan = Exclude const NON_ENTERPRISE_PLANS = ['free', 'pro', 'team'] as const satisfies readonly NonEnterprisePlan[] -export type CleanupJobPayload = - | { plan: NonEnterprisePlan } - | { plan: 'enterprise'; workspaceId: string } +export interface CleanupJobPayload { + plan: PlanCategory + workspaceIds: string[] + retentionHours: number + label: string + /** Set on exactly one chunk per dispatch so plan-wide housekeeping runs once. */ + runGlobalHousekeeping?: boolean +} interface CleanupJobConfig { key: OrganizationRetentionKey @@ -170,74 +180,6 @@ async function resolvePlanTypesByWorkspaceId( return new Map(entries.filter((entry): entry is PlanResolutionEntry => entry !== null)) } -/** - * Bulk-lookup workspace IDs for a non-enterprise plan category using the same - * effective-plan lookup used by execution, limits, and workspace policy. - * Enterprise is per-workspace (routed through the owning organization's - * retention config). - */ -async function resolveWorkspaceIdsForPlan(plan: NonEnterprisePlan): Promise { - const rows = await listActiveWorkspaceCleanupScopeRows() - const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(rows) - return rows.filter((row) => planByWorkspaceId.get(row.id) === plan).map((row) => row.id) -} - -export interface ResolvedCleanupScope { - workspaceIds: string[] - retentionHours: number - label: string -} - -/** - * Translate a queued cleanup payload into a concrete cleanup scope: the set of - * workspaces and the retention cutoff to apply. Returns `null` when the plan - * has no retention configured (default is null, or the enterprise org has not - * set this key). - */ -export async function resolveCleanupScope( - jobType: CleanupJobType, - payload: CleanupJobPayload -): Promise { - const config = CLEANUP_CONFIG[jobType] - - if (payload.plan !== 'enterprise') { - const retentionHours = config.defaults[payload.plan] - if (retentionHours === null) return null - const workspaceIds = await resolveWorkspaceIdsForPlan(payload.plan) - return { workspaceIds, retentionHours, label: payload.plan } - } - - const [row] = await db - .select({ - id: workspace.id, - organizationId: workspace.organizationId, - workspaceMode: workspace.workspaceMode, - billedAccountUserId: workspace.billedAccountUserId, - settings: organization.dataRetentionSettings, - }) - .from(workspace) - .innerJoin(organization, eq(organization.id, workspace.organizationId)) - .where(eq(workspace.id, payload.workspaceId)) - .limit(1) - - if (!row || !isOrganizationWorkspace(row)) return null - - const organizationId = row.organizationId - if (!organizationId) return null - - const subscription = await getOrganizationSubscription(organizationId, { onError: 'throw' }) - if (getPlanType(subscription?.plan) !== 'enterprise') return null - - const hours = row?.settings?.[config.key] - if (hours == null) return null - - return { - workspaceIds: [payload.workspaceId], - retentionHours: hours, - label: `enterprise/${payload.workspaceId}`, - } -} - async function buildCleanupRunner(jobType: CleanupJobType): Promise { const cleanupRunner = await (async () => { switch (jobType) { @@ -252,91 +194,125 @@ async function buildCleanupRunner(jobType: CleanupJobType): Promise cleanupRunner(payload as CleanupJobPayload)) as EnqueueOptions['runner'] } -/** - * Dispatcher: enqueue cleanup jobs driven by `CLEANUP_CONFIG`. - * - * - One job per non-enterprise plan with a non-null default - * - One enterprise job per workspace whose owning organization has a non-null - * retention value for this job's key - * - * Uses Trigger.dev batchTrigger when available, otherwise parallel enqueue via - * the JobQueueBackend abstraction. On the database backend (no external worker), - * jobs run inline in the same process via fire-and-forget promises. - */ -export async function dispatchCleanupJobs( - jobType: CleanupJobType -): Promise<{ jobIds: string[]; jobCount: number; enterpriseCount: number }> { - const config = CLEANUP_CONFIG[jobType] - const jobQueue = await getJobQueue() - const jobIds: string[] = [] +/** Job type → plan whose housekeeping is global, not per-workspace. */ +const GLOBAL_HOUSEKEEPING_PLAN: Partial> = { + 'cleanup-logs': 'free', +} - const plansWithDefaults = NON_ENTERPRISE_PLANS.filter((plan) => config.defaults[plan] !== null) +async function buildCleanupChunks(jobType: CleanupJobType): Promise { + const config = CLEANUP_CONFIG[jobType] + const activeRows = await listActiveWorkspaceCleanupScopeRows() + const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(activeRows) + + const chunks: CleanupJobPayload[] = [] + + for (const plan of NON_ENTERPRISE_PLANS) { + const retentionHours = config.defaults[plan] + if (retentionHours === null) continue + const workspaceIds = activeRows + .filter((row) => planByWorkspaceId.get(row.id) === plan) + .map((row) => row.id) + if (workspaceIds.length === 0) continue + for (const ws of chunkArray(workspaceIds, WORKSPACES_PER_CLEANUP_CHUNK)) { + chunks.push({ plan, workspaceIds: ws, retentionHours, label: plan }) + } + } - for (const plan of plansWithDefaults) { - const payload: CleanupJobPayload = { plan } - const jobId = await jobQueue.enqueue(jobType, payload, { - runner: shouldExecuteInline() ? await buildCleanupRunner(jobType) : undefined, + for (const row of activeRows) { + if (planByWorkspaceId.get(row.id) !== 'enterprise') continue + const hours = row.organizationSettings?.[config.key] + if (hours == null) continue + chunks.push({ + plan: 'enterprise', + workspaceIds: [row.id], + retentionHours: hours, + label: `enterprise/${row.id}`, }) - jobIds.push(jobId) } - const activeWorkspaceRows = await listActiveWorkspaceCleanupScopeRows() - const planByWorkspaceId = await resolvePlanTypesByWorkspaceId(activeWorkspaceRows) - const enterpriseRows = activeWorkspaceRows.filter( - (row) => - planByWorkspaceId.get(row.id) === 'enterprise' && - row.organizationSettings?.[config.key] != null - ) + const housekeepingPlan = GLOBAL_HOUSEKEEPING_PLAN[jobType] + if (housekeepingPlan) { + const target = chunks.find((c) => c.plan === housekeepingPlan) + if (target) { + target.runGlobalHousekeeping = true + } else if (housekeepingPlan !== 'enterprise') { + // Synthetic empty chunk so housekeeping still fires when the plan has no workspaces. + const retentionHours = config.defaults[housekeepingPlan] + if (retentionHours != null) { + chunks.push({ + plan: housekeepingPlan, + workspaceIds: [], + retentionHours, + label: `${housekeepingPlan}/housekeeping`, + runGlobalHousekeeping: true, + }) + } + } + } + + return chunks +} - const enterpriseCount = enterpriseRows.length +/** + * Resolve the workspace set + retention cutoff once, then fan out one task + * run per `WORKSPACES_PER_CLEANUP_CHUNK` workspaces via `tasks.batchTrigger`. + * Falls back to `JobQueueBackend` enqueue when Trigger.dev isn't available. + */ +export async function dispatchCleanupJobs(jobType: CleanupJobType): Promise<{ + jobIds: string[] + jobCount: number + chunkCount: number + workspaceCount: number +}> { + const chunks = await buildCleanupChunks(jobType) + const workspaceCount = chunks.reduce((sum, c) => sum + c.workspaceIds.length, 0) - const planLabels = plansWithDefaults.join('+') || 'none' logger.info( - `[${jobType}] Dispatching: plans=[${planLabels}] + ${enterpriseCount} enterprise jobs (key: ${config.key})` + `[${jobType}] Dispatching: ${chunks.length} chunk(s) covering ${workspaceCount} workspace(s)` ) - if (enterpriseCount === 0) { - return { jobIds, jobCount: jobIds.length, enterpriseCount: 0 } + if (chunks.length === 0) { + return { jobIds: [], jobCount: 0, chunkCount: 0, workspaceCount: 0 } } + const jobIds: string[] = [] + if (isTriggerAvailable()) { - // Trigger.dev: use batchTrigger, chunked - for (let i = 0; i < enterpriseRows.length; i += BATCH_TRIGGER_CHUNK_SIZE) { - const chunk = enterpriseRows.slice(i, i + BATCH_TRIGGER_CHUNK_SIZE) + for (let i = 0; i < chunks.length; i += BATCH_TRIGGER_CHUNK_SIZE) { + const batch = chunks.slice(i, i + BATCH_TRIGGER_CHUNK_SIZE) const batchResult = await tasks.batchTrigger( jobType, - chunk.map((row) => ({ - payload: { plan: 'enterprise' as const, workspaceId: row.id }, + batch.map((payload) => ({ + payload, options: { - tags: [`workspaceId:${row.id}`, `jobType:${jobType}`], + tags: [`plan:${payload.plan}`, `jobType:${jobType}`], }, })) ) jobIds.push(batchResult.batchId) } - } else { - // Fallback: parallel enqueue via abstraction - const inlineRunner = shouldExecuteInline() ? await buildCleanupRunner(jobType) : undefined - const results = await Promise.allSettled( - enterpriseRows.map(async (row) => { - const payload: CleanupJobPayload = { plan: 'enterprise', workspaceId: row.id } - return jobQueue.enqueue(jobType, payload, { runner: inlineRunner }) - }) - ) - - let succeeded = 0 - let failed = 0 - for (const result of results) { - if (result.status === 'fulfilled') { - jobIds.push(result.value) - succeeded++ - } else { - failed++ - logger.error(`[${jobType}] Failed to enqueue enterprise job:`, { reason: result.reason }) - } + return { jobIds, jobCount: jobIds.length, chunkCount: chunks.length, workspaceCount } + } + + // Fallback: parallel enqueue via abstraction (self-hosted / inline path) + const inlineRunner = shouldExecuteInline() ? await buildCleanupRunner(jobType) : undefined + const jobQueue = await getJobQueue() + const results = await Promise.allSettled( + chunks.map((payload) => jobQueue.enqueue(jobType, payload, { runner: inlineRunner })) + ) + + let succeeded = 0 + let failed = 0 + for (const result of results) { + if (result.status === 'fulfilled') { + jobIds.push(result.value) + succeeded++ + } else { + failed++ + logger.error(`[${jobType}] Failed to enqueue chunk:`, { reason: result.reason }) } - logger.info(`[${jobType}] Enterprise enqueue: ${succeeded} succeeded, ${failed} failed`) } + logger.info(`[${jobType}] Chunk enqueue: ${succeeded} succeeded, ${failed} failed`) - return { jobIds, jobCount: jobIds.length, enterpriseCount } + return { jobIds, jobCount: jobIds.length, chunkCount: chunks.length, workspaceCount } } diff --git a/apps/sim/lib/cleanup/batch-delete.ts b/apps/sim/lib/cleanup/batch-delete.ts index dbf6cf21c51..259f095356f 100644 --- a/apps/sim/lib/cleanup/batch-delete.ts +++ b/apps/sim/lib/cleanup/batch-delete.ts @@ -6,7 +6,8 @@ import type { PgColumn, PgTable } from 'drizzle-orm/pg-core' const logger = createLogger('BatchDelete') export const DEFAULT_BATCH_SIZE = 2000 -export const DEFAULT_MAX_BATCHES_PER_TABLE = 10 +/** 50 × 2000 = 100K row cap per cleanup run; drains long-tail tenants in days, not weeks. */ +export const DEFAULT_MAX_BATCHES_PER_TABLE = 50 /** * Split workspaceIds into this-sized groups before running SELECT/DELETE. Large * IN lists combined with `started_at < X` force Postgres to probe every @@ -14,6 +15,8 @@ export const DEFAULT_MAX_BATCHES_PER_TABLE = 10 * at the scale of the full free tier. */ export const DEFAULT_WORKSPACE_CHUNK_SIZE = 50 +/** Bounds FK cascade trigger queue (per-statement in-memory) and bind-parameter count. */ +export const DEFAULT_DELETE_CHUNK_SIZE = 1000 export function chunkArray(arr: T[], size: number): T[][] { const out: T[][] = [] @@ -215,26 +218,38 @@ export async function batchDeleteByWorkspaceAndTimestamp({ } /** - * Delete rows by an explicit list of IDs. Use this when the IDs were selected - * upstream (e.g., to drive external cleanup like S3 deletes or a backend API - * call) so the DB delete cannot drift from the upstream selection. Paired with - * `batchDeleteByWorkspaceAndTimestamp` for tables with no external side effects. + * Delete by explicit ID list, chunked so each statement is its own transaction. + * Partial progress survives chunk-level failures. */ export async function deleteRowsById( tableDef: PgTable, idCol: PgColumn, ids: string[], - tableName: string + tableName: string, + chunkSize: number = DEFAULT_DELETE_CHUNK_SIZE ): Promise { const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 } if (ids.length === 0) return result - try { - const deleted = await db.delete(tableDef).where(inArray(idCol, ids)).returning({ id: idCol }) - result.deleted = deleted.length - logger.info(`[${tableName}] Deleted ${deleted.length} rows`) - } catch (error) { - result.failed++ - logger.error(`[${tableName}] Delete failed:`, { error }) + + const chunks = chunkArray(ids, chunkSize) + for (const [chunkIdx, chunkIds] of chunks.entries()) { + try { + const deleted = await db + .delete(tableDef) + .where(inArray(idCol, chunkIds)) + .returning({ id: idCol }) + result.deleted += deleted.length + } catch (error) { + result.failed += chunkIds.length + logger.error( + `[${tableName}] Delete chunk ${chunkIdx + 1}/${chunks.length} failed (${chunkIds.length} rows):`, + { error } + ) + } } + + logger.info( + `[${tableName}] Deleted ${result.deleted} rows across ${chunks.length} chunk(s)${result.failed > 0 ? `, ${result.failed} failed` : ''}` + ) return result } diff --git a/apps/sim/lib/cleanup/chat-cleanup.ts b/apps/sim/lib/cleanup/chat-cleanup.ts index 5999706e2a0..b154687bc37 100644 --- a/apps/sim/lib/cleanup/chat-cleanup.ts +++ b/apps/sim/lib/cleanup/chat-cleanup.ts @@ -2,6 +2,7 @@ import { db } from '@sim/db' import { copilotChats, workspaceFiles } from '@sim/db/schema' import { createLogger } from '@sim/logger' import { and, inArray, isNull } from 'drizzle-orm' +import { chunkArray } from '@/lib/cleanup/batch-delete' import { SIM_AGENT_API_URL } from '@/lib/copilot/constants' import { env } from '@/lib/core/config/env' import type { StorageContext } from '@/lib/uploads' @@ -10,6 +11,8 @@ import { isUsingCloudStorage, StorageService } from '@/lib/uploads' const logger = createLogger('ChatCleanup') const COPILOT_CLEANUP_BATCH_SIZE = 1000 +/** Bounds JSONB detoast memory: `messages` can be MBs per row. */ +const CHAT_FILE_COLLECT_CHUNK_SIZE = 500 /** * Only storage in these contexts is tied to chat/task lifecycle. Workspace @@ -26,10 +29,9 @@ interface FileRef { } /** - * Collect all file storage keys associated with the given chat IDs. - * Two sources: - * 1. workspaceFiles rows with chatId FK — filtered to chat-scoped contexts only - * 2. fileAttachments[].key inside copilotChats.messages JSONB — all copilot uploads + * Collect all file storage keys for the given chat IDs from two sources: + * 1. workspaceFiles rows with chatId FK (chat-scoped contexts only) + * 2. fileAttachments[].key inside copilotChats.messages JSONB */ export async function collectChatFiles(chatIds: string[]): Promise { const files: FileRef[] = [] @@ -37,47 +39,49 @@ export async function collectChatFiles(chatIds: string[]): Promise { const seen = new Set() - const [linkedFiles, chatsWithMessages] = await Promise.all([ - db - .select({ key: workspaceFiles.key, context: workspaceFiles.context }) - .from(workspaceFiles) - .where( - and( - inArray(workspaceFiles.chatId, chatIds), - isNull(workspaceFiles.deletedAt), - inArray(workspaceFiles.context, [...CHAT_SCOPED_CONTEXTS]) - ) - ), - db - .select({ messages: copilotChats.messages }) - .from(copilotChats) - .where(inArray(copilotChats.id, chatIds)), - ]) - - for (const f of linkedFiles) { - if (!seen.has(f.key)) { - seen.add(f.key) - files.push({ key: f.key, context: f.context as ChatScopedContext }) + for (const chunk of chunkArray(chatIds, CHAT_FILE_COLLECT_CHUNK_SIZE)) { + const [linkedFiles, chatsWithMessages] = await Promise.all([ + db + .select({ key: workspaceFiles.key, context: workspaceFiles.context }) + .from(workspaceFiles) + .where( + and( + inArray(workspaceFiles.chatId, chunk), + isNull(workspaceFiles.deletedAt), + inArray(workspaceFiles.context, [...CHAT_SCOPED_CONTEXTS]) + ) + ), + db + .select({ messages: copilotChats.messages }) + .from(copilotChats) + .where(inArray(copilotChats.id, chunk)), + ]) + + for (const f of linkedFiles) { + if (!seen.has(f.key)) { + seen.add(f.key) + files.push({ key: f.key, context: f.context as ChatScopedContext }) + } } - } - for (const chat of chatsWithMessages) { - const messages = chat.messages as unknown[] - if (!Array.isArray(messages)) continue - for (const msg of messages) { - if (!msg || typeof msg !== 'object') continue - const attachments = (msg as Record).fileAttachments - if (!Array.isArray(attachments)) continue - for (const attachment of attachments) { - if ( - attachment && - typeof attachment === 'object' && - (attachment as Record).key - ) { - const key = (attachment as Record).key as string - if (!seen.has(key)) { - seen.add(key) - files.push({ key, context: 'copilot' }) + for (const chat of chatsWithMessages) { + const messages = chat.messages as unknown[] + if (!Array.isArray(messages)) continue + for (const msg of messages) { + if (!msg || typeof msg !== 'object') continue + const attachments = (msg as Record).fileAttachments + if (!Array.isArray(attachments)) continue + for (const attachment of attachments) { + if ( + attachment && + typeof attachment === 'object' && + (attachment as Record).key + ) { + const key = (attachment as Record).key as string + if (!seen.has(key)) { + seen.add(key) + files.push({ key, context: 'copilot' }) + } } } } @@ -87,9 +91,7 @@ export async function collectChatFiles(chatIds: string[]): Promise { return files } -/** - * Delete files from cloud storage using the correct context/bucket per file. - */ +/** Groups files by storage context so each context can use one batch DELETE call. */ export async function deleteStorageFiles( files: FileRef[], label: string @@ -97,17 +99,23 @@ export async function deleteStorageFiles( const stats = { filesDeleted: 0, filesFailed: 0 } if (files.length === 0 || !isUsingCloudStorage()) return stats - await Promise.all( - files.map(async (file) => { - try { - await StorageService.deleteFile({ key: file.key, context: file.context }) - stats.filesDeleted++ - } catch (error) { - stats.filesFailed++ - logger.error(`[${label}] Failed to delete storage file ${file.key}:`, { error }) - } - }) - ) + const keysByContext = new Map() + for (const file of files) { + const bucket = keysByContext.get(file.context) + if (bucket) bucket.push(file.key) + else keysByContext.set(file.context, [file.key]) + } + + for (const [context, keys] of keysByContext) { + const result = await StorageService.deleteFiles(keys, context) + stats.filesDeleted += result.deleted + stats.filesFailed += result.failed.length + for (const { key, error } of result.failed) { + logger.error(`[${label}] Failed to delete storage file ${key} (context: ${context}):`, { + error, + }) + } + } return stats } diff --git a/apps/sim/lib/uploads/core/storage-service.ts b/apps/sim/lib/uploads/core/storage-service.ts index 75632f2cba7..7a5ba092f29 100644 --- a/apps/sim/lib/uploads/core/storage-service.ts +++ b/apps/sim/lib/uploads/core/storage-service.ts @@ -1,5 +1,6 @@ import { randomBytes } from 'crypto' import { createLogger } from '@sim/logger' +import { getErrorMessage } from '@sim/utils/errors' import { getStorageConfig, USE_BLOB_STORAGE, USE_S3_STORAGE } from '@/lib/uploads/config' import type { BlobConfig } from '@/lib/uploads/providers/blob/types' import type { S3Config } from '@/lib/uploads/providers/s3/types' @@ -239,6 +240,48 @@ export async function deleteFile(options: DeleteFileOptions): Promise { await unlink(filePath) } +/** AWS SDK v3 silently caps HTTP connections at 50/endpoint — stay well under. */ +const PER_FILE_DELETE_CONCURRENCY = 25 + +/** + * Bulk delete via the provider's native multi-object API when available + * (S3 `DeleteObjects`), else bounded-concurrency per-file. All keys must + * share `context`. Idempotent on missing keys. + */ +export async function deleteFiles( + keys: string[], + context: StorageContext +): Promise<{ deleted: number; failed: Array<{ key: string; error: string }> }> { + if (keys.length === 0) return { deleted: 0, failed: [] } + + const config = getStorageConfig(context) + + if (USE_S3_STORAGE) { + const { deleteManyFromS3 } = await import('@/lib/uploads/providers/s3/client') + const { failed } = await deleteManyFromS3(keys, createS3Config(config)) + return { deleted: keys.length - failed.length, failed } + } + + const failed: Array<{ key: string; error: string }> = [] + let cursor = 0 + const runWorker = async (): Promise => { + while (cursor < keys.length) { + const idx = cursor++ + const key = keys[idx] + try { + await deleteFile({ key, context }) + } catch (error) { + failed.push({ key, error: getErrorMessage(error) }) + } + } + } + + const workerCount = Math.min(PER_FILE_DELETE_CONCURRENCY, keys.length) + await Promise.all(Array.from({ length: workerCount }, runWorker)) + + return { deleted: keys.length - failed.length, failed } +} + /** * Check whether an object exists in the configured cloud storage provider. * Returns object size and content-type when present, or null when missing. diff --git a/apps/sim/lib/uploads/providers/s3/client.ts b/apps/sim/lib/uploads/providers/s3/client.ts index 6f347a4bdd6..f31bd21718d 100644 --- a/apps/sim/lib/uploads/providers/s3/client.ts +++ b/apps/sim/lib/uploads/providers/s3/client.ts @@ -3,6 +3,7 @@ import { CompleteMultipartUploadCommand, CreateMultipartUploadCommand, DeleteObjectCommand, + DeleteObjectsCommand, GetObjectCommand, HeadObjectCommand, PutObjectCommand, @@ -10,6 +11,7 @@ import { UploadPartCommand, } from '@aws-sdk/client-s3' import { getSignedUrl } from '@aws-sdk/s3-request-presigner' +import { getErrorMessage } from '@sim/utils/errors' import { generateId } from '@sim/utils/id' import { env } from '@/lib/core/config/env' import { S3_CONFIG, S3_KB_CONFIG } from '@/lib/uploads/config' @@ -251,6 +253,52 @@ export async function deleteFromS3(key: string, customConfig?: S3Config): Promis ) } +/** S3 `DeleteObjects` hard cap. */ +const S3_DELETE_OBJECTS_MAX_KEYS = 1000 + +/** + * Multi-object delete. One HTTP call per 1000 keys; each key still counts + * against the per-prefix DELETE rate limit (3500/sec). + */ +export async function deleteManyFromS3( + keys: string[], + customConfig?: S3Config +): Promise<{ failed: Array<{ key: string; error: string }> }> { + const failed: Array<{ key: string; error: string }> = [] + if (keys.length === 0) return { failed } + + const config = customConfig || { bucket: S3_CONFIG.bucket, region: S3_CONFIG.region } + const s3Client = getS3Client() + + for (let i = 0; i < keys.length; i += S3_DELETE_OBJECTS_MAX_KEYS) { + const chunk = keys.slice(i, i + S3_DELETE_OBJECTS_MAX_KEYS) + try { + const response = await s3Client.send( + new DeleteObjectsCommand({ + Bucket: config.bucket, + Delete: { + Objects: chunk.map((Key) => ({ Key })), + Quiet: true, + }, + }) + ) + for (const error of response.Errors ?? []) { + if (error.Key) { + failed.push({ + key: error.Key, + error: error.Message ?? error.Code ?? 'unknown', + }) + } + } + } catch (error) { + const message = getErrorMessage(error) + for (const Key of chunk) failed.push({ key: Key, error: message }) + } + } + + return { failed } +} + /** * Initiate a multipart upload for S3 */ From ddbdacbd653b2a28c3a16e1447142a51a5f841d0 Mon Sep 17 00:00:00 2001 From: Waleed Latif Date: Wed, 20 May 2026 19:20:48 -0700 Subject: [PATCH 2/2] improvement(cleanup): chunk-index labels, clarify upper-bound failure counter Addresses Greptile review feedback: - Disambiguate downstream logs when a plan splits into multiple workspace chunks (e.g. 'free/1', 'free/2') - Document that deleteRowsById's failed counter is an upper bound (chunk rolls back to 0 deletes on error) --- apps/sim/lib/billing/cleanup-dispatcher.ts | 10 ++++++++-- apps/sim/lib/cleanup/batch-delete.ts | 5 ++++- 2 files changed, 12 insertions(+), 3 deletions(-) diff --git a/apps/sim/lib/billing/cleanup-dispatcher.ts b/apps/sim/lib/billing/cleanup-dispatcher.ts index 93568887621..23c410ee365 100644 --- a/apps/sim/lib/billing/cleanup-dispatcher.ts +++ b/apps/sim/lib/billing/cleanup-dispatcher.ts @@ -213,8 +213,14 @@ async function buildCleanupChunks(jobType: CleanupJobType): Promise planByWorkspaceId.get(row.id) === plan) .map((row) => row.id) if (workspaceIds.length === 0) continue - for (const ws of chunkArray(workspaceIds, WORKSPACES_PER_CLEANUP_CHUNK)) { - chunks.push({ plan, workspaceIds: ws, retentionHours, label: plan }) + const planChunks = chunkArray(workspaceIds, WORKSPACES_PER_CLEANUP_CHUNK) + for (const [idx, ws] of planChunks.entries()) { + chunks.push({ + plan, + workspaceIds: ws, + retentionHours, + label: planChunks.length > 1 ? `${plan}/${idx + 1}` : plan, + }) } } diff --git a/apps/sim/lib/cleanup/batch-delete.ts b/apps/sim/lib/cleanup/batch-delete.ts index 259f095356f..3241bea9df9 100644 --- a/apps/sim/lib/cleanup/batch-delete.ts +++ b/apps/sim/lib/cleanup/batch-delete.ts @@ -240,9 +240,12 @@ export async function deleteRowsById( .returning({ id: idCol }) result.deleted += deleted.length } catch (error) { + // Upper bound: Postgres rolls back the chunk on error, so actual deletes = 0, + // but we can't tell which IDs in the chunk would have matched. The next cron + // run picks up whatever's still expired, so this only inflates the metric. result.failed += chunkIds.length logger.error( - `[${tableName}] Delete chunk ${chunkIdx + 1}/${chunks.length} failed (${chunkIds.length} rows):`, + `[${tableName}] Delete chunk ${chunkIdx + 1}/${chunks.length} failed (up to ${chunkIds.length} rows):`, { error } ) }