Skip to content

Commit 11ad891

Browse files
authored
improvement(cleanup): batchTrigger fan-out, chunked queries, batched S3, faster outlier drain (#4688)
* improvement(cleanup): batchTrigger fan-out, chunked queries, batched S3, faster outlier drain - Fan cleanup-tasks/logs/soft-deletes out via tasks.batchTrigger (500 ws/chunk); bump to large-1x with concurrencyLimit: 5 - Chunk bulk DELETEs (1000 IDs/stmt) and collectChatFiles JSONB SELECT (500 chats/stmt) to bound worker memory and lock duration - Replace per-key position() table scans with one LATERAL unnest scan per 200-key chunk - Route storage deletes through StorageService.deleteFiles (S3 DeleteObjects: 1000 keys/HTTP) - Raise per-run row cap to 100K so long-tail tenants (one prod workspace has 723K doomed rows) drain in days, not weeks * improvement(cleanup): chunk-index labels, clarify upper-bound failure counter Addresses Greptile review feedback: - Disambiguate downstream logs when a plan splits into multiple workspace chunks (e.g. 'free/1', 'free/2') - Document that deleteRowsById's failed counter is an upper bound (chunk rolls back to 0 deletes on error)
1 parent e27afaa commit 11ad891

8 files changed

Lines changed: 370 additions & 316 deletions

File tree

apps/sim/background/cleanup-logs.ts

Lines changed: 32 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,10 @@ import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/
33
import { createLogger } from '@sim/logger'
44
import { task } from '@trigger.dev/sdk'
55
import { and, eq, inArray, isNull, lt, notInArray, or, sql } from 'drizzle-orm'
6-
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
6+
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
77
import {
88
batchDeleteByWorkspaceAndTimestamp,
9+
chunkArray,
910
chunkedBatchDelete,
1011
type TableCleanupResult,
1112
} from '@/lib/cleanup/batch-delete'
@@ -27,31 +28,34 @@ interface FileDeleteStats {
2728

2829
const RESUMABLE_PAUSED_STATUSES = ['paused', 'partially_resumed', 'cancelling']
2930

31+
/** Caps the per-row predicate cost: keys-per-row is `O(chunk)` not `O(uniqueKeys)`. */
32+
const REFERENCE_CHECK_KEY_CHUNK_SIZE = 200
33+
34+
/**
35+
* One `LATERAL unnest` scan per chunk replaces N per-key sequential scans
36+
* (each detoasting the entire JSONB column). Substring semantics identical.
37+
*/
3038
async function filterLargeValueKeysWithoutRetainedReferences(
3139
keys: string[],
3240
deletedLogIds: string[]
3341
): Promise<string[]> {
3442
if (keys.length === 0 || deletedLogIds.length === 0) return []
3543

36-
const unreferencedKeys: string[] = []
37-
for (const key of Array.from(new Set(keys))) {
38-
const [referencingLog] = await db
39-
.select({ id: workflowExecutionLogs.id })
40-
.from(workflowExecutionLogs)
41-
.where(
42-
and(
43-
notInArray(workflowExecutionLogs.id, deletedLogIds),
44-
sql`position(${key} in ${workflowExecutionLogs.executionData}::text) > 0`
45-
)
46-
)
47-
.limit(1)
48-
49-
if (!referencingLog) {
50-
unreferencedKeys.push(key)
51-
}
44+
const uniqueKeys = Array.from(new Set(keys))
45+
const referencedKeys = new Set<string>()
46+
47+
for (const keyChunk of chunkArray(uniqueKeys, REFERENCE_CHECK_KEY_CHUNK_SIZE)) {
48+
const rows = await db.execute<{ key: string }>(sql`
49+
SELECT DISTINCT k.key AS key
50+
FROM ${workflowExecutionLogs} AS wel,
51+
unnest(${keyChunk}::text[]) AS k(key)
52+
WHERE wel.id <> ALL(${deletedLogIds}::text[])
53+
AND position(k.key in wel.execution_data::text) > 0
54+
`)
55+
for (const row of rows) referencedKeys.add(row.key)
5256
}
5357

54-
return unreferencedKeys
58+
return uniqueKeys.filter((key) => !referencedKeys.has(key))
5559
}
5660

5761
async function deleteExecutionFiles(files: unknown, stats: FileDeleteStats): Promise<void> {
@@ -154,14 +158,7 @@ async function cleanupWorkflowExecutionLogs(
154158
return { ...dbStats, ...fileStats }
155159
}
156160

157-
async function cleanupFreePlanOrphanedSnapshots(
158-
payload: CleanupJobPayload,
159-
retentionHours: number
160-
): Promise<void> {
161-
if (payload.plan !== 'free') {
162-
return
163-
}
164-
161+
async function cleanupFreePlanOrphanedSnapshots(retentionHours: number): Promise<void> {
165162
try {
166163
const retentionDays = Math.floor(retentionHours / 24)
167164
const snapshotsCleaned = await snapshotService.cleanupOrphanedSnapshots(retentionDays + 1)
@@ -173,20 +170,15 @@ async function cleanupFreePlanOrphanedSnapshots(
173170

174171
export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void> {
175172
const startTime = Date.now()
176-
177-
const scope = await resolveCleanupScope('cleanup-logs', payload)
178-
if (!scope) {
179-
logger.info(`[${payload.plan}] No retention configured, skipping`)
180-
return
181-
}
182-
183-
const { workspaceIds, retentionHours, label } = scope
173+
const { workspaceIds, retentionHours, label, plan, runGlobalHousekeeping } = payload
184174

185175
const retentionDate = new Date(Date.now() - retentionHours * 60 * 60 * 1000)
186176

187177
if (workspaceIds.length === 0) {
188178
logger.info(`[${label}] No workspaces to process`)
189-
await cleanupFreePlanOrphanedSnapshots(payload, retentionHours)
179+
if (runGlobalHousekeeping && plan === 'free') {
180+
await cleanupFreePlanOrphanedSnapshots(retentionHours)
181+
}
190182
return
191183
}
192184

@@ -211,13 +203,17 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
211203
tableName: `${label}/job_execution_logs`,
212204
})
213205

214-
await cleanupFreePlanOrphanedSnapshots(payload, retentionHours)
206+
if (runGlobalHousekeeping && plan === 'free') {
207+
await cleanupFreePlanOrphanedSnapshots(retentionHours)
208+
}
215209

216210
const timeElapsed = (Date.now() - startTime) / 1000
217211
logger.info(`[${label}] Job completed in ${timeElapsed.toFixed(2)}s`)
218212
}
219213

220214
export const cleanupLogsTask = task({
221215
id: 'cleanup-logs',
216+
machine: 'large-1x',
217+
queue: { concurrencyLimit: 5 },
222218
run: runCleanupLogs,
223219
})

apps/sim/background/cleanup-soft-deletes.ts

Lines changed: 23 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import {
1515
import { createLogger } from '@sim/logger'
1616
import { task } from '@trigger.dev/sdk'
1717
import { and, inArray, isNotNull, lt } from 'drizzle-orm'
18-
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
18+
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
1919
import {
2020
batchDeleteByWorkspaceAndTimestamp,
2121
deleteRowsById,
@@ -92,22 +92,26 @@ async function cleanupWorkspaceFileStorage(
9292
const stats = { filesDeleted: 0, filesFailed: 0 }
9393
if (!isUsingCloudStorage()) return stats
9494

95-
const toDelete: Array<{ key: string; context: StorageContext }> = [
96-
...scope.legacyRows.map((r) => ({ key: r.key, context: 'workspace' as StorageContext })),
97-
...scope.multiContextRows.map((r) => ({ key: r.key, context: r.context })),
98-
]
95+
const keysByContext = new Map<StorageContext, string[]>()
96+
for (const r of scope.legacyRows) {
97+
const bucket = keysByContext.get('workspace')
98+
if (bucket) bucket.push(r.key)
99+
else keysByContext.set('workspace', [r.key])
100+
}
101+
for (const r of scope.multiContextRows) {
102+
const bucket = keysByContext.get(r.context)
103+
if (bucket) bucket.push(r.key)
104+
else keysByContext.set(r.context, [r.key])
105+
}
99106

100-
await Promise.all(
101-
toDelete.map(async ({ key, context }) => {
102-
try {
103-
await StorageService.deleteFile({ key, context })
104-
stats.filesDeleted++
105-
} catch (error) {
106-
stats.filesFailed++
107-
logger.error(`Failed to delete storage file ${key} (context: ${context}):`, { error })
108-
}
109-
})
110-
)
107+
for (const [context, keys] of keysByContext) {
108+
const result = await StorageService.deleteFiles(keys, context)
109+
stats.filesDeleted += result.deleted
110+
stats.filesFailed += result.failed.length
111+
for (const { key, error } of result.failed) {
112+
logger.error(`Failed to delete storage file ${key} (context: ${context}):`, { error })
113+
}
114+
}
111115

112116
return stats
113117
}
@@ -160,14 +164,7 @@ const CLEANUP_TARGETS = [
160164

161165
export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise<void> {
162166
const startTime = Date.now()
163-
164-
const scope = await resolveCleanupScope('cleanup-soft-deletes', payload)
165-
if (!scope) {
166-
logger.info(`[${payload.plan}] No retention configured, skipping`)
167-
return
168-
}
169-
170-
const { workspaceIds, retentionHours, label } = scope
167+
const { workspaceIds, retentionHours, label } = payload
171168

172169
if (workspaceIds.length === 0) {
173170
logger.info(`[${label}] No workspaces to process`)
@@ -274,5 +271,7 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
274271

275272
export const cleanupSoftDeletesTask = task({
276273
id: 'cleanup-soft-deletes',
274+
machine: 'large-1x',
275+
queue: { concurrencyLimit: 5 },
277276
run: runCleanupSoftDeletes,
278277
})

apps/sim/background/cleanup-tasks.ts

Lines changed: 12 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@ import {
99
} from '@sim/db/schema'
1010
import { createLogger } from '@sim/logger'
1111
import { task } from '@trigger.dev/sdk'
12-
import { and, inArray, lt, sql } from 'drizzle-orm'
13-
import { type CleanupJobPayload, resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
12+
import { and, inArray, lt } from 'drizzle-orm'
13+
import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
1414
import {
1515
batchDeleteByWorkspaceAndTimestamp,
1616
deleteRowsById,
@@ -38,27 +38,6 @@ const RUN_CHILD_TABLES = [
3838
},
3939
] as const
4040

41-
async function deleteByRunIds(
42-
table: (typeof RUN_CHILD_TABLES)[number]['table'],
43-
runIdCol: (typeof RUN_CHILD_TABLES)[number]['runIdCol'],
44-
runIds: string[],
45-
tableName: string
46-
): Promise<TableCleanupResult> {
47-
const result: TableCleanupResult = { table: tableName, deleted: 0, failed: 0 }
48-
try {
49-
const deleted = await db
50-
.delete(table)
51-
.where(inArray(runIdCol, runIds))
52-
.returning({ id: sql`id` })
53-
result.deleted = deleted.length
54-
logger.info(`[${tableName}] Deleted ${deleted.length} rows`)
55-
} catch (error) {
56-
result.failed++
57-
logger.error(`[${tableName}] Delete failed:`, { error })
58-
}
59-
return result
60-
}
61-
6241
async function cleanupRunChildren(
6342
workspaceIds: string[],
6443
retentionDate: Date,
@@ -83,20 +62,13 @@ async function cleanupRunChildren(
8362
const ids = runIds.map((r) => r.id)
8463

8564
return Promise.all(
86-
RUN_CHILD_TABLES.map((t) => deleteByRunIds(t.table, t.runIdCol, ids, `${label}/${t.name}`))
65+
RUN_CHILD_TABLES.map((t) => deleteRowsById(t.table, t.runIdCol, ids, `${label}/${t.name}`))
8766
)
8867
}
8968

9069
export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void> {
9170
const startTime = Date.now()
92-
93-
const scope = await resolveCleanupScope('cleanup-tasks', payload)
94-
if (!scope) {
95-
logger.info(`[${payload.plan}] No retention configured, skipping`)
96-
return
97-
}
98-
99-
const { workspaceIds, retentionHours, label } = scope
71+
const { workspaceIds, retentionHours, label } = payload
10072

10173
if (workspaceIds.length === 0) {
10274
logger.info(`[${label}] No workspaces to process`)
@@ -130,26 +102,12 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
130102
}
131103

132104
// Delete feedback — no direct workspaceId, reuse chat IDs collected above
133-
const feedbackResult: TableCleanupResult = {
134-
table: `${label}/copilotFeedback`,
135-
deleted: 0,
136-
failed: 0,
137-
}
138-
try {
139-
if (doomedChatIds.length > 0) {
140-
const deleted = await db
141-
.delete(copilotFeedback)
142-
.where(inArray(copilotFeedback.chatId, doomedChatIds))
143-
.returning({ id: copilotFeedback.feedbackId })
144-
feedbackResult.deleted = deleted.length
145-
logger.info(`[${feedbackResult.table}] Deleted ${deleted.length} rows`)
146-
} else {
147-
logger.info(`[${feedbackResult.table}] No expired rows found`)
148-
}
149-
} catch (error) {
150-
feedbackResult.failed++
151-
logger.error(`[${feedbackResult.table}] Delete failed:`, { error })
152-
}
105+
const feedbackResult = await deleteRowsById(
106+
copilotFeedback,
107+
copilotFeedback.chatId,
108+
doomedChatIds,
109+
`${label}/copilotFeedback`
110+
)
153111

154112
// Delete copilot runs (has workspaceId directly, cascades checkpoints)
155113
const runsResult = await batchDeleteByWorkspaceAndTimestamp({
@@ -198,5 +156,7 @@ export async function runCleanupTasks(payload: CleanupJobPayload): Promise<void>
198156

199157
export const cleanupTasksTask = task({
200158
id: 'cleanup-tasks',
159+
machine: 'large-1x',
160+
queue: { concurrencyLimit: 5 },
201161
run: runCleanupTasks,
202162
})

0 commit comments

Comments
 (0)