Skip to content

Commit 5a25abf

Browse files
matt-aitkenclaude
andcommitted
refactor(webapp): unify Phase 2 retry idempotency check across all 3 branches
After settling the operational contract — ABORTED throws because zero TaskRun records exist for the customer to monitor; every other terminal state returns sealed:true because TaskRun records exist (some may be in failed state, but per-run signals reach the customer through run monitoring) — three inconsistencies remained between the pre-loop check and the two post-loop race handlers: 1. Seal-failed branch threw "unexpected state" on sealed=true + PENDING, which is the legitimate post-callback "all runs created" state (V2 batchCompletionCallback resets PROCESSING → PENDING and leaves sealed=true). Pre-loop and count-mismatch both accept this state. 2. Count-mismatch branch admitted sealed=true + ABORTED via the bare `currentBatch?.sealed` clause, returning sealed:true. Pre-loop throws on this state. The count-mismatch outcome would silently hide a batch where zero TaskRuns were created. 3. Count-mismatch branch's fall-through return (sealed:false) implies "retry with missing items", which is wrong for ABORTED — a fresh batch is needed. Extracted the per-status policy into an exported helper: isIdempotentRetrySuccess(status, sealed) returns true for PROCESSING, COMPLETED, PARTIAL_FAILED, or (sealed && PENDING). ABORTED is excluded so the customer's batchTrigger() retry fires. All three branches now call the same helper. The count-mismatch branch additionally throws explicitly on ABORTED before falling through to the sealed:false return. Tests (TDD red-then-green): - New: seal-failed race with sealed=true + PENDING returns sealed:true (was throwing "unexpected state"). Uses racingPrisma to set the exact post-callback shape during the seal updateMany. - New: count-mismatch race with sealed=true + ABORTED throws ServiceValidationError (was returning sealed:false). Uses a call-counter on findFirst to flip the batch state between the pre-loop read and the re-query. All 36 tests in streamBatchItems.test.ts pass; webapp typecheck clean. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent e8caa1d commit 5a25abf

2 files changed

Lines changed: 289 additions & 33 deletions

File tree

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 50 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,45 @@ import {
44
} from "@trigger.dev/core/v3";
55
import { BatchId } from "@trigger.dev/core/v3/isomorphic";
66
import type { BatchItem, RunEngine } from "@internal/run-engine";
7+
import type { BatchTaskRunStatus } from "@trigger.dev/database";
78
import { prisma, type PrismaClientOrTransaction } from "~/db.server";
89
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
910
import { logger } from "~/services/logger.server";
1011
import { ServiceValidationError, WithRunEngine } from "../../v3/services/baseService.server";
1112
import { BatchPayloadProcessor } from "../concerns/batchPayloads.server";
1213

