Skip to content

Commit d451918

Browse files
d-csclaude
andcommitted
fix(webapp,run-engine): three follow-ups on Devin review (PR #3754)
Three unresolved Devin threads, all addressed in this commit. Committed locally only — not pushed. 1. `callWithoutTraceEvents` was inheriting the new `emitRunFailedEvent` default of `true`, so its `createFailedTaskRun` call would fire the `runFailed` bus emit and the listener would write a ClickHouse completion event row with empty `traceId`/`spanId` — orphan row, directly contradicting the method's "without trace events" contract. Pass `emitRunFailedEvent: false` and enqueue `PerformTaskRunAlertsService` directly, mirroring the `call()` pattern so customers' ERROR channels still see the failure. 2. The cjson empty-tags defense lived only on `createCancelledRun`, not on `engine.trigger`. When the mollifier buffer's mutate-side Lua re-serialises a payload (e.g. `append_tags` on a buffered run that never had tags), an empty Lua table encodes as `{}` and decodes back to a JS object — and the previous `tags.length === 0` check passes that object straight to Prisma's `String[]` column. Mirror the same `Array.isArray && tags.length > 0 ? tags : undefined` guard `createCancelledRun` already uses. The defense is symmetric with the existing tested case for createCancelledRun, so the same contract holds for the trigger replay path. 3. `runCancelled` handler's `cancelRunEvent` lookup fails for buffered-only runs (no primary trace event exists, since the mollifier gate skipped `repository.traceEvent` for the not-yet-materialised run). The handler's `tryCatch` swallowed the error, but the systematic `[runCancelled] Failed to cancel run event` log fired on every cancelled buffered run. Add `emitRunCancelledEvent: boolean = true` to `createCancelledRun` (symmetric with the existing `emitRunFailedEvent` flag on `createFailedTaskRun`); drainer handler passes `false`. CANCELED PG row still writes; only the trace-event mirror is skipped. Tests: - `RunEngine.createCancelledRun > emitRunCancelledEvent: false suppresses the bus emit but still writes the CANCELED PG row` — pins the new flag's semantics. - `createDrainerHandler > calls createCancelledRun with emitRunCancelledEvent: false (suppresses orphan trace-event log noise)` — pins the call site's contract. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 4a1bcfe commit d451918

5 files changed

Lines changed: 177 additions & 29 deletions

File tree

apps/webapp/app/runEngine/services/triggerFailedTask.server.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -291,7 +291,7 @@ export class TriggerFailedTaskService {
291291
}
292292
}
293293

