Skip to content

Commit f2280b2

Browse files
d-csclaude
andcommitted
fix(core,webapp): coerce numeric concurrencyKey to string
`concurrencyKey` validation accepted only `z.string().optional()` on the single-trigger and V2/V3 batch endpoints, and the Phase-2 streaming NDJSON endpoint accepted `z.record(z.unknown()).optional()` for the entire `options` field. Callers passing `concurrencyKey: someNumericId` (e.g. `payload.userId`) either failed schema validation on the first two paths or sailed through on Phase-2 and then failed downstream at `prisma.taskRun.create` with `Argument concurrencyKey: Expected String or Null, provided Int`. The schema now accepts `string | number` for `concurrencyKey` and stringifies on the way in, across all three paths. The Phase-2 NDJSON `options` is tightened to reuse the strict `BatchTriggerTaskItem.options` shape so it validates identically to the V2/V3 batch endpoints. A defensive `typeof === "number"` coercion at the `engine.trigger` call site in `RunEngineTriggerTaskService` covers in-flight Redis-stored batch items enqueued before the schema fix — those items are rebuilt from a `Record<string, unknown>` shape that bypasses the new schema and would otherwise continue failing for up to their TTL. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 181d9ba commit f2280b2

5 files changed

Lines changed: 188 additions & 5 deletions

File tree

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@trigger.dev/core": patch
3+
---
4+
5+
Coerce numeric `concurrencyKey` values to string at the API boundary across `tasks.trigger`, `tasks.batchTrigger`, and the Phase-2 streaming batch endpoint.

