Skip to content

Commit 77fc3b7

Browse files
d-csclaude
andcommitted
fix(webapp): close cross-env mollifier mutation gap + Devin review followups
Four Devin-flagged issues on PR #3756: * Cross-env auth gate on the buffer mutation path (#2). mutateWithFallback and applyMetadataMutationToBufferedRun now verify entry.envId/orgId match the caller's environmentId/organizationId before any buffer write, so a token authed in env A can't mutate a buffered run in env B by guessing the friendlyId. Mismatches return not_found (no existence leak), mirroring the env scoping the PG path already enforces via Prisma filters. * Unhandled error in routeOperationsToRun (#0). The parent/root op fan-out is documented as best-effort but a Redis throw used to 500 the request even though the primary mutation already landed. The buffer fallback now runs through tryCatch and warns instead of throwing. * Silent no-op when parent metadata routes to an internal id (#1). The PG service accepts internal ids, but the buffer is keyed by friendlyId; passing an internal cuid to the fallback was a silent miss. Made it an intentional skip (with a comment explaining why a buffered child's parent is always materialised already). * BufferedReplayInputSchema strips seedMetadata (#3). Replays from a buffered source were silently losing initial metadata vs PG-sourced replays. Added seedMetadata + seedMetadataType to the schema. Tests added: cross-env + cross-org gate cases on both helpers. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent d826845 commit 77fc3b7

6 files changed

Lines changed: 224 additions & 21 deletions