14+
/**
15+
* Phase 2 retry idempotency check (TRI-9944).
16+
*
17+
* Returns true when the batch is in a state that means the Phase 2 stream's
18+
* job has already been done by an earlier (or concurrent) request — every
19+
* item is enqueued, runs have been or are being created, and at least one
20+
* TaskRun record exists for the customer to monitor. A retry should return
21+
* sealed:true in these states so the SDK stops retrying.
22+
*
23+
* - PROCESSING / sealed=true + PENDING: original sealed; runs are executing
24+
* (PENDING after callback "all runs created") or about to.
25+
* - COMPLETED: every run reached a terminal state (tryCompleteBatch).
26+
* - PARTIAL_FAILED: at least one TaskRun record exists; per-run failures
27+
* are visible at the run level.
28+
*
29+
* ABORTED is intentionally excluded — it means ZERO TaskRun records were
30+
* created (every per-item attempt failed AND the pre-failed-TaskRun fallback
31+
* also failed). The customer has nothing to monitor at the run level, so
32+
* the trigger call must throw to give their retry/error handling a chance.
33+
*/
34+
export function isIdempotentRetrySuccess(
35+
status: BatchTaskRunStatus | null | undefined,
36+
sealed: boolean | null | undefined
37+
): boolean {
38+
return (
39+
status === "PROCESSING" ||
40+
status === "COMPLETED" ||
41+
status === "PARTIAL_FAILED" ||
42+
(sealed === true && status === "PENDING")
43+
);
44+
}
45+
1346
export type StreamBatchItemsServiceOptions = {
1447
maxItemBytes: number;
1548
};
@@ -100,26 +133,7 @@ export class StreamBatchItemsService extends WithRunEngine {
100133
throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`);
101134
}
102135

103-
// Phase 2 retry idempotency (TRI-9944): a successful original request
104-
// sealed the batch (sealed=true, status=PROCESSING) and the V2 batch
105-
// completion callback can then independently update status to:
106-
// - PENDING (all runs created — sealed stays true)
107-
// - PARTIAL_FAILED (some run creations failed — sealed stays true/false)
108-
// - COMPLETED (set by tryCompleteBatch after every run reaches a final
109-
// state — sealed is NOT set by this path)
110-
// For all of these the Phase 2 stream did its job, so a retry should
111-
// return sealed:true and the SDK stops retrying.
112-
//
113-
// ABORTED is explicitly excluded — it means every run-creation attempt
114-
// failed and the batch is terminally broken; surface that as an error
115-
// rather than masking it as success.
116-
const isIdempotentRetrySuccess =
117-
batch.status === "PROCESSING" ||
118-
batch.status === "COMPLETED" ||
119-
batch.status === "PARTIAL_FAILED" ||
120-
(batch.sealed && batch.status === "PENDING");
121-
122-
if (isIdempotentRetrySuccess) {
136+
if (isIdempotentRetrySuccess(batch.status, batch.sealed)) {
123137
logger.info("Batch already sealed/completed - treating Phase 2 retry as success", {
124138
batchId: batchFriendlyId,
125139
batchSealed: batch.sealed,
@@ -137,6 +151,8 @@ export class StreamBatchItemsService extends WithRunEngine {
137151

138152
if (batch.status !== "PENDING") {
139153
// ABORTED or any other unexpected non-PENDING state — surface as an error.
154+
// For ABORTED specifically, throwing is required so the customer's
155+
// batchTrigger() retries (a new batch) can recreate the runs.
140156
throw new ServiceValidationError(
141157
`Batch ${batchFriendlyId} is not in PENDING status (current: ${batch.status})`
142158
);
@@ -253,18 +269,14 @@ export class StreamBatchItemsService extends WithRunEngine {
253269
select: { sealed: true, status: true },
254270
});
255271

256-
if (
257-
currentBatch?.sealed ||
258-
currentBatch?.status === "COMPLETED" ||
259-
currentBatch?.status === "PARTIAL_FAILED"
260-
) {
272+
if (isIdempotentRetrySuccess(currentBatch?.status, currentBatch?.sealed)) {
261273
logger.info("Batch already sealed before count check (fast completion)", {
262274
batchId: batchFriendlyId,
263275
itemsAccepted,
264276
itemsDeduplicated,
265277
enqueuedCount,
266278
expectedCount: batch.runCount,
267-
batchStatus: currentBatch.status,
279+
batchStatus: currentBatch?.status,
268280
});
269281

270282
return {
@@ -276,6 +288,15 @@ export class StreamBatchItemsService extends WithRunEngine {
276288
};
277289
}
278290

291+
if (currentBatch?.status === "ABORTED") {
292+
// Zero TaskRuns exist — the count-mismatch sealed:false semantics
293+
// ("retry with missing items") would mislead the SDK. Throw so the
294+
// customer's batchTrigger() retry creates a fresh batch.
295+
throw new ServiceValidationError(
296+
`Batch ${batchFriendlyId} is not in PENDING status (current: ABORTED)`
297+
);
298+
}
299+
279300
logger.warn("Batch item count mismatch", {
280301
batchId: batchFriendlyId,
281302
expected: batch.runCount,
@@ -337,18 +358,14 @@ export class StreamBatchItemsService extends WithRunEngine {
337358
},
338359
});
339360

340-
if (
341-
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
342-
currentBatch?.status === "COMPLETED" ||
343-
currentBatch?.status === "PARTIAL_FAILED"
344-
) {
361+
if (isIdempotentRetrySuccess(currentBatch?.status, currentBatch?.sealed)) {
345362
logger.info("Batch already sealed/completed by concurrent path", {
346363
batchId: batchFriendlyId,
347364
itemsAccepted,
348365
itemsDeduplicated,
349366
envId: environment.id,
350-
batchStatus: currentBatch.status,
351-
batchSealed: currentBatch.sealed,
367+
batchStatus: currentBatch?.status,
368+
batchSealed: currentBatch?.sealed,
352369
});
353370

354371
span.setAttribute("itemsAccepted", itemsAccepted);

apps/webapp/test/engine/streamBatchItems.test.ts

Lines changed: 239 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1083,6 +1083,245 @@ describe("StreamBatchItemsService", () => {
10831083
await engine.quit();
10841084
}
10851085
);
1086+
1087+
containerTest(
1088+
"should return sealed=true when seal-failed race produces sealed=true + PENDING (post-callback all-created)",
1089+
async ({ prisma, redisOptions }) => {
1090+
const engine = new RunEngine({
1091+
prisma,
1092+
worker: {
1093+
redis: redisOptions,
1094+
workers: 1,
1095+
tasksPerWorker: 10,
1096+
pollIntervalMs: 100,
1097+
disabled: true,
1098+
},
1099+
queue: {
1100+
redis: redisOptions,
1101+
},
1102+
runLock: {
1103+
redis: redisOptions,
1104+
},
1105+
machines: {
1106+
defaultMachine: "small-1x",
1107+
machines: {
1108+
"small-1x": {
1109+
name: "small-1x" as const,
1110+
cpu: 0.5,
1111+
memory: 0.5,
1112+
centsPerMs: 0.0001,
1113+
},
1114+
},
1115+
baseCostInCents: 0.0005,
1116+
},
1117+
batchQueue: {
1118+
redis: redisOptions,
1119+
},
1120+
tracer: trace.getTracer("test", "0.0.0"),
1121+
});
1122+
1123+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
1124+
1125+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
1126+
runCount: 2,
1127+
status: "PENDING",
1128+
sealed: false,
1129+
});
1130+
1131+
await engine.initializeBatch({
1132+
batchId: batch.id,
1133+
friendlyId: batch.friendlyId,
1134+
environmentId: authenticatedEnvironment.id,
1135+
environmentType: authenticatedEnvironment.type,
1136+
organizationId: authenticatedEnvironment.organizationId,
1137+
projectId: authenticatedEnvironment.projectId,
1138+
runCount: 2,
1139+
processingConcurrency: 10,
1140+
});
1141+
1142+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
1143+
task: "test-task",
1144+
payload: JSON.stringify({ data: "item1" }),
1145+
payloadType: "application/json",
1146+
});
1147+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
1148+
task: "test-task",
1149+
payload: JSON.stringify({ data: "item2" }),
1150+
payloadType: "application/json",
1151+
});
1152+
1153+
// Simulate the race where a concurrent path seals the batch (sealed=true,
1154+
// PROCESSING), then the V2 batchCompletionCallback fires with all runs
1155+
// created successfully and resets status to PENDING (sealed stays true).
1156+
// Our seal updateMany then fails the conditional (sealed=false no longer
1157+
// matches), and the re-query sees sealed=true + PENDING — a perfectly
1158+
// valid post-callback state that the SDK retry should treat as success.
1159+
const racingPrisma = {
1160+
...prisma,
1161+
batchTaskRun: {
1162+
...prisma.batchTaskRun,
1163+
findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun),
1164+
updateMany: async () => {
1165+
await prisma.batchTaskRun.update({
1166+
where: { id: batch.id },
1167+
data: {
1168+
sealed: true,
1169+
sealedAt: new Date(),
1170+
// Intentionally leave status as PENDING — that's exactly what
1171+
// the V2 batchCompletionCallback does after all runs are
1172+
// created (status PROCESSING → PENDING).
1173+
},
1174+
});
1175+
return { count: 0 };
1176+
},
1177+
findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun),
1178+
},
1179+
} as unknown as PrismaClient;
1180+
1181+
const service = new StreamBatchItemsService({
1182+
prisma: racingPrisma,
1183+
engine,
1184+
});
1185+
1186+
const result = await service.call(
1187+
authenticatedEnvironment,
1188+
batch.friendlyId,
1189+
itemsToAsyncIterable([]),
1190+
{
1191+
maxItemBytes: 1024 * 1024,
1192+
}
1193+
);
1194+
1195+
expect(result.sealed).toBe(true);
1196+
expect(result.id).toBe(batch.friendlyId);
1197+
1198+
const updatedBatch = await prisma.batchTaskRun.findUnique({
1199+
where: { id: batch.id },
1200+
});
1201+
1202+
expect(updatedBatch?.sealed).toBe(true);
1203+
expect(updatedBatch?.status).toBe("PENDING");
1204+
1205+
await engine.quit();
1206+
}
1207+
);
1208+
1209+
containerTest(
1210+
"should throw when count-mismatch race produces sealed=true + ABORTED (no TaskRuns created)",
1211+
async ({ prisma, redisOptions }) => {
1212+
const engine = new RunEngine({
1213+
prisma,
1214+
worker: {
1215+
redis: redisOptions,
1216+
workers: 1,
1217+
tasksPerWorker: 10,
1218+
pollIntervalMs: 100,
1219+
disabled: true,
1220+
},
1221+
queue: {
1222+
redis: redisOptions,
1223+
},
1224+
runLock: {
1225+
redis: redisOptions,
1226+
},
1227+
machines: {
1228+
defaultMachine: "small-1x",
1229+
machines: {
1230+
"small-1x": {
1231+
name: "small-1x" as const,
1232+
cpu: 0.5,
1233+
memory: 0.5,
1234+
centsPerMs: 0.0001,
1235+
},
1236+
},
1237+
baseCostInCents: 0.0005,
1238+
},
1239+
batchQueue: {
1240+
redis: redisOptions,
1241+
},
1242+
tracer: trace.getTracer("test", "0.0.0"),
1243+
});
1244+
1245+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
1246+
1247+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
1248+
runCount: 3,
1249+
status: "PENDING",
1250+
sealed: false,
1251+
});
1252+
1253+
await engine.initializeBatch({
1254+
batchId: batch.id,
1255+
friendlyId: batch.friendlyId,
1256+
environmentId: authenticatedEnvironment.id,
1257+
environmentType: authenticatedEnvironment.type,
1258+
organizationId: authenticatedEnvironment.organizationId,
1259+
projectId: authenticatedEnvironment.projectId,
1260+
runCount: 3,
1261+
processingConcurrency: 10,
1262+
});
1263+
1264+
// Only enqueue 2 items so the post-loop count check trips into the
1265+
// mismatch handler. The race we're simulating: between our pre-loop
1266+
// findFirst and the count-mismatch re-query, a concurrent path sealed
1267+
// the batch, runs were attempted, every run-creation failed AND the
1268+
// pre-failed-TaskRun fallback also failed → callback sets ABORTED.
1269+
// The customer has zero TaskRun records to monitor, so the retry must
1270+
// throw rather than silently succeed.
1271+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
1272+
task: "test-task",
1273+
payload: JSON.stringify({ data: "item1" }),
1274+
payloadType: "application/json",
1275+
});
1276+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
1277+
task: "test-task",
1278+
payload: JSON.stringify({ data: "item2" }),
1279+
payloadType: "application/json",
1280+
});
1281+
1282+
// Override findFirst to flip the batch to sealed=true + ABORTED on the
1283+
// re-query that happens INSIDE the count-mismatch branch. The first
1284+
// findFirst (pre-loop) must still see PENDING + sealed=false so we
1285+
// pass through and reach the count-mismatch branch.
1286+
let findFirstCallCount = 0;
1287+
const racingPrisma = {
1288+
...prisma,
1289+
batchTaskRun: {
1290+
...prisma.batchTaskRun,
1291+
findFirst: async (args: Parameters<typeof prisma.batchTaskRun.findFirst>[0]) => {
1292+
findFirstCallCount++;
1293+
if (findFirstCallCount >= 2) {
1294+
await prisma.batchTaskRun.update({
1295+
where: { id: batch.id },
1296+
data: {
1297+
sealed: true,
1298+
sealedAt: new Date(),
1299+
status: "ABORTED",
1300+
completedAt: new Date(),
1301+
},
1302+
});
1303+
}
1304+
return prisma.batchTaskRun.findFirst.call(prisma.batchTaskRun, args);
1305+
},
1306+
updateMany: prisma.batchTaskRun.updateMany.bind(prisma.batchTaskRun),
1307+
findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun),
1308+
},
1309+
} as unknown as PrismaClient;
1310+
1311+
const service = new StreamBatchItemsService({
1312+
prisma: racingPrisma,
1313+
engine,
1314+
});
1315+
1316+
await expect(
1317+
service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), {
1318+
maxItemBytes: 1024 * 1024,
1319+
})
1320+
).rejects.toThrow(ServiceValidationError);
1321+
1322+
await engine.quit();
1323+
}
1324+
);
10861325
});
10871326

10881327
describe("createNdjsonParserStream", () => {

0 commit comments

Comments
 (0)