Skip to content

Commit 4ca7651

Browse files
authored
improvement(knowledge): eliminate N+1 on tag definitions in bulk upload (#4681)
* improvement(knowledge): eliminate N+1 on tag definitions in bulk upload createDocumentRecords previously called processDocumentTags per-doc, each running a SELECT against knowledge_base_tag_definitions — N queries that all returned the same kbId-scoped rows. Worse, those reads used the global db pool while the tx held a FOR UPDATE lock on the KB row, risking pool contention on large bulk uploads. Split the helper into loadTagDefinitions (single query, accepts the tx as executor) and resolveDocumentTags (pure, takes the pre-loaded Map). The bulk path loads once inside the transaction; createSingleDocument loads once outside its tx. Same throw-on-validation-error semantics preserved. * improvement(knowledge): fold processDocumentAsync prefetches into one JOIN processDocumentAsync was issuing three separate SELECTs per processed document: knowledge_base (config), workspace (billing settings), and document (tag values). For a typical Trigger.dev fleet processing thousands of docs, that's thousands of redundant pool checkouts. Collapsed into a single JOIN at the top of processDocumentAsync that fetches kb config + billed account user + document tag values in one roundtrip. The post-embedding tag SELECT (which previously held tags through the full embedding-generation wait) is gone; tags from the initial prefetch are reused. Behavior: - Missing/archived/deleted document or KB → same 'failed' status outcome as before, single consolidated error message. - Missing billed account → preserves existing error. - All 208 KB tests pass (test mock extended for innerJoin/leftJoin). * improvement(knowledge): skip tag-definitions load when no doc carries tags Trim verbose comments in the same pass. * lint
1 parent b6679a9 commit 4ca7651

2 files changed

Lines changed: 108 additions & 66 deletions

File tree

apps/sim/app/api/knowledge/utils.test.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,32 @@ vi.mock('@sim/db', async () => {
116116
},
117117
}
118118
},
119+
innerJoin() {
120+
// document × knowledge_base context JOIN — return the first kb and
121+
// doc row merged (covers processDocumentAsync's prefetch).
122+
return {
123+
leftJoin: () => ({
124+
where: () => ({
125+
limit: (n: number) =>
126+
Promise.resolve(
127+
kbRows.length > 0 && docRows.length > 0
128+
? [
129+
{ ...kbRows[0], ...docRows[0], billedAccountUserId: 'billing-user-1' },
130+
].slice(0, n)
131+
: []
132+
),
133+
}),
134+
}),
135+
where: () => ({
136+
limit: (n: number) =>
137+
Promise.resolve(
138+
kbRows.length > 0 && docRows.length > 0
139+
? [{ ...kbRows[0], ...docRows[0] }].slice(0, n)
140+
: []
141+
),
142+
}),
143+
}
144+
},
119145
}
120146
},
121147
}

apps/sim/lib/knowledge/documents/service.ts

