Skip to content

Commit e9d3e62

Browse files
d-csclaude
andcommitted
perf(redis-worker): pipeline HGETALLs in MollifierBuffer.listEntriesForEnv
The stale-sweep calls listEntriesForEnv with maxCount=1000 per env per pass. The previous implementation issued one HGETALL per runId returned by LRANGE, serially — ~1000 round-trips per env per pass. At any meaningful drainer backlog this dominated sweep wall-time and made the sweep run longer than its interval (the inFlight guard then drops every subsequent tick until it finishes). Refactor the per-entry fetch into a single ioredis pipeline so one network batch covers every runId LRANGE returned. Behaviour is preserved end-to-end: - Empty/missing entry hashes are still skipped (concurrent drainer ack/fail between LRANGE and our HGETALL DELs the hash — a runId on the queue with no backing hash is an expected race outcome, not an error). - Per-entry parse failures are logged and skipped without aborting the batch. - Per-entry pipeline errors are logged and skipped (pipeline.exec does not abort on individual command errors). Existing tests for happy-path + empty queue + maxCount<=0 unchanged. Added a new test characterising the torn-down-entry race (`mollifier:entries:r_a` DEL'd between accept and listEntriesForEnv) — passes against both serial and pipelined implementations, so it acts as a safety net for this refactor and future ones. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent df406ff commit e9d3e62

3 files changed

Lines changed: 79 additions & 6 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Pipeline the per-entry `HGETALL` fetches in `MollifierBuffer.listEntriesForEnv`. The previous serial implementation issued one Redis round-trip per runId returned by `LRANGE`, which dominated stale-sweep wall-time at any meaningful backlog (at the sweep's default maxCount=1000, this is ~1000 RTTs per env per pass). Behaviour is unchanged — entries are still skipped when the entry hash has been torn down by a concurrent drainer ack/fail between the LRANGE and the HGETALL.

packages/redis-worker/src/mollifier/buffer.test.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2341,6 +2341,41 @@ describe("MollifierBuffer.listEntriesForEnv", () => {
23412341
}
23422342
});
23432343

2344+
redisTest(
2345+
"skips entries whose hash was torn down between LRANGE and HGETALL (concurrent drainer ack/fail race)",
2346+
{ timeout: 20_000 },
2347+
async ({ redisContainer }) => {
2348+
// The drainer can RPOP + ack/fail an entry between our LRANGE and
2349+
// the per-runId HGETALL — its DEL of the entry hash races our read.
2350+
// listEntriesForEnv must tolerate this: skip the runId, return
2351+
// every other entry. This is exercised here by simulating the race:
2352+
// LPUSH a runId onto the queue without an accompanying entry hash.
2353+
const buffer = new MollifierBuffer({
2354+
redisOptions: {
2355+
host: redisContainer.getHost(),
2356+
port: redisContainer.getPort(),
2357+
password: redisContainer.getPassword(),
2358+
},
2359+
logger: new Logger("test", "log"),
2360+
});
2361+
2362+
try {
2363+
await buffer.accept({ runId: "r_a", envId: "env_race", orgId: "org_1", payload: "{}" });
2364+
await buffer.accept({ runId: "r_b", envId: "env_race", orgId: "org_1", payload: "{}" });
2365+
2366+
// Tear down r_a's hash to simulate the drainer winning the race.
2367+
// The runId stays on the queue LIST but its entry hash is gone —
2368+
// listEntriesForEnv must tolerate the missing HGETALL result.
2369+
await buffer["redis"].del("mollifier:entries:r_a");
2370+
2371+
const entries = await buffer.listEntriesForEnv("env_race", 10);
2372+
expect(entries.map((e) => e.runId).sort()).toEqual(["r_b"]);
2373+
} finally {
2374+
await buffer.close();
2375+
}
2376+
},
2377+
);
2378+
23442379
redisTest("maxCount <= 0 returns empty without hitting redis", { timeout: 20_000 }, async ({ redisContainer }) => {
23452380
const buffer = new MollifierBuffer({
23462381
redisOptions: {

packages/redis-worker/src/mollifier/buffer.ts

Lines changed: 39 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -251,20 +251,53 @@ export class MollifierBuffer {
251251
// Used by the stale-sweep to compute per-entry dwell time, so order is
252252
// immaterial — LRANGE returns them newest-first (LPUSH head) but the
253253
// caller scans the whole window. Non-destructive: the drainer still
254-
// RPOPs these entries in FIFO order. Each entry hash is fetched
255-
// separately; a `null` from getEntry (entry torn down by a concurrent
256-
// drainer ack/fail between LRANGE and HGETALL) is skipped.
254+
// RPOPs these entries in FIFO order.
255+
//
256+
// The entry HGETALLs are issued in a single pipelined batch (one
257+
// network round-trip instead of N) — at the stale-sweep's default
258+
// maxCount=1000 the serial implementation cost ~1000 RTTs per env,
259+
// which dominated sweep wall-time at any meaningful backlog.
260+
//
261+
// A missing entry (empty hash) is skipped: the drainer's RPOP+DEL of
262+
// the entry hash can race our LRANGE→HGETALL window, so a runId on
263+
// the queue with no backing hash is an expected concurrency outcome,
264+
// not an error.
257265
async listEntriesForEnv(envId: string, maxCount: number): Promise<BufferEntry[]> {
258266
if (maxCount <= 0) return [];
259267
const runIds = await this.redis.lrange(
260268
`mollifier:queue:${envId}`,
261269
0,
262270
maxCount - 1,
263271
);
264-
const entries: BufferEntry[] = [];
272+
if (runIds.length === 0) return [];
273+
274+
const pipeline = this.redis.pipeline();
265275
for (const runId of runIds) {
266-
const entry = await this.getEntry(runId);
267-
if (entry) entries.push(entry);
276+
pipeline.hgetall(`mollifier:entries:${runId}`);
277+
}
278+
const results = await pipeline.exec();
279+
if (!results) return [];
280+
281+
const entries: BufferEntry[] = [];
282+
for (let i = 0; i < results.length; i++) {
283+
const [err, raw] = results[i] as [Error | null, Record<string, string> | null];
284+
if (err) {
285+
this.logger.error("MollifierBuffer.listEntriesForEnv: hgetall failed", {
286+
runId: runIds[i],
287+
err: err.message,
288+
});
289+
continue;
290+
}
291+
if (!raw || Object.keys(raw).length === 0) continue;
292+
const parsed = BufferEntrySchema.safeParse(raw);
293+
if (!parsed.success) {
294+
this.logger.error("MollifierBuffer.listEntriesForEnv: invalid entry shape", {
295+
runId: runIds[i],
296+
errors: parsed.error.flatten(),
297+
});
298+
continue;
299+
}
300+
entries.push(parsed.data);
268301
}
269302
return entries;
270303
}

0 commit comments

Comments
 (0)