@@ -2,13 +2,50 @@ import { RunId } from "@trigger.dev/core/v3/isomorphic";
22import type { PrismaClientOrTransaction , TaskRun } from "@trigger.dev/database" ;
33import { logger } from "~/services/logger.server" ;
44import { resolveIdempotencyKeyTTL } from "~/utils/idempotencyKeys.server" ;
5+ import { ServiceValidationError } from "~/v3/services/common.server" ;
56import type { RunEngine } from "~/v3/runEngine.server" ;
67import { shouldIdempotencyKeyBeCleared } from "~/v3/taskStatus" ;
8+ import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server" ;
9+ import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server" ;
10+ import { claimOrAwait } from "~/v3/mollifier/idempotencyClaim.server" ;
11+ import { makeResolveMollifierFlag } from "~/v3/mollifier/mollifierGate.server" ;
712import type { TraceEventConcern , TriggerTaskRequest } from "../types" ;
813
14+ // In-memory per-org mollifier-enabled check, shared with `evaluateGate`
15+ // (same `Organization.featureFlags` JSON, no DB read). Used to gate the
16+ // pre-gate claim's Redis round-trip so non-mollifier orgs don't pay it
17+ // during staged rollout — see the comment above the claim block in
18+ // handleTriggerRequest.
19+ const resolveOrgMollifierFlag = makeResolveMollifierFlag ( ) ;
20+
21+ // Claim ownership context returned to the caller when the
22+ // IdempotencyKeyConcern won a pre-gate claim. Caller MUST publish the
23+ // winning runId on pipeline success (`publishClaim`) or release the
24+ // claim on failure (`releaseClaim`).
25+ export type ClaimedIdempotency = {
26+ envId : string ;
27+ taskIdentifier : string ;
28+ idempotencyKey : string ;
29+ // Ownership token from `claimOrAwait`. The caller's trigger pipeline
30+ // MUST thread this into publishClaim/releaseClaim so the buffer's
31+ // compare-and-act protects the slot against a stale predecessor.
32+ token : string ;
33+ } ;
34+
935export type IdempotencyKeyConcernResult =
1036 | { isCached : true ; run : TaskRun }
11- | { isCached : false ; idempotencyKey ?: string ; idempotencyKeyExpiresAt ?: Date } ;
37+ | {
38+ isCached : false ;
39+ idempotencyKey ?: string ;
40+ idempotencyKeyExpiresAt ?: Date ;
41+ // Set when this trigger holds a pre-gate claim. The caller's
42+ // trigger pipeline MUST resolve the claim by either publishing
43+ // the runId on success or releasing on failure. Undefined when
44+ // the request has no idempotency key, when the buffer is
45+ // unavailable, or when the request is a triggerAndWait (claim
46+ // path skipped per plan doc).
47+ claim ?: ClaimedIdempotency ;
48+ } ;
1249
1350export class IdempotencyKeyConcern {
1451 constructor (
@@ -17,6 +54,86 @@ export class IdempotencyKeyConcern {
1754 private readonly traceEventConcern : TraceEventConcern
1855 ) { }
1956
57+ // Buffer-side idempotency dedup. Resolves an idempotency key against the
58+ // mollifier buffer when PG missed. Returns a SyntheticRun cast to
59+ // TaskRun so the route handler (which only reads run.id / run.friendlyId)
60+ // can echo the buffered run's friendlyId as a cached hit. Returns null
61+ // for any failure or miss — buffer outages must not 500 the trigger
62+ // hot path; we fail open to "no cache hit" and let the request through.
63+ private async findBufferedRunWithIdempotency (
64+ environmentId : string ,
65+ organizationId : string ,
66+ taskIdentifier : string ,
67+ idempotencyKey : string ,
68+ ) : Promise < TaskRun | null > {
69+ const buffer = getMollifierBuffer ( ) ;
70+ if ( ! buffer ) return null ;
71+
72+ let bufferedRunId : string | null ;
73+ try {
74+ bufferedRunId = await buffer . lookupIdempotency ( {
75+ envId : environmentId ,
76+ taskIdentifier,
77+ idempotencyKey,
78+ } ) ;
79+ } catch ( err ) {
80+ logger . error ( "IdempotencyKeyConcern: buffer lookupIdempotency failed" , {
81+ environmentId,
82+ taskIdentifier,
83+ err : err instanceof Error ? err . message : String ( err ) ,
84+ } ) ;
85+ return null ;
86+ }
87+ if ( ! bufferedRunId ) return null ;
88+
89+ const synthetic = await findRunByIdWithMollifierFallback ( {
90+ runId : bufferedRunId ,
91+ environmentId,
92+ organizationId,
93+ } ) ;
94+ if ( ! synthetic ) return null ;
95+ // PG-resident path enforces idempotency-key expiry below
96+ // (`existingRun.idempotencyKeyExpiresAt < new Date()` clears the key
97+ // and lets a new run go through). The buffer path needs the same
98+ // check — without it a customer who passes `idempotencyKeyTTL: "2s"`
99+ // gets the cached buffered runId returned indefinitely, because the
100+ // buffer entry persists for its own (hours-long) TTL independent of
101+ // the customer's key TTL.
102+ //
103+ // Returning null isn't enough on its own: the trigger pipeline then
104+ // proceeds to `mollifyTrigger`, whose `buffer.accept` Lua dedupes by
105+ // `(envId, taskIdentifier, idempotencyKey)` via SETNX on the same
106+ // `mollifier:idempotency:*` key and would echo the stale runId as
107+ // `duplicate_idempotency`. Clear the buffer-side idempotency
108+ // binding (both the lookup and any in-flight claim) so the next
109+ // accept goes through as a fresh trigger. Mirrors what
110+ // `ResetIdempotencyKeyService` does for the explicit
111+ // reset-via-API path.
112+ if (
113+ synthetic . idempotencyKeyExpiresAt &&
114+ synthetic . idempotencyKeyExpiresAt < new Date ( )
115+ ) {
116+ const buffer = getMollifierBuffer ( ) ;
117+ if ( buffer ) {
118+ try {
119+ await buffer . resetIdempotency ( {
120+ envId : environmentId ,
121+ taskIdentifier,
122+ idempotencyKey,
123+ } ) ;
124+ } catch ( err ) {
125+ logger . warn ( "IdempotencyKeyConcern: failed to reset expired buffer idempotency" , {
126+ envId : environmentId ,
127+ taskIdentifier,
128+ err : err instanceof Error ? err . message : String ( err ) ,
129+ } ) ;
130+ }
131+ }
132+ return null ;
133+ }
134+ return synthetic as unknown as TaskRun ;
135+ }
136+
20137 async handleTriggerRequest (
21138 request : TriggerTaskRequest ,
22139 parentStore : string | undefined
@@ -44,6 +161,25 @@ export class IdempotencyKeyConcern {
44161 } )
45162 : undefined ;
46163
164+ // Buffer fallback per the mollifier-idempotency design. PG missed —
165+ // the same key may belong to a buffered run that hasn't materialised
166+ // yet. Skipped when `resumeParentOnCompletion` is set: blocking a
167+ // parent on a buffered child via waitpoint requires a PG row that
168+ // doesn't exist yet. The follow-up accept's SETNX in mollifyTrigger
169+ // still dedupes the trigger itself; the waitpoint just doesn't fire
170+ // for this rare race window.
171+ if ( ! existingRun && idempotencyKey && ! request . body . options ?. resumeParentOnCompletion ) {
172+ const buffered = await this . findBufferedRunWithIdempotency (
173+ request . environment . id ,
174+ request . environment . organizationId ,
175+ request . taskId ,
176+ idempotencyKey ,
177+ ) ;
178+ if ( buffered ) {
179+ return { isCached : true , run : buffered } ;
180+ }
181+ }
182+
47183 if ( existingRun ) {
48184 // The idempotency key has expired
49185 if ( existingRun . idempotencyKeyExpiresAt && existingRun . idempotencyKeyExpiresAt < new Date ( ) ) {
@@ -133,6 +269,133 @@ export class IdempotencyKeyConcern {
133269 return { isCached : true , run : existingRun } ;
134270 }
135271
272+ // Pre-gate claim — closes the PG+buffer race during gate transition.
273+ // All same-key triggers serialise here before evaluateGate decides
274+ // PG-pass-through vs mollify. Skipped for triggerAndWait
275+ // (resumeParentOnCompletion) — that path bypasses the gate entirely
276+ // and its existing PG-side dedup is sufficient.
277+ //
278+ // Also gated on the same per-org mollifier flag the gate uses: when
279+ // `TRIGGER_MOLLIFIER_ENABLED=1` globally for staged rollout, the buffer
280+ // singleton is constructed and `claimOrAwait` would otherwise issue a
281+ // Redis SETNX for EVERY idempotency-keyed trigger — including orgs
282+ // that haven't opted in. Those orgs never enter the mollify branch
283+ // (the gate always returns pass_through for them), so there's no
284+ // buffer activity to serialise against; PG's unique constraint
285+ // already deduplicates concurrent same-key races. Resolving the org
286+ // flag is a pure in-memory read of `Organization.featureFlags` — no
287+ // DB query, same predicate the gate uses — keeping the claim's Redis
288+ // RTT off the hot path for non-opted-in orgs during incremental
289+ // rollout.
290+ // Match the gate's bypass list (`mollifierGate.server.ts:158-175`).
291+ // debounce + oneTimeUseToken triggers always return pass_through from
292+ // the gate, so claiming a Redis SETNX here is wasted RTT on the
293+ // trigger hot path. Excluding them keeps the claim aligned with the
294+ // gate — if the gate would never mollify the request, there's no
295+ // buffer to serialise against.
296+ const claimEligible =
297+ ! request . body . options ?. resumeParentOnCompletion &&
298+ ! request . body . options ?. debounce &&
299+ ! request . options ?. oneTimeUseToken &&
300+ ( await resolveOrgMollifierFlag ( {
301+ envId : request . environment . id ,
302+ orgId : request . environment . organizationId ,
303+ taskId : request . taskId ,
304+ orgFeatureFlags :
305+ ( ( request . environment . organization ?. featureFlags as
306+ | Record < string , unknown >
307+ | null
308+ | undefined ) ?? null ) ,
309+ } ) ) ;
310+ if ( claimEligible ) {
311+ const ttlSeconds = Math . max (
312+ 1 ,
313+ Math . min (
314+ 30 ,
315+ Math . ceil ( ( idempotencyKeyExpiresAt . getTime ( ) - Date . now ( ) ) / 1000 ) ,
316+ ) ,
317+ ) ;
318+ const outcome = await claimOrAwait ( {
319+ envId : request . environment . id ,
320+ taskIdentifier : request . taskId ,
321+ idempotencyKey,
322+ ttlSeconds,
323+ } ) ;
324+ if ( outcome . kind === "resolved" ) {
325+ // Another concurrent trigger committed first. Re-resolve via the
326+ // existing checks: writer-side PG findFirst first (defeats
327+ // replica lag), then buffer fallback for the buffered case.
328+ const writerRun = await this . prisma . taskRun . findFirst ( {
329+ where : {
330+ runtimeEnvironmentId : request . environment . id ,
331+ idempotencyKey,
332+ taskIdentifier : request . taskId ,
333+ } ,
334+ include : { associatedWaitpoint : true } ,
335+ } ) ;
336+ if ( writerRun ) {
337+ return { isCached : true , run : writerRun } ;
338+ }
339+ const buffered = await this . findBufferedRunWithIdempotency (
340+ request . environment . id ,
341+ request . environment . organizationId ,
342+ request . taskId ,
343+ idempotencyKey ,
344+ ) ;
345+ if ( buffered ) {
346+ return { isCached : true , run : buffered } ;
347+ }
348+ // Claim resolved to a runId nothing can find — the run was
349+ // genuinely lost (claimant errored after publish, drain failed,
350+ // or both the PG row and buffer entry TTL'd out). This is
351+ // terminal, not transient: `lookupIdempotency` self-heals a
352+ // dangling pointer, and `ack` keeps the entry hash as a
353+ // read-fallback past the PG write, so re-polling cannot conjure
354+ // a run that is gone. Falling through to a fresh trigger is the
355+ // correct recovery.
356+ //
357+ // Why falling through claimless is safe (no duplicate runs):
358+ // concurrent triggers that also fall through here converge on a
359+ // single run via the same dedup backstops the claim layer relies
360+ // on — the PG unique constraint on the idempotency key
361+ // (RunDuplicateIdempotencyKeyError → retry resolves to the
362+ // winner) for the pass-through path, and `accept`'s idempotency
363+ // SETNX (`duplicate_idempotency`) for the mollify path. Once the
364+ // first fall-through commits a run, later callers find it via the
365+ // writer-PG / buffer lookups above despite the stale `resolved:`
366+ // slot, which the slot's TTL clears within ~30s. The residual
367+ // cost is a few redundant (deduped) trigger attempts in that
368+ // window, not duplicate runs.
369+ logger . warn ( "idempotency claim resolved but runId not findable" , {
370+ envId : request . environment . id ,
371+ taskIdentifier : request . taskId ,
372+ claimedRunId : outcome . runId ,
373+ } ) ;
374+ }
375+ if ( outcome . kind === "timed_out" ) {
376+ throw new ServiceValidationError (
377+ "Idempotency claim resolution timed out" ,
378+ 503 ,
379+ ) ;
380+ }
381+ if ( outcome . kind === "claimed" ) {
382+ // Caller MUST publish/release. Signalled via the result's
383+ // `claim` field, including the ownership token so the buffer
384+ // can compare-and-act on the slot we now own.
385+ return {
386+ isCached : false ,
387+ idempotencyKey,
388+ idempotencyKeyExpiresAt,
389+ claim : {
390+ envId : request . environment . id ,
391+ taskIdentifier : request . taskId ,
392+ idempotencyKey,
393+ token : outcome . token ,
394+ } ,
395+ } ;
396+ }
397+ }
398+
136399 return { isCached : false , idempotencyKey, idempotencyKeyExpiresAt } ;
137400 }
138401}
0 commit comments