Skip to content

Commit e8caa1d

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): tighten Phase 2 idempotency check; cover PARTIAL_FAILED race
Addresses two PR review findings: CodeRabbit: sealed=true + ABORTED would silently succeed under the previous `if (batch.sealed || ...)` check. V2's batch completion callback can set status=ABORTED (failedRunCount > 0 && successfulRunCount === 0) on a batch that streamBatchItems already sealed — leaving sealed=true alongside a terminally-failed batch. A Phase 2 retry of such a batch must surface the error, not mask it. Devin: PARTIAL_FAILED (failedRunCount > 0 with at least one success) is a real V2 completion-callback status, but neither the pre-loop check nor the post-loop race handlers (lines 226 and 306) accepted it as success. A retry whose original stream succeeded would either 422 at the pre-loop or hit "unexpected state" at the post-loop seal- failed branch. Changes: - Pre-loop: replace the broad `sealed || PROCESSING || COMPLETED` check with an `isIdempotentRetrySuccess` boolean that admits PROCESSING, COMPLETED, PARTIAL_FAILED, or (sealed && PENDING) — ABORTED falls through to the throw. - Post-loop count-mismatch (line 226 region): add PARTIAL_FAILED to the success short-circuit alongside sealed and COMPLETED. - Post-loop seal-failed (line 306 region): add PARTIAL_FAILED to the success short-circuit alongside (sealed && PROCESSING) and COMPLETED. Tests (TDD red-then-green): - New: pre-loop sealed=true + ABORTED → throws (CodeRabbit's case). - New: pre-loop PARTIAL_FAILED → returns sealed:true. - New: post-loop seal-failed race with PARTIAL_FAILED → returns sealed:true (uses the same racingPrisma pattern as the existing COMPLETED race test). All 34 tests in streamBatchItems.test.ts pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent deaa29b commit e8caa1d

2 files changed

Lines changed: 278 additions & 8 deletions

File tree

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -100,12 +100,26 @@ export class StreamBatchItemsService extends WithRunEngine {
100100
throw new ServiceValidationError(`Batch ${batchFriendlyId} not found`);
101101
}
102102

