Skip to content

Commit ac30f57

Browse files
Ekaterina BulatovaEkaterina Bulatova
authored andcommitted
feat(sdk,core): offload large trigger payloads via object storage
Upload oversized trigger payloads before the API request and send an application/store pointer instead of embedding large JSON in the trigger body. Validate pointer payloads in TriggerTaskRequestBody.
1 parent 8ab5b48 commit ac30f57

6 files changed

Lines changed: 341 additions & 105 deletions

File tree

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
---
2+
"@trigger.dev/core": patch
3+
"@trigger.dev/sdk": patch
4+
---
5+
6+
Offload large trigger payloads to object storage before sending the trigger API request. The SDK uploads packets at or above the existing 128KB limit and sends an `application/store` pointer instead of embedding large JSON in the request body. `TriggerTaskRequestBody` now validates that `application/store` payloads are non-empty storage paths.
7+
8+
Payload uploads use the same resolved `ApiClient` as the trigger call (including `requestOptions.clientConfig`), not only the global `apiClientManager.client` — so custom `baseURL`, access token, and preview branch apply to both presign and trigger.

packages/core/src/v3/schemas/api-type.test.ts

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import { describe, it, expect } from "vitest";
2-
import { InitializeDeploymentRequestBody } from "./api.js";
2+
import { InitializeDeploymentRequestBody, TriggerTaskRequestBody } from "./api.js";
33
import type { InitializeDeploymentRequestBody as InitializeDeploymentRequestBodyType } from "./api.js";
44

55
describe("InitializeDeploymentRequestBody", () => {
@@ -139,3 +139,41 @@ describe("InitializeDeploymentRequestBody", () => {
139139
});
140140
});
141141
});
142+
143+
describe("TriggerTaskRequestBody", () => {
144+
it("accepts application/store payload as a non-empty string", () => {
145+
const result = TriggerTaskRequestBody.safeParse({
146+
payload: "packets/payloads/file.json",
147+
context: {},
148+
options: {
149+
payloadType: "application/store",
150+
},
151+
});
152+
153+
expect(result.success).toBe(true);
154+
});
155+
156+
it("rejects application/store payload when payload is not a string", () => {
157+
const result = TriggerTaskRequestBody.safeParse({
158+
payload: { foo: "bar" },
159+
context: {},
160+
options: {
161+
payloadType: "application/store",
162+
},
163+
});
164+
165+
expect(result.success).toBe(false);
166+
});
167+
168+
it("rejects application/store payload when payload is an empty string", () => {
169+
const result = TriggerTaskRequestBody.safeParse({
170+
payload: "",
171+
context: {},
172+
options: {
173+
payloadType: "application/store",
174+
},
175+
});
176+
177+
expect(result.success).toBe(false);
178+
});
179+
});

packages/core/src/v3/schemas/api.ts

Lines changed: 95 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -161,83 +161,95 @@ export type IdempotencyKeyOptionsSchema = z.infer<typeof IdempotencyKeyOptionsSc
161161
// column is String?, so passing a number (a common foot-gun when callers do
162162
// `concurrencyKey: payload.userId`) used to fail at `prisma.taskRun.create`
163163
// with PrismaClientValidationError. Accept the intent and stringify here.
164-
const ConcurrencyKeySchema = z
165-
.union([z.string(), z.number()])
166-
.transform((value) => String(value));
164+
const ConcurrencyKeySchema = z.union([z.string(), z.number()]).transform((value) => String(value));
167165

