@@ -3,9 +3,10 @@ import { jobExecutionLogs, pausedExecutions, workflowExecutionLogs } from '@sim/
33import { createLogger } from '@sim/logger'
44import { task } from '@trigger.dev/sdk'
55import { and , eq , inArray , isNull , lt , notInArray , or , sql } from 'drizzle-orm'
6- import { type CleanupJobPayload , resolveCleanupScope } from '@/lib/billing/cleanup-dispatcher'
6+ import type { CleanupJobPayload } from '@/lib/billing/cleanup-dispatcher'
77import {
88 batchDeleteByWorkspaceAndTimestamp ,
9+ chunkArray ,
910 chunkedBatchDelete ,
1011 type TableCleanupResult ,
1112} from '@/lib/cleanup/batch-delete'
@@ -27,33 +28,36 @@ interface FileDeleteStats {
2728
2829const RESUMABLE_PAUSED_STATUSES = [ 'paused' , 'partially_resumed' , 'cancelling' ]
2930
31+ /** Caps the per-row predicate cost: keys-per-row is `O(chunk)` not `O(uniqueKeys)`. */
32+ const REFERENCE_CHECK_KEY_CHUNK_SIZE = 200
33+
34+ /**
35+ * One `LATERAL unnest` scan per chunk replaces N per-key sequential scans
36+ * (each detoasting the entire JSONB column). Substring semantics identical.
37+ */
3038async function filterLargeValueKeysWithoutRetainedReferences (
3139 keys : string [ ] ,
3240 deletedLogIds : string [ ] ,
3341 workspaceIds : string [ ]
3442) : Promise < string [ ] > {
3543 if ( keys . length === 0 || deletedLogIds . length === 0 || workspaceIds . length === 0 ) return [ ]
3644
37- const unreferencedKeys : string [ ] = [ ]
38- for ( const key of Array . from ( new Set ( keys ) ) ) {
39- const [ referencingLog ] = await db
40- . select ( { id : workflowExecutionLogs . id } )
41- . from ( workflowExecutionLogs )
42- . where (
43- and (
44- inArray ( workflowExecutionLogs . workspaceId , workspaceIds ) ,
45- notInArray ( workflowExecutionLogs . id , deletedLogIds ) ,
46- sql `position(${ key } in ${ workflowExecutionLogs . executionData } ::text) > 0`
47- )
48- )
49- . limit ( 1 )
50-
51- if ( ! referencingLog ) {
52- unreferencedKeys . push ( key )
53- }
45+ const uniqueKeys = Array . from ( new Set ( keys ) )
46+ const referencedKeys = new Set < string > ( )
47+
48+ for ( const keyChunk of chunkArray ( uniqueKeys , REFERENCE_CHECK_KEY_CHUNK_SIZE ) ) {
49+ const rows = await db . execute < { key : string } > ( sql `
50+ SELECT DISTINCT k.key AS key
51+ FROM ${ workflowExecutionLogs } AS wel,
52+ unnest(${ keyChunk } ::text[]) AS k(key)
53+ WHERE wel.workspace_id = ANY(${ workspaceIds } ::text[])
54+ AND wel.id <> ALL(${ deletedLogIds } ::text[])
55+ AND position(k.key in wel.execution_data::text) > 0
56+ ` )
57+ for ( const row of rows ) referencedKeys . add ( row . key )
5458 }
5559
56- return unreferencedKeys
60+ return uniqueKeys . filter ( ( key ) => ! referencedKeys . has ( key ) )
5761}
5862
5963async function deleteExecutionFiles ( files : unknown , stats : FileDeleteStats ) : Promise < void > {
@@ -159,14 +163,7 @@ async function cleanupWorkflowExecutionLogs(
159163 return { ...dbStats , ...fileStats }
160164}
161165
162- async function cleanupFreePlanOrphanedSnapshots (
163- payload : CleanupJobPayload ,
164- retentionHours : number
165- ) : Promise < void > {
166- if ( payload . plan !== 'free' || payload . runGlobalMaintenance !== true ) {
167- return
168- }
169-
166+ async function cleanupFreePlanOrphanedSnapshots ( retentionHours : number ) : Promise < void > {
170167 try {
171168 const retentionDays = Math . floor ( retentionHours / 24 )
172169 const snapshotsCleaned = await snapshotService . cleanupOrphanedSnapshots ( retentionDays + 1 )
@@ -178,20 +175,15 @@ async function cleanupFreePlanOrphanedSnapshots(
178175
179176export async function runCleanupLogs ( payload : CleanupJobPayload ) : Promise < void > {
180177 const startTime = Date . now ( )
181-
182- const scope = await resolveCleanupScope ( 'cleanup-logs' , payload )
183- if ( ! scope ) {
184- logger . info ( `[${ payload . plan } ] No retention configured, skipping` )
185- return
186- }
187-
188- const { workspaceIds, retentionHours, label } = scope
178+ const { workspaceIds, retentionHours, label, plan, runGlobalHousekeeping } = payload
189179
190180 const retentionDate = new Date ( Date . now ( ) - retentionHours * 60 * 60 * 1000 )
191181
192182 if ( workspaceIds . length === 0 ) {
193183 logger . info ( `[${ label } ] No workspaces to process` )
194- await cleanupFreePlanOrphanedSnapshots ( payload , retentionHours )
184+ if ( runGlobalHousekeeping && plan === 'free' ) {
185+ await cleanupFreePlanOrphanedSnapshots ( retentionHours )
186+ }
195187 return
196188 }
197189
@@ -216,17 +208,17 @@ export async function runCleanupLogs(payload: CleanupJobPayload): Promise<void>
216208 tableName : `${ label } /job_execution_logs` ,
217209 } )
218210
219- await cleanupFreePlanOrphanedSnapshots ( payload , retentionHours )
211+ if ( runGlobalHousekeeping && plan === 'free' ) {
212+ await cleanupFreePlanOrphanedSnapshots ( retentionHours )
213+ }
220214
221215 const timeElapsed = ( Date . now ( ) - startTime ) / 1000
222216 logger . info ( `[${ label } ] Job completed in ${ timeElapsed . toFixed ( 2 ) } s` )
223217}
224218
225219export const cleanupLogsTask = task ( {
226220 id : 'cleanup-logs' ,
227- queue : {
228- name : 'cleanup-logs' ,
229- concurrencyLimit : 1 ,
230- } ,
221+ machine : 'large-1x' ,
222+ queue : { concurrencyLimit : 5 } ,
231223 run : runCleanupLogs ,
232224} )
0 commit comments