Skip to content

Commit 0cf0119

Browse files
Ekaterina BulatovaEkaterina Bulatova
authored andcommitted
feat(sdk,core): offload large trigger payloads via object storage
Upload oversized trigger payloads before the API request and send an application/store pointer instead of embedding large JSON in the trigger body. Validate pointer payloads in TriggerTaskRequestBody.
1 parent 9cb6fd1 commit 0cf0119

4 files changed

Lines changed: 85 additions & 13 deletions

File tree

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Offload large trigger payloads to object storage before sending the trigger API request. The SDK uploads packets at or above the existing 128KB limit and sends an `application/store` pointer instead of embedding large JSON in the request body. `TriggerTaskRequestBody` now validates that `application/store` payloads are non-empty storage paths.

packages/core/src/v3/schemas/api-type.test.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, it, expect } from "vitest";
2-
import { InitializeDeploymentRequestBody } from "./api.js";
2+
import { InitializeDeploymentRequestBody, TriggerTaskRequestBody } from "./api.js";
33
import type { InitializeDeploymentRequestBody as InitializeDeploymentRequestBodyType } from "./api.js";
44

55
describe("InitializeDeploymentRequestBody", () => {
@@ -139,3 +139,41 @@ describe("InitializeDeploymentRequestBody", () => {
139139
});
140140
});
141141
});
142+
143+
describe("TriggerTaskRequestBody", () => {
144+
it("accepts application/store payload as a non-empty string", () => {
145+
const result = TriggerTaskRequestBody.safeParse({
146+
payload: "packets/payloads/file.json",
147+
context: {},
148+
options: {
149+
payloadType: "application/store",
150+
},
151+
});
152+
153+
expect(result.success).toBe(true);
154+
});
155+
156+
it("rejects application/store payload when payload is not a string", () => {
157+
const result = TriggerTaskRequestBody.safeParse({
158+
payload: { foo: "bar" },
159+
context: {},
160+
options: {
161+
payloadType: "application/store",
162+
},
163+
});
164+
165+
expect(result.success).toBe(false);
166+
});
167+
168+
it("rejects application/store payload when payload is an empty string", () => {
169+
const result = TriggerTaskRequestBody.safeParse({
170+
payload: "",
171+
context: {},
172+
options: {
173+
payloadType: "application/store",
174+
},
175+
});
176+
177+
expect(result.success).toBe(false);
178+
});
179+
});

packages/core/src/v3/schemas/api.ts

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -157,11 +157,12 @@ export const IdempotencyKeyOptionsSchema = z.object({
157157

158158
export type IdempotencyKeyOptionsSchema = z.infer<typeof IdempotencyKeyOptionsSchema>;
159159

160-
export const TriggerTaskRequestBody = z.object({
161-
payload: z.any(),
162-
context: z.any(),
163-
options: z
164-
.object({
160+
export const TriggerTaskRequestBody = z
161+
.object({
162+
payload: z.any(),
163+
context: z.any(),
164+
options: z
165+
.object({
165166
/** @deprecated engine v1 only */
166167
dependentAttempt: z.string().optional(),
167168
/** @deprecated engine v1 only */
@@ -227,9 +228,22 @@ export const TriggerTaskRequestBody = z.object({
227228
maxDelay: z.string().optional(),
228229
})
229230
.optional(),
230-
})
231-
.optional(),
232-
});
231+
})
232+
.optional(),
233+
})
234+
.superRefine((value, ctx) => {
235+
if (value.options?.payloadType !== "application/store") {
236+
return;
237+
}
238+
239+
if (typeof value.payload !== "string" || value.payload.length === 0) {
240+
ctx.addIssue({
241+
code: z.ZodIssueCode.custom,
242+
message: "payload must be a non-empty string when options.payloadType is application/store",
243+
path: ["payload"],
244+
});
245+
}
246+
});
233247

234248
export type TriggerTaskRequestBody = z.infer<typeof TriggerTaskRequestBody>;
235249

packages/trigger-sdk/src/v3/shared.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
ApiError,
66
apiClientManager,
77
ApiRequestOptions,
8+
conditionallyExportPacket,
89
conditionallyImportPacket,
910
convertToolParametersToSchema,
1011
createErrorTaskError,
@@ -2214,6 +2215,10 @@ async function trigger_internal<TRunTypes extends AnyRunTypes>(
22142215
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
22152216

22162217
const payloadPacket = await stringifyIO(parsedPayload);
2218+
const triggerPayloadPacket = await conditionallyExportPacket(
2219+
payloadPacket,
2220+
createTriggerPayloadPathPrefix(id)
2221+
);
22172222

22182223
// Process idempotency key and extract options for storage
22192224
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
@@ -2224,12 +2229,12 @@ async function trigger_internal<TRunTypes extends AnyRunTypes>(
22242229
const handle = await apiClient.triggerTask(
22252230
id,
22262231
{
2227-
payload: payloadPacket.data,
2232+
payload: triggerPayloadPacket.data,
22282233
options: {
22292234
queue: options?.queue ? { name: options.queue } : undefined,
22302235
concurrencyKey: options?.concurrencyKey,
22312236
test: taskContext.ctx?.run.isTest,
2232-
payloadType: payloadPacket.dataType,
2237+
payloadType: triggerPayloadPacket.dataType,
22332238
idempotencyKey: processedIdempotencyKey?.toString(),
22342239
idempotencyKeyTTL: options?.idempotencyKeyTTL,
22352240
idempotencyKeyOptions,
@@ -2470,6 +2475,10 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
24702475
const parsedPayload = parsePayload ? await parsePayload(payload) : payload;
24712476

24722477
const payloadPacket = await stringifyIO(parsedPayload);
2478+
const triggerPayloadPacket = await conditionallyExportPacket(
2479+
payloadPacket,
2480+
createTriggerPayloadPathPrefix(id)
2481+
);
24732482

24742483
// Process idempotency key and extract options for storage
24752484
const processedIdempotencyKey = await makeIdempotencyKey(options?.idempotencyKey);
@@ -2483,13 +2492,13 @@ async function triggerAndWait_internal<TIdentifier extends string, TPayload, TOu
24832492
const response = await apiClient.triggerTask(
24842493
id,
24852494
{
2486-
payload: payloadPacket.data,
2495+
payload: triggerPayloadPacket.data,
24872496
options: {
24882497
lockToVersion: taskContext.worker?.version, // Lock to current version because we're waiting for it to finish
24892498
queue: options?.queue ? { name: options.queue } : undefined,
24902499
concurrencyKey: options?.concurrencyKey,
24912500
test: taskContext.ctx?.run.isTest,
2492-
payloadType: payloadPacket.dataType,
2501+
payloadType: triggerPayloadPacket.dataType,
24932502
delay: options?.delay,
24942503
ttl: options?.ttl,
24952504
tags: options?.tags,
@@ -3074,3 +3083,8 @@ function registerTaskLifecycleHooks<
30743083
});
30753084
}
30763085
}
3086+
3087+
function createTriggerPayloadPathPrefix(taskId: string): string {
3088+
const safeTaskId = encodeURIComponent(taskId);
3089+
return `trigger/${safeTaskId}/${Date.now()}-${Math.random().toString(36).slice(2)}/payload`;
3090+
}

0 commit comments

Comments
 (0)