Skip to content

Commit 68b1e30

Browse files
d-csclaude
andcommitted
fix(webapp): close metadata-PUT parent/root race with drainer terminal-failure delete
When the metadata-PUT route's primary buffered mutation succeeded, the route then did a second `findRunByIdWithMollifierFallback` read to obtain `parentTaskRunFriendlyId` / `rootTaskRunFriendlyId` for fanning out `body.parentOperations` and `body.rootOperations` to their respective runs. If the drainer's terminal-failure path (`buffer.fail`) ran between the primary mutation landing on the snapshot and the second read — atomically marking FAILED and DELing the entry hash — the second read returned null and the route silently skipped the entire `routeOperationsToRun` fan-out. The caller's parent/root operations were permanently lost even though their primary metadata write had succeeded. Move the parent/root id capture INSIDE `applyMetadataMutationToBufferedRun` — its CAS read already loads the snapshot, so it can extract the ids cheaply on any iteration of its retry loop and surface them on the `applied` outcome. The route then uses the outcome's captured ids directly without a second buffer round trip, closing the race window. FriendlyIds (not internal cuids) are surfaced because the consuming `routeOperationsToRun` helper gates on the `run_…` prefix to decide whether to attempt the buffer fallback; cuids would skip that path. The snapshot's `parentTaskRunId` / `rootTaskRunId` are engine-side cuids, so `RunId.toFriendlyId` converts them — identical to what `readFallback.server.ts` does when assembling its SyntheticRun. Self-fallback to `runId` matches PG semantics: the PG service routes to `taskRun.parentTaskRun?.id ?? taskRun.id` and equivalently for root, so a top-level run's parent/root ops land on itself rather than being silently dropped. Devin follow-up on PR #3756. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 9d861d2 commit 68b1e30

3 files changed

Lines changed: 138 additions & 31 deletions

File tree

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 28 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -204,35 +204,34 @@ const { action } = createActionApiRoute(
204204
// materialised by the time the child is buffered, so the existing
205205
// service handles them; if they're also buffered, the helper
206206
// recurses through the buffered mutation path).
207-
const bufferedEntry = await findRunByIdWithMollifierFallback({
208-
runId,
209-
environmentId: env.id,
210-
organizationId: env.organizationId,
211-
});
212-
if (bufferedEntry) {
213-
// Both parent and root use the friendlyIds derived in
214-
// `readFallback.server.ts` via `internalRunIdToFriendlyId` from the
215-
// internal IDs the engine snapshot carries (`parentTaskRunId` /
216-
// `rootTaskRunId`). The PG-side `UpdateMetadataService` would
217-
// route to `taskRun.parentTaskRun?.id ?? taskRun.id` and
218-
// `taskRun.rootTaskRun?.id ?? taskRun.id` respectively — i.e. fall
219-
// back to the run itself when there's no parent / root. Mirror
220-
// that self-fallback with `?? runId` so a top-level run's
221-
// parent/root ops land on itself (matching PG semantics) instead
222-
// of being silently dropped.
223-
await Promise.all([
224-
routeOperationsToRun(
225-
bufferedEntry.parentTaskRunFriendlyId ?? runId,
226-
body.parentOperations,
227-
env,
228-
),
229-
routeOperationsToRun(
230-
bufferedEntry.rootTaskRunFriendlyId ?? runId,
231-
body.rootOperations,
232-
env,
233-
),
234-
]);
235-
}
207+
//
208+
// Use the parent/root friendlyIds the buffered mutation captured
209+
// during its internal read — NOT a second `findRunByIdWithMollifierFallback`
210+
// call here. The drainer's terminal-failure path DELetes the entry
211+
// hash atomically, so if it fires between the primary mutation
212+
// landing and our route's second read, `bufferedEntry` would come
213+
// back null and the route would silently drop `parentOperations` /
214+
// `rootOperations` after the customer's primary mutation already
215+
// landed on the snapshot. Capturing the ids in the helper's first
216+
// CAS read closes that race.
217+
//
218+
// Self-fallback to `runId` matches PG semantics: the PG service
219+
// routes to `taskRun.parentTaskRun?.id ?? taskRun.id` and
220+
// `taskRun.rootTaskRun?.id ?? taskRun.id`, so a top-level run's
221+
// parent/root ops land on itself rather than being silently
222+
// dropped.
223+
await Promise.all([
224+
routeOperationsToRun(
225+
bufferOutcome.parentTaskRunFriendlyId ?? runId,
226+
body.parentOperations,
227+
env,
228+
),
229+
routeOperationsToRun(
230+
bufferOutcome.rootTaskRunFriendlyId ?? runId,
231+
body.rootOperations,
232+
env,
233+
),
234+
]);
236235

237236
return json({ metadata: bufferOutcome.newMetadata }, { status: 200 });
238237
}

apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts

Lines changed: 48 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,33 @@
11
import { applyMetadataOperations } from "@trigger.dev/core/v3";
22
import type { FlushedRunMetadata } from "@trigger.dev/core/v3/schemas";
3+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
34
import type { MollifierBuffer } from "@trigger.dev/redis-worker";
45
import { logger } from "~/services/logger.server";
56
import { getMollifierBuffer } from "./mollifierBuffer.server";
67

8+
// On `applied` we surface the parent/root friendlyIds captured during
9+
// the snapshot read. Callers that fan parent/root metadata operations
10+
// out to their respective runs can use these without a second
11+
// `findRunByIdWithMollifierFallback` round trip — and, more importantly,
12+
// without racing the drainer's terminal-failure path (which atomically
13+
// DELetes the entry hash). Without these on the outcome the second
14+
// read can come back null mid-route, silently dropping the caller's
15+
// parentOperations / rootOperations after the primary mutation already
16+
// landed on the snapshot.
17+
//
18+
// FriendlyIds (not internal cuids) because the consuming
19+
// `routeOperationsToRun` helper gates on the `run_…` prefix to decide
20+
// whether to attempt the buffer fallback; cuids would skip that path.
21+
// The snapshot's `parentTaskRunId` / `rootTaskRunId` are engine-side
22+
// cuids, so we convert via `RunId.toFriendlyId` here — identical to
23+
// what `readFallback.server.ts` does when assembling its SyntheticRun.
724
export type ApplyMetadataMutationOutcome =
8-
| { kind: "applied"; newMetadata: Record<string, unknown> }
25+
| {
26+
kind: "applied";
27+
newMetadata: Record<string, unknown>;
28+
parentTaskRunFriendlyId: string | undefined;
29+
rootTaskRunFriendlyId: string | undefined;
30+
}
931
| { kind: "not_found" }
1032
| { kind: "busy" }
1133
| { kind: "version_exhausted" }
@@ -68,6 +90,25 @@ export async function applyMetadataMutationToBufferedRun(input: {
6890
const currentMetadataType =
6991
typeof snapshot.metadataType === "string" ? snapshot.metadataType : "application/json";
7092

93+
// Capture parent/root ids during this read so the caller can fan
94+
// parent/root operations out without a second buffer.getEntry. If
95+
// the drainer's terminal-failure path runs between our CAS-write
96+
// below and the route's follow-up, the entry hash would be DELd
97+
// and a second read would return null — silently dropping the
98+
// caller's `body.parentOperations` / `body.rootOperations`. The ids
99+
// themselves are immutable for a run, so capturing them on any
100+
// loop iteration is fine.
101+
const snapshotParentTaskRunInternalId =
102+
typeof snapshot.parentTaskRunId === "string" ? snapshot.parentTaskRunId : undefined;
103+
const snapshotParentTaskRunFriendlyId = snapshotParentTaskRunInternalId
104+
? RunId.toFriendlyId(snapshotParentTaskRunInternalId)
105+
: undefined;
106+
const snapshotRootTaskRunInternalId =
107+
typeof snapshot.rootTaskRunId === "string" ? snapshot.rootTaskRunId : undefined;
108+
const snapshotRootTaskRunFriendlyId = snapshotRootTaskRunInternalId
109+
? RunId.toFriendlyId(snapshotRootTaskRunInternalId)
110+
: undefined;
111+
71112
// Match PG semantics: `body.operations` and `body.metadata` are
72113
// mutually exclusive on a single request. The PG service
73114
// (`UpdateMetadataService.#updateRunMetadata`) branches on
@@ -126,7 +167,12 @@ export async function applyMetadataMutationToBufferedRun(input: {
126167
});
127168

128169
if (cas.kind === "applied") {
129-
return { kind: "applied", newMetadata: metadataObject };
170+
return {
171+
kind: "applied",
172+
newMetadata: metadataObject,
173+
parentTaskRunFriendlyId: snapshotParentTaskRunFriendlyId,
174+
rootTaskRunFriendlyId: snapshotRootTaskRunFriendlyId,
175+
};
130176
}
131177
if (cas.kind === "not_found") return { kind: "not_found" };
132178
if (cas.kind === "busy") return { kind: "busy" };

apps/webapp/test/mollifierApplyMetadataMutation.test.ts

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ vi.mock("~/db.server", () => ({ prisma: {}, $replica: {} }));
44

55
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
66
import type { BufferEntry, MollifierBuffer, CasSetMetadataResult } from "@trigger.dev/redis-worker";
7+
import { RunId } from "@trigger.dev/core/v3/isomorphic";
78

89
// Regression for a CAS retry-exhaustion bug: the default `maxRetries`
910
// was 3, matching the PG-side service, but that exhausts fast when N
@@ -261,6 +262,67 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
261262
expect(stub.buffer.casSetMetadata).not.toHaveBeenCalled();
262263
});
263264