apps/webapp/app/runEngine/services/triggerTask.server.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,14 @@ export class RunEngineTriggerTaskService {
467467
taskVersion: lockedToBackgroundWorker?.version,
468468
sdkVersion: lockedToBackgroundWorker?.sdkVersion,
469469
cliVersion: lockedToBackgroundWorker?.cliVersion,
470-
concurrencyKey: body.options?.concurrencyKey,
470+
// Schema-level coercion now lands `body.options.concurrencyKey`
471+
// as `string` on the API path, but the BatchQueue worker rebuilds
472+
// body.options from Redis-stored items (Record<string, unknown>),
473+
// which can still carry the pre-fix shape from in-flight batches.
474+
concurrencyKey:
475+
typeof body.options?.concurrencyKey === "number"
476+
? String(body.options.concurrencyKey)
477+
: body.options?.concurrencyKey,
471478
queue: queueName,
472479
lockedQueueId,
473480
workerQueue,

apps/webapp/test/engine/triggerTask.test.ts

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,76 @@ describe("RunEngineTriggerTaskService", () => {
253253
await engine.quit();
254254
});
255255

256+
// The BatchQueue worker rebuilds body.options from Redis-stored items
257+
// (Record<string, unknown>), so the Phase-2 schema coercion doesn't apply
258+
// to in-flight items enqueued before the schema fix. The defensive
259+
// `typeof === "number"` coercion at the engine.trigger call site is what
260+
// prevents these from failing at prisma.taskRun.create with
261+
// "Argument concurrencyKey: Expected String or Null, provided Int".
262+
containerTest(
263+
"coerces a numeric concurrencyKey to a string at the engine.trigger boundary",
264+
async ({ prisma, redisOptions }) => {
265+
const engine = new RunEngine({
266+
prisma,
267+
worker: {
268+
redis: redisOptions,
269+
workers: 1,
270+
tasksPerWorker: 10,
271+
pollIntervalMs: 100,
272+
},
273+
queue: { redis: redisOptions },
274+
runLock: { redis: redisOptions },
275+
machines: {
276+
defaultMachine: "small-1x",
277+
machines: {
278+
"small-1x": {
279+
name: "small-1x" as const,
280+
cpu: 0.5,
281+
memory: 0.5,
282+
centsPerMs: 0.0001,
283+
},
284+
},
285+
baseCostInCents: 0.0005,
286+
},
287+
tracer: trace.getTracer("test", "0.0.0"),
288+
});
289+
290+
const authenticatedEnvironment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
291+
const taskIdentifier = "test-task";
292+
await setupBackgroundWorker(engine, authenticatedEnvironment, taskIdentifier);
293+
294+
const triggerTaskService = new RunEngineTriggerTaskService({
295+
engine,
296+
prisma,
297+
payloadProcessor: new MockPayloadProcessor(),
298+
queueConcern: new DefaultQueueManager(prisma, engine),
299+
idempotencyKeyConcern: new IdempotencyKeyConcern(
300+
prisma,
301+
engine,
302+
new MockTraceEventConcern()
303+
),
304+
validator: new MockTriggerTaskValidator(),
305+
traceEventConcern: new MockTraceEventConcern(),
306+
tracer: trace.getTracer("test", "0.0.0"),
307+
metadataMaximumSize: 1024 * 1024 * 1,
308+
});
309+
310+
const result = await triggerTaskService.call({
311+
taskId: taskIdentifier,
312+
environment: authenticatedEnvironment,
313+
// Cast through `any` to simulate the in-flight Redis batch-item shape
314+
// (Record<string, unknown>) that bypasses the BatchItemNDJSON schema.
315+
body: { payload: { userId: 51262 }, options: { concurrencyKey: 51262 as any } },
316+
});
317+
318+
expect(result).toBeDefined();
319+
const run = await prisma.taskRun.findUnique({ where: { id: result!.run.id } });
320+
expect(run?.concurrencyKey).toBe("51262");
321+
322+
await engine.quit();
323+
}
324+
);
325+
256326
containerTest("should handle idempotency keys correctly", async ({ prisma, redisOptions }) => {
257327
const engine = new RunEngine({
258328
prisma,

packages/core/src/v3/schemas/api.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,14 @@ export const IdempotencyKeyOptionsSchema = z.object({
157157

158158
export type IdempotencyKeyOptionsSchema = z.infer<typeof IdempotencyKeyOptionsSchema>;
159159

160+
// Coerces user-supplied concurrencyKey values to string. The downstream Prisma
161+
// column is String?, so passing a number (a common foot-gun when callers do
162+
// `concurrencyKey: payload.userId`) used to fail at `prisma.taskRun.create`
163+
// with PrismaClientValidationError. Accept the intent and stringify here.
164+
const ConcurrencyKeySchema = z
165+
.union([z.string(), z.number()])
166+
.transform((value) => String(value));
167+
160168
export const TriggerTaskRequestBody = z.object({
161169
payload: z.any(),
162170
context: z.any(),
@@ -195,7 +203,7 @@ export const TriggerTaskRequestBody = z.object({
195203
concurrencyLimit: z.number().int().optional(),
196204
})
197205
.optional(),
198-
concurrencyKey: z.string().optional(),
206+
concurrencyKey: ConcurrencyKeySchema.optional(),
199207
delay: z.string().or(z.coerce.date()).optional(),
200208
idempotencyKey: z
201209
.string()
@@ -253,7 +261,7 @@ export const BatchTriggerTaskItem = z.object({
253261
context: z.any(),
254262
options: z
255263
.object({
256-
concurrencyKey: z.string().optional(),
264+
concurrencyKey: ConcurrencyKeySchema.optional(),
257265
delay: z.string().or(z.coerce.date()).optional(),
258266
idempotencyKey: z
259267
.string()
@@ -401,7 +409,12 @@ export type CreateBatchResponse = z.infer<typeof CreateBatchResponse>;
401409

402410
/**
403411
* Phase 2: Individual item in the NDJSON stream
404-
* Each line in the NDJSON body should match this schema
412+
* Each line in the NDJSON body should match this schema.
413+
*
414+
* `options` reuses the strict shape from BatchTriggerTaskItem so that the
415+
* Phase-2 streaming path validates option fields identically to the V2/V3
416+
* batch trigger endpoints — historically this used z.record(z.unknown()) and
417+
* let invalid values (e.g. numeric concurrencyKey) reach Prisma.
405418
*/
406419
export const BatchItemNDJSON = z.object({
407420
/** Zero-based index of this item (used for idempotency and ordering) */
@@ -411,7 +424,7 @@ export const BatchItemNDJSON = z.object({
411424
/** The payload for this task run */
412425
payload: z.unknown().optional(),
413426
/** Options for this specific item */
414-
options: z.record(z.unknown()).optional(),
427+
options: BatchTriggerTaskItem.shape.options,
415428
});
416429

417430
export type BatchItemNDJSON = z.infer<typeof BatchItemNDJSON>;
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
import { describe, it, expect } from "vitest";
2+
import { BatchItemNDJSON, BatchTriggerTaskItem, TriggerTaskRequestBody } from "./api.js";
3+
4+
describe("concurrencyKey coercion", () => {
5+
// Phase-2 NDJSON used to accept arbitrary shapes for `options`, so a numeric
6+
// concurrencyKey (a common foot-gun when callers pass
7+
// `concurrencyKey: payload.userId`) reached Prisma untouched and failed
8+
// there with PrismaClientValidationError. The schema now coerces
9+
// number → string at the API boundary across every trigger path.
10+
describe("BatchItemNDJSON", () => {
11+
it("coerces a numeric concurrencyKey to a string", () => {
12+
const result = BatchItemNDJSON.safeParse({
13+
index: 0,
14+
task: "user-workflow-tick",
15+
payload: { json: { userId: 51262 } },
16+
options: { concurrencyKey: 51262 },
17+
});
18+
19+
expect(result.success).toBe(true);
20+
if (result.success) {
21+
expect(result.data.options?.concurrencyKey).toBe("51262");
22+
}
23+
});
24+
25+
it("accepts a string concurrencyKey unchanged", () => {
26+
const result = BatchItemNDJSON.safeParse({
27+
index: 0,
28+
task: "user-workflow-tick",
29+
payload: { json: { userId: 51262 } },
30+
options: { concurrencyKey: "user-51262" },
31+
});
32+
33+
expect(result.success).toBe(true);
34+
if (result.success) {
35+
expect(result.data.options?.concurrencyKey).toBe("user-51262");
36+
}
37+
});
38+
39+
it("accepts an item with no options", () => {
40+
const result = BatchItemNDJSON.safeParse({
41+
index: 0,
42+
task: "user-workflow-tick",
43+
payload: { json: { userId: 51262 } },
44+
});
45+
46+
expect(result.success).toBe(true);
47+
});
48+
49+
it("rejects a non-numeric, non-string concurrencyKey", () => {
50+
const result = BatchItemNDJSON.safeParse({
51+
index: 0,
52+
task: "user-workflow-tick",
53+
options: { concurrencyKey: { nested: "object" } },
54+
});
55+
56+
expect(result.success).toBe(false);
57+
});
58+
});
59+
60+
describe("BatchTriggerTaskItem", () => {
61+
it("coerces a numeric concurrencyKey to a string", () => {
62+
const result = BatchTriggerTaskItem.safeParse({
63+
task: "user-workflow-tick",
64+
payload: { userId: 51262 },
65+
options: { concurrencyKey: 51262 },
66+
});
67+
68+
expect(result.success).toBe(true);
69+
if (result.success) {
70+
expect(result.data.options?.concurrencyKey).toBe("51262");
71+
}
72+
});
73+
});
74+
75+
describe("TriggerTaskRequestBody", () => {
76+
it("coerces a numeric concurrencyKey to a string", () => {
77+
const result = TriggerTaskRequestBody.safeParse({
78+
payload: { userId: 51262 },
79+
options: { concurrencyKey: 51262 },
80+
});
81+
82+
expect(result.success).toBe(true);
83+
if (result.success) {
84+
expect(result.data.options?.concurrencyKey).toBe("51262");
85+
}
86+
});
87+
});
88+
});

0 commit comments

Comments
 (0)