11import { Logger } from "@trigger.dev/core/logger" ;
2- import pLimit from "p-limit" ;
32import { MollifierBuffer } from "./buffer.js" ;
43import { BufferEntry , deserialiseSnapshot } from "./schemas.js" ;
54
@@ -84,8 +83,8 @@ export class MollifierDrainer<TPayload = unknown> {
8483 private readonly pollIntervalMs : number ;
8584 private readonly maxOrgsPerTick : number ;
8685 private readonly drainBatchSize : number ;
86+ private readonly concurrency : number ;
8787 private readonly logger : Logger ;
88- private readonly limit : ReturnType < typeof pLimit > ;
8988 // Rotation state. `orgCursor` advances through the active-orgs list.
9089 // Each org has its own internal cursor in `perOrgEnvCursors` for
9190 // cycling through that org's envs. Both reset on `start()`.
@@ -104,8 +103,8 @@ export class MollifierDrainer<TPayload = unknown> {
104103 this . pollIntervalMs = options . pollIntervalMs ?? 100 ;
105104 this . maxOrgsPerTick = options . maxOrgsPerTick ?? 500 ;
106105 this . drainBatchSize = Math . max ( 1 , options . drainBatchSize ?? 1 ) ;
106+ this . concurrency = Math . max ( 1 , options . concurrency ) ;
107107 this . logger = options . logger ?? new Logger ( "MollifierDrainer" , "debug" ) ;
108- this . limit = pLimit ( options . concurrency ) ;
109108 }
110109
111110 async runOnce ( ) : Promise < DrainResult > {
@@ -133,64 +132,98 @@ export class MollifierDrainer<TPayload = unknown> {
133132 targets . push ( envId ) ;
134133 }
135134
136- // Pop a batch from each target env in parallel. Within an env we pop
137- // sequentially (each Lua `pop` is atomic; back-to-back pops on the
138- // same env can't be concurrent without a `popBatch` Lua, and Redis
139- // RTT × drainBatchSize is cheap compared to the engine.trigger work
140- // that follows). A pop failure mid-batch aborts only that env's
141- // batch and counts as one failure — same semantics as the previous
142- // one-pop-per-env path, generalised.
143- const envBatches = await Promise . all (
144- targets . map ( async ( envId ) => {
145- const entries : BufferEntry [ ] = [ ] ;
146- let popFailed = false ;
147- for ( let i = 0 ; i < this . drainBatchSize ; i ++ ) {
148- let entry : BufferEntry | null ;
149- try {
150- entry = await this . buffer . pop ( envId ) ;
151- } catch ( err ) {
152- this . logger . error ( "MollifierDrainer.pop failed" , { envId, err } ) ;
153- popFailed = true ;
154- break ;
155- }
156- if ( ! entry ) break ;
157- entries . push ( entry ) ;
135+ if ( targets . length === 0 ) return { drained : 0 , failed : 0 } ;
136+
137+ // Worker-pool draining. We spawn up to `concurrency` workers; each
138+ // worker repeatedly:
139+ // 1. Picks the next env with budget remaining (round-robin),
140+ // atomically claiming one slot of that env's per-tick budget.
141+ // 2. Pops one entry and processes it.
142+ // 3. Repeats until pickNextEnv returns null.
143+ //
144+ // This pattern gives us both invariants the prior two designs traded
145+ // off:
146+ // - Single-env bursts use the full `concurrency` budget. All
147+ // workers can pull from one env, processing `concurrency` entries
148+ // in parallel.
149+ // - The number of entries in "popped-but-not-acked" (DRAINING)
150+ // state at any moment is bounded by the worker count, i.e.
151+ // `concurrency` — same blast radius as the pre-batch
152+ // one-pop-per-env model. A process crash mid-tick strands at
153+ // most `concurrency` entries for stale-sweep to recover, not
154+ // `maxOrgsPerTick × drainBatchSize`.
155+ //
156+ // Fairness: pickNextEnv advances a cursor by 1 each successful pick,
157+ // so workers round-robin across envs at the entry level. Combined
158+ // with the per-env budget cap, an env contributes at most
159+ // `drainBatchSize` entries per tick regardless of how many workers
160+ // are free — a heavy env can't starve siblings within a tick.
161+ const remaining = new Map < string , number > ( ) ;
162+ const skip = new Set < string > ( ) ; // envs with empty queue or pop failure this tick
163+ for ( const envId of targets ) remaining . set ( envId , this . drainBatchSize ) ;
164+
165+ let cursor = 0 ;
166+ const pickNextEnv = ( ) : string | null => {
167+ for ( let i = 0 ; i < targets . length ; i ++ ) {
168+ const idx = ( cursor + i ) % targets . length ;
169+ const envId = targets [ idx ] ! ;
170+ if ( skip . has ( envId ) ) continue ;
171+ const r = remaining . get ( envId ) ?? 0 ;
172+ if ( r > 0 ) {
173+ remaining . set ( envId , r - 1 ) ;
174+ cursor = ( idx + 1 ) % targets . length ;
175+ return envId ;
158176 }
159- return { entries , popFailed } ;
160- } ) ,
161- ) ;
177+ }
178+ return null ;
179+ } ;
162180
163- const popFailures = envBatches . reduce ( ( n , b ) => n + ( b . popFailed ? 1 : 0 ) , 0 ) ;
164- const allEntries = envBatches . flatMap ( ( b ) => b . entries ) ;
165- if ( allEntries . length === 0 ) {
166- return { drained : 0 , failed : popFailures } ;
167- }
181+ let drained = 0 ;
182+ let failed = 0 ;
168183
169- // Dispatch every popped entry through the shared pLimit so the
170- // global in-flight cap is `concurrency` regardless of how many envs
171- // contributed entries this tick. Per-entry errors are caught inside
172- // the closure so a single bad entry can't poison the tick — same
173- // safety net the old `processOneFromEnv` provided.
174- const inflight = allEntries . map ( ( entry ) =>
175- this . limit ( async ( ) => {
184+ const worker = async ( ) : Promise < void > => {
185+ while ( true ) {
186+ const envId = pickNextEnv ( ) ;
187+ if ( envId === null ) return ;
188+ let entry : BufferEntry | null ;
176189 try {
177- return await this . processEntry ( entry ) ;
190+ entry = await this . buffer . pop ( envId ) ;
191+ } catch ( err ) {
192+ // A pop failure on one env aborts that env's batch for this
193+ // tick (don't keep hammering a broken Redis) and counts as
194+ // exactly one failure — same as the pre-batch path on a pop
195+ // blowup. Other envs continue.
196+ this . logger . error ( "MollifierDrainer.pop failed" , { envId, err } ) ;
197+ skip . add ( envId ) ;
198+ failed += 1 ;
199+ continue ;
200+ }
201+ if ( ! entry ) {
202+ // Queue exhausted between scheduling and this pop. Mark the
203+ // env skipped so siblings aren't held up by repeated empty pops.
204+ skip . add ( envId ) ;
205+ continue ;
206+ }
207+ try {
208+ const outcome = await this . processEntry ( entry ) ;
209+ if ( outcome === "drained" ) drained += 1 ;
210+ else failed += 1 ;
178211 } catch ( err ) {
179212 this . logger . error ( "MollifierDrainer.processEntry failed" , {
180- envId : entry . envId ,
213+ envId,
181214 runId : entry . runId ,
182215 err,
183216 } ) ;
184- return " failed" as const ;
217+ failed += 1 ;
185218 }
186- } ) ,
187- ) ;
188-
189- const results = await Promise . all ( inflight ) ;
190- return {
191- drained : results . filter ( ( r ) => r === "drained" ) . length ,
192- failed : results . filter ( ( r ) => r === "failed" ) . length + popFailures ,
219+ }
193220 } ;
221+
222+ const totalBudget = targets . length * this . drainBatchSize ;
223+ const workerCount = Math . min ( this . concurrency , totalBudget ) ;
224+ await Promise . all ( Array . from ( { length : workerCount } , ( ) => worker ( ) ) ) ;
225+
226+ return { drained, failed } ;
194227 }
195228
196229 start ( ) : void {
0 commit comments