294-
await this.engine.createFailedTaskRun({
294+
const failedRun = await this.engine.createFailedTaskRun({
295295
friendlyId: failedRunFriendlyId,
296296
environment: {
297297
id: opts.environmentId,
@@ -313,8 +313,32 @@ export class TriggerFailedTaskService {
313313
depth,
314314
resumeParentOnCompletion: opts.resumeParentOnCompletion,
315315
batch: opts.batch,
316+
// Suppress the engine's `runFailed` bus emit — the listener
317+
// (`runEngineHandlers.server.ts` `runFailed`) calls
318+
// `completeFailedRunEvent`, which writes a ClickHouse trace event
319+
// row keyed on (traceId, spanId). This caller has no trace
320+
// context (the method name is literally `callWithoutTraceEvents`)
321+
// so the emit would write a row with empty traceId/spanId —
322+
// orphan event in the store. We still want alert coverage,
323+
// though, so enqueue directly below.
324+
emitRunFailedEvent: false,
316325
});
317326

327+
// Alerts side of `runFailed` — the engine emit was suppressed
328+
// above so we don't create an orphan trace event; enqueue the
329+
// alert directly so customers' ERROR channels still see the
330+
// failure. Best-effort, mirroring the `call()` path.
331+
try {
332+
await PerformTaskRunAlertsService.enqueue(failedRun.id);
333+
} catch (alertsError) {
334+
logger.warn("TriggerFailedTaskService.callWithoutTraceEvents: alert enqueue failed", {
335+
taskId: opts.taskId,
336+
friendlyId: failedRun.friendlyId,
337+
error:
338+
alertsError instanceof Error ? alertsError.message : String(alertsError),
339+
});
340+
}
341+
318342
return failedRunFriendlyId;
319343
} catch (createError) {
320344
logger.error("TriggerFailedTaskService: failed to create pre-failed TaskRun (no trace)", {

apps/webapp/app/v3/mollifier/mollifierDrainerHandler.server.ts

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -53,11 +53,13 @@ export function createDrainerHandler(deps: {
5353
})
5454
: context.active();
5555

56-
// Cancel-wins-over-trigger. If a cancel API call
57-
// landed on this entry while it was QUEUED, the snapshot carries
58-
// `cancelledAt` + `cancelReason`. Skip the normal materialise path
59-
// and write a CANCELED PG row directly. The existing runCancelled
60-
// handler writes the TaskEvent.
56+
// Cancel-wins-over-trigger. If a cancel API call landed on this
57+
// entry while it was QUEUED, the snapshot carries `cancelledAt` +
58+
// `cancelReason`. Skip the normal materialise path and write a
59+
// CANCELED PG row directly. The `runCancelled` bus emit is
60+
// suppressed here because a buffered-only run never had a primary
61+
// trace event written for it — the runCancelled handler's
62+
// `cancelRunEvent` lookup would fail and log noise per cancel.
6163
const cancelledAtStr =
6264
typeof input.payload.cancelledAt === "string" ? input.payload.cancelledAt : undefined;
6365
if (cancelledAtStr) {
@@ -79,6 +81,7 @@ export function createDrainerHandler(deps: {
7981
snapshot: input.payload as any,
8082
cancelledAt: new Date(cancelledAtStr),
8183
cancelReason,
84+
emitRunCancelledEvent: false,
8285
},
8386
deps.prisma,
8487
);

apps/webapp/test/mollifierDrainerHandler.test.ts

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -222,6 +222,45 @@ describe("createDrainerHandler", () => {
222222
expect(createFailedTaskRun).toHaveBeenCalledOnce();
223223
});
224224

225+
it("calls createCancelledRun with emitRunCancelledEvent: false (suppresses orphan trace-event log noise)", async () => {
226+
// Buffered-only runs never had a primary trace event written for
227+
// them — the mollifier gate skipped `repository.traceEvent` since
228+
// the run hadn't materialised in PG yet. The `runCancelled` handler
229+
// would log `[runCancelled] Failed to cancel run event` for every
230+
// cancelled buffered run if we let the emit fire. Suppress it.
231+
const friendlyId = RunId.generate().friendlyId;
232+
const createCancelledRun = vi.fn(async () => ({
233+
id: "internal",
234+
friendlyId,
235+
status: "CANCELED",
236+
}));
237+
const handler = createDrainerHandler({
238+
engine: { createCancelledRun } as any,
239+
prisma: {} as any,
240+
});
241+
242+
await handler({
243+
runId: friendlyId,
244+
envId: "env_a",
245+
orgId: "org_1",
246+
payload: {
247+
friendlyId,
248+
taskIdentifier: "t",
249+
environment: envFixture,
250+
cancelledAt: new Date().toISOString(),
251+
cancelReason: "Canceled by user",
252+
},
253+
attempts: 0,
254+
createdAt: new Date(),
255+
} as any);
256+
257+
expect(createCancelledRun).toHaveBeenCalledOnce();
258+
const arg = createCancelledRun.mock.calls[0][0] as {
259+
emitRunCancelledEvent?: boolean;
260+
};
261+
expect(arg.emitRunCancelledEvent).toBe(false);
262+
});
263+
225264
it("honours the cancel when a buffered cancel races a materialised non-CANCELED row", async () => {
226265
// Cancel-wins-over-trigger. If the normal trigger
227266
// replay path materialised a live PENDING row before the cancel

internal-packages/run-engine/src/engine/index.ts

Lines changed: 54 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -458,25 +458,45 @@ export class RunEngine {
458458
* Skips: queue insertion (no execution), waitpoint creation (the
459459
* mollifier gate refuses to buffer triggerAndWait children, so a
460460
* cancelled buffered run never has a waiting parent to unblock),
461-
* concurrency reservation. Emits `runCancelled` so the existing
462-
* TaskEvent handler writes the cancellation event row — the only side
463-
* effect PG-side cancel has today per audit.
461+
* concurrency reservation. Emits `runCancelled` by default — callers
462+
* working on buffered-only runs (no primary trace event exists) can
463+
* opt out via `emitRunCancelledEvent: false` to avoid the systematic
464+
* "Failed to cancel run event" noise the handler would log when its
465+
* `cancelRunEvent` call can't find a span.
464466
*
465467
* Idempotent: if a row with the same friendlyId already exists (double
466468
* drainer pop after requeue), Prisma's P2002 unique-constraint violation
467469
* is caught and the existing row is returned. The duplicate runCancelled
468470
* emission is skipped — the original drain's emit already wrote the
469-
* TaskEvent.
471+
* TaskEvent (when applicable).
470472
*/
471473
async createCancelledRun(
472474
{
473475
snapshot,
474476
cancelledAt,
475477
cancelReason,
478+
emitRunCancelledEvent = true,
476479
}: {
477480
snapshot: TriggerParams;
478481
cancelledAt: Date;
479482
cancelReason: string;
483+
/**
484+
* Whether to emit the `runCancelled` engine-bus event. Defaults to
485+
* true.
486+
*
487+
* Set to `false` for buffered-only runs that never had a primary
488+
* trace event written (the mollifier gate never called
489+
* `repository.traceEvent` for them). The `runCancelled` handler in
490+
* `runEngineHandlers.server.ts` calls `cancelRunEvent`, which
491+
* looks up the run's primary span in the event store — for
492+
* buffered-only runs that span doesn't exist, so the lookup fails,
493+
* the handler's `tryCatch` swallows it, and a "[runCancelled]
494+
* Failed to cancel run event" error is logged for every cancelled
495+
* buffered run. Suppressing the emit avoids that systematic noise.
496+
* The CANCELED PG row is still written; only the trace-event
497+
* mirror is skipped.
498+
*/
499+
emitRunCancelledEvent?: boolean;
480500
},
481501
tx?: PrismaClientOrTransaction,
482502
): Promise<TaskRun> {
@@ -567,24 +587,26 @@ export class RunEngine {
567587
},
568588
});
569589

570-
this.eventBus.emit("runCancelled", {
571-
time: cancelledAt,
572-
run: {
573-
id: taskRun.id,
574-
status: taskRun.status,
575-
friendlyId: taskRun.friendlyId,
576-
spanId: taskRun.spanId,
577-
taskEventStore: taskRun.taskEventStore,
578-
createdAt: taskRun.createdAt,
579-
completedAt: taskRun.completedAt,
580-
error,
581-
updatedAt: taskRun.updatedAt,
582-
attemptNumber: taskRun.attemptNumber ?? 0,
583-
},
584-
organization: { id: snapshot.environment.organization.id },
585-
project: { id: snapshot.environment.project.id },
586-
environment: { id: snapshot.environment.id },
587-
});
590+
if (emitRunCancelledEvent) {
591+
this.eventBus.emit("runCancelled", {
592+
time: cancelledAt,
593+
run: {
594+
id: taskRun.id,
595+
status: taskRun.status,
596+
friendlyId: taskRun.friendlyId,
597+
spanId: taskRun.spanId,
598+
taskEventStore: taskRun.taskEventStore,
599+
createdAt: taskRun.createdAt,
600+
completedAt: taskRun.completedAt,
601+
error,
602+
updatedAt: taskRun.updatedAt,
603+
attemptNumber: taskRun.attemptNumber ?? 0,
604+
},
605+
organization: { id: snapshot.environment.organization.id },
606+
project: { id: snapshot.environment.project.id },
607+
environment: { id: snapshot.environment.id },
608+
});
609+
}
588610

589611
return taskRun;
590612
} catch (err) {
@@ -819,7 +841,16 @@ export class RunEngine {
819841
priorityMs,
820842
queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(),
821843
ttl: resolvedTtl,
822-
runTags: tags.length === 0 ? undefined : tags,
844+
// Defensive: when the mollifier drainer replays a buffered
845+
// snapshot whose payload was rewritten by a buffer-side Lua
846+
// mutate (e.g. append_tags clears an empty list), cjson
847+
// encodes an empty Lua table as `{}` rather than `[]`. JS
848+
// parses that back as an empty object, and `{}.length` is
849+
// undefined — the original `tags.length === 0` check would
850+
// pass `{}` straight to Prisma's `String[]` column. Mirror
851+
// the same Array.isArray guard that `createCancelledRun`
852+
// uses for symmetry with the trigger replay path.
853+
runTags: Array.isArray(tags) && tags.length > 0 ? tags : undefined,
823854
oneTimeUseToken,
824855
parentTaskRunId,
825856
rootTaskRunId,

internal-packages/run-engine/src/engine/tests/createCancelledRun.test.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,57 @@ describe("RunEngine.createCancelledRun", () => {
144144
},
145145
);
146146

147+
containerTest(
148+
"emitRunCancelledEvent: false suppresses the bus emit but still writes the CANCELED PG row",
149+
async ({ prisma, redisOptions }) => {
150+
// The mollifier drainer passes `emitRunCancelledEvent: false` for
151+
// buffered-only runs because the runCancelled handler's
152+
// `cancelRunEvent` lookup fails for them (no primary trace event
153+
// span exists — the mollifier gate never called
154+
// `repository.traceEvent` for this run). Without the gate, every
155+
// cancelled buffered run produces a `[runCancelled] Failed to
156+
// cancel run event` error log. This pins the gate's contract: PG
157+
// row still lands, bus emit suppressed.
158+
const env = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
159+
const engine = new RunEngine({ prisma, ...baseEngineOptions(redisOptions) });
160+
const captured: EventBusEventArgs<"runCancelled">[0][] = [];
161+
engine.eventBus.on("runCancelled", (event) => {
162+
captured.push(event);
163+
});
164+
165+
try {
166+
const friendlyId = freshRunId();
167+
const result = await engine.createCancelledRun({
168+
snapshot: {
169+
friendlyId,
170+
environment: env,
171+
taskIdentifier: "test-task",
172+
payload: "{}",
173+
payloadType: "application/json",
174+
context: {},
175+
traceContext: {},
176+
traceId: "0000000000000000eeee000000000000",
177+
spanId: "ffff000000000000",
178+
queue: "task/test-task",
179+
isTest: false,
180+
tags: [],
181+
},
182+
cancelledAt: new Date(),
183+
cancelReason: "Test cancel (silent emit)",
184+
emitRunCancelledEvent: false,
185+
});
186+
187+
// PG row still lands.
188+
expect(result.status).toBe("CANCELED");
189+
expect(result.friendlyId).toBe(friendlyId);
190+
// Bus emit suppressed.
191+
expect(captured).toHaveLength(0);
192+
} finally {
193+
await engine.quit();
194+
}
195+
},
196+
);
197+
147198
containerTest(
148199
"idempotent on double-pop: second call returns existing row without re-emitting",
149200
async ({ prisma, redisOptions }) => {

0 commit comments

Comments
 (0)