44} from "@trigger.dev/core/v3" ;
55import { BatchId } from "@trigger.dev/core/v3/isomorphic" ;
66import type { BatchItem , RunEngine } from "@internal/run-engine" ;
7+ import pMap from "p-map" ;
78import type { BatchTaskRunStatus } from "@trigger.dev/database" ;
89import { prisma , type PrismaClientOrTransaction } from "~/db.server" ;
910import type { AuthenticatedEnvironment } from "~/services/apiAuth.server" ;
@@ -55,6 +56,8 @@ export function isIdempotentRetrySuccess(
5556
5657export type StreamBatchItemsServiceOptions = {
5758 maxItemBytes : number ;
59+ /** Max items processed concurrently. The route wires this to STREAMING_BATCH_INGEST_CONCURRENCY. */
60+ concurrency : number ;
5861} ;
5962
6063export type OversizedItemMarker = {
@@ -68,6 +71,8 @@ export type OversizedItemMarker = {
6871export type StreamBatchItemsServiceConstructorOptions = {
6972 prisma ?: PrismaClientOrTransaction ;
7073 engine ?: RunEngine ;
74+ /** Override the payload processor (used in tests to observe ingest concurrency). */
75+ payloadProcessor ?: BatchPayloadProcessor ;
7176} ;
7277
7378/**
@@ -88,7 +93,7 @@ export class StreamBatchItemsService extends WithRunEngine {
8893
8994 constructor ( opts : StreamBatchItemsServiceConstructorOptions = { } ) {
9095 super ( { prisma : opts . prisma ?? prisma , engine : opts . engine } ) ;
91- this . payloadProcessor = new BatchPayloadProcessor ( ) ;
96+ this . payloadProcessor = opts . payloadProcessor ?? new BatchPayloadProcessor ( ) ;
9297 }
9398
9499 /**
@@ -170,94 +175,28 @@ export class StreamBatchItemsService extends WithRunEngine {
170175 ) ;
171176 }
172177
178+ // Process items from the stream with bounded concurrency.
179+ //
180+ // Ordering and idempotency do NOT depend on processing order:
181+ // - The BatchQueue derives run order from each item's index
182+ // (enqueue timestamp = batch.createdAt + itemIndex), not enqueue order.
183+ // - enqueueBatchItem() dedups atomically per index.
184+ // We cap concurrency to bound peak in-flight memory (≈ concurrency ×
185+ // maxItemBytes) and to keep backpressure on the request body stream.
186+ // p-map pulls lazily from the async iterator — at most `concurrency`
187+ // items are read and in flight at once. stopOnError aborts ingestion on
188+ // the first failure (the batch is left unsealed; the SDK's retry
189+ // re-streams and dedups already-enqueued items).
190+ const outcomes = await pMap (
191+ itemsIterator ,
192+ ( rawItem ) => this . #processItem( rawItem , batchId , environment , batch . runCount ) ,
193+ { concurrency : options . concurrency , stopOnError : true }
194+ ) ;
195+
173196 let itemsAccepted = 0 ;
174197 let itemsDeduplicated = 0 ;
175- let lastIndex = - 1 ;
176-
177- // Process items from the stream
178- for await ( const rawItem of itemsIterator ) {
179- // Check for oversized item markers from the NDJSON parser
180- if ( rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem ) {
181- const marker = rawItem as OversizedItemMarker ;
182- const itemIndex = marker . index >= 0 ? marker . index : lastIndex + 1 ;
183-
184- const errorMessage = `Batch item payload is too large (${ ( marker . actualSize / 1024 ) . toFixed ( 1 ) } KB). Maximum allowed size is ${ ( marker . maxSize / 1024 ) . toFixed ( 1 ) } KB. Reduce the payload size or offload large data to external storage.` ;
185-
186- // Enqueue with __error metadata - processItemCallback will detect this
187- // and use TriggerFailedTaskService to create a pre-failed run
188- const batchItem : BatchItem = {
189- task : marker . task ,
190- payload : "{}" ,
191- payloadType : "application/json" ,
192- options : {
193- __error : errorMessage ,
194- __errorCode : "PAYLOAD_TOO_LARGE" ,
195- } ,
196- } ;
197-
198- const result = await this . _engine . enqueueBatchItem (
199- batchId ,
200- environment . id ,
201- itemIndex ,
202- batchItem
203- ) ;
204-
205- if ( result . enqueued ) {
206- itemsAccepted ++ ;
207- } else {
208- itemsDeduplicated ++ ;
209- }
210- lastIndex = itemIndex ;
211- continue ;
212- }
213-
214- // Parse and validate the item
215- const parseResult = BatchItemNDJSONSchema . safeParse ( rawItem ) ;
216- if ( ! parseResult . success ) {
217- throw new ServiceValidationError (
218- `Invalid item at index ${ lastIndex + 1 } : ${ parseResult . error . message } `
219- ) ;
220- }
221-
222- const item = parseResult . data ;
223- lastIndex = item . index ;
224-
225- // Validate index is within expected range
226- if ( item . index >= batch . runCount ) {
227- throw new ServiceValidationError (
228- `Item index ${ item . index } exceeds batch runCount ${ batch . runCount } `
229- ) ;
230- }
231-
232- // Get the original payload type
233- const originalPayloadType = ( item . options ?. payloadType as string ) ?? "application/json" ;
234-
235- // Process payload - offload to R2 if it exceeds threshold
236- const processedPayload = await this . payloadProcessor . process (
237- item . payload ,
238- originalPayloadType ,
239- batchId ,
240- item . index ,
241- environment
242- ) ;
243-
244- // Convert to BatchItem format with potentially offloaded payload
245- const batchItem : BatchItem = {
246- task : item . task ,
247- payload : processedPayload . payload ,
248- payloadType : processedPayload . payloadType ,
249- options : item . options ,
250- } ;
251-
252- // Enqueue the item
253- const result = await this . _engine . enqueueBatchItem (
254- batchId ,
255- environment . id ,
256- item . index ,
257- batchItem
258- ) ;
259-
260- if ( result . enqueued ) {
198+ for ( const outcome of outcomes ) {
199+ if ( outcome === "accepted" ) {
261200 itemsAccepted ++ ;
262201 } else {
263202 itemsDeduplicated ++ ;
@@ -446,6 +385,104 @@ export class StreamBatchItemsService extends WithRunEngine {
446385 }
447386 ) ;
448387 }
388+
389+ /**
390+ * Process a single streamed batch item: validate it, offload its payload to
391+ * object storage if oversized, and enqueue it. Returns whether the item was
392+ * newly enqueued ("accepted") or was a duplicate ("deduplicated"). Throws
393+ * ServiceValidationError for invalid items, which aborts the stream.
394+ *
395+ * Safe to run concurrently: enqueueBatchItem() is atomic and order-independent
396+ * per item index, and each item carries its own index (real items from the
397+ * SDK; oversized markers are stamped by the NDJSON parser).
398+ */
399+ async #processItem(
400+ rawItem : unknown ,
401+ batchId : string ,
402+ environment : AuthenticatedEnvironment ,
403+ runCount : number
404+ ) : Promise < "accepted" | "deduplicated" > {
405+ // Oversized item marker emitted by the NDJSON parser
406+ if ( rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem ) {
407+ const marker = rawItem as OversizedItemMarker ;
408+
409+ const errorMessage = `Batch item payload is too large (${ ( marker . actualSize / 1024 ) . toFixed (
410+ 1
411+ ) } KB). Maximum allowed size is ${ ( marker . maxSize / 1024 ) . toFixed (
412+ 1
413+ ) } KB. Reduce the payload size or offload large data to external storage.`;
414+
415+ // Enqueue with __error metadata - processItemCallback will detect this
416+ // and use TriggerFailedTaskService to create a pre-failed run
417+ const batchItem : BatchItem = {
418+ task : marker . task ,
419+ payload : "{}" ,
420+ payloadType : "application/json" ,
421+ options : {
422+ __error : errorMessage ,
423+ __errorCode : "PAYLOAD_TOO_LARGE" ,
424+ } ,
425+ } ;
426+
427+ const result = await this . _engine . enqueueBatchItem (
428+ batchId ,
429+ environment . id ,
430+ marker . index ,
431+ batchItem
432+ ) ;
433+
434+ return result . enqueued ? "accepted" : "deduplicated" ;
435+ }
436+
437+ // Parse and validate the item
438+ const parseResult = BatchItemNDJSONSchema . safeParse ( rawItem ) ;
439+ if ( ! parseResult . success ) {
440+ const rawIndex = ( rawItem as { index ?: unknown } | null ) ?. index ;
441+ const where = typeof rawIndex === "number" ? `index ${ rawIndex } ` : "unknown index" ;
442+ throw new ServiceValidationError (
443+ `Invalid item at ${ where } : ${ parseResult . error . message } `
444+ ) ;
445+ }
446+
447+ const item = parseResult . data ;
448+
449+ // Validate index is within expected range
450+ if ( item . index >= runCount ) {
451+ throw new ServiceValidationError (
452+ `Item index ${ item . index } exceeds batch runCount ${ runCount } `
453+ ) ;
454+ }
455+
456+ // Get the original payload type
457+ const originalPayloadType = ( item . options ?. payloadType as string ) ?? "application/json" ;
458+
459+ // Process payload - offload to object storage if it exceeds threshold
460+ const processedPayload = await this . payloadProcessor . process (
461+ item . payload ,
462+ originalPayloadType ,
463+ batchId ,
464+ item . index ,
465+ environment
466+ ) ;
467+
468+ // Convert to BatchItem format with potentially offloaded payload
469+ const batchItem : BatchItem = {
470+ task : item . task ,
471+ payload : processedPayload . payload ,
472+ payloadType : processedPayload . payloadType ,
473+ options : item . options ,
474+ } ;
475+
476+ // Enqueue the item
477+ const result = await this . _engine . enqueueBatchItem (
478+ batchId ,
479+ environment . id ,
480+ item . index ,
481+ batchItem
482+ ) ;
483+
484+ return result . enqueued ? "accepted" : "deduplicated" ;
485+ }
449486}
450487
451488/**
@@ -587,12 +624,29 @@ export function createNdjsonParserStream(
587624 let chunks : Uint8Array [ ] = [ ] ;
588625 let totalBytes = 0 ;
589626 let lineNumber = 0 ;
627+ // 0-based position of the next object we emit (parsed item or oversized
628+ // marker). The parser is the single sequential point in the pipeline, so this
629+ // is the authoritative source of item ordering — downstream consumers can
630+ // process items concurrently and must not rely on processing order to derive
631+ // an item's index. Used to back-fill an oversized marker's index when it
632+ // couldn't be extracted from the (truncated) raw bytes.
633+ let emittedCount = 0 ;
590634 // When an oversized incomplete line is detected (Case 2), we must discard
591635 // all remaining bytes of that line until the next newline delimiter.
592636 let skipUntilNewline = false ;
593637
594638 const NEWLINE_BYTE = 0x0a ; // '\n'
595639
640+ /**
641+ * Emit a parsed object or marker downstream and advance the emit position.
642+ * Every emitted object MUST go through here so `emittedCount` stays aligned
643+ * with item position (empty/skipped lines never emit, so they don't count).
644+ */
645+ function emit ( controller : TransformStreamDefaultController < unknown > , obj : unknown ) : void {
646+ controller . enqueue ( obj ) ;
647+ emittedCount ++ ;
648+ }
649+
596650 /**
597651 * Concatenate all chunks into a single Uint8Array
598652 */
@@ -675,7 +729,7 @@ export function createNdjsonParserStream(
675729
676730 try {
677731 const obj = JSON . parse ( trimmed ) ;
678- controller . enqueue ( obj ) ;
732+ emit ( controller , obj ) ;
679733 } catch ( err ) {
680734 throw new Error ( `Invalid JSON at line ${ lineNumber } : ${ ( err as Error ) . message } ` ) ;
681735 }
@@ -715,12 +769,12 @@ export function createNdjsonParserStream(
715769 const extracted = extractIndexAndTask ( lineBytes ) ;
716770 const marker : OversizedItemMarker = {
717771 __batchItemError : "OVERSIZED" ,
718- index : extracted . index ,
772+ index : extracted . index >= 0 ? extracted . index : emittedCount ,
719773 task : extracted . task ,
720774 actualSize : newlineIndex ,
721775 maxSize : maxItemBytes ,
722776 } ;
723- controller . enqueue ( marker ) ;
777+ emit ( controller , marker ) ;
724778 lineNumber ++ ;
725779 continue ;
726780 }
@@ -736,12 +790,12 @@ export function createNdjsonParserStream(
736790 const extracted = extractIndexAndTask ( concatenateChunks ( ) ) ;
737791 const marker : OversizedItemMarker = {
738792 __batchItemError : "OVERSIZED" ,
739- index : extracted . index ,
793+ index : extracted . index >= 0 ? extracted . index : emittedCount ,
740794 task : extracted . task ,
741795 actualSize : totalBytes ,
742796 maxSize : maxItemBytes ,
743797 } ;
744- controller . enqueue ( marker ) ;
798+ emit ( controller , marker ) ;
745799 lineNumber ++ ;
746800 // Clear buffer and skip remaining bytes of this oversized line
747801 // until the next newline delimiter is found in a subsequent chunk
@@ -768,12 +822,12 @@ export function createNdjsonParserStream(
768822 const extracted = extractIndexAndTask ( concatenateChunks ( ) ) ;
769823 const marker : OversizedItemMarker = {
770824 __batchItemError : "OVERSIZED" ,
771- index : extracted . index ,
825+ index : extracted . index >= 0 ? extracted . index : emittedCount ,
772826 task : extracted . task ,
773827 actualSize : totalBytes ,
774828 maxSize : maxItemBytes ,
775829 } ;
776- controller . enqueue ( marker ) ;
830+ emit ( controller , marker ) ;
777831 return ;
778832 }
779833
0 commit comments