Skip to content

Commit 15ff359

Browse files
authored
OpenConceptLab/ocl_issues#2446 | Partial ES Index updates (#836)
* OpenConceptLab/ocl_issues#2446 | partial index update * OpenConceptLab/ocl_issues#2446 | New indexes for concepts/mappings sources M2M relation * OpenConceptLab/ocl_issues#2444 | review feedbacks * OpenConceptLab/ocl_issues#2444 | review feedback | added tests for partial index update failure to trigger full index
1 parent fabade4 commit 15ff359

7 files changed

Lines changed: 315 additions & 64 deletions

File tree

core/common/models.py

Lines changed: 64 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -199,22 +199,68 @@ def get_exact_or_criteria(attr, values, decode=False):
199199
return criteria
200200

201201
@staticmethod
202-
def batch_index(queryset, document, single_batch=False, prefetch=None):
203-
if not get(settings, 'TEST_MODE'):
204-
doc = document()
205-
if prefetch:
206-
queryset = queryset.prefetch_related(*prefetch)
207-
if single_batch:
208-
doc.update(queryset.all(), parallel=True)
209-
else:
210-
batch_size = 500
211-
start = 0
212-
while True:
213-
batch = list(queryset.order_by('-id')[start:start + batch_size])
214-
if not batch:
215-
break
216-
doc.update(batch, parallel=True)
217-
start += batch_size
202+
def batch_index(queryset, document, single_batch=False, prefetch=None, partial_doc=None):
203+
if partial_doc:
204+
BaseModel.batch_index_partial(queryset, document, single_batch, partial_doc)
205+
return
206+
BaseModel.batch_index_full(single_batch, queryset, document, prefetch)
207+
208+
@staticmethod
209+
def batch_index_full(single_batch: bool, queryset, document, prefetch):
210+
if get(settings, 'TEST_MODE', False):
211+
return
212+
213+
doc = document()
214+
215+
if prefetch:
216+
queryset = queryset.prefetch_related(*prefetch)
217+
218+
if single_batch:
219+
doc.update(queryset.all(), parallel=True)
220+
else:
221+
batch_size = 500
222+
start = 0
223+
while True:
224+
batch = list(queryset.order_by('-id')[start:start + batch_size])
225+
if not batch:
226+
break
227+
doc.update(batch, parallel=True)
228+
start += batch_size
229+
230+
@staticmethod
231+
def batch_index_partial(queryset, document, single_batch, partial_doc):
232+
if get(settings, 'TEST_MODE', False):
233+
return
234+
235+
doc = document()
236+
237+
kwargs = {}
238+
if doc.django.auto_refresh:
239+
kwargs['refresh'] = doc.django.auto_refresh
240+
241+
def get_actions(ids):
242+
for object_id in ids:
243+
yield {
244+
'_op_type': 'update',
245+
'_index': doc._index._name, # pylint: disable=protected-access
246+
'_id': object_id,
247+
'doc': partial_doc,
248+
'doc_as_upsert': True
249+
}
250+
251+
if single_batch:
252+
ids = queryset.all().values_list('id', flat=True)
253+
doc._bulk(get_actions(ids), parallel=True, **kwargs) # pylint: disable=protected-access
254+
else:
255+
batch_size = 500
256+
start = 0
257+
queryset = queryset.order_by('-id').values_list('id', flat=True)
258+
while True:
259+
batch = list(queryset[start:start + batch_size])
260+
if not batch:
261+
break
262+
doc._bulk(get_actions(batch), parallel=True, **kwargs) # pylint: disable=protected-access
263+
start += batch_size
218264

219265
@staticmethod
220266
@transaction.atomic
@@ -758,7 +804,7 @@ def persist_new_version(cls, obj, user=None, **kwargs):
758804

759805
return errors
760806

761-
def index_resources_for_self_as_latest_released(self):
807+
def index_resources_for_self_as_latest_released(self, only_update=False): # pylint: disable=unused-argument
762808
pass
763809

764810
@classmethod
@@ -804,7 +850,7 @@ def persist_changes(cls, obj, updated_by, original_schema, **kwargs): # pylint:
804850
(obj.app_name, obj.id, target_schema), task_id=task.id, queue='default')
805851
if should_reindex_resources:
806852
if obj.released:
807-
obj.index_resources_for_self_as_latest_released()
853+
obj.index_resources_for_self_as_latest_released(only_update=True)
808854
else:
809855
obj.index_resources_for_self_as_unreleased()
810856
elif should_reindex_concepts_only:

