Skip to content

Commit 6076610

Browse files
d-csclaude
andcommitted
feat(webapp): drainer emits admin-only LOG event with buffered window
After engine.trigger lands the PG row, the drainer calls recordRunDebugLog with the original bufferedAt as startTime and the dwell duration as the event duration. The helper flips this to TaskEventKind.LOG, which the trace view + logs download already gate behind admin (eventRepository.server.ts:108, resources.runs.\$runParam.logs.download.ts:118). Admins now see "Mollifier buffered for Xms" rendered at the historical instant inside the run's existing trace, sitting between trigger and dequeue. Customers see no change — the LOG-kind filter strips the event from their view. No schema change on TaskRun; the audit trail lives in the same ClickHouse store the rest of the trace events use. Best-effort: recordRunDebugLog has its own try/catch and returns a result. The drainer logs non-RUN_NOT_FOUND failures but never fails materialisation because the audit trail couldn't be written. Skipped on the cancel-bifurcation path (the run never ran) and on the terminal SYSTEM_FAILURE path (the customer-visible outcome is the failure row, not a "buffered for Xms" note). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent af49aa1 commit 6076610

2 files changed

Lines changed: 161 additions & 0 deletions

File tree

apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import type {
77
MollifierDrainerTerminalFailureHandler,
88
} from "@trigger.dev/redis-worker";
99
import { logger } from "~/services/logger.server";
10+
import { recordRunDebugLog } from "~/v3/eventRepository/index.server";
1011
import { PerformTaskRunAlertsService } from "~/v3/services/alerts/performTaskRunAlerts.server";
1112
import { startSpan } from "~/v3/tracing.server";
1213
import type { MollifierSnapshot } from "./mollifierSnapshot.server";
@@ -129,8 +130,10 @@ export function createDrainerHandler(deps: {
129130
span.setAttribute("mollifier.run_friendly_id", input.runId);
130131
span.setAttribute("taskRunId", input.runId);
131132

133+
let triggerSucceeded = false;
132134
try {
133135
await deps.engine.trigger(input.payload as any, deps.prisma);
136+
triggerSucceeded = true;
134137
} catch (err) {
135138
// The retryable-PG class re-throws so the drainer's outer
136139
// worker loop can `buffer.requeue` (handled in
@@ -179,6 +182,47 @@ export function createDrainerHandler(deps: {
179182
throw err;
180183
}
181184
}
185+
186+
// Admin-only audit trail emitted once engine.trigger has
187+
// landed a PG row. `recordRunDebugLog` flips this to
188+
// `TaskEventKind.LOG`, which the trace view + logs download
189+
// already gate behind admin
190+
// (`eventRepository.server.ts:108`,
191+
// `resources.runs.$runParam.logs.download.ts:118`). Encoding
192+
// the buffered window as `startTime` + `duration` makes the
193+
// event render at the historical instant inside the run's
194+
// existing trace — admins see "Mollifier buffered for Xms"
195+
// sitting between trigger and dequeue. Best-effort: the
196+
// helper has its own try/catch and returns a result, so it
197+
// never throws into the materialisation path. Failures are
198+
// logged but not surfaced because the customer-visible run
199+
// has already landed.
200+
if (triggerSucceeded) {
201+
const debugResult = await recordRunDebugLog(
202+
RunId.fromFriendlyId(input.runId),
203+
`Mollifier buffered for ${dwellMs}ms before materialising`,
204+
{
205+
attributes: {
206+
runId: input.runId,
207+
metadata: {
208+
"mollifier.bufferedAt": input.createdAt.toISOString(),
209+
"mollifier.materialisedAt": new Date().toISOString(),
210+
"mollifier.dwellMs": dwellMs,
211+
"mollifier.attempts": input.attempts,
212+
},
213+
},
214+
startTime: input.createdAt,
215+
duration: dwellMs * 1_000_000,
216+
parentId: snapshotSpanId,
217+
}
218+
);
219+
if (!debugResult.success && debugResult.code !== "RUN_NOT_FOUND") {
220+
logger.warn("mollifier drainer: failed to record admin debug log", {
221+
runId: input.runId,
222+
code: debugResult.code,
223+
});
224+
}
225+
}
182226
});
183227
});
184228
};

apps/webapp/test/mollifierDrainerHandler.test.ts

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,22 @@ vi.mock("~/v3/services/alerts/performTaskRunAlerts.server", () => ({
1919
},
2020
}));
2121

