Skip to content

Commit 4ca9807

Browse files
matt-aitkenclaude
andcommitted
fix(webapp): address CodeRabbit review on streaming batch ingest
- Enforce positive STREAMING_BATCH_INGEST_CONCURRENCY in the env schema (.int().positive()) — p-map requires concurrency >= 1, so 0/negative would throw at runtime. - Apply the same out-of-range index guard to oversized-item markers as normal items, so an oversized item with index >= runCount returns a 4xx instead of creating a stray pre-failed run. Covered by a new test. Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
1 parent d5daeb8 commit 4ca9807

3 files changed

Lines changed: 57 additions & 1 deletion

File tree

apps/webapp/app/env.server.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -677,7 +677,7 @@ const EnvironmentSchema = z
677677
// Number of streamed batch items ingested concurrently in Phase 2. Peak
678678
// in-flight memory per request ≈ this × STREAMING_BATCH_ITEM_MAXIMUM_SIZE,
679679
// so raise with care. Set to 1 for fully sequential ingestion.
680-
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().default(10),
680+
STREAMING_BATCH_INGEST_CONCURRENCY: z.coerce.number().int().positive().default(10),
681681
BATCH_RATE_LIMIT_REFILL_RATE: z.coerce.number().int().default(100),
682682
BATCH_RATE_LIMIT_MAX: z.coerce.number().int().default(1200),
683683
BATCH_RATE_LIMIT_REFILL_INTERVAL: z.string().default("10s"),

apps/webapp/app/runEngine/services/streamBatchItems.server.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,14 @@ export class StreamBatchItemsService extends WithRunEngine {
406406
if (rawItem && typeof rawItem === "object" && "__batchItemError" in rawItem) {
407407
const marker = rawItem as OversizedItemMarker;
408408

409+
// Same out-of-range guard as normal items: an oversized item with an
410+
// out-of-range index must 4xx rather than create a stray pre-failed run.
411+
if (marker.index >= runCount) {
412+
throw new ServiceValidationError(
413+
`Item index ${marker.index} exceeds batch runCount ${runCount}`
414+
);
415+
}
416+
409417
const errorMessage = `Batch item payload is too large (${(marker.actualSize / 1024).toFixed(
410418
1
411419
)} KB). Maximum allowed size is ${(marker.maxSize / 1024).toFixed(

apps/webapp/test/engine/streamBatchItems.test.ts

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1753,6 +1753,54 @@ describe("StreamBatchItemsService", () => {
17531753
await engine.quit();
17541754
}
17551755
);
1756+
1757+
containerTest(
1758+
"rejects an oversized item whose index exceeds runCount",
1759+
async ({ prisma, redisOptions }) => {
1760+
const engine = buildEngine(prisma, redisOptions);
1761+
const environment = await setupAuthenticatedEnvironment(prisma, "PRODUCTION");
1762+
1763+
const batch = await createBatch(prisma, environment.id, {
1764+
runCount: 2,
1765+
status: "PENDING",
1766+
sealed: false,
1767+
});
1768+
1769+
await engine.initializeBatch({
1770+
batchId: batch.id,
1771+
friendlyId: batch.friendlyId,
1772+
environmentId: environment.id,
1773+
environmentType: environment.type,
1774+
organizationId: environment.organizationId,
1775+
projectId: environment.projectId,
1776+
runCount: 2,
1777+
processingConcurrency: 10,
1778+
});
1779+
1780+
const service = new StreamBatchItemsService({ prisma, engine });
1781+
1782+
// An oversized marker must hit the same out-of-range guard as a normal
1783+
// item rather than slipping through to create a stray pre-failed run.
1784+
async function* oversizedOutOfRange() {
1785+
yield {
1786+
__batchItemError: "OVERSIZED",
1787+
index: 5,
1788+
task: "test-task",
1789+
actualSize: 9999,
1790+
maxSize: 1000,
1791+
};
1792+
}
1793+
1794+
await expect(
1795+
service.call(environment, batch.friendlyId, oversizedOutOfRange(), {
1796+
maxItemBytes: 1024,
1797+
concurrency: 4,
1798+
})
1799+
).rejects.toThrow(ServiceValidationError);
1800+
1801+
await engine.quit();
1802+
}
1803+
);
17561804
});
17571805

17581806
describe("createNdjsonParserStream", () => {

0 commit comments

Comments
 (0)