@@ -4,12 +4,55 @@ import {
44} from "@trigger.dev/core/v3" ;
55import { BatchId } from "@trigger.dev/core/v3/isomorphic" ;
66import type { BatchItem , RunEngine } from "@internal/run-engine" ;
7+ import type { BatchTaskRunStatus } from "@trigger.dev/database" ;
78import { prisma , type PrismaClientOrTransaction } from "~/db.server" ;
89import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
910import { logger } from "~/services/logger.server" ;
1011import { ServiceValidationError , WithRunEngine } from "../../v3/services/baseService.server" ;
1112import { BatchPayloadProcessor } from "../concerns/batchPayloads.server" ;
1213
14+ /**
15+ * Phase 2 retry idempotency check (TRI-9944).
16+ *
17+ * Returns true when the batch is in a state that means the Phase 2 stream's
18+ * job has already been done — every item has a TaskRun record (real or
19+ * pre-failed) for the customer to monitor. A retry, or the original call
20+ * racing against a fast-completing BatchQueue, should return sealed:true
21+ * in these states so the SDK stops retrying.
22+ *
23+ * Three "work is done" shapes:
24+ * - status moved out of PENDING into PROCESSING/COMPLETED/PARTIAL_FAILED
25+ * (PROCESSING via our seal, COMPLETED via tryCompleteBatch, PARTIAL_FAILED
26+ * via the V2 batchCompletionCallback).
27+ * - status stuck at PENDING but `sealed=true`: another concurrent
28+ * streamBatchItems call sealed the batch and then the callback's
29+ * happy-path branch reset status to PENDING ("all runs created").
30+ * - status stuck at PENDING with `sealed=false` but `processingCompletedAt`
31+ * set: the cleanup-race. BatchQueue rushed through all items, callback
32+ * fired (setting processingCompletedAt), cleanup deleted the Redis
33+ * metadata — all before our service got the chance to seal. The work
34+ * is done; the discriminator is processingCompletedAt which is set
35+ * exclusively by the V2 completion callback.
36+ *
37+ * ABORTED is excluded — it means ZERO TaskRun records were created (every
38+ * per-item attempt failed AND the pre-failed-TaskRun fallback also failed,
39+ * or queue-overload on every item). The customer has nothing to monitor
40+ * at the run level, so the trigger call must throw to give their retry/
41+ * error handling a chance to create a fresh batch.
42+ */
43+ export function isIdempotentRetrySuccess (
44+ status : BatchTaskRunStatus | null | undefined ,
45+ sealed : boolean | null | undefined ,
46+ processingCompletedAt : Date | null | undefined
47+ ) : boolean {
48+ return (
49+ status === "PROCESSING" ||
50+ status === "COMPLETED" ||
51+ status === "PARTIAL_FAILED" ||
52+ ( status === "PENDING" && ( sealed === true || processingCompletedAt != null ) )
53+ ) ;
54+ }
55+
1356export type StreamBatchItemsServiceOptions = {
1457 maxItemBytes : number ;
1558} ;
@@ -93,20 +136,35 @@ export class StreamBatchItemsService extends WithRunEngine {
93136 runCount : true ,
94137 sealed : true ,
95138 batchVersion : true ,
139+ processingCompletedAt : true ,
96140 } ,
97141 } ) ;
98142
99143 if ( ! batch ) {
100144 throw new ServiceValidationError ( `Batch ${ batchFriendlyId } not found` ) ;
101145 }
102146
103- if ( batch . sealed ) {
104- throw new ServiceValidationError (
105- `Batch ${ batchFriendlyId } is already sealed and cannot accept more items`
106- ) ;
147+ if ( isIdempotentRetrySuccess ( batch . status , batch . sealed , batch . processingCompletedAt ) ) {
148+ logger . info ( "Batch already sealed/completed - treating Phase 2 retry as success" , {
149+ batchId : batchFriendlyId ,
150+ batchSealed : batch . sealed ,
151+ batchStatus : batch . status ,
152+ processingCompletedAt : batch . processingCompletedAt ,
153+ } ) ;
154+
155+ return {
156+ id : batchFriendlyId ,
157+ itemsAccepted : 0 ,
158+ itemsDeduplicated : 0 ,
159+ sealed : true ,
160+ runCount : batch . runCount ,
161+ } ;
107162 }
108163
109164 if ( batch . status !== "PENDING" ) {
165+ // ABORTED or any other unexpected non-PENDING state — surface as an error.
166+ // For ABORTED specifically, throwing is required so the customer's
167+ // batchTrigger() retries (a new batch) can recreate the runs.
110168 throw new ServiceValidationError (
111169 `Batch ${ batchFriendlyId } is not in PENDING status (current: ${ batch . status } )`
112170 ) ;
@@ -220,17 +278,24 @@ export class StreamBatchItemsService extends WithRunEngine {
220278 // COMPLETED (sealed by the BatchQueue completion path before we got here).
221279 const currentBatch = await this . _prisma . batchTaskRun . findFirst ( {
222280 where : { id : batchId } ,
223- select : { sealed : true , status : true } ,
281+ select : { sealed : true , status : true , processingCompletedAt : true } ,
224282 } ) ;
225283
226- if ( currentBatch ?. sealed || currentBatch ?. status === "COMPLETED" ) {
284+ if (
285+ isIdempotentRetrySuccess (
286+ currentBatch ?. status ,
287+ currentBatch ?. sealed ,
288+ currentBatch ?. processingCompletedAt
289+ )
290+ ) {
227291 logger . info ( "Batch already sealed before count check (fast completion)" , {
228292 batchId : batchFriendlyId ,
229293 itemsAccepted,
230294 itemsDeduplicated,
231295 enqueuedCount,
232296 expectedCount : batch . runCount ,
233- batchStatus : currentBatch . status ,
297+ batchStatus : currentBatch ?. status ,
298+ processingCompletedAt : currentBatch ?. processingCompletedAt ,
234299 } ) ;
235300
236301 return {
@@ -242,6 +307,15 @@ export class StreamBatchItemsService extends WithRunEngine {
242307 } ;
243308 }
244309
310+ if ( currentBatch ?. status === "ABORTED" ) {
311+ // Zero TaskRuns exist — the count-mismatch sealed:false semantics
312+ // ("retry with missing items") would mislead the SDK. Throw so the
313+ // customer's batchTrigger() retry creates a fresh batch.
314+ throw new ServiceValidationError (
315+ `Batch ${ batchFriendlyId } is not in PENDING status (current: ABORTED)`
316+ ) ;
317+ }
318+
245319 logger . warn ( "Batch item count mismatch" , {
246320 batchId : batchFriendlyId ,
247321 expected : batch . runCount ,
@@ -300,20 +374,25 @@ export class StreamBatchItemsService extends WithRunEngine {
300374 friendlyId : true ,
301375 status : true ,
302376 sealed : true ,
377+ processingCompletedAt : true ,
303378 } ,
304379 } ) ;
305380
306381 if (
307- ( currentBatch ?. sealed && currentBatch . status === "PROCESSING" ) ||
308- currentBatch ?. status === "COMPLETED"
382+ isIdempotentRetrySuccess (
383+ currentBatch ?. status ,
384+ currentBatch ?. sealed ,
385+ currentBatch ?. processingCompletedAt
386+ )
309387 ) {
310388 logger . info ( "Batch already sealed/completed by concurrent path" , {
311389 batchId : batchFriendlyId ,
312390 itemsAccepted,
313391 itemsDeduplicated,
314392 envId : environment . id ,
315- batchStatus : currentBatch . status ,
316- batchSealed : currentBatch . sealed ,
393+ batchStatus : currentBatch ?. status ,
394+ batchSealed : currentBatch ?. sealed ,
395+ processingCompletedAt : currentBatch ?. processingCompletedAt ,
317396 } ) ;
318397
319398 span . setAttribute ( "itemsAccepted" , itemsAccepted ) ;
0 commit comments