Skip to content

Commit 44da36e

Browse files
d-csclaude
andcommitted
fix(webapp): align buffer-path tags success count with PG (post-dedup)
Devin follow-ups on PR #3756. * Tags route synthesisedResponse reported `nonEmptyTags.length` (pre- dedup input count) while the PG path reports `newTags.length` (post- dedup against existing tags). Same API call → different message count across the buffered/materialised boundary. mutateWithFallback now forwards the pre-mutation BufferEntry from its env-auth pre-check into synthesisedResponse + rejectedResponse, so the tags route can dedup against the snapshot's existing tags without an extra Redis round-trip. Same TOCTOU semantics as the PG path; the Lua keeps the actual write atomically deduped regardless. * Replay route's `as unknown as TaskRun` cast bypasses TS for the buffered branch; if ReplayTaskRunService later reads an extra TaskRun field without that field being added to BufferedReplayInputSchema, the buffered branch silently feeds the service `undefined`. Run-time mitigation (safeParse + warn log + 404) already exists; added an inline comment near the cast spelling out the manual sync requirement so future edits notice. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent a55da17 commit 44da36e

4 files changed

Lines changed: 98 additions & 8 deletions

File tree

apps/webapp/app/routes/api.v1.runs.$runId.tags.ts

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
22
import { AddTagsRequestBody } from "@trigger.dev/core/v3";
3+
import type { BufferEntry } from "@trigger.dev/redis-worker";
34
import { z } from "zod";
45
import { prisma } from "~/db.server";
56
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
@@ -8,6 +9,22 @@ import { getRequestAbortSignal } from "~/services/httpAsyncStorage.server";
89
import { logger } from "~/services/logger.server";
910
import { mutateWithFallback } from "~/v3/mollifier/mutateWithFallback.server";
1011