apps/webapp/app/routes/api.v1.runs.$runId.metadata.ts

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -86,20 +86,41 @@ async function routeOperationsToRun(
8686
if (!error) return;
8787

8888
// PG service threw — commonly "Cannot update metadata for a completed
89-
// run", but it could also be a transient PG failure. The parent/root
90-
// ops are auxiliary, so we stay best-effort and don't surface this to
91-
// the caller — but we must not swallow the failure silently, otherwise
92-
// a genuine PG outage on these ops is invisible. Warn, then try the
93-
// buffer in case the target is itself buffered.
94-
logger.warn("metadata route: parent/root PG op failed, falling back to buffer", {
89+
// run", but it could also be a transient PG failure. Parent/root ops
90+
// are auxiliary (the caller's primary mutation already landed); stay
91+
// best-effort and don't surface this to the caller — but warn so a
92+
// genuine PG outage on these ops isn't invisible.
93+
logger.warn("metadata route: parent/root PG op failed", {
9594
targetRunId,
9695
error: error instanceof Error ? error.message : String(error),
9796
});
9897

99-
await applyMetadataMutationToBufferedRun({
100-
runId: targetRunId,
101-
body: { operations },
102-
});
98+
// Buffer fallback only makes sense for friendlyId-keyed entries. The
99+
// PG-side parent/root IDs are internal cuids; the buffer keys entries
100+
// by friendlyId, so passing the internal id would silently no-op.
101+
// Skip explicitly — a buffered child's parent is always materialised
102+
// in PG already (a buffered run hasn't executed, so it can't have
103+
// triggered the child), so the buffered-parent branch isn't actually
104+
// reachable. Treating the no-op as intentional rather than incidental.
105+
if (!targetRunId.startsWith("run_")) return;
106+
107+
// Best-effort buffer fallback. Wrap so a transient Redis throw on
108+
// this auxiliary op can't 500 the request after the primary mutation
109+
// already succeeded.
110+
const [bufferError] = await tryCatch(
111+
applyMetadataMutationToBufferedRun({
112+
runId: targetRunId,
113+
environmentId: env.id,
114+
organizationId: env.organizationId,
115+
body: { operations },
116+
})
117+
);
118+
if (bufferError) {
119+
logger.warn("metadata route: buffer fallback for parent/root op failed", {
120+
targetRunId,
121+
error: bufferError instanceof Error ? bufferError.message : String(bufferError),
122+
});
123+
}
103124
}
104125

105126
const { action } = createActionApiRoute(
@@ -133,6 +154,8 @@ const { action } = createActionApiRoute(
133154
// PG miss. Target run is either buffered or genuinely absent.
134155
const bufferOutcome = await applyMetadataMutationToBufferedRun({
135156
runId,
157+
environmentId: env.id,
158+
organizationId: env.organizationId,
136159
body: { metadata: body.metadata, operations: body.operations },
137160
});
138161

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,13 @@ const BufferedReplayInputSchema = z.object({
3838
workerQueue: z.string().nullable().optional(),
3939
machinePreset: z.string().nullable().optional(),
4040
realtimeStreamsVersion: z.string().nullable().optional(),
41+
// ReplayTaskRunService.getExistingMetadata reads these to preserve
42+
// the original run's metadata on replay. Without them in the schema
43+
// they'd be stripped by Zod's default key-passthrough behaviour, and
44+
// a buffered-source replay would silently lose metadata that a
45+
// PG-source replay carries over.
46+
seedMetadata: z.string().nullable().optional(),
47+
seedMetadataType: z.string().nullable().optional(),
4148
});
4249

4350
export async function action({ request, params }: ActionFunctionArgs) {

apps/webapp/app/v3/mollifier/applyMetadataMutation.server.ts

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,13 @@ export type ApplyMetadataMutationOutcome =
1919
// callers never lose an increment / append / set.
2020
export async function applyMetadataMutationToBufferedRun(input: {
2121
runId: string;
22+
// Env+org scoping closes a cross-environment write gap on the buffer
23+
// path: the route's PG path is already env-scoped via Prisma filters,
24+
// and this helper now enforces the same isolation before any buffer
25+
// write so a caller authed in env A can't mutate a buffered run that
26+
// belongs to env B.
27+
environmentId: string;
28+
organizationId: string;
2229
body: Pick<FlushedRunMetadata, "metadata" | "operations">;
2330
buffer?: MollifierBuffer | null;
2431
maxRetries?: number;
@@ -37,6 +44,14 @@ export async function applyMetadataMutationToBufferedRun(input: {
3744
for (let attempt = 0; attempt <= maxRetries; attempt++) {
3845
const entry = await buffer.getEntry(input.runId);
3946
if (!entry) return { kind: "not_found" };
47+
// Env+org check: an entry from a different env is treated as a
48+
// miss (not 403) so existence in other envs doesn't leak.
49+
if (
50+
entry.envId !== input.environmentId ||
51+
entry.orgId !== input.organizationId
52+
) {
53+
return { kind: "not_found" };
54+
}
4055
if (entry.status !== "QUEUED" || entry.materialised) {
4156
return { kind: "busy" };
4257
}

apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,27 @@ export async function mutateWithFallback<TResponse>(
8282
return { kind: "not_found" };
8383
}
8484

85+
// Env-scoped authorization for the buffer path. The replica/writer
86+
// lookups above are already env-scoped via findRunInPg; this closes
87+
// the same gap on the buffer side so a caller authed in env A can't
88+
// mutate a buffered run that belongs to env B (or a different org)
89+
// by guessing its friendlyId. Non-atomic w.r.t. the mutateSnapshot
90+
// call below, but the TOCTOU is benign: runIds are globally unique,
91+
// so a cross-env entry can't suddenly appear after a same-env check.
92+
// A genuinely-missing entry (entry === null) falls through and is
93+
// handled by the existing not_found / writer-recovery path below.
94+
const entryForAuth = await buffer.getEntry(input.runId);
95+
if (
96+
entryForAuth &&
97+
(entryForAuth.envId !== input.environmentId ||
98+
entryForAuth.orgId !== input.organizationId)
99+
) {
100+
// Hide existence on env mismatch: return not_found, same shape as
101+
// a true miss, rather than 403 which would leak that the runId
102+
// exists in some other env.
103+
return { kind: "not_found" };
104+
}
105+
85106
// Path 2 — buffer snapshot mutation.
86107
const result: MutateSnapshotResult = await buffer.mutateSnapshot(
87108
input.runId,

apps/webapp/test/mollifierApplyMetadataMutation.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
8686
const { buffer, state } = makeBufferStub();
8787
const result = await applyMetadataMutationToBufferedRun({
8888
runId: "run_1",
89+
environmentId: "env_a",
90+
organizationId: "org_1",
8991
body: { metadata: { counter: 1 } },
9092
buffer,
9193
});
@@ -99,6 +101,8 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
99101
state.pendingConflictsForNextN = 5;
100102
const result = await applyMetadataMutationToBufferedRun({
101103
runId: "run_1",
104+
environmentId: "env_a",
105+
organizationId: "org_1",
102106
body: { operations: [{ type: "increment", key: "counter", value: 1 }] },
103107
buffer,
104108
});
@@ -124,6 +128,8 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
124128
stub.state.pendingConflictsForNextN = 11;
125129
const result = await applyMetadataMutationToBufferedRun({
126130
runId: "run_1",
131+
environmentId: "env_a",
132+
organizationId: "org_1",
127133
body: { operations: [{ type: "increment", key: "counter", value: 1 }] },
128134
buffer: stub.buffer,
129135
});
@@ -137,6 +143,8 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
137143
stub.state.pendingConflictsForNextN = 99;
138144
const result = await applyMetadataMutationToBufferedRun({
139145
runId: "run_1",
146+
environmentId: "env_a",
147+
organizationId: "org_1",
140148
body: { operations: [{ type: "increment", key: "counter", value: 1 }] },
141149
buffer: stub.buffer,
142150
maxRetries: 12,
@@ -152,13 +160,46 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
152160
stub.state.pendingConflictsForNextN = 8;
153161
const result = await applyMetadataMutationToBufferedRun({
154162
runId: "run_1",
163+
environmentId: "env_a",
164+
organizationId: "org_1",
155165
body: { operations: [{ type: "increment", key: "counter", value: 1 }] },
156166
buffer: stub.buffer,
157167
maxRetries: 3,
158168
});
159169
expect(result.kind).toBe("version_exhausted");
160170
});
161171

172+
it("returns not_found when the buffered entry belongs to a different env (cross-env auth gate)", async () => {
173+
// Same shape as a normal apply call, but the caller's environmentId
174+
// doesn't match the entry's envId. The helper must refuse the
175+
// mutation and return not_found (without leaking existence) and
176+
// must NOT call casSetMetadata.
177+
const stub = makeBufferStub();
178+
const result = await applyMetadataMutationToBufferedRun({
179+
runId: "run_1",
180+
environmentId: "env_OTHER",
181+
organizationId: "org_1",
182+
body: { metadata: { counter: 1 } },
183+
buffer: stub.buffer,
184+
});
185+
expect(result.kind).toBe("not_found");
186+
expect(stub.buffer.casSetMetadata).not.toHaveBeenCalled();
187+
expect(stub.state.version).toBe(0);
188+
});
189+
190+
it("returns not_found when the buffered entry belongs to a different org (cross-org auth gate)", async () => {
191+
const stub = makeBufferStub();
192+
const result = await applyMetadataMutationToBufferedRun({
193+
runId: "run_1",
194+
environmentId: "env_a",
195+
organizationId: "org_OTHER",
196+
body: { metadata: { counter: 1 } },
197+
buffer: stub.buffer,
198+
});
199+
expect(result.kind).toBe("not_found");
200+
expect(stub.buffer.casSetMetadata).not.toHaveBeenCalled();
201+
});
202+
162203
it("N-way concurrent applies all converge under default budget", async () => {
163204
// Simulate N parallel writers against a shared state. Each writer
164205
// reads, applies a delta, CAS-writes. The Lua CAS forces them to
@@ -173,6 +214,8 @@ describe("applyMetadataMutationToBufferedRun — retry behaviour", () => {
173214
const calls = Array.from({ length: N }, () =>
174215
applyMetadataMutationToBufferedRun({
175216
runId: "run_1",
217+
environmentId: "env_a",
218+
organizationId: "org_1",
176219
body: { operations: [{ type: "increment", key: "counter", value: 1 }] },
177220
buffer: sharedStub.buffer,
178221
}),

apps/webapp/test/mollifierMutateWithFallback.test.ts

Lines changed: 105 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,18 +23,37 @@ function fakePrisma(rows: Array<TaskRun | null>): PrismaStub {
2323
return { taskRun: { findFirst: fn } };
2424
}
2525

26+
// Env-matching entry returned by the env-pre-check getEntry call that
27+
// mutateWithFallback now does before any buffer write (cross-env auth
28+
// gate). Same envId/orgId as `baseInput` so the check passes and the
29+
// flow under test proceeds to mutateSnapshot.
30+
const preCheckEntry = (): BufferEntry =>
31+
({
32+
envId: "env_a",
33+
orgId: "org_1",
34+
status: "QUEUED",
35+
materialised: false,
36+
}) as unknown as BufferEntry;
37+
2638
function bufferReturning(result: MutateSnapshotResult): MollifierBuffer {
39+
const getEntry = vi.fn(async () => preCheckEntry());
2740
return {
2841
mutateSnapshot: vi.fn(async () => result),
29-
getEntry: vi.fn(async () => null),
42+
getEntry,
3043
} as unknown as MollifierBuffer;
3144
}
3245

3346
// Buffer whose mutateSnapshot returns "busy" and whose getEntry walks a
34-
// scripted sequence of entry states (the drainer's progress). The last
35-
// element repeats once the sequence is exhausted.
47+
// scripted sequence of entry states. The pre-check getEntry call (one
48+
// extra read before the busy-wait loop, used for env authorization)
49+
// consumes the first scripted result, then the busy-wait loop pops the
50+
// remainder; the last element repeats once the sequence is exhausted.
3651
function bufferBusy(entries: Array<BufferEntry | null>): MollifierBuffer {
3752
const getEntry = vi.fn();
53+
// Pre-check consumes one entry. Use a QUEUED env-matching entry so
54+
// the env-check passes and the flow reaches mutateSnapshot (which
55+
// returns "busy") and enters the wait-loop.
56+
getEntry.mockResolvedValueOnce(preCheckEntry());
3857
for (const e of entries) getEntry.mockResolvedValueOnce(e);
3958
getEntry.mockResolvedValue(entries.length ? entries[entries.length - 1] : null);
4059
return {
@@ -44,11 +63,26 @@ function bufferBusy(entries: Array<BufferEntry | null>): MollifierBuffer {
4463
}
4564

4665
const entryDraining = (): BufferEntry =>
47-
({ status: "DRAINING", materialised: false }) as unknown as BufferEntry;
66+
({
67+
envId: "env_a",
68+
orgId: "org_1",
69+
status: "DRAINING",
70+
materialised: false,
71+
}) as unknown as BufferEntry;
4872
const entryQueued = (): BufferEntry =>
49-
({ status: "QUEUED", materialised: false }) as unknown as BufferEntry;
73+
({
74+
envId: "env_a",
75+
orgId: "org_1",
76+
status: "QUEUED",
77+
materialised: false,
78+
}) as unknown as BufferEntry;
5079
const entryMaterialised = (): BufferEntry =>
51-
({ status: "DRAINING", materialised: true }) as unknown as BufferEntry;
80+
({
81+
envId: "env_a",
82+
orgId: "org_1",
83+
status: "DRAINING",
84+
materialised: true,
85+
}) as unknown as BufferEntry;
5286

5387
const fakeRun = (overrides: Partial<TaskRun> = {}): TaskRun =>
5488
({
@@ -150,8 +184,9 @@ describe("mutateWithFallback", () => {
150184
});
151185
expect(result).toEqual({ kind: "pg", response: "pg-after-wait" });
152186
expect(pgMutation).toHaveBeenCalledWith(row);
153-
// Detection happened against Redis (3 polls), the primary exactly once.
154-
expect(buffer.getEntry).toHaveBeenCalledTimes(3);
187+
// One env-pre-check call + 3 busy-wait polls = 4 getEntry reads;
188+
// primary read exactly once.
189+
expect(buffer.getEntry).toHaveBeenCalledTimes(4);
155190
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
156191
});
157192

@@ -227,7 +262,8 @@ describe("mutateWithFallback", () => {
227262
random: () => 0,
228263
});
229264
expect(result).toEqual({ kind: "pg", response: "pg-after-requeue" });
230-
expect(buffer.getEntry).toHaveBeenCalledTimes(3);
265+
// One env-pre-check + 3 busy-wait polls.
266+
expect(buffer.getEntry).toHaveBeenCalledTimes(4);
231267
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(1);
232268
});
233269

@@ -278,8 +314,8 @@ describe("mutateWithFallback", () => {
278314
abortSignal: controller.signal,
279315
});
280316
expect(result).toEqual({ kind: "timed_out" });
281-
// One buffer poll happened before the sleep+abort; primary untouched.
282-
expect(buffer.getEntry).toHaveBeenCalledTimes(1);
317+
// One env-pre-check + one busy-wait poll before sleep+abort; primary untouched.
318+
expect(buffer.getEntry).toHaveBeenCalledTimes(2);
283319
expect(writer.taskRun.findFirst).toHaveBeenCalledTimes(0);
284320
});
285321

@@ -313,6 +349,64 @@ describe("mutateWithFallback", () => {
313349
).rejects.toThrow(/limit_exceeded/);
314350
});
315351

352+
it("replica miss + buffer entry belongs to a different env → not_found (cross-env auth gate)", async () => {
353+
// Same flow as the applied_to_snapshot test, except the entry's
354+
// envId doesn't match input.environmentId. mutateWithFallback must
355+
// refuse the write and return not_found (without leaking that the
356+
// runId exists in another env), and must NOT call mutateSnapshot.
357+
const crossEnvEntry: BufferEntry = {
358+
envId: "env_OTHER",
359+
orgId: "org_1",
360+
status: "QUEUED",
361+
materialised: false,
362+
} as unknown as BufferEntry;
363+
const mutateSnapshot = vi.fn(async () => "applied_to_snapshot" as const);
364+
const buffer = {
365+
mutateSnapshot,
366+
getEntry: vi.fn(async () => crossEnvEntry),
367+
} as unknown as MollifierBuffer;
368+
369+
const pgMutation = vi.fn(async () => "pg");
370+
const synthesisedResponse = vi.fn(() => "snap");
371+
const result = await mutateWithFallback({
372+
...baseInput,
373+
pgMutation,
374+
synthesisedResponse,
375+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
376+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
377+
getBuffer: () => buffer,
378+
});
379+
expect(result).toEqual({ kind: "not_found" });
380+
expect(mutateSnapshot).not.toHaveBeenCalled();
381+
expect(pgMutation).not.toHaveBeenCalled();
382+
expect(synthesisedResponse).not.toHaveBeenCalled();
383+
});
384+
385+
it("replica miss + buffer entry belongs to a different org → not_found (cross-org auth gate)", async () => {
386+
const crossOrgEntry: BufferEntry = {
387+
envId: "env_a",
388+
orgId: "org_OTHER",
389+
status: "QUEUED",
390+
materialised: false,
391+
} as unknown as BufferEntry;
392+
const mutateSnapshot = vi.fn(async () => "applied_to_snapshot" as const);
393+
const buffer = {
394+
mutateSnapshot,
395+
getEntry: vi.fn(async () => crossOrgEntry),
396+
} as unknown as MollifierBuffer;
397+
398+
const result = await mutateWithFallback({
399+
...baseInput,
400+
pgMutation: async () => "pg",
401+
synthesisedResponse: () => "snap",
402+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
403+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
404+
getBuffer: () => buffer,
405+
});
406+
expect(result).toEqual({ kind: "not_found" });
407+
expect(mutateSnapshot).not.toHaveBeenCalled();
408+
});
409+
316410
it("buffer is null (mollifier disabled) → not_found after replica miss", async () => {
317411
const result = await mutateWithFallback({
318412
...baseInput,

0 commit comments

Comments
 (0)