From dd79388f81f84ba46c989ba025a98c92f8d83843 Mon Sep 17 00:00:00 2001 From: mkovalua Date: Tue, 5 May 2026 17:18:56 +0300 Subject: [PATCH 1/6] add nodes and preprints files reinding on account merging --- osf/models/user.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/osf/models/user.py b/osf/models/user.py index 04cc0df6662..c44565beaab 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -877,6 +877,30 @@ def merge_user(self, user): except Exception as e: logger.exception(f'Failed to SHARE reindex preprint {preprint._id} during user merge: {e}') + from django.contrib.contenttypes.models import ContentType + from osf.models import AbstractNode, Preprint + from addons.osfstorage.models import OsfStorageFile + node_ctype = ContentType.objects.get_for_model(AbstractNode) + preprint_ctype = ContentType.objects.get_for_model(Preprint) + nodes_files_to_reindex = OsfStorageFile.objects.filter( + target_object_id__in=user.contributed.values_list('id', flat=True), target_content_type=node_ctype, + guids__isnull=False + ) + preprints_files_to_reindex = OsfStorageFile.objects.filter( + target_object_id__in=user.preprints.values_list('id', flat=True), target_content_type=preprint_ctype, + guids__isnull=False + ) + for file in nodes_files_to_reindex.iterator(chunk_size=100): + try: + update_share(file) + except Exception as e: + logger.exception(f'Failed to SHARE reindex file {file._id} during user merge: {e}') + for file in preprints_files_to_reindex.iterator(chunk_size=100): + try: + update_share(file) + except Exception as e: + logger.exception(f'Failed to SHARE reindex preprints file {file._id} during user merge: {e}') + def _merge_users_preprints(self, user): """ Preprints use guardian. The PreprintContributor table stores order and bibliographic information. From de265752491d1e67f151a7e977fc438cc8c2b2a3 Mon Sep 17 00:00:00 2001 From: mkovalua Date: Fri, 8 May 2026 17:04:51 +0300 Subject: [PATCH 2/6] resolve CR --- osf/models/user.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/osf/models/user.py b/osf/models/user.py index c44565beaab..23eae1a91c5 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -18,6 +18,7 @@ from django.contrib.auth.base_user import AbstractBaseUser, BaseUserManager from django.contrib.auth.hashers import check_password from django.contrib.auth.models import PermissionsMixin +from django.contrib.contenttypes.models import ContentType from django.core.exceptions import FieldDoesNotExist from django.dispatch import receiver from django.db import models @@ -877,7 +878,6 @@ def merge_user(self, user): except Exception as e: logger.exception(f'Failed to SHARE reindex preprint {preprint._id} during user merge: {e}') - from django.contrib.contenttypes.models import ContentType from osf.models import AbstractNode, Preprint from addons.osfstorage.models import OsfStorageFile node_ctype = ContentType.objects.get_for_model(AbstractNode) From 9d15292bebdeda9c258b475987685d23e1a6ab27 Mon Sep 17 00:00:00 2001 From: mkovalua Date: Wed, 3 Jun 2026 23:01:28 +0300 Subject: [PATCH 3/6] update unittests to cover files on merging --- osf/models/user.py | 6 ++++-- osf_tests/test_user.py | 30 ++++++++++++++++++++++-------- 2 files changed, 26 insertions(+), 10 deletions(-) diff --git a/osf/models/user.py b/osf/models/user.py index 23eae1a91c5..4d75f93d8e3 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -733,7 +733,9 @@ def merge_user(self, user): # Capture content to SHARE reindex BEFORE merge transfers contributors # After merge, user.contributed and user.preprints will be empty nodes_to_reindex = list(user.contributed) + node_ids_to_reindex = [node.id for node in nodes_to_reindex] preprints_to_reindex = list(user.preprints.all()) + preprint_ids_to_reindex = [preprint.id for preprint in preprints_to_reindex] # Move over the other user's attributes # TODO: confirm @@ -883,11 +885,11 @@ def merge_user(self, user): node_ctype = ContentType.objects.get_for_model(AbstractNode) preprint_ctype = ContentType.objects.get_for_model(Preprint) nodes_files_to_reindex = OsfStorageFile.objects.filter( - target_object_id__in=user.contributed.values_list('id', flat=True), target_content_type=node_ctype, + target_object_id__in=node_ids_to_reindex, target_content_type=node_ctype, guids__isnull=False ) preprints_files_to_reindex = OsfStorageFile.objects.filter( - target_object_id__in=user.preprints.values_list('id', flat=True), target_content_type=preprint_ctype, + target_object_id__in=preprint_ids_to_reindex, target_content_type=preprint_ctype, guids__isnull=False ) for file in nodes_files_to_reindex.iterator(chunk_size=100): diff --git a/osf_tests/test_user.py b/osf_tests/test_user.py index 2e5d0631cd4..b681b1966f0 100644 --- a/osf_tests/test_user.py +++ b/osf_tests/test_user.py @@ -445,6 +445,7 @@ def test_merge_drafts(self, user): @mock.patch('api.share.utils.update_share') def test_merge_user_triggers_share_reindex(self, mock_update_share): from osf.models import Preprint + from addons.osfstorage.models import OsfStorageFile user = AuthUserFactory() user2 = AuthUserFactory() @@ -457,26 +458,39 @@ def test_merge_user_triggers_share_reindex(self, mock_update_share): preprint_two = PreprintFactory(title='preprint_two') preprint_two.add_contributor(user2) + node_file = OsfStorageFile.create( + target=node_one, path='/node_file.txt', name='node_file.txt', materialized_path='/node_file.txt' + ) + node_file.save(skip_search=True) + node_file.get_guid(create=True) + + preprint_file = OsfStorageFile.create( + target=preprint_one, path='/preprint_file.txt', name='preprint_file.txt', + materialized_path='/preprint_file.txt' + ) + preprint_file.save(skip_search=True) + preprint_file.get_guid(create=True) + user.merge_user(user2) + all_reindexed = [call[0][0] for call in mock_update_share.call_args_list] # Verify update_share was called for both nodes - nodes_reindexed = [ - call[0][0] for call in mock_update_share.call_args_list - if isinstance(call[0][0], AbstractNode) - ] + nodes_reindexed = [node_reindexed for node_reindexed in all_reindexed if isinstance(node_reindexed, AbstractNode)] assert len(nodes_reindexed) == 2 assert node_one in nodes_reindexed assert node_two in nodes_reindexed # Verify update_share was called for both preprints - preprints_reindexed = [ - call[0][0] for call in mock_update_share.call_args_list - if isinstance(call[0][0], Preprint) - ] + preprints_reindexed = [preprint_reindexed for preprint_reindexed in all_reindexed if isinstance(preprint_reindexed, Preprint)] assert len(preprints_reindexed) == 2 assert preprint_one in preprints_reindexed assert preprint_two in preprints_reindexed + # Verify update_share was called for files belonging to user2's nodes and preprints + files_reindexed = [file_reindexed for file_reindexed in all_reindexed if isinstance(file_reindexed, OsfStorageFile)] + assert node_file in files_reindexed + assert preprint_file in files_reindexed + def test_cant_create_user_without_username(self): u = OSFUser() # No username given with pytest.raises(ValidationError): From 39e1f5b4e5698c3aec5c2260569ce2f72dba2961 Mon Sep 17 00:00:00 2001 From: mkovalua Date: Thu, 4 Jun 2026 22:14:06 +0300 Subject: [PATCH 4/6] add files share reindexing tasks to low priority queue --- api/share/utils.py | 10 +++++----- framework/celery_tasks/routers.py | 2 ++ osf/models/user.py | 4 ++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/api/share/utils.py b/api/share/utils.py index d843c7e146a..17b02ca24ee 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -50,21 +50,21 @@ def is_qa_resource(resource): return has_qa_tags or has_qa_title -def update_share(resource): +def update_share(resource, urgent=True): if not settings.SHARE_ENABLED: return if not hasattr(resource, 'guids'): logger.error(f'update_share called on non-guid resource: {resource}') return - _enqueue_update_share(resource) + _enqueue_update_share(resource, urgent) -def _enqueue_update_share(osfresource): +def _enqueue_update_share(osfresource, urgent): _osfguid_value = osfresource.guids.values_list('_id', flat=True).first() if not _osfguid_value: logger.warning(f'update_share skipping resource that has no guids: {osfresource}') return - enqueue_task(task__update_share.s(_osfguid_value)) + enqueue_task(task__update_share.s(_osfguid_value, urgent=urgent)) @celery_app.task( @@ -73,7 +73,7 @@ def _enqueue_update_share(osfresource): max_retries=4, retry_backoff=True, ) -def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN'): +def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN', urgent=True): """ Send SHARE/trove current metadata record(s) for the osf-guid-identified object """ diff --git a/framework/celery_tasks/routers.py b/framework/celery_tasks/routers.py index d9d6e335286..9120b159e30 100644 --- a/framework/celery_tasks/routers.py +++ b/framework/celery_tasks/routers.py @@ -32,6 +32,8 @@ def route_for_task(self, task, args=None, kwargs=None): :param str task: Of the form 'full.module.path.to.class.function' :returns dict: Tells celery into which queue to route this task. """ + if kwargs and (kwargs.get('urgent') is False): + return {'queue': CeleryConfig.task_low_queue} return { 'queue': match_by_module(task) } diff --git a/osf/models/user.py b/osf/models/user.py index 4d75f93d8e3..3508aa5c462 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -894,12 +894,12 @@ def merge_user(self, user): ) for file in nodes_files_to_reindex.iterator(chunk_size=100): try: - update_share(file) + update_share(file, urgent=False) except Exception as e: logger.exception(f'Failed to SHARE reindex file {file._id} during user merge: {e}') for file in preprints_files_to_reindex.iterator(chunk_size=100): try: - update_share(file) + update_share(file, urgent=False) except Exception as e: logger.exception(f'Failed to SHARE reindex preprints file {file._id} during user merge: {e}') From bda7a43dd8ee4964ac7f6d7db3d3ab25d434e29d Mon Sep 17 00:00:00 2001 From: mkovalua Date: Thu, 4 Jun 2026 22:53:19 +0300 Subject: [PATCH 5/6] code updates --- api/share/utils.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/api/share/utils.py b/api/share/utils.py index 17b02ca24ee..15fc06456e7 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -56,10 +56,13 @@ def update_share(resource, urgent=True): if not hasattr(resource, 'guids'): logger.error(f'update_share called on non-guid resource: {resource}') return - _enqueue_update_share(resource, urgent) + if urgent: + _enqueue_update_share(resource) + else: + _enqueue_update_share(resource, urgent=False) -def _enqueue_update_share(osfresource, urgent): +def _enqueue_update_share(osfresource, urgent=True): _osfguid_value = osfresource.guids.values_list('_id', flat=True).first() if not _osfguid_value: logger.warning(f'update_share skipping resource that has no guids: {osfresource}') From d980170796e804630177c9bf0d99b5075da4a60e Mon Sep 17 00:00:00 2001 From: mkovalua Date: Mon, 8 Jun 2026 16:26:36 +0300 Subject: [PATCH 6/6] use more explicit name approach to mark 'target_queue' for specific celery queue tasks --- api/share/utils.py | 21 ++++++++++++++------- framework/celery_tasks/routers.py | 4 ++-- osf/models/user.py | 5 +++-- 3 files changed, 19 insertions(+), 11 deletions(-) diff --git a/api/share/utils.py b/api/share/utils.py index 15fc06456e7..42ee4ea7ce4 100644 --- a/api/share/utils.py +++ b/api/share/utils.py @@ -50,24 +50,31 @@ def is_qa_resource(resource): return has_qa_tags or has_qa_title -def update_share(resource, urgent=True): +def update_share(resource, target_queue=None): + """ + By default, tasks are routed to queue based on module routing in CeleryRouter, + :param resource: osf resource that is needed to be reindexed + :param target_queue: should be task queue attribute of CeleryConfig f.e 'task_low_queue' for bulk background + passing 'target_queue' allows low-level queue task run (reindexing files after a user merge) even though + related module path may be marked to work with task_high_queue. + """ if not settings.SHARE_ENABLED: return if not hasattr(resource, 'guids'): logger.error(f'update_share called on non-guid resource: {resource}') return - if urgent: - _enqueue_update_share(resource) + if target_queue is not None: + _enqueue_update_share(resource, target_queue) else: - _enqueue_update_share(resource, urgent=False) + _enqueue_update_share(resource) -def _enqueue_update_share(osfresource, urgent=True): +def _enqueue_update_share(osfresource, target_queue=None): _osfguid_value = osfresource.guids.values_list('_id', flat=True).first() if not _osfguid_value: logger.warning(f'update_share skipping resource that has no guids: {osfresource}') return - enqueue_task(task__update_share.s(_osfguid_value, urgent=urgent)) + enqueue_task(task__update_share.s(_osfguid_value, target_queue=target_queue)) @celery_app.task( @@ -76,7 +83,7 @@ def _enqueue_update_share(osfresource, urgent=True): max_retries=4, retry_backoff=True, ) -def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN', urgent=True): +def task__update_share(self, guid: str, is_backfill=False, osfmap_partition_name='MAIN', target_queue=None): """ Send SHARE/trove current metadata record(s) for the osf-guid-identified object """ diff --git a/framework/celery_tasks/routers.py b/framework/celery_tasks/routers.py index 9120b159e30..4f08d8b256e 100644 --- a/framework/celery_tasks/routers.py +++ b/framework/celery_tasks/routers.py @@ -32,8 +32,8 @@ def route_for_task(self, task, args=None, kwargs=None): :param str task: Of the form 'full.module.path.to.class.function' :returns dict: Tells celery into which queue to route this task. """ - if kwargs and (kwargs.get('urgent') is False): - return {'queue': CeleryConfig.task_low_queue} + if kwargs and (target_queue := kwargs.get('target_queue')): + return {'queue': target_queue} return { 'queue': match_by_module(task) } diff --git a/osf/models/user.py b/osf/models/user.py index 3508aa5c462..fadf7dc14a6 100644 --- a/osf/models/user.py +++ b/osf/models/user.py @@ -68,6 +68,7 @@ from website import filters from website.project import new_bookmark_collection from website.util.metrics import OsfSourceTags, unregistered_created_source_tag +from website.settings import CeleryConfig from importlib import import_module from osf.models.notification_type import NotificationTypeEnum from osf.utils.requests import string_type_request_headers @@ -894,12 +895,12 @@ def merge_user(self, user): ) for file in nodes_files_to_reindex.iterator(chunk_size=100): try: - update_share(file, urgent=False) + update_share(file, target_queue=CeleryConfig.task_low_queue) except Exception as e: logger.exception(f'Failed to SHARE reindex file {file._id} during user merge: {e}') for file in preprints_files_to_reindex.iterator(chunk_size=100): try: - update_share(file, urgent=False) + update_share(file, target_queue=CeleryConfig.task_low_queue) except Exception as e: logger.exception(f'Failed to SHARE reindex preprints file {file._id} during user merge: {e}')