103-
// Phase 2 retry idempotency (TRI-9944): if the batch is already sealed
104-
// or has moved past PENDING into PROCESSING/COMPLETED, this is a retry
105-
// of a request whose response was lost — the original successful request
106-
// already enqueued every item and sealed the batch. Returning sealed:true
107-
// makes the SDK stop retrying instead of throwing a customer-visible 422.
108-
if (batch.sealed || batch.status === "PROCESSING" || batch.status === "COMPLETED") {
103+
// Phase 2 retry idempotency (TRI-9944): a successful original request
104+
// sealed the batch (sealed=true, status=PROCESSING) and the V2 batch
105+
// completion callback can then independently update status to:
106+
// - PENDING (all runs created — sealed stays true)
107+
// - PARTIAL_FAILED (some run creations failed — sealed stays true/false)
108+
// - COMPLETED (set by tryCompleteBatch after every run reaches a final
109+
// state — sealed is NOT set by this path)
110+
// For all of these the Phase 2 stream did its job, so a retry should
111+
// return sealed:true and the SDK stops retrying.
112+
//
113+
// ABORTED is explicitly excluded — it means every run-creation attempt
114+
// failed and the batch is terminally broken; surface that as an error
115+
// rather than masking it as success.
116+
const isIdempotentRetrySuccess =
117+
batch.status === "PROCESSING" ||
118+
batch.status === "COMPLETED" ||
119+
batch.status === "PARTIAL_FAILED" ||
120+
(batch.sealed && batch.status === "PENDING");
121+
122+
if (isIdempotentRetrySuccess) {
109123
logger.info("Batch already sealed/completed - treating Phase 2 retry as success", {
110124
batchId: batchFriendlyId,
111125
batchSealed: batch.sealed,
@@ -239,7 +253,11 @@ export class StreamBatchItemsService extends WithRunEngine {
239253
select: { sealed: true, status: true },
240254
});
241255

242-
if (currentBatch?.sealed || currentBatch?.status === "COMPLETED") {
256+
if (
257+
currentBatch?.sealed ||
258+
currentBatch?.status === "COMPLETED" ||
259+
currentBatch?.status === "PARTIAL_FAILED"
260+
) {
243261
logger.info("Batch already sealed before count check (fast completion)", {
244262
batchId: batchFriendlyId,
245263
itemsAccepted,
@@ -321,7 +339,8 @@ export class StreamBatchItemsService extends WithRunEngine {
321339

322340
if (
323341
(currentBatch?.sealed && currentBatch.status === "PROCESSING") ||
324-
currentBatch?.status === "COMPLETED"
342+
currentBatch?.status === "COMPLETED" ||
343+
currentBatch?.status === "PARTIAL_FAILED"
325344
) {
326345
logger.info("Batch already sealed/completed by concurrent path", {
327346
batchId: batchFriendlyId,

apps/webapp/test/engine/streamBatchItems.test.ts

Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -375,6 +375,257 @@ describe("StreamBatchItemsService", () => {
375375
}
376376
);
377377

378+
containerTest(
379+
"should throw when batch is sealed but ABORTED (callback aborted post-seal must surface as error)",
380+
async ({ prisma, redisOptions }) => {
381+
const engine = new RunEngine({
382+
prisma,
383+
worker: {
384+
redis: redisOptions,
385+
workers: 1,
386+
tasksPerWorker: 10,
387+
pollIntervalMs: 100,
388+
disabled: true,
389+
},
390+
queue: {
391+
redis: redisOptions,
392+
},
393+
runLock: {
394+
redis: redisOptions,
395+
},
396+
machines: {
397+
defaultMachine: "small-1x",
398+
machines: {
399+
"small-1x": {
400+
name: "small-1x" as const,
401+
cpu: 0.5,
402+
memory: 0.5,
403+
centsPerMs: 0.0001,
404+
},
405+
},
406+
baseCostInCents: 0.0005,
407+
},
408+
batchQueue: {
409+
redis: redisOptions,
410+
},
411+
tracer: trace.getTracer("test", "0.0.0"),
412+
});
413+
414+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
415+
416+
// V2 batch completion callback sets status=ABORTED (failedRunCount > 0 &&
417+
// successfulRunCount === 0) without touching sealed=true that the seal
418+
// step previously set. The Phase 2 retry must NOT mask this terminal
419+
// failure as success — every run failed.
420+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
421+
runCount: 2,
422+
status: "ABORTED",
423+
sealed: true,
424+
});
425+
426+
const service = new StreamBatchItemsService({
427+
prisma,
428+
engine,
429+
});
430+
431+
await expect(
432+
service.call(authenticatedEnvironment, batch.friendlyId, itemsToAsyncIterable([]), {
433+
maxItemBytes: 1024 * 1024,
434+
})
435+
).rejects.toThrow(ServiceValidationError);
436+
437+
await engine.quit();
438+
}
439+
);
440+
441+
containerTest(
442+
"should return sealed=true when batch is PARTIAL_FAILED (Phase 2 retry idempotency)",
443+
async ({ prisma, redisOptions }) => {
444+
const engine = new RunEngine({
445+
prisma,
446+
worker: {
447+
redis: redisOptions,
448+
workers: 1,
449+
tasksPerWorker: 10,
450+
pollIntervalMs: 100,
451+
disabled: true,
452+
},
453+
queue: {
454+
redis: redisOptions,
455+
},
456+
runLock: {
457+
redis: redisOptions,
458+
},
459+
machines: {
460+
defaultMachine: "small-1x",
461+
machines: {
462+
"small-1x": {
463+
name: "small-1x" as const,
464+
cpu: 0.5,
465+
memory: 0.5,
466+
centsPerMs: 0.0001,
467+
},
468+
},
469+
baseCostInCents: 0.0005,
470+
},
471+
batchQueue: {
472+
redis: redisOptions,
473+
},
474+
tracer: trace.getTracer("test", "0.0.0"),
475+
});
476+
477+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
478+
479+
// V2 completion callback sets PARTIAL_FAILED when some run-creation
480+
// attempts failed but at least one succeeded. The Phase 2 stream itself
481+
// did its job (items were enqueued and processed), so a retry should
482+
// see this as terminal success — the per-item failures are visible on
483+
// the individual run records.
484+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
485+
runCount: 2,
486+
status: "PARTIAL_FAILED",
487+
sealed: false,
488+
});
489+
490+
const service = new StreamBatchItemsService({
491+
prisma,
492+
engine,
493+
});
494+
495+
const result = await service.call(
496+
authenticatedEnvironment,
497+
batch.friendlyId,
498+
itemsToAsyncIterable([]),
499+
{
500+
maxItemBytes: 1024 * 1024,
501+
}
502+
);
503+
504+
expect(result.sealed).toBe(true);
505+
expect(result.id).toBe(batch.friendlyId);
506+
expect(result.itemsAccepted).toBe(0);
507+
expect(result.itemsDeduplicated).toBe(0);
508+
509+
await engine.quit();
510+
}
511+
);
512+
513+
containerTest(
514+
"should return sealed=true when batch is PARTIAL_FAILED by callback before seal attempt",
515+
async ({ prisma, redisOptions }) => {
516+
const engine = new RunEngine({
517+
prisma,
518+
worker: {
519+
redis: redisOptions,
520+
workers: 1,
521+
tasksPerWorker: 10,
522+
pollIntervalMs: 100,
523+
disabled: true,
524+
},
525+
queue: {
526+
redis: redisOptions,
527+
},
528+
runLock: {
529+
redis: redisOptions,
530+
},
531+
machines: {
532+
defaultMachine: "small-1x",
533+
machines: {
534+
"small-1x": {
535+
name: "small-1x" as const,
536+
cpu: 0.5,
537+
memory: 0.5,
538+
centsPerMs: 0.0001,
539+
},
540+
},
541+
baseCostInCents: 0.0005,
542+
},
543+
batchQueue: {
544+
redis: redisOptions,
545+
},
546+
tracer: trace.getTracer("test", "0.0.0"),
547+
});
548+
549+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
550+
551+
const batch = await createBatch(prisma, authenticatedEnvironment.id, {
552+
runCount: 2,
553+
status: "PENDING",
554+
sealed: false,
555+
});
556+
557+
await engine.initializeBatch({
558+
batchId: batch.id,
559+
friendlyId: batch.friendlyId,
560+
environmentId: authenticatedEnvironment.id,
561+
environmentType: authenticatedEnvironment.type,
562+
organizationId: authenticatedEnvironment.organizationId,
563+
projectId: authenticatedEnvironment.projectId,
564+
runCount: 2,
565+
processingConcurrency: 10,
566+
});
567+
568+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 0, {
569+
task: "test-task",
570+
payload: JSON.stringify({ data: "item1" }),
571+
payloadType: "application/json",
572+
});
573+
await engine.enqueueBatchItem(batch.id, authenticatedEnvironment.id, 1, {
574+
task: "test-task",
575+
payload: JSON.stringify({ data: "item2" }),
576+
payloadType: "application/json",
577+
});
578+
579+
// Simulate the race where V2's batchCompletionCallback runs between
580+
// getEnqueuedCount and the seal updateMany — some runs failed to create
581+
// but at least one succeeded, so callback sets status=PARTIAL_FAILED
582+
// without setting sealed=true.
583+
const racingPrisma = {
584+
...prisma,
585+
batchTaskRun: {
586+
...prisma.batchTaskRun,
587+
findFirst: prisma.batchTaskRun.findFirst.bind(prisma.batchTaskRun),
588+
updateMany: async () => {
589+
await prisma.batchTaskRun.update({
590+
where: { id: batch.id },
591+
data: {
592+
status: "PARTIAL_FAILED",
593+
},
594+
});
595+
return { count: 0 };
596+
},
597+
findUnique: prisma.batchTaskRun.findUnique.bind(prisma.batchTaskRun),
598+
},
599+
} as unknown as PrismaClient;
600+
601+
const service = new StreamBatchItemsService({
602+
prisma: racingPrisma,
603+
engine,
604+
});
605+
606+
const result = await service.call(
607+
authenticatedEnvironment,
608+
batch.friendlyId,
609+
itemsToAsyncIterable([]),
610+
{
611+
maxItemBytes: 1024 * 1024,
612+
}
613+
);
614+
615+
expect(result.sealed).toBe(true);
616+
expect(result.id).toBe(batch.friendlyId);
617+
618+
const updatedBatch = await prisma.batchTaskRun.findUnique({
619+
where: { id: batch.id },
620+
});
621+
622+
expect(updatedBatch?.status).toBe("PARTIAL_FAILED");
623+
expect(updatedBatch?.sealed).toBe(false);
624+
625+
await engine.quit();
626+
}
627+
);
628+
378629
containerTest(
379630
"should return sealed=true when concurrent request already sealed the batch during seal attempt",
380631
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)