core/common/tasks.py

Lines changed: 26 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -634,31 +634,46 @@ def make_hierarchy(concept_map): # pragma: no cover
634634
ignore_result=True, autoretry_for=(Exception, WorkerLostError, ), retry_kwargs={'max_retries': 2, 'countdown': 2},
635635
acks_late=True, reject_on_worker_lost=True, base=QueueOnceCustomTask
636636
)
637-
def index_source_concepts(source_id):
637+
def index_source_concepts(source_id, partial_doc=None):
638+
"""
639+
Index source concepts, or partially update existing ES documents when `partial_doc` is supplied.
640+
"""
638641
from core.sources.models import Source
639642
source = Source.objects.filter(id=source_id).first()
640643
if source:
641644
from core.concepts.documents import ConceptDocument
642-
source.batch_index(
643-
source.concepts, ConceptDocument,
644-
prefetch=['sources', 'names', 'descriptions',
645-
'expansion_set', 'expansion_set__collection_version']
646-
)
645+
prefetch = ['sources', 'names', 'descriptions', 'expansion_set', 'expansion_set__collection_version']
646+
try:
647+
kwargs = {'partial_doc': partial_doc} if partial_doc else {'prefetch': prefetch}
648+
source.batch_index(source.concepts, ConceptDocument, **kwargs)
649+
except Exception: # pragma: no cover
650+
if not partial_doc:
651+
raise
652+
logger.exception('Falling back to full concept reindex for source %s', source_id)
653+
source.batch_index(source.concepts, ConceptDocument, prefetch=prefetch)
647654

648655

649656
@app.task(
650657
ignore_result=True, autoretry_for=(Exception, WorkerLostError, ), retry_kwargs={'max_retries': 2, 'countdown': 2},
651658
acks_late=True, reject_on_worker_lost=True, base=QueueOnceCustomTask
652659
)
653-
def index_source_mappings(source_id):
660+
def index_source_mappings(source_id, partial_doc=None):
661+
"""
662+
Index source mappings, or partially update existing ES documents when `partial_doc` is supplied.
663+
"""
654664
from core.sources.models import Source
655665
source = Source.objects.filter(id=source_id).first()
656666
if source:
657667
from core.mappings.documents import MappingDocument
658-
source.batch_index(
659-
source.mappings, MappingDocument,
660-
prefetch=['sources', 'expansion_set', 'expansion_set__collection_version']
661-
)
668+
prefetch = ['sources', 'expansion_set', 'expansion_set__collection_version']
669+
try:
670+
kwargs = {'partial_doc': partial_doc} if partial_doc else {'prefetch': prefetch}
671+
source.batch_index(source.mappings, MappingDocument, **kwargs)
672+
except Exception: # pragma: no cover
673+
if not partial_doc:
674+
raise
675+
logger.exception('Falling back to full mapping reindex for source %s', source_id)
676+
source.batch_index(source.mappings, MappingDocument, prefetch=prefetch)
662677

663678

664679
@app.task(base=QueueOnceCustomTask)
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from django.db import migrations
2+
3+
4+
class Migration(migrations.Migration):
5+
6+
dependencies = [
7+
('concepts', '0078_auto_20250909_1351'),
8+
]
9+
10+
operations = [
11+
migrations.RunSQL(
12+
sql=(
13+
'CREATE INDEX IF NOT EXISTS concepts_sources_source_concept_idx '
14+
'ON concepts_sources (source_id, concept_id);'
15+
),
16+
reverse_sql=(
17+
'DROP INDEX IF EXISTS concepts_sources_source_concept_idx;'
18+
),
19+
),
20+
]