22+
// The drainer calls `recordRunDebugLog` after a successful engine.trigger
23+
// to emit an admin-only LOG-kind event encoding the buffered window.
24+
// The real implementation imports the configured event repository (prisma
25+
// + clickhouse + env), which has heavy side-effects on first import.
26+
// Stub it to a vi.fn so the unit tests can assert call shape without
27+
// dragging the whole eventRepository graph into webapp test setup.
28+
// `vi.hoisted` is required because `vi.mock` factories are hoisted above
29+
// regular `const`s — referencing a top-level variable from inside the
30+
// factory otherwise fires `Cannot access 'X' before initialization`.
31+
const { recordRunDebugLogMock } = vi.hoisted(() => ({
32+
recordRunDebugLogMock: vi.fn(async () => ({ success: true as const })),
33+
}));
34+
vi.mock("~/v3/eventRepository/index.server", () => ({
35+
recordRunDebugLog: recordRunDebugLogMock,
36+
}));
37+
2238
import {
2339
createDrainerHandler,
2440
isRetryablePgError,
@@ -371,4 +387,105 @@ describe("createDrainerHandler", () => {
371387
).rejects.toThrow("engine rejected the snapshot");
372388
expect(createFailedTaskRun).not.toHaveBeenCalled();
373389
});
390+
391+
it("emits an admin-only LOG-kind event with the buffered window after engine.trigger succeeds", async () => {
392+
// The drainer's audit trail rides the existing TaskEventKind.LOG
393+
// filter pattern (`eventRepository.server.ts:108` + `logs.download.ts:118`)
394+
// — admins see the buffered window in the trace; non-admins don't.
395+
recordRunDebugLogMock.mockClear();
396+
const trigger = vi.fn(async () => ({ friendlyId: "run_z" }));
397+
const handler = createDrainerHandler({
398+
engine: { trigger } as any,
399+
prisma: {} as any,
400+
});
401+
402+
const bufferedAt = new Date(Date.now() - 4_000);
403+
await handler({
404+
runId: "run_z",
405+
envId: "env_a",
406+
orgId: "org_1",
407+
payload: { taskIdentifier: "t", spanId: "snapspan", traceId: "snaptrace" },
408+
attempts: 2,
409+
createdAt: bufferedAt,
410+
} as any);
411+
412+
expect(recordRunDebugLogMock).toHaveBeenCalledOnce();
413+
const [callRunId, message, options] = recordRunDebugLogMock.mock.calls[0] as [
414+
string,
415+
string,
416+
any,
417+
];
418+
// Internal cuid derived from the friendlyId, mirroring what
419+
// `findRunForEventCreation` queries on.
420+
expect(callRunId).toBe("z");
421+
expect(message).toMatch(/Mollifier buffered for \d+ms/);
422+
// Encodes the historical buffered window so the trace view places
423+
// the LOG event between trigger and dequeue (not at "now").
424+
expect(options.startTime).toBe(bufferedAt);
425+
expect(options.duration).toBeGreaterThan(0);
426+
expect(options.parentId).toBe("snapspan");
427+
expect(options.attributes.metadata["mollifier.bufferedAt"]).toBe(bufferedAt.toISOString());
428+
expect(options.attributes.metadata["mollifier.attempts"]).toBe(2);
429+
});
430+
431+
it("does NOT emit the admin LOG event when engine.trigger fails non-retryably", async () => {
432+
// The audit trail is for runs that actually materialised. On a
433+
// terminal SYSTEM_FAILURE path the customer-visible outcome is the
434+
// failure row; emitting a "buffered for Xms" event next to it would
435+
// imply the buffered window completed normally.
436+
recordRunDebugLogMock.mockClear();
437+
const trigger = vi.fn(async () => {
438+
throw new Error("engine rejected the snapshot");
439+
});
440+
const createFailedTaskRun = vi.fn(async () => ({ id: "internal" }));
441+
const handler = createDrainerHandler({
442+
engine: { trigger, createFailedTaskRun } as any,
443+
prisma: {} as any,
444+
});
445+
446+
await handler({
447+
runId: "run_z",
448+
envId: "env_a",
449+
orgId: "org_1",
450+
payload: { taskIdentifier: "t", environment: envFixture },
451+
attempts: 0,
452+
createdAt: new Date(),
453+
} as any);
454+
455+
expect(recordRunDebugLogMock).not.toHaveBeenCalled();
456+
});
457+
458+
it("does NOT emit the admin LOG event on the cancel-bifurcation path", async () => {
459+
// Cancel-bifurcation writes a CANCELED row directly without calling
460+
// engine.trigger. There's no buffered-then-materialised window to
461+
// describe — the run never ran.
462+
recordRunDebugLogMock.mockClear();
463+
const friendlyId = RunId.generate().friendlyId;
464+
const createCancelledRun = vi.fn(async () => ({
465+
id: "internal",
466+
friendlyId,
467+
status: "CANCELED",
468+
}));
469+
const handler = createDrainerHandler({
470+
engine: { createCancelledRun } as any,
471+
prisma: {} as any,
472+
});
473+
474+
await handler({
475+
runId: friendlyId,
476+
envId: "env_a",
477+
orgId: "org_1",
478+
payload: {
479+
friendlyId,
480+
taskIdentifier: "t",
481+
environment: envFixture,
482+
cancelledAt: new Date().toISOString(),
483+
cancelReason: "Canceled by user",
484+
},
485+
attempts: 0,
486+
createdAt: new Date(),
487+
} as any);
488+
489+
expect(recordRunDebugLogMock).not.toHaveBeenCalled();
490+
});
374491
});

0 commit comments

Comments
 (0)