168-
export const TriggerTaskRequestBody = z.object({
169-
payload: z.any(),
170-
context: z.any(),
171-
options: z
172-
.object({
173-
/** @deprecated engine v1 only */
174-
dependentAttempt: z.string().optional(),
175-
/** @deprecated engine v1 only */
176-
parentAttempt: z.string().optional(),
177-
/** @deprecated engine v1 only */
178-
dependentBatch: z.string().optional(),
179-
/**
180-
* If triggered in a batch, this is the BatchTaskRun id
181-
*/
182-
parentBatch: z.string().optional(),
183-
/**
184-
* RunEngine v2
185-
* If triggered inside another run, the parentRunId is the friendly ID of the parent run.
186-
*/
187-
parentRunId: z.string().optional(),
188-
/**
189-
* RunEngine v2
190-
* Should be `true` if `triggerAndWait` or `batchTriggerAndWait`
191-
*/
192-
resumeParentOnCompletion: z.boolean().optional(),
193-
/**
194-
* Locks the version to the passed value.
195-
* Automatically set when using `triggerAndWait` or `batchTriggerAndWait`
196-
*/
197-
lockToVersion: z.string().optional(),
166+
export const TriggerTaskRequestBody = z
167+
.object({
168+
payload: z.any(),
169+
context: z.any(),
170+
options: z
171+
.object({
172+
/** @deprecated engine v1 only */
173+
dependentAttempt: z.string().optional(),
174+
/** @deprecated engine v1 only */
175+
parentAttempt: z.string().optional(),
176+
/** @deprecated engine v1 only */
177+
dependentBatch: z.string().optional(),
178+
/**
179+
* If triggered in a batch, this is the BatchTaskRun id
180+
*/
181+
parentBatch: z.string().optional(),
182+
/**
183+
* RunEngine v2
184+
* If triggered inside another run, the parentRunId is the friendly ID of the parent run.
185+
*/
186+
parentRunId: z.string().optional(),
187+
/**
188+
* RunEngine v2
189+
* Should be `true` if `triggerAndWait` or `batchTriggerAndWait`
190+
*/
191+
resumeParentOnCompletion: z.boolean().optional(),
192+
/**
193+
* Locks the version to the passed value.
194+
* Automatically set when using `triggerAndWait` or `batchTriggerAndWait`
195+
*/
196+
lockToVersion: z.string().optional(),
197+
198+
queue: z
199+
.object({
200+
name: z.string(),
201+
// @deprecated, this is now specified on the queue
202+
concurrencyLimit: z.number().int().optional(),
203+
})
204+
.optional(),
205+
concurrencyKey: ConcurrencyKeySchema.optional(),
206+
delay: z.string().or(z.coerce.date()).optional(),
207+
idempotencyKey: z
208+
.string()
209+
// Caps user-supplied keys before they reach the unique idempotency index
210+
// on the underlying table — values past this fail at the database layer
211+
// rather than returning a clean 400.
212+
.max(2048, "idempotencyKey must be 2048 characters or less")
213+
.optional(),
214+
idempotencyKeyTTL: z.string().optional(),
215+
/** The original user-provided idempotency key and scope */
216+
idempotencyKeyOptions: IdempotencyKeyOptionsSchema.optional(),
217+
machine: MachinePresetName.optional(),
218+
maxAttempts: z.number().int().optional(),
219+
maxDuration: z.number().optional(),
220+
metadata: z.any(),
221+
metadataType: z.string().optional(),
222+
payloadType: z.string().optional(),
223+
tags: RunTags.optional(),
224+
test: z.boolean().optional(),
225+
ttl: z.string().or(z.number().nonnegative().int()).optional(),
226+
priority: z.number().optional(),
227+
bulkActionId: z.string().optional(),
228+
region: z.string().optional(),
229+
debounce: z
230+
.object({
231+
key: z.string().max(512),
232+
delay: z.string(),
233+
mode: z.enum(["leading", "trailing"]).optional(),
234+
maxDelay: z.string().optional(),
235+
})
236+
.optional(),
237+
})
238+
.optional(),
239+
})
240+
.superRefine((value, ctx) => {
241+
if (value.options?.payloadType !== "application/store") {
242+
return;
243+
}
198244

199-
queue: z
200-
.object({
201-
name: z.string(),
202-
// @deprecated, this is now specified on the queue
203-
concurrencyLimit: z.number().int().optional(),
204-
})
205-
.optional(),
206-
concurrencyKey: ConcurrencyKeySchema.optional(),
207-
delay: z.string().or(z.coerce.date()).optional(),
208-
idempotencyKey: z
209-
.string()
210-
// Caps user-supplied keys before they reach the unique idempotency index
211-
// on the underlying table — values past this fail at the database layer
212-
// rather than returning a clean 400.
213-
.max(2048, "idempotencyKey must be 2048 characters or less")
214-
.optional(),
215-
idempotencyKeyTTL: z.string().optional(),
216-
/** The original user-provided idempotency key and scope */
217-
idempotencyKeyOptions: IdempotencyKeyOptionsSchema.optional(),
218-
machine: MachinePresetName.optional(),
219-
maxAttempts: z.number().int().optional(),
220-
maxDuration: z.number().optional(),
221-
metadata: z.any(),
222-
metadataType: z.string().optional(),
223-
payloadType: z.string().optional(),
224-
tags: RunTags.optional(),
225-
test: z.boolean().optional(),
226-
ttl: z.string().or(z.number().nonnegative().int()).optional(),
227-
priority: z.number().optional(),
228-
bulkActionId: z.string().optional(),
229-
region: z.string().optional(),
230-
debounce: z
231-
.object({
232-
key: z.string().max(512),
233-
delay: z.string(),
234-
mode: z.enum(["leading", "trailing"]).optional(),
235-
maxDelay: z.string().optional(),
236-
})
237-
.optional(),
238-
})
239-
.optional(),
240-
});
245+
if (typeof value.payload !== "string" || value.payload.length === 0) {
246+
ctx.addIssue({
247+
code: z.ZodIssueCode.custom,
248+
message: "payload must be a non-empty string when options.payloadType is application/store",
249+
path: ["payload"],
250+
});
251+
}
252+
});
241253

