Skip to content

Commit 6d881e7

Browse files
d-csclaude
andcommitted
fix(webapp): skip buffer-side delayUntil guard on reschedule when entry is materialised
The reschedule route enforces a buffer-side equivalent of the PG service's `taskRun.status !== "DELAYED"` precondition by reading the buffered snapshot's `delayUntil` and 422'ing when absent. Post materialisation the buffer entry hangs around for a 30s grace TTL with `materialised: true`, and `findRunByIdWithMollifierFallback` returns the stale snapshot regardless. If a run was originally buffered without `delayUntil` but subsequently became DELAYED on the PG side (a prior reschedule routed to PG, the engine set the delay through another path, etc.), the guard 422'd a legitimately DELAYED PG row. Switch to a direct `buffer.getEntry()` read and only apply the guard when `entry.materialised !== true`. Post-materialisation we let `mutateWithFallback` route to PG, which runs its own canonical DELAYED check on the actual current state. Also adds env/org scoping at the read — matches the rest of the mutations layer and prevents a cross-environment buffer entry leak from the guard's read. Devin follow-up on PR #3756. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 68b1e30 commit 6d881e7

1 file changed

Lines changed: 29 additions & 14 deletions

File tree

apps/webapp/app/routes/api.v1.runs.$runParam.reschedule.ts

Lines changed: 29 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { logger } from "~/services/logger.server";
1010
import { ServiceValidationError } from "~/v3/services/baseService.server";
1111
import { RescheduleTaskRunService } from "~/v3/services/rescheduleTaskRun.server";
1212
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
13-
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
13+
import { getMollifierBuffer } from "~/v3/mollifier/mollifierBuffer.server";
1414
import { parseDelay } from "~/utils/delays";
1515

1616
const ParamsSchema = z.object({
@@ -58,19 +58,34 @@ export async function action({ request, params }: ActionFunctionArgs) {
5858
// SyntheticRun type doesn't carry a "DELAYED" enum value because
5959
// it's not a terminal status the trace API needs to express; the
6060
// buffered analogue is `delayUntil` set in the snapshot. Gate on
61-
// that. Race window between read and write is bounded: if the
62-
// drainer materialises mid-call, mutateWithFallback falls through
63-
// to the PG mutation which has its own DELAYED check.
64-
const buffered = await findRunByIdWithMollifierFallback({
65-
runId: parsed.data.runParam,
66-
environmentId: env.id,
67-
organizationId: env.organizationId,
68-
});
69-
if (buffered && !buffered.delayUntil) {
70-
return json(
71-
{ error: "Cannot reschedule a run that is not delayed" },
72-
{ status: 422 },
73-
);
61+
// that.
62+
//
63+
// Only apply the guard when the buffer entry is NOT yet
64+
// materialised. Post-materialise the entry sticks around for a
65+
// 30s grace TTL with `materialised: true`, but the PG row is now
66+
// canonical — its DELAYED state may differ from what the snapshot
67+
// recorded at trigger time (e.g. a prior reschedule via the PG
68+
// path, or a delay set by the engine through another mechanism).
69+
// Reading from the stale snapshot would 422 a legitimately-DELAYED
70+
// PG row. When `materialised` we let `mutateWithFallback` route to
71+
// PG, which runs its own canonical DELAYED check.
72+
const buffer = getMollifierBuffer();
73+
const entry = buffer ? await buffer.getEntry(parsed.data.runParam) : null;
74+
const isLiveBuffered =
75+
entry !== null &&
76+
entry.materialised !== true &&
77+
entry.envId === env.id &&
78+
entry.orgId === env.organizationId;
79+
if (isLiveBuffered) {
80+
const snapshot = JSON.parse(entry.payload) as Record<string, unknown>;
81+
const snapshotDelayUntil =
82+
typeof snapshot.delayUntil === "string" ? snapshot.delayUntil : undefined;
83+
if (!snapshotDelayUntil) {
84+
return json(
85+
{ error: "Cannot reschedule a run that is not delayed" },
86+
{ status: 422 },
87+
);
88+
}
7489
}
7590

7691
const outcome = await mutateWithFallback<Response>({

0 commit comments

Comments
 (0)