265+
it("surfaces parent/root friendlyIds on `applied` so the route can fan parent/root ops without a second buffer read", async () => {
266+
// Regression: the metadata route used to do a SECOND
267+
// `findRunByIdWithMollifierFallback` after the primary CAS to
268+
// obtain parent/root friendlyIds for `routeOperationsToRun`.
269+
// If the drainer's terminal-failure path ran between the CAS and
270+
// the second read, the entry hash was DELd and the second read
271+
// came back null — the route silently skipped the entire
272+
// parent/root fan-out, dropping `body.parentOperations` /
273+
// `body.rootOperations` after the primary mutation already
274+
// landed. The helper now captures the ids inside its own read
275+
// loop and surfaces them on the `applied` outcome so the route
276+
// never needs a second round trip.
277+
//
278+
// Engine-side snapshot stores internal cuids; we expect the
279+
// helper to convert via `RunId.toFriendlyId` so the outcome
280+
// matches what `readFallback.server.ts` would have produced.
281+
const parentFriendly = RunId.generate().friendlyId;
282+
const rootFriendly = RunId.generate().friendlyId;
283+
const parentInternal = RunId.fromFriendlyId(parentFriendly);
284+
const rootInternal = RunId.fromFriendlyId(rootFriendly);
285+
const stub = makeBufferStub({
286+
parentTaskRunId: parentInternal,
287+
rootTaskRunId: rootInternal,
288+
});
289+
const result = await applyMetadataMutationToBufferedRun({
290+
runId: "run_1",
291+
environmentId: "env_a",
292+
organizationId: "org_1",
293+
maximumSize: 1024 * 1024,
294+
body: { metadata: { counter: 1 } },
295+
buffer: stub.buffer,
296+
});
297+
expect(result.kind).toBe("applied");
298+
if (result.kind === "applied") {
299+
expect(result.parentTaskRunFriendlyId).toBe(parentFriendly);
300+
expect(result.rootTaskRunFriendlyId).toBe(rootFriendly);
301+
}
302+
});
303+
304+
it("`applied` parent/root ids are undefined when the snapshot carries neither (top-level run)", async () => {
305+
// Top-level runs (parentTaskRunId/rootTaskRunId both undefined in
306+
// the engine-trigger snapshot) must surface as undefined on the
307+
// outcome so the route's `?? runId` self-fallback fires —
308+
// matching the PG service's `taskRun.parentTaskRun?.id ??
309+
// taskRun.id` semantics.
310+
const stub = makeBufferStub({});
311+
const result = await applyMetadataMutationToBufferedRun({
312+
runId: "run_1",
313+
environmentId: "env_a",
314+
organizationId: "org_1",
315+
maximumSize: 1024 * 1024,
316+
body: { metadata: { counter: 1 } },
317+
buffer: stub.buffer,
318+
});
319+
expect(result.kind).toBe("applied");
320+
if (result.kind === "applied") {
321+
expect(result.parentTaskRunFriendlyId).toBeUndefined();
322+
expect(result.rootTaskRunFriendlyId).toBeUndefined();
323+
}
324+
});
325+
264326
it("N-way concurrent applies all converge under default budget", async () => {
265327
// Simulate N parallel writers against a shared state. Each writer
266328
// reads, applies a delta, CAS-writes. The Lua CAS forces them to

0 commit comments

Comments
 (0)