242254
export type TriggerTaskRequestBody = z.infer<typeof TriggerTaskRequestBody>;
243255

@@ -1658,9 +1670,7 @@ export const EndAndContinueSessionResponseBody = z.object({
16581670
*/
16591671
swapped: z.boolean(),
16601672
});
1661-
export type EndAndContinueSessionResponseBody = z.infer<
1662-
typeof EndAndContinueSessionResponseBody
1663-
>;
1673+
export type EndAndContinueSessionResponseBody = z.infer<typeof EndAndContinueSessionResponseBody>;
16641674

16651675
export const UpdateSessionRequestBody = z.object({
16661676
tags: z.array(z.string().max(128)).max(10).optional(),
@@ -1712,13 +1722,10 @@ export const ListSessionsQueryParams = z
17121722
"filter[createdAt][from]": z.coerce.number().int().optional(),
17131723
"filter[createdAt][to]": z.coerce.number().int().optional(),
17141724
})
1715-
.refine(
1716-
(value) => !(value["page[after]"] && value["page[before]"]),
1717-
{
1718-
message: "Cannot pass both page[after] and page[before] on the same request",
1719-
path: ["page[before]"],
1720-
}
1721-
);
1725+
.refine((value) => !(value["page[after]"] && value["page[before]"]), {
1726+
message: "Cannot pass both page[after] and page[before] on the same request",
1727+
path: ["page[before]"],
1728+
});
17221729
export type ListSessionsQueryParams = z.infer<typeof ListSessionsQueryParams>;
17231730

17241731
/**
@@ -2111,7 +2118,9 @@ export type UpdatePromptOverrideRequestBody = z.infer<typeof UpdatePromptOverrid
21112118
export const ReactivatePromptOverrideRequestBody = z.object({
21122119
version: z.number().int().positive(),
21132120
});
2114-
export type ReactivatePromptOverrideRequestBody = z.infer<typeof ReactivatePromptOverrideRequestBody>;
2121+
export type ReactivatePromptOverrideRequestBody = z.infer<
2122+
typeof ReactivatePromptOverrideRequestBody
2123+
>;
21152124

21162125
export const PromptOkResponseBody = z.object({ ok: z.boolean() });
21172126
export type PromptOkResponseBody = z.infer<typeof PromptOkResponseBody>;

packages/core/src/v3/utils/ioSerialization.ts

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -100,22 +100,33 @@ export async function stringifyIO(value: any): Promise<IOPacket> {
100100
}
101101
}
102102

103+
/**
104+
* Offloads a packet to object storage when it exceeds the size limit.
105+
*
106+
* @param client - Optional API client to use for the upload presign request. When
107+
* omitted, falls back to the global `apiClientManager.client`. Pass an explicit client
108+
* (e.g. one built from a custom `clientConfig`) to ensure the payload is uploaded using
109+
* the same configuration as the accompanying trigger API call.
110+
*/
103111
export async function conditionallyExportPacket(
104112
packet: IOPacket,
105113
pathPrefix: string,
106-
tracer?: TriggerTracer
114+
tracer?: TriggerTracer,
115+
client?: ApiClient
107116
): Promise<IOPacket> {
108-
if (apiClientManager.client) {
117+
const $client = client ?? apiClientManager.client;
118+
119+
if ($client) {
109120
const { needsOffloading, size } = packetRequiresOffloading(packet);
110121

111122
if (needsOffloading) {
112123
if (!tracer) {
113-
return await exportPacket(packet, pathPrefix);
124+
return await exportPacket(packet, pathPrefix, $client);
114125
} else {
115126
const result = await tracer.startActiveSpan(
116127
"store.uploadOutput",
117128
async (span) => {
118-
return await exportPacket(packet, pathPrefix);
129+
return await exportPacket(packet, pathPrefix, $client);
119130
},
120131
{
121132
attributes: {
@@ -163,11 +174,15 @@ const ioRetryOptions = {
163174
randomize: true,
164175
} satisfies RetryOptions;
165176

166-
async function exportPacket(packet: IOPacket, pathPrefix: string): Promise<IOPacket> {
177+
async function exportPacket(
178+
packet: IOPacket,
179+
pathPrefix: string,
180+
client: ApiClient
181+
): Promise<IOPacket> {
167182
// Offload the output
168183
const filename = `${pathPrefix}.${getPacketExtension(packet.dataType)}`;
169184

170-
const presignedResponse = await apiClientManager.client!.createUploadPayloadUrl(filename);
185+
const presignedResponse = await client.createUploadPayloadUrl(filename);
171186

172187
if (!presignedResponse.storagePath) {
173188
throw new Error(

0 commit comments

Comments
 (0)