Skip to content

Commit e195077

Browse files
d-csclaude
andauthored
feat(webapp): mollifier API mutations on buffered runs (#3756)
## Summary Cancel, replay, reschedule, metadata, tags, and idempotency-key-reset now succeed against a run that's still in the mollifier buffer. Mutations are applied to the buffered snapshot via Lua CAS; the drainer carries the mutation forward when it replays. Primitives added: - `mutateWithFallback` — PG-first / buffer-fallback resolver with bounded-wait safety net for entries that transition mid-mutation. - `applyMetadataMutation` — buffered metadata PUT mirroring the PG-side retry loop with CAS atomicity. - `resolveRunForMutation` — discriminated-union resolver used by route `findResource` so the route builder's pre-action 404 check sees buffered runs. Routes wired (whole files, no GET/POST splits): - `api.v2.runs.\$runParam.cancel.ts` - `api.v1.runs.\$runParam.replay.ts` - `api.v1.runs.\$runParam.reschedule.ts` - `api.v1.runs.\$runId.metadata.ts` - `api.v1.runs.\$runId.tags.ts` - `resetIdempotencyKey.server.ts` Stacked on the reads PR. ## Test plan - [x] \`pnpm run typecheck --filter webapp\` passes - [x] \`pnpm run test --filter webapp test/mollifierMutateWithFallback.test.ts\` passes - [x] \`pnpm run test --filter webapp test/mollifierApplyMetadataMutation.test.ts\` passes - [x] \`pnpm run test --filter webapp test/mollifierResolveRunForMutation.test.ts\` passes --------- Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a1dc3c5 commit e195077

18 files changed

Lines changed: 2616 additions & 120 deletions

.changeset/mollifier-tag-cap.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/redis-worker": patch
3+
---
4+
5+
Mollifier `mutateSnapshot` now enforces a tag cap: an `append_tags` patch carrying `maxTags` returns `"limit_exceeded"` (writing nothing) when the deduped tag count would exceed the limit, so a buffered run can't accumulate more tags via the tags API than the trigger validator allows at creation.
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
area: webapp
3+
type: feature
4+
---
5+
6+
Mollifier API mutations on buffered runs: tag, metadata, replay, reschedule, cancel, and idempotency-key reset via a buffer-snapshot fallback. When a mutation races a mid-drain run, the wait-and-bounce loop watches the buffer entry in Redis (cheap) and reads the primary exactly once for the actual mutation, instead of polling the writer on a fixed cadence; polls use jittered exponential backoff.

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 236 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,161 @@
1+
import type { LoaderFunctionArgs } from "@remix-run/server-runtime";
12
import { json } from "@remix-run/server-runtime";
23
import { tryCatch } from "@trigger.dev/core/utils";
4+
import type { RunMetadataChangeOperation } from "@trigger.dev/core/v3/schemas";
35
import { UpdateMetadataRequestBody } from "@trigger.dev/core/v3";
46
import { z } from "zod";
7+
import { $replica } from "~/db.server";
8+
// Aliased to avoid shadowing the local `env: AuthenticatedEnvironment`
9+
// parameter the route handler and `routeOperationsToRun` use.
10+
import { env as appEnv } from "~/env.server";
11+
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
12+
import { authenticateApiRequest } from "~/services/apiAuth.server";
13+
import { logger } from "~/services/logger.server";
514
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
615
import { createActionApiRoute } from "~/services/routeBuilders/apiBuilder.server";
716
import { ServiceValidationError } from "~/v3/services/common.server";
17+
import { applyMetadataMutationToBufferedRun } from "~/v3/mollifier/applyMetadataMutation.server";
18+
import { findRunByIdWithMollifierFallback } from "~/v3/mollifier/readFallback.server";
819

920
const ParamsSchema = z.object({
1021
runId: z.string(),
1122
});
1223

24+
// GET handler added to fix the pre-existing route bug where this URL
25+
// returned a Remix "no loader" 400 — only PUT (update) was exported, so
26+
// GET had no handler. Returns `{ metadata, metadataType }` from either
27+
// the Postgres row or the mollifier buffer snapshot.
28+
export async function loader({ request, params }: LoaderFunctionArgs) {
29+
const authenticationResult = await authenticateApiRequest(request);
30+
if (!authenticationResult) {
31+
return json({ error: "Invalid or Missing API Key" }, { status: 401 });
32+
}
33+
34+
const parsed = ParamsSchema.safeParse(params);
35+
if (!parsed.success) {
36+
return json({ error: "Invalid or missing run ID" }, { status: 400 });
37+
}
38+
39+
const env = authenticationResult.environment;
40+
41+
const pgRun = await $replica.taskRun.findFirst({
42+
where: { friendlyId: parsed.data.runId, runtimeEnvironmentId: env.id },
43+
select: { metadata: true, metadataType: true },
44+
});
45+
if (pgRun) {
46+
return json({ metadata: pgRun.metadata, metadataType: pgRun.metadataType }, { status: 200 });
47+
}
48+
49+
const buffered = await findRunByIdWithMollifierFallback({
50+
runId: parsed.data.runId,
51+
environmentId: env.id,
52+
organizationId: env.organizationId,
53+
});
54+
if (buffered) {
55+
return json(
56+
{
57+
metadata: buffered.metadata ?? null,
58+
metadataType: buffered.metadataType ?? "application/json",
59+
},
60+
{ status: 200 }
61+
);
62+
}
63+
64+
return json({ error: "Run not found" }, { status: 404 });
65+
}
66+
67+
// Route parent/root operations to the existing PG service by directly
68+
// invoking it against the parent/root runId. The service ingests via
69+
// its batching worker, which targets PG by id. If the parent/root is
70+
// itself buffered we recurse through our buffered-mutation helper.
71+
// `_ingestion_only` flag: a synthetic body that has the operations
72+
// promoted to top-level `operations` so the service applies them to
73+
// `targetRunId` directly.
74+
// Exported so the silent-failure logging behaviour can be unit-tested.
75+
// The route handler itself isn't an attractive test target (createActionApiRoute
76+
// wraps it in auth + body parsing + error-handler middleware), but the
77+
// fan-out helper carries the load-bearing logic — including the ops-
78+
// visibility branch this change adds.
79+
export async function routeOperationsToRun(
80+
targetRunId: string | undefined,
81+
operations: RunMetadataChangeOperation[] | undefined,
82+
env: AuthenticatedEnvironment
83+
): Promise<void> {
84+
if (!targetRunId || !operations || operations.length === 0) return;
85+
86+
// Try PG first via the existing service (this is how parent/root
87+
// operations have always landed; preserve that). Accepts the full
88+
// AuthenticatedEnvironment so we don't have to recover the unsafe
89+
// `as unknown` cast that the previous narrowed `{ id, organizationId }`
90+
// signature forced on us.
91+
//
92+
// Two non-success outcomes from `call`:
93+
// * throws — PG threw (e.g. "Cannot update metadata for a completed
94+
// run", or a transient PG outage).
95+
// * resolves with undefined — PG row didn't exist (the target may be
96+
// buffered, not yet materialised).
97+
// Either way we want to try the buffer fallback below; treating the
98+
// undefined-return as success would make the fallback unreachable.
99+
const [error, result] = await tryCatch(
100+
updateMetadataService.call(targetRunId, { operations }, env)
101+
);
102+
if (!error && result !== undefined) return;
103+
104+
if (error) {
105+
// PG threw — auxiliary op, stay best-effort and don't surface this
106+
// to the caller (the caller's primary mutation already landed). But
107+
// warn so a genuine PG outage on these ops isn't invisible.
108+
logger.warn("metadata route: parent/root PG op failed", {
109+
targetRunId,
110+
error: error instanceof Error ? error.message : String(error),
111+
});
112+
}
113+
114+
// Buffer fallback only makes sense for friendlyId-keyed entries. The
115+
// PG-side parent/root IDs are internal cuids; the buffer keys entries
116+
// by friendlyId, so passing the internal id would silently no-op.
117+
// Skip explicitly — a buffered child's parent is always materialised
118+
// in PG already (a buffered run hasn't executed, so it can't have
119+
// triggered the child), so the buffered-parent branch isn't actually
120+
// reachable. Treating the no-op as intentional rather than incidental.
121+
if (!targetRunId.startsWith("run_")) return;
122+
123+
// Best-effort buffer fallback. Wrap so a transient Redis throw on
124+
// this auxiliary op can't 500 the request after the primary mutation
125+
// already succeeded.
126+
const [bufferError, bufferOutcome] = await tryCatch(
127+
applyMetadataMutationToBufferedRun({
128+
runId: targetRunId,
129+
environmentId: env.id,
130+
organizationId: env.organizationId,
131+
maximumSize: appEnv.TASK_RUN_METADATA_MAXIMUM_SIZE,
132+
body: { operations },
133+
})
134+
);
135+
if (bufferError) {
136+
logger.warn("metadata route: buffer fallback for parent/root op failed", {
137+
targetRunId,
138+
error: bufferError instanceof Error ? bufferError.message : String(bufferError),
139+
});
140+
return;
141+
}
142+
// `applyMetadataMutationToBufferedRun` reports non-throw failures via
143+
// its returned outcome kind: `not_found`, `busy`, `version_exhausted`,
144+
// `metadata_too_large`. Without inspecting `.kind`, the parent/root
145+
// operation can silently disappear — no PG row landed it (handled
146+
// above) and the buffer rejected it for one of these reasons but the
147+
// helper returned cleanly. Surface a warn log per non-success branch
148+
// so ops can trace why a parent/root op went missing. The customer's
149+
// primary mutation has already succeeded by this point; this remains
150+
// best-effort, so we still don't bubble these to the response.
151+
if (bufferOutcome && bufferOutcome.kind !== "applied") {
152+
logger.warn("metadata route: parent/root buffer op did not apply", {
153+
targetRunId,
154+
kind: bufferOutcome.kind,
155+
});
156+
}
157+
}
158+
13159
const { action } = createActionApiRoute(
14160
{
15161
params: ParamsSchema,
@@ -18,23 +164,104 @@ const { action } = createActionApiRoute(
18164
method: "PUT",
19165
},
20166
async ({ authentication, body, params }) => {
21-
const [error, result] = await tryCatch(
22-
updateMetadataService.call(params.runId, body, authentication.environment)
23-
);
167+
const env = authentication.environment;
168+
const runId = params.runId;
24169

25-
if (error) {
26-
if (error instanceof ServiceValidationError) {
27-
return json({ error: error.message }, { status: error.status ?? 422 });
170+
// PG-canonical path. If the run is in PG, the existing service
171+
// owns the full request shape including parent/root operations,
172+
// metadataVersion CAS, batching, validation — none of which the
173+
// buffer side needs to reimplement.
174+
const [pgError, pgResult] = await tryCatch(
175+
updateMetadataService.call(runId, body, env)
176+
);
177+
if (pgError) {
178+
if (pgError instanceof ServiceValidationError) {
179+
return json({ error: pgError.message }, { status: pgError.status ?? 422 });
28180
}
29-
30181
return json({ error: "Internal Server Error" }, { status: 500 });
31182
}
183+
if (pgResult) {
184+
return json(pgResult, { status: 200 });
185+
}
32186

33-
if (!result) {
187+
// PG miss. Target run is either buffered or genuinely absent.
188+
const bufferOutcome = await applyMetadataMutationToBufferedRun({
189+
runId,
190+
environmentId: env.id,
191+
organizationId: env.organizationId,
192+
maximumSize: appEnv.TASK_RUN_METADATA_MAXIMUM_SIZE,
193+
body: { metadata: body.metadata, operations: body.operations },
194+
});
195+
196+
if (bufferOutcome.kind === "not_found") {
34197
return json({ error: "Task Run not found" }, { status: 404 });
35198
}
199+
if (bufferOutcome.kind === "metadata_too_large") {
200+
// Mirror PG's `MetadataTooLargeError` (413).
201+
return json(
202+
{
203+
error: `Metadata exceeds maximum size of ${bufferOutcome.maximumSize} bytes`,
204+
},
205+
{ status: 413 }
206+
);
207+
}
208+
if (bufferOutcome.kind === "busy") {
209+
// Entry is materialising. Best path is to retry the PG call —
210+
// the row may be visible now. We don't waste a roundtrip in
211+
// the happy path, but a 503 here would be customer-visible
212+
// breakage for legitimately-burst workloads. Hand back 503 with
213+
// a retry hint; SDK retry policy converges.
214+
return json({ error: "Run materialising, retry shortly" }, { status: 503 });
215+
}
216+
if (bufferOutcome.kind === "version_exhausted") {
217+
// Pathological contention — many concurrent metadata writers on
218+
// the same buffered runId. Surface as 503 rather than silently
219+
// dropping the request.
220+
return json({ error: "Metadata write contention; retry shortly" }, { status: 503 });
221+
}
222+
223+
// Buffered metadata mutation succeeded. Fan parent/root operations
224+
// out to their respective runs (parent/root are typically PG-
225+
// materialised by the time the child is buffered, so the existing
226+
// service handles them; if they're also buffered, the helper
227+
// recurses through the buffered mutation path).
228+
//
229+
// Use the parent/root friendlyIds the buffered mutation captured
230+
// during its internal read — NOT a second `findRunByIdWithMollifierFallback`
231+
// call here. The drainer's terminal-failure path DELetes the entry
232+
// hash atomically, so if it fires between the primary mutation
233+
// landing and our route's second read, `bufferedEntry` would come
234+
// back null and the route would silently drop `parentOperations` /
235+
// `rootOperations` after the customer's primary mutation already
236+
// landed on the snapshot. Capturing the ids in the helper's first
237+
// CAS read closes that race.
238+
//
239+
// Self-fallback to `runId` matches PG semantics: the PG service
240+
// routes to `taskRun.parentTaskRun?.id ?? taskRun.id` and
241+
// `taskRun.rootTaskRun?.id ?? taskRun.id`, so a top-level run's
242+
// parent/root ops land on itself rather than being silently
243+
// dropped.
244+
await Promise.all([
245+
routeOperationsToRun(
246+
bufferOutcome.parentTaskRunFriendlyId ?? runId,
247+
body.parentOperations,
248+
env,
249+
),
250+
routeOperationsToRun(
251+
bufferOutcome.rootTaskRunFriendlyId ?? runId,
252+
body.rootOperations,
253+
env,
254+
),
255+
]);
36256

37-
return json(result, { status: 200 });
257+
// Wire-shape parity with the PG branch. `UpdateMetadataService.call`
258+
// returns `{ metadata: <object> }` (see `updateMetadata.server.ts:356-358`),
259+
// sourced from `applyResults.newMetadata` / `parsePacket(metadataPacket)`
260+
// — both parsed `Record<string, unknown>`. `bufferOutcome.newMetadata`
261+
// is typed identically (`applyMetadataMutation.server.ts:27`). SDK
262+
// consumers see the same response shape regardless of which branch
263+
// serves the request.
264+
return json({ metadata: bufferOutcome.newMetadata }, { status: 200 });
38265
}
39266
);
40267

0 commit comments

Comments
 (0)