Lines changed: 82 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
knowledgeBase,
66
knowledgeBaseTagDefinitions,
77
knowledgeConnector,
8+
workspace as workspaceTable,
89
} from '@sim/db/schema'
910
import { createLogger } from '@sim/logger'
1011
import { sha256Hex } from '@sim/security/hash'
@@ -47,7 +48,6 @@ import type { ProcessedDocumentTags } from '@/lib/knowledge/types'
4748
import { estimateTokenCount } from '@/lib/tokenization/estimators'
4849
import { deleteFile } from '@/lib/uploads/core/storage-service'
4950
import { extractStorageKey } from '@/lib/uploads/utils/file-utils'
50-
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
5151
import type {
5252
DocumentProcessingPayload,
5353
processDocument as processDocumentTask,
@@ -111,11 +111,26 @@ interface DocumentTagData {
111111
value: string
112112
}
113113

114-
async function processDocumentTags(
114+
type TagDefinition = typeof knowledgeBaseTagDefinitions.$inferSelect
115+
type TagDefinitionsByName = Map<string, TagDefinition>
116+
type DbExecutor = Pick<typeof db, 'select'>
117+
118+
async function loadTagDefinitions(
115119
knowledgeBaseId: string,
120+
executor: DbExecutor = db
121+
): Promise<TagDefinitionsByName> {
122+
const defs = await executor
123+
.select()
124+
.from(knowledgeBaseTagDefinitions)
125+
.where(eq(knowledgeBaseTagDefinitions.knowledgeBaseId, knowledgeBaseId))
126+
return new Map(defs.map((def) => [def.displayName, def]))
127+
}
128+
129+
function resolveDocumentTags(
116130
tagData: DocumentTagData[],
131+
tagDefinitions: TagDefinitionsByName,
117132
requestId: string
118-
): Promise<ProcessedDocumentTags> {
133+
): ProcessedDocumentTags {
119134
const setTagValue = (
120135
tags: ProcessedDocumentTags,
121136
slot: string,
@@ -200,13 +215,6 @@ async function processDocumentTags(
200215
return result
201216
}
202217

203-
const existingDefinitions = await db
204-
.select()
205-
.from(knowledgeBaseTagDefinitions)
206-
.where(eq(knowledgeBaseTagDefinitions.knowledgeBaseId, knowledgeBaseId))
207-
208-
const existingByName = new Map(existingDefinitions.map((def) => [def.displayName, def]))
209-
210218
const undefinedTags: string[] = []
211219
const typeErrors: string[] = []
212220

@@ -223,7 +231,7 @@ async function processDocumentTags(
223231

224232
if (!hasValue) continue
225233

226-
const existingDef = existingByName.get(tagName)
234+
const existingDef = tagDefinitions.get(tagName)
227235
if (!existingDef) {
228236
undefinedTags.push(tagName)
229237
continue
@@ -264,7 +272,7 @@ async function processDocumentTags(
264272

265273
if (!hasValue) continue
266274

267-
const existingDef = existingByName.get(tagName)
275+
const existingDef = tagDefinitions.get(tagName)
268276
if (!existingDef) continue
269277

270278
const targetSlot = existingDef.tagSlot
@@ -418,34 +426,66 @@ export async function processDocumentAsync(
418426
try {
419427
logger.info(`[${documentId}] Starting document processing: ${docData.filename}`)
420428

421-
const kb = await db
429+
// KB config + workspace billing + doc tags in one JOIN (was 3 SELECTs).
430+
const contextRows = await db
422431
.select({
423432
userId: knowledgeBase.userId,
424433
workspaceId: knowledgeBase.workspaceId,
425434
chunkingConfig: knowledgeBase.chunkingConfig,
426435
embeddingModel: knowledgeBase.embeddingModel,
436+
billedAccountUserId: workspaceTable.billedAccountUserId,
437+
tag1: document.tag1,
438+
tag2: document.tag2,
439+
tag3: document.tag3,
440+
tag4: document.tag4,
441+
tag5: document.tag5,
442+
tag6: document.tag6,
443+
tag7: document.tag7,
444+
number1: document.number1,
445+
number2: document.number2,
446+
number3: document.number3,
447+
number4: document.number4,
448+
number5: document.number5,
449+
date1: document.date1,
450+
date2: document.date2,
451+
boolean1: document.boolean1,
452+
boolean2: document.boolean2,
453+
boolean3: document.boolean3,
427454
})
428-
.from(knowledgeBase)
429-
.where(and(eq(knowledgeBase.id, knowledgeBaseId), isNull(knowledgeBase.deletedAt)))
455+
.from(document)
456+
.innerJoin(knowledgeBase, eq(knowledgeBase.id, document.knowledgeBaseId))
457+
.leftJoin(
458+
workspaceTable,
459+
and(eq(workspaceTable.id, knowledgeBase.workspaceId), isNull(workspaceTable.archivedAt))
460+
)
461+
.where(
462+
and(
463+
eq(document.id, documentId),
464+
eq(knowledgeBase.id, knowledgeBaseId),
465+
isNull(document.archivedAt),
466+
isNull(document.deletedAt),
467+
isNull(knowledgeBase.deletedAt)
468+
)
469+
)
430470
.limit(1)
431471

432-
if (kb.length === 0) {
472+
if (contextRows.length === 0) {
433473
logger.warn(
434-
`[${documentId}] Skipping document processing: knowledge base ${knowledgeBaseId} is deleted`
474+
`[${documentId}] Skipping document processing: document or knowledge base ${knowledgeBaseId} no longer exists`
435475
)
436476
await db
437477
.update(document)
438478
.set({
439479
processingStatus: 'failed',
440-
processingError: 'Knowledge base deleted',
480+
processingError: 'Document or knowledge base no longer exists',
441481
processingCompletedAt: new Date(),
442482
})
443-
.where(
444-
and(eq(document.id, documentId), isNull(document.archivedAt), isNull(document.deletedAt))
445-
)
483+
.where(eq(document.id, documentId))
446484
return
447485
}
448486

487+
const ctx = contextRows[0]
488+
449489
await db
450490
.update(document)
451491
.set({
@@ -460,7 +500,7 @@ export async function processDocumentAsync(
460500

461501
logger.info(`[${documentId}] Status updated to 'processing', starting document processor`)
462502

463-
const rawConfig = kb[0].chunkingConfig as {
503+
const rawConfig = ctx.chunkingConfig as {
464504
maxSize?: number
465505
minSize?: number
466506
overlap?: number
@@ -473,13 +513,13 @@ export async function processDocumentAsync(
473513
overlap: rawConfig?.overlap ?? 200,
474514
}
475515

476-
const kbEmbeddingModel = kb[0].embeddingModel
477-
if (!kb[0].workspaceId) {
516+
const kbEmbeddingModel = ctx.embeddingModel
517+
if (!ctx.workspaceId) {
478518
throw new Error(`Knowledge base ${knowledgeBaseId} is missing workspace billing context`)
479519
}
480-
const billingUserId = await getWorkspaceBilledAccountUserId(kb[0].workspaceId)
520+
const billingUserId = ctx.billedAccountUserId
481521
if (!billingUserId) {
482-
throw new Error(`Workspace ${kb[0].workspaceId} is missing billed account`)
522+
throw new Error(`Workspace ${ctx.workspaceId} is missing billed account`)
483523
}
484524
let totalEmbeddingTokens = 0
485525
let embeddingIsBYOK = false
@@ -495,8 +535,8 @@ export async function processDocumentAsync(
495535
kbConfig.maxSize,
496536
kbConfig.overlap,
497537
kbConfig.minSize,
498-
kb[0].userId,
499-
kb[0].workspaceId,
538+
ctx.userId,
539+
ctx.workspaceId,
500540
rawConfig?.strategy,
501541
rawConfig?.strategyOptions
502542
)
@@ -534,7 +574,7 @@ export async function processDocumentAsync(
534574
isBYOK,
535575
modelName,
536576
pricingId,
537-
} = await generateEmbeddings(batch, kbEmbeddingModel, kb[0].workspaceId)
577+
} = await generateEmbeddings(batch, kbEmbeddingModel, ctx.workspaceId)
538578
for (const emb of batchEmbeddings) {
539579
embeddings.push(emb)
540580
}
@@ -547,41 +587,10 @@ export async function processDocumentAsync(
547587
}
548588
}
549589

550-
logger.info(`[${documentId}] Embeddings generated, fetching document tags`)
551-
552-
const documentRecord = await db
553-
.select({
554-
tag1: document.tag1,
555-
tag2: document.tag2,
556-
tag3: document.tag3,
557-
tag4: document.tag4,
558-
tag5: document.tag5,
559-
tag6: document.tag6,
560-
tag7: document.tag7,
561-
number1: document.number1,
562-
number2: document.number2,
563-
number3: document.number3,
564-
number4: document.number4,
565-
number5: document.number5,
566-
date1: document.date1,
567-
date2: document.date2,
568-
boolean1: document.boolean1,
569-
boolean2: document.boolean2,
570-
boolean3: document.boolean3,
571-
})
572-
.from(document)
573-
.where(
574-
and(
575-
eq(document.id, documentId),
576-
isNull(document.archivedAt),
577-
isNull(document.deletedAt)
578-
)
579-
)
580-
.limit(1)
581-
582-
const documentTags = documentRecord[0] || {}
590+
// Tag values prefetched above; reuse for the embedding rows.
591+
const documentTags = ctx
583592

584-
logger.info(`[${documentId}] Creating embedding records with tags`)
593+
logger.info(`[${documentId}] Embeddings generated, creating embedding records with tags`)
585594

586595
const tokenizerProvider = getEmbeddingModelInfo(kbEmbeddingModel).tokenizerProvider
587596

@@ -686,7 +695,7 @@ export async function processDocumentAsync(
686695
if (cost > 0) {
687696
await recordUsage({
688697
userId: billingUserId,
689-
workspaceId: kb[0].workspaceId ?? undefined,
698+
workspaceId: ctx.workspaceId ?? undefined,
690699
entries: [
691700
{
692701
category: 'model',
@@ -770,6 +779,12 @@ export async function createDocumentRecords(
770779
throw new Error('Knowledge base not found')
771780
}
772781

782+
// One load per batch (was N+1); skip entirely if no doc carries tags.
783+
const hasTaggedDocs = documents.some((d) => d.documentTagsData)
784+
const tagDefinitions = hasTaggedDocs
785+
? await loadTagDefinitions(knowledgeBaseId, tx)
786+
: (new Map() as TagDefinitionsByName)
787+
773788
const now = new Date()
774789
const documentRecords = []
775790
const returnData: DocumentData[] = []
@@ -783,7 +798,7 @@ export async function createDocumentRecords(
783798
try {
784799
const tagData = JSON.parse(docData.documentTagsData)
785800
if (Array.isArray(tagData)) {
786-
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
801+
processedTags = resolveDocumentTags(tagData, tagDefinitions, requestId)
787802
}
788803
} catch (error) {
789804
if (error instanceof SyntaxError) {
@@ -1277,7 +1292,8 @@ export async function createSingleDocument(
12771292
try {
12781293
const tagData = JSON.parse(documentData.documentTagsData)
12791294
if (Array.isArray(tagData)) {
1280-
processedTags = await processDocumentTags(knowledgeBaseId, tagData, requestId)
1295+
const tagDefinitions = await loadTagDefinitions(knowledgeBaseId)
1296+
processedTags = resolveDocumentTags(tagData, tagDefinitions, requestId)
12811297
}
12821298
} catch (error) {
12831299
if (error instanceof SyntaxError) {

0 commit comments

Comments
 (0)