12+
// Pull the existing tags out of a buffer entry's serialised payload so
13+
// the buffer-path response can dedup against them, matching the
14+
// PG-path's `newTags.length` count rather than the pre-dedup input
15+
// count. Returns null on any parse failure / shape mismatch so the
16+
// caller can fall back gracefully.
17+
function parseSnapshotTags(entry: BufferEntry | null): string[] | null {
18+
if (!entry) return null;
19+
try {
20+
const snapshot = JSON.parse(entry.payload) as { tags?: unknown };
21+
if (!Array.isArray(snapshot.tags)) return null;
22+
return snapshot.tags.filter((t): t is string => typeof t === "string");
23+
} catch {
24+
return null;
25+
}
26+
}
27+
1128
const ParamsSchema = z.object({
1229
runId: z.string(),
1330
});
@@ -80,8 +97,23 @@ export async function action({ request, params }: ActionFunctionArgs) {
8097
// MAX_TAGS_PER_RUN via the `maxTags` we pass in `bufferPatch` —
8198
// matching the PG-path cap above so a buffered run can't exceed the
8299
// limit the trigger validator applies at creation.
83-
synthesisedResponse: () =>
84-
json({ message: `Successfully set ${nonEmptyTags.length} new tags.` }, { status: 200 }),
100+
//
101+
// Dedup the success-count off the pre-mutation entry (already
102+
// fetched by mutateWithFallback's env-auth pre-check, so no extra
103+
// Redis read) so the message reports the same `newTags.length` the
104+
// PG path reports — not the pre-dedup request count, which would
105+
// give an inconsistent number across the buffered/materialised
106+
// boundary for the same input.
107+
synthesisedResponse: ({ bufferEntry }) => {
108+
const existing = parseSnapshotTags(bufferEntry);
109+
const newTagsCount = existing
110+
? nonEmptyTags.filter((t) => !existing.includes(t)).length
111+
: nonEmptyTags.length;
112+
return json(
113+
{ message: `Successfully set ${newTagsCount} new tags.` },
114+
{ status: 200 }
115+
);
116+
},
85117
// Buffer rejected the append because it would exceed the cap. We
86118
// don't know the exact deduped overflow count here (the Lua does),
87119
// so report the limit rather than a precise "trying to set N".

apps/webapp/app/routes/api.v1.runs.$runParam.replay.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,18 @@ export async function action({ request, params }: ActionFunctionArgs) {
9393
if (buffered) {
9494
const parsed = BufferedReplayInputSchema.safeParse(buffered);
9595
if (parsed.success) {
96+
// Manual sync point: `BufferedReplayInputSchema` covers only
97+
// the subset of `TaskRun` fields `ReplayTaskRunService.call`
98+
// currently reads from `existingTaskRun`. The cast is `as
99+
// unknown as TaskRun` because the full `TaskRun` type carries
100+
// ~40 fields the service never touches; mirroring all of them
101+
// on a synthetic snapshot would be misleading. If a future
102+
// change to `ReplayTaskRunService` reads an additional
103+
// `existingTaskRun` field, **add it to the schema above** —
104+
// otherwise the buffered path will silently feed the service
105+
// `undefined` for that field while the PG-source replay
106+
// works. The `safeParse` + warn-log + 404 below is the
107+
// run-time fail-safe; this comment is the design fail-safe.
96108
taskRun = parsed.data as unknown as TaskRun;
97109
} else {
98110
logger.warn("replay: buffered fallback failed schema validation", {

apps/webapp/app/v3/mollifier/mutateWithFallback.server.ts

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import type {
2+
BufferEntry,
23
MollifierBuffer,
34
MutateSnapshotResult,
45
SnapshotPatch,
@@ -26,13 +27,26 @@ export type MutateWithFallbackInput<TResponse> = {
2627
// Receives the full TaskRun shape and returns the customer-visible body.
2728
pgMutation: (pgRow: TaskRun) => Promise<TResponse>;
2829
// Called when the patch landed cleanly on the buffer snapshot. The
29-
// drainer will see the patched payload on its next pop.
30-
synthesisedResponse: () => TResponse | Promise<TResponse>;
30+
// drainer will see the patched payload on its next pop. Receives the
31+
// pre-mutation snapshot entry (the one fetched for the env auth
32+
// check above) so the caller can compute response details that
33+
// depend on the prior state — e.g. the tags route needs to dedup
34+
// against the existing tags to report an accurate `newTags` count
35+
// matching the PG path, without an extra Redis round-trip.
36+
// `bufferEntry` is `null` in the rare race where the entry didn't
37+
// exist at pre-check time but appeared before `mutateSnapshot`.
38+
synthesisedResponse: (ctx: {
39+
bufferEntry: BufferEntry | null;
40+
}) => TResponse | Promise<TResponse>;
3141
// Called when the buffer rejected the patch as invalid (e.g. an
3242
// `append_tags` patch carrying `maxTags` would exceed the cap). Required
3343
// only by callers that send a rejectable patch; the helper throws if the
34-
// buffer reports a rejection and no builder was supplied.
35-
rejectedResponse?: () => TResponse | Promise<TResponse>;
44+
// buffer reports a rejection and no builder was supplied. Receives the
45+
// same `bufferEntry` context as `synthesisedResponse` so a rejection
46+
// message can reference the prior state if useful.
47+
rejectedResponse?: (ctx: {
48+
bufferEntry: BufferEntry | null;
49+
}) => TResponse | Promise<TResponse>;
3650
abortSignal?: AbortSignal;
3751
// Override defaults for tests.
3852
safetyNetMs?: number;
@@ -110,7 +124,10 @@ export async function mutateWithFallback<TResponse>(
110124
);
111125

112126
if (result === "applied_to_snapshot") {
113-
return { kind: "snapshot", response: await input.synthesisedResponse() };
127+
return {
128+
kind: "snapshot",
129+
response: await input.synthesisedResponse({ bufferEntry: entryForAuth }),
130+
};
114131
}
115132

116133
if (result === "limit_exceeded") {
@@ -122,7 +139,10 @@ export async function mutateWithFallback<TResponse>(
122139
"mutateWithFallback: buffer returned 'limit_exceeded' but no rejectedResponse was provided",
123140
);
124141
}
125-
return { kind: "rejected", response: await input.rejectedResponse() };
142+
return {
143+
kind: "rejected",
144+
response: await input.rejectedResponse({ bufferEntry: entryForAuth }),
145+
};
126146
}
127147

128148
if (result === "not_found") {

apps/webapp/test/mollifierMutateWithFallback.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,32 @@ describe("mutateWithFallback", () => {
133133
expect(pgMutation).not.toHaveBeenCalled();
134134
});
135135

136+
it("applied_to_snapshot forwards the pre-mutation entry to synthesisedResponse (lets callers dedup)", async () => {
137+
// The tags route uses this to compute the same post-dedup count
138+
// the PG path reports, without an extra Redis round-trip.
139+
const synthesised = vi.fn(({ bufferEntry }: { bufferEntry: BufferEntry | null }) => {
140+
// Caller can inspect bufferEntry.payload (or other fields) to
141+
// produce a response that depends on the prior snapshot state.
142+
return bufferEntry ? "snap-with-entry" : "snap-without-entry";
143+
});
144+
const result = await mutateWithFallback({
145+
...baseInput,
146+
pgMutation: async () => "pg",
147+
synthesisedResponse: synthesised,
148+
prismaReplica: fakePrisma([null]) as unknown as typeof import("~/db.server").$replica,
149+
prismaWriter: fakePrisma([]) as unknown as typeof import("~/db.server").prisma,
150+
getBuffer: () => bufferReturning("applied_to_snapshot"),
151+
});
152+
expect(result).toEqual({ kind: "snapshot", response: "snap-with-entry" });
153+
expect(synthesised).toHaveBeenCalledTimes(1);
154+
const ctx = synthesised.mock.calls[0]?.[0];
155+
expect(ctx?.bufferEntry).not.toBeNull();
156+
// The pre-check entry has the env-matching shape set up by
157+
// bufferReturning() / preCheckEntry().
158+
expect(ctx?.bufferEntry?.envId).toBe("env_a");
159+
expect(ctx?.bufferEntry?.orgId).toBe("org_1");
160+
});
161+
136162
it("replica miss + buffer not_found + writer miss → not_found", async () => {
137163
const result = await mutateWithFallback({
138164
...baseInput,

0 commit comments

Comments
 (0)