Skip to content

Commit abbd62b

Browse files
paynejdclaude
andauthored
Speed up source version creation pipeline (#833)
* Speed up source version creation pipeline - Fix N+1 queries in ES indexing by adding prefetch_related to batch_index() for source/expansion concept and mapping indexing tasks - Replace Paginator with manual slicing in batch_index() to avoid COUNT(*) query - Optimize export serialization: increase batch size 100->1000, use single file handle, eliminate redundant .exists() checks, add prefetch on mappings - Replace M2M .set() with bulk_create on through tables in seed_concepts() and seed_mappings() for faster version seeding - Move snapshot serialization and checksum computation from synchronous persist_new_version() to async seed_children_to_new_version task Benchmarked with PIH (8,427 concepts + 45,089 mappings): - Seeding: ~2s (was minutes with .set()) - Export: ~4.5min with 10x larger batches - API response: instant (snapshot/checksum now async) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Use prefetch cache in ES document prepare methods Change values_list() calls on prefetched relations (names, descriptions, sources) to iterate .all() instead. values_list() bypasses Django's prefetch cache and hits the DB, negating the prefetch_related added in the previous commit. This eliminates thousands of redundant queries during concept and mapping indexing. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent d8e540c commit abbd62b

6 files changed

Lines changed: 119 additions & 95 deletions

File tree

core/common/models.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
from django.contrib.postgres.fields import ArrayField
66
from django.core.cache import cache
77
from django.core.exceptions import ValidationError
8-
from django.core.paginator import Paginator
98
from django.core.validators import RegexValidator
109
from django.db import models, IntegrityError, transaction
1110
from django.db.models import Value, Q, Count
@@ -200,16 +199,22 @@ def get_exact_or_criteria(attr, values, decode=False):
200199
return criteria
201200

202201
@staticmethod
203-
def batch_index(queryset, document, single_batch=False):
202+
def batch_index(queryset, document, single_batch=False, prefetch=None):
204203
if not get(settings, 'TEST_MODE'):
205204
doc = document()
205+
if prefetch:
206+
queryset = queryset.prefetch_related(*prefetch)
206207
if single_batch:
207208
doc.update(queryset.all(), parallel=True)
208209
else:
209-
paginator = Paginator(queryset.order_by('-id'), 500)
210-
for page_number in paginator.page_range:
211-
page = paginator.page(page_number)
212-
doc.update(page.object_list, parallel=True)
210+
batch_size = 500
211+
start = 0
212+
while True:
213+
batch = list(queryset.order_by('-id')[start:start + batch_size])
214+
if not batch:
215+
break
216+
doc.update(batch, parallel=True)
217+
start += batch_size
213218

214219
@staticmethod
215220
@transaction.atomic
@@ -720,9 +725,6 @@ def persist_new(cls, obj, created_by, **kwargs):
720725

721726
@classmethod
722727
def persist_new_version(cls, obj, user=None, **kwargs):
723-
from core.collections.serializers import CollectionDetailSerializer
724-
from core.sources.serializers import SourceDetailSerializer
725-
726728
errors = {}
727729

728730
obj.is_active = True
@@ -731,17 +733,14 @@ def persist_new_version(cls, obj, user=None, **kwargs):
731733
obj.created_by = user
732734
obj.updated_by = user
733735
repo_resource_name = obj.__class__.__name__
734-
serializer = SourceDetailSerializer if repo_resource_name == 'Source' else CollectionDetailSerializer
735736
head = obj.head
736737
if not head:
737738
errors[repo_resource_name.lower()] = 'Version Head not found.'
738739
return errors
739-
obj.snapshot = serializer(head).data
740740
obj.update_version_data(head)
741741
obj.save(**kwargs)
742742

743743
if obj.id:
744-
obj.get_checksums(recalculate=True)
745744
obj.sibling_versions.update(is_latest_version=False)
746745

747746
is_test_mode = get(settings, 'TEST_MODE', False)

core/common/tasks.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -393,6 +393,17 @@ def seed_children_to_new_version(self, resource, obj_id, export=True, sync=False
393393
task_id = self.request.id
394394
try:
395395
instance.add_processing(task_id)
396+
# Compute snapshot and checksums async (moved from persist_new_version for faster HTTP response)
397+
head = instance.head
398+
if head:
399+
if is_source:
400+
from core.sources.serializers import SourceDetailSerializer
401+
instance.snapshot = SourceDetailSerializer(head).data
402+
elif is_collection:
403+
from core.collections.serializers import CollectionDetailSerializer
404+
instance.snapshot = CollectionDetailSerializer(head).data
405+
instance.save(update_fields=['snapshot'])
406+
instance.get_checksums(recalculate=True)
396407
instance.seed_references()
397408
if is_source:
398409
instance.seed_concepts(index=False)
@@ -569,7 +580,11 @@ def index_expansion_concepts(expansion_id, count=None, concept_versioned_ids=Non
569580
queryset = Concept.objects.filter(versioned_object_id__in=concept_versioned_ids)
570581
else:
571582
queryset = expansion.concepts
572-
expansion.batch_index(queryset, ConceptDocument)
583+
expansion.batch_index(
584+
queryset, ConceptDocument,
585+
prefetch=['sources', 'names', 'descriptions',
586+
'expansion_set', 'expansion_set__collection_version']
587+
)
573588

574589

575590
@app.task(
@@ -586,7 +601,10 @@ def index_expansion_mappings(expansion_id, count=None, mapping_versioned_ids=Non
586601
queryset = Mapping.objects.filter(versioned_object_id__in=mapping_versioned_ids)
587602
else:
588603
queryset = expansion.mappings
589-
expansion.batch_index(queryset, MappingDocument)
604+
expansion.batch_index(
605+
queryset, MappingDocument,
606+
prefetch=['sources', 'expansion_set', 'expansion_set__collection_version']
607+
)
590608

591609

592610
@app.task
@@ -621,7 +639,11 @@ def index_source_concepts(source_id):
621639
source = Source.objects.filter(id=source_id).first()
622640
if source:
623641
from core.concepts.documents import ConceptDocument
624-
source.batch_index(source.concepts, ConceptDocument)
642+
source.batch_index(
643+
source.concepts, ConceptDocument,
644+
prefetch=['sources', 'names', 'descriptions',
645+
'expansion_set', 'expansion_set__collection_version']
646+
)
625647

626648

627649
@app.task(
@@ -633,7 +655,10 @@ def index_source_mappings(source_id):
633655
source = Source.objects.filter(id=source_id).first()
634656
if source:
635657
from core.mappings.documents import MappingDocument
636-
source.batch_index(source.mappings, MappingDocument)
658+
source.batch_index(
659+
source.mappings, MappingDocument,
660+
prefetch=['sources', 'expansion_set', 'expansion_set__collection_version']
661+
)
637662

638663

639664
@app.task(base=QueueOnceCustomTask)

core/common/utils.py

Lines changed: 59 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -213,7 +213,7 @@ def write_export_file(
213213
resource_string = json.dumps(data, cls=encoders.JSONEncoder)
214214
logger.info('Done serializing attributes.')
215215

216-
batch_size = 100
216+
batch_size = 1000
217217
is_collection = resource_type == 'collection'
218218

219219
concepts_qs = Concept.objects.none()
@@ -236,97 +236,88 @@ def write_export_file(
236236
if version.is_head:
237237
filters['is_latest_version'] = True
238238

239+
resource_name = resource_type.title()
240+
239241
with open('export.json', 'w') as out:
240242
out.write(f'{resource_string[:-1]}, "concepts": [')
241243

242-
resource_name = resource_type.title()
243-
244-
if concepts_qs.exists():
245-
logger.info(f'{resource_name} has concepts. Getting them in batches of {batch_size:d}...')
246244
concept_serializer_class = get_class('core.concepts.serializers.ConceptVersionExportSerializer')
245+
written_concepts = False
247246
start = 0
248-
end = batch_size
249-
batch_queryset = concepts_qs.order_by('-concept_id')[start:end]
250-
251-
while batch_queryset.exists():
252-
logger.info(f'Serializing concepts {start + 1:d} - {end:d}...')
247+
while True:
248+
batch_ids = list(
249+
concepts_qs.order_by('-concept_id')[start:start + batch_size].values_list('concept_id', flat=True)
250+
)
251+
if not batch_ids:
252+
break
253+
logger.info(f'Serializing concepts {start + 1:d} - {start + len(batch_ids):d}...')
253254
queryset = Concept.objects.filter(
254-
id__in=batch_queryset.values_list('concept_id')).filter(**filters).order_by('-id')
255-
if queryset.exists():
256-
if start > 0:
257-
with open('export.json', 'a') as out:
258-
out.write(', ')
259-
concept_versions = queryset.prefetch_related('names', 'descriptions')
255+
id__in=batch_ids).filter(**filters).prefetch_related('names', 'descriptions').order_by('-id')
256+
concept_versions = list(queryset)
257+
if concept_versions:
258+
if written_concepts:
259+
out.write(', ')
260260
data = concept_serializer_class(concept_versions, many=True).data
261261
concept_string = json.dumps(data, cls=encoders.JSONEncoder)
262-
concept_string = concept_string[1:-1]
263-
264-
with open('export.json', 'a') as out:
265-
out.write(concept_string)
266-
262+
out.write(concept_string[1:-1])
263+
written_concepts = True
267264
start += batch_size
268-
end += batch_size
269-
batch_queryset = concepts_qs.order_by('-concept_id')[start:end]
270265

271-
logger.info('Done serializing concepts.')
266+
if written_concepts:
267+
logger.info('Done serializing concepts.')
272268

273-
if is_collection:
274-
references_qs = version.references
275-
total_references = references_qs.count()
269+
if is_collection:
270+
references_qs = version.references
271+
total_references = references_qs.count()
276272

277-
with open('export.json', 'a') as out:
278273
out.write('], "references": [')
279-
if total_references:
280-
logger.info(
281-
f'{resource_name} has {total_references:d} references. Getting them in batches of {batch_size:d}...'
282-
)
283-
reference_serializer_class = get_class('core.collections.serializers.CollectionReferenceDetailSerializer')
284-
for start in range(0, total_references, batch_size):
285-
end = min(start + batch_size, total_references)
286-
logger.info(f'Serializing references {start + 1:d} - {end:d}...')
287-
references = references_qs.order_by('-id').filter()[start:end]
288-
reference_serializer = reference_serializer_class(references, many=True)
289-
reference_string = json.dumps(reference_serializer.data, cls=encoders.JSONEncoder)
290-
reference_string = reference_string[1:-1]
291-
with open('export.json', 'a') as out:
292-
out.write(reference_string)
293-
if end != total_references:
274+
if total_references:
275+
logger.info(
276+
f'{resource_name} has {total_references:d} references. '
277+
f'Getting them in batches of {batch_size:d}...'
278+
)
279+
reference_serializer_class = get_class(
280+
'core.collections.serializers.CollectionReferenceDetailSerializer')
281+
for ref_start in range(0, total_references, batch_size):
282+
ref_end = min(ref_start + batch_size, total_references)
283+
logger.info(f'Serializing references {ref_start + 1:d} - {ref_end:d}...')
284+
references = references_qs.order_by('-id').filter()[ref_start:ref_end]
285+
reference_serializer = reference_serializer_class(references, many=True)
286+
reference_string = json.dumps(reference_serializer.data, cls=encoders.JSONEncoder)
287+
out.write(reference_string[1:-1])
288+
if ref_end != total_references:
294289
out.write(', ')
295-
logger.info('Done serializing references.')
290+
logger.info('Done serializing references.')
296291

297-
with open('export.json', 'a') as out:
298292
out.write('], "mappings": [')
299293

300-
if mappings_qs.exists():
301-
logger.info(f'{resource_name} has mappings. Getting them in batches of {batch_size:d}...')
302294
mapping_serializer_class = get_class('core.mappings.serializers.MappingDetailSerializer')
295+
written_mappings = False
303296
start = 0
304-
end = batch_size
305-
batch_queryset = mappings_qs.order_by('-mapping_id')[start:end]
306-
307-
while batch_queryset.exists():
308-
logger.info(f'Serializing mappings {start + 1:d} - {start + batch_size:d}...')
297+
while True:
298+
batch_ids = list(
299+
mappings_qs.order_by('-mapping_id')[start:start + batch_size].values_list('mapping_id', flat=True)
300+
)
301+
if not batch_ids:
302+
break
303+
logger.info(f'Serializing mappings {start + 1:d} - {start + len(batch_ids):d}...')
309304
queryset = Mapping.objects.filter(
310-
id__in=batch_queryset.values_list('mapping_id')).filter(**filters).order_by('-id')
311-
if queryset.exists():
312-
if start > 0:
313-
with open('export.json', 'a') as out:
314-
out.write(', ')
315-
316-
data = mapping_serializer_class(queryset, many=True).data
305+
id__in=batch_ids).filter(**filters).prefetch_related(
306+
'from_concept', 'to_concept', 'from_source', 'to_source').order_by('-id')
307+
mapping_versions = list(queryset)
308+
if mapping_versions:
309+
if written_mappings:
310+
out.write(', ')
311+
data = mapping_serializer_class(mapping_versions, many=True).data
317312
mapping_string = json.dumps(data, cls=encoders.JSONEncoder)
318-
mapping_string = mapping_string[1:-1]
319-
with open('export.json', 'a') as out:
320-
out.write(mapping_string)
321-
313+
out.write(mapping_string[1:-1])
314+
written_mappings = True
322315
start += batch_size
323-
end += batch_size
324-
batch_queryset = mappings_qs.order_by('-mapping_id')[start:end]
325316

326-
logger.info('Done serializing mappings.')
317+
if written_mappings:
318+
logger.info('Done serializing mappings.')
327319

328-
end_time = str(round((time.time() - start_time) + 2, 2))
329-
with open('export.json', 'a') as out:
320+
end_time = str(round((time.time() - start_time) + 2, 2))
330321
out.write('], "export_time": ' + json.dumps(f"{end_time}secs", cls=encoders.JSONEncoder) + '}')
331322

332323
version.update_extras('__export_time', end_time)

core/concepts/documents.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -165,11 +165,11 @@ def prepare_numeric_id(instance):
165165

166166
@staticmethod
167167
def prepare_locale(instance):
168-
return compact(set(instance.names.values_list('locale', flat=True)))
168+
return compact(set(n.locale for n in instance.names.all()))
169169

170170
@staticmethod
171171
def prepare_source_version(instance):
172-
return list(instance.sources.values_list('version', flat=True))
172+
return [s.version for s in instance.sources.all()]
173173

174174
@staticmethod
175175
def prepare_collection_version(instance):
@@ -218,15 +218,15 @@ def prepare_properties(instance):
218218

219219
@staticmethod
220220
def prepare_name_types(instance):
221-
return compact(set(instance.names.values_list('type', flat=True)))
221+
return compact(set(n.type for n in instance.names.all()))
222222

223223
@staticmethod
224224
def prepare_description_types(instance):
225-
return compact(set(instance.descriptions.values_list('type', flat=True)))
225+
return compact(set(d.type for d in instance.descriptions.all()))
226226

227227
@staticmethod
228228
def prepare_description(instance):
229-
return '. '.join(compact(set(instance.descriptions.values_list('name', flat=True))))
229+
return '. '.join(compact(set(d.name for d in instance.descriptions.all())))
230230

231231
def prepare(self, instance):
232232
data = super().prepare(instance)
@@ -239,8 +239,8 @@ def prepare(self, instance):
239239
name = get(preferred_locale, 'name') or ''
240240
data['_name'] = name.lower()
241241
data['name'] = name.replace('-', '_')
242-
synonyms = instance.names.exclude(name=name).exclude(name='')
243-
data['synonyms'] = compact(set(synonyms.values_list('name', flat=True)))
242+
synonyms = [n for n in instance.names.all() if n.name and n.name != name]
243+
data['synonyms'] = compact(set(n.name for n in synonyms))
244244
data['_synonyms'] = data['synonyms']
245245

246246
if instance.parent.has_semantic_match_algorithm:

core/mappings/documents.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ def prepare_to_concept(instance):
106106

107107
@staticmethod
108108
def prepare_source_version(instance):
109-
return list(instance.sources.values_list('version', flat=True))
109+
return [s.version for s in instance.sources.all()]
110110

111111
@staticmethod
112112
def prepare_collection_version(instance):

core/sources/models.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -449,18 +449,27 @@ def index_resources_for_self_as_unreleased(self):
449449
latest_released.index_children_async(user)
450450

451451
def seed_concepts(self, index=True):
452+
from core.concepts.models import Concept
452453
head = self.head
453454
if head:
454-
concepts = head.concepts.filter(is_latest_version=True)
455-
self.concepts.set(concepts)
455+
through_model = Concept.sources.through
456+
through_model.objects.filter(source_id=self.id).delete()
457+
concept_ids = list(head.concepts.filter(is_latest_version=True).values_list('id', flat=True))
458+
through_objects = [through_model(source_id=self.id, concept_id=cid) for cid in concept_ids]
459+
through_model.objects.bulk_create(through_objects, batch_size=5000)
456460
if index:
457461
from core.concepts.documents import ConceptDocument
458462
self.batch_index(self.concepts, ConceptDocument)
459463

460464
def seed_mappings(self, index=True):
465+
from core.mappings.models import Mapping
461466
head = self.head
462467
if head:
463-
self.mappings.set(head.mappings.filter(is_latest_version=True))
468+
through_model = Mapping.sources.through
469+
through_model.objects.filter(source_id=self.id).delete()
470+
mapping_ids = list(head.mappings.filter(is_latest_version=True).values_list('id', flat=True))
471+
through_objects = [through_model(source_id=self.id, mapping_id=mid) for mid in mapping_ids]
472+
through_model.objects.bulk_create(through_objects, batch_size=5000)
464473
if index:
465474
from core.mappings.documents import MappingDocument
466475
self.batch_index(self.mappings, MappingDocument)

0 commit comments

Comments
 (0)