core/integration_tests/tests_sources.py

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -711,9 +711,9 @@ def test_version_updated_to_released_should_index_children(
711711
self.assertEqual(version.id, self.source_v1.id)
712712
self.assertTrue(version.is_latest_released)
713713
index_source_concepts_task_mock.apply_async.assert_called_once_with(
714-
(version.id,), queue='indexing', persist_args=True, task_id=ANY)
714+
(version.id, {'is_in_latest_source_version': True}), queue='indexing', persist_args=True, task_id=ANY)
715715
index_source_mappings_task_mock.apply_async.assert_called_once_with(
716-
(version.id,), queue='indexing', persist_args=True, task_id=ANY)
716+
(version.id, {'is_in_latest_source_version': True}), queue='indexing', persist_args=True, task_id=ANY)
717717

718718
@patch('core.sources.models.index_source_mappings')
719719
@patch('core.sources.models.index_source_concepts')
@@ -764,9 +764,11 @@ def test_released_version_updated_to_unreleased_should_reindex_children(
764764
self.source_v1.refresh_from_db()
765765
self.assertFalse(self.source_v1.released)
766766
index_source_concepts_task_mock.apply_async.assert_called_once_with(
767-
(self.source_v1.id,), queue='indexing', persist_args=True, task_id=ANY)
767+
(self.source_v1.id, {'is_in_latest_source_version': False}),
768+
queue='indexing', persist_args=True, task_id=ANY)
768769
index_source_mappings_task_mock.apply_async.assert_called_once_with(
769-
(self.source_v1.id,), queue='indexing', persist_args=True, task_id=ANY)
770+
(self.source_v1.id, {'is_in_latest_source_version': False}),
771+
queue='indexing', persist_args=True, task_id=ANY)
770772

771773
@patch('core.sources.models.index_source_mappings')
772774
@patch('core.sources.models.index_source_concepts')
@@ -807,15 +809,19 @@ def test_released_version_updated_to_unreleased_should_reindex_children_of_this_
807809
self.assertEqual(
808810
index_source_concepts_task_mock.apply_async.mock_calls,
809811
[
810-
call((source_v2.id,), queue='indexing', persist_args=True, task_id=ANY),
811-
call((self.source_v1.id,), queue='indexing', persist_args=True, task_id=ANY)
812+
call((source_v2.id, {'is_in_latest_source_version': False}),
813+
queue='indexing', persist_args=True, task_id=ANY),
814+
call((self.source_v1.id, {'is_in_latest_source_version': True}),
815+
queue='indexing', persist_args=True, task_id=ANY)
812816
]
813817
)
814818
self.assertEqual(
815819
index_source_mappings_task_mock.apply_async.mock_calls,
816820
[
817-
call((source_v2.id,), queue='indexing', persist_args=True, task_id=ANY),
818-
call((self.source_v1.id,), queue='indexing', persist_args=True, task_id=ANY)
821+
call((source_v2.id, {'is_in_latest_source_version': False}),
822+
queue='indexing', persist_args=True, task_id=ANY),
823+
call((self.source_v1.id, {'is_in_latest_source_version': True}),
824+
queue='indexing', persist_args=True, task_id=ANY)
819825
]
820826
)
821827

@@ -858,15 +864,21 @@ def test_unreleased_version_updated_to_released_should_reindex_children_of_this_
858864
self.assertEqual(
859865
index_source_concepts_task_mock.apply_async.mock_calls,
860866
[
861-
call((self.source_v1.id,), queue='indexing', persist_args=True, task_id=ANY),
862-
call((source_v2.id,), queue='indexing', persist_args=True, task_id=ANY)
867+
call(
868+
(self.source_v1.id, {'is_in_latest_source_version': False}),
869+
queue='indexing', persist_args=True, task_id=ANY),
870+
call(
871+
(source_v2.id, {'is_in_latest_source_version': True}),
872+
queue='indexing', persist_args=True, task_id=ANY)
863873
]
864874
)
865875
self.assertEqual(
866876
index_source_mappings_task_mock.apply_async.mock_calls,
867877
[
868-
call((self.source_v1.id,), queue='indexing', persist_args=True, task_id=ANY),
869-
call((source_v2.id,), queue='indexing', persist_args=True, task_id=ANY)
878+
call((self.source_v1.id, {'is_in_latest_source_version': False}),
879+
queue='indexing', persist_args=True, task_id=ANY),
880+
call((source_v2.id, {'is_in_latest_source_version': True}),
881+
queue='indexing', persist_args=True, task_id=ANY)
870882
]
871883
)
872884

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from django.db import migrations
2+
3+
4+
class Migration(migrations.Migration):
5+
6+
dependencies = [
7+
('mappings', '0056_auto_20250226_0544'),
8+
]
9+
10+
operations = [
11+
migrations.RunSQL(
12+
sql=(
13+
'CREATE INDEX IF NOT EXISTS mappings_sources_source_mapping_idx '
14+
'ON mappings_sources (source_id, mapping_id);'
15+
),
16+
reverse_sql=(
17+
'DROP INDEX IF EXISTS mappings_sources_source_mapping_idx;'
18+
),
19+
),
20+
]

core/sources/models.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -419,7 +419,7 @@ def set_active_mappings(self):
419419
queryset = self.mappings_set.filter(id=F('versioned_object_id'))
420420
self.active_mappings = queryset.filter(retired=False, is_active=True).count()
421421

422-
def index_resources_for_self_as_latest_released(self):
422+
def index_resources_for_self_as_latest_released(self, only_update=False):
423423
"""
424424
1. Assumes self is the latest released version
425425
2. indexes prev released version's (if exists) concepts and mappings
@@ -430,9 +430,11 @@ def index_resources_for_self_as_latest_released(self):
430430

431431
prev_released_version = self.get_prev_released_version()
432432
if prev_released_version:
433-
prev_released_version.index_children_async(user)
433+
prev_released_version.index_children_async(user, {'is_in_latest_source_version': False})
434434

435-
self.index_children_async(user)
435+
self.index_children_async(
436+
user, {'is_in_latest_source_version': True} if only_update else None
437+
)
436438

437439
def index_resources_for_self_as_unreleased(self):
438440
"""
@@ -442,11 +444,11 @@ def index_resources_for_self_as_unreleased(self):
442444
"""
443445
if not self.released:
444446
user = self.updated_by
445-
self.index_children_async(user)
447+
self.index_children_async(user, {'is_in_latest_source_version': False})
446448

447449
latest_released = self.get_latest_released_version()
448450
if latest_released:
449-
latest_released.index_children_async(user)
451+
latest_released.index_children_async(user, {'is_in_latest_source_version': True})
450452

451453
def seed_concepts(self, index=True):
452454
head = self.head
@@ -483,27 +485,31 @@ def index_children(self, sync=True, user=None):
483485
else:
484486
self.index_children_async(user)
485487

486-
def index_children_async(self, user=None):
488+
def index_children_async(self, user=None, partial_doc=None):
487489
user = user or self.updated_by
488490

489-
self.index_concepts_async(user)
490-
self.index_mappings_async(user)
491+
self.index_concepts_async(user, partial_doc)
492+
self.index_mappings_async(user, partial_doc)
491493

492-
def index_mappings_async(self, user):
494+
def index_mappings_async(self, user, partial_doc=None):
493495
user = user or self.updated_by
494496

495497
task = Task.new(queue='indexing', user=user, name=index_source_mappings.__name__)
496498
try:
497-
index_source_mappings.apply_async((self.id,), queue='indexing', persist_args=True, task_id=task.id)
499+
index_source_mappings.apply_async(
500+
(self.id, partial_doc), queue='indexing', persist_args=True, task_id=task.id
501+
)
498502
except AlreadyQueued:
499503
pass
500504

501-
def index_concepts_async(self, user):
505+
def index_concepts_async(self, user, partial_doc=None):
502506
user = user or self.updated_by
503507

504508
task = Task.new(queue='indexing', user=user, name=index_source_concepts.__name__)
505509
try:
506-
index_source_concepts.apply_async((self.id,), queue='indexing', persist_args=True, task_id=task.id)
510+
index_source_concepts.apply_async(
511+
(self.id, partial_doc), queue='indexing', persist_args=True, task_id=task.id
512+
)
507513
except AlreadyQueued:
508514
pass
509515

@@ -519,7 +525,6 @@ def get_index_mappings_task(self):
519525
def get_seed_new_version_task(self):
520526
return Task.find(name__iendswith='seed_children_to_new_version', args__contains=['source', self.id])
521527

522-
523528
def __get_resource_db_sequence_prefix(self):
524529
return self.uri.replace('/', '_').replace('-', '_').replace('.', '_').replace('@', '_')
525530

0 commit comments

Comments
 (0)