-
Notifications
You must be signed in to change notification settings - Fork 25
Multiple doc types #732
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
kpsherva
wants to merge
12
commits into
CERNDocumentServer:master
Choose a base branch
from
kpsherva:multiple-doc-types
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+9,204
−1,282
Open
Multiple doc types #732
Changes from all commits
Commits
Show all changes
12 commits
Select commit
Hold shift + click to select a range
0264013
change(harvester): improve writer interface
kpsherva 186851a
chore(resolver): return always the same type
kpsherva 6894c7d
feat(harvester): introduce notion of versions in the writer
kpsherva a48eb5d
feat(harvester): add specialized mappers per res type
kpsherva 0d1b370
tests: add multiple resource type case
kpsherva 49f4fa5
chore(harvester): fix tests
kpsherva 56637e9
wip: adjust file field update
kpsherva b813559
change(harvester): when to update files policy
kpsherva fa55b09
change(harvester): splitting metadata assignment by resource type
kpsherva a2ba8ac
chore(harvester): formatting
kpsherva 916d066
feat(harvester): add ROR assignment in mapping
kpsherva 0d84f9d
fix(tests): resource type assert
kpsherva File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,8 @@ | ||
| # -*- coding: utf-8 -*- | ||
| # | ||
| # Copyright (C) 2026 CERN. | ||
| # | ||
| # CDS-RDM is free software; you can redistribute it and/or modify it under | ||
| # the terms of the MIT License; see LICENSE file for more details. | ||
|
|
||
| """INSPIRE to CDS harvester context module.""" |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,76 @@ | ||
| # -*- coding: utf-8 -*- | ||
| # | ||
| # Copyright (C) 2026 CERN. | ||
| # | ||
| # CDS-RDM is free software; you can redistribute it and/or modify it under | ||
| # the terms of the MIT License; see LICENSE file for more details. | ||
|
|
||
| """Draft lifecycle management module.""" | ||
|
|
||
| from flask import current_app | ||
| from invenio_access.permissions import system_identity | ||
| from invenio_db import db | ||
| from invenio_rdm_records.proxies import current_rdm_records_service | ||
| from invenio_rdm_records.services.errors import ValidationErrorWithMessageAsList | ||
| from invenio_vocabularies.datastreams.errors import WriterError | ||
| from marshmallow import ValidationError | ||
|
|
||
|
|
||
| class DraftLifecycleManager: | ||
| """Manages draft creation, editing, versioning, and publishing.""" | ||
|
|
||
| def create(self, entry): | ||
| """Create a new draft from entry data.""" | ||
| return current_rdm_records_service.create(system_identity, data=entry) | ||
|
|
||
| def edit(self, record_pid): | ||
| """Open an edit draft for an existing published record.""" | ||
| return current_rdm_records_service.edit(system_identity, record_pid) | ||
|
|
||
| def update(self, draft, metadata): | ||
| """Update draft.""" | ||
| return current_rdm_records_service.update_draft( | ||
| system_identity, draft.id, metadata | ||
| ) | ||
|
|
||
| def new_version(self, record_pid): | ||
| """Create a new-version draft from an existing published record.""" | ||
| return current_rdm_records_service.new_version(system_identity, record_pid) | ||
|
|
||
| def add_community(self, draft): | ||
| """Add the CERN Scientific Community to the draft.""" | ||
| with db.session.begin_nested(): | ||
| community_id = current_app.config["CDS_CERN_SCIENTIFIC_COMMUNITY_ID"] | ||
| draft_obj = current_rdm_records_service.draft_cls.pid.resolve( | ||
| draft.id, registered_only=False | ||
| ) | ||
| draft_obj.parent.communities.add(community_id) | ||
| draft_obj.parent.communities.default = community_id | ||
| draft_obj.parent.commit() | ||
|
|
||
| def publish(self, draft_id, logger): | ||
| """Publish a draft. Deletes the draft on any failure, then raises WriterError.""" | ||
| try: | ||
| logger.debug(f"Publishing draft {draft_id}") | ||
| current_rdm_records_service.publish(system_identity, draft_id) | ||
| logger.info(f"Draft {draft_id} published successfully.") | ||
| except ValidationError as e: | ||
| logger.error( | ||
| f"Failure: draft {draft_id} not published, validation errors: {e}." | ||
| ) | ||
| current_rdm_records_service.delete_draft(system_identity, draft_id) | ||
| raise WriterError( | ||
| f"Failure: draft {draft_id} not published, validation errors: {e}." | ||
| ) | ||
| except ValidationErrorWithMessageAsList as e: | ||
| current_rdm_records_service.delete_draft(system_identity, draft_id) | ||
| raise WriterError( | ||
| f"Failure: draft {draft_id} not published," | ||
| f" validation errors: {e.messages}." | ||
| ) | ||
| except Exception as e: | ||
| current_rdm_records_service.delete_draft(system_identity, draft_id) | ||
| raise WriterError( | ||
| f"Draft {draft_id} failed publishing" | ||
| f" because of an unexpected error: {str(e)}." | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,210 @@ | ||||||
| # -*- coding: utf-8 -*- | ||||||
| # | ||||||
| # Copyright (C) 2026 CERN. | ||||||
| # | ||||||
| # CDS-RDM is free software; you can redistribute it and/or modify it under | ||||||
| # the terms of the MIT License; see LICENSE file for more details. | ||||||
|
|
||||||
| """File synchronization module.""" | ||||||
|
|
||||||
| import time | ||||||
| from dataclasses import dataclass | ||||||
| from io import BytesIO | ||||||
| from typing import List | ||||||
|
|
||||||
| import requests | ||||||
| from invenio_access.permissions import system_identity | ||||||
| from invenio_rdm_records.proxies import current_rdm_records_service | ||||||
| from invenio_records_resources.services.errors import FileKeyNotFoundError | ||||||
| from invenio_vocabularies.datastreams.errors import WriterError | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class RetryConfig: | ||||||
| """Configuration for file fetch retries.""" | ||||||
|
|
||||||
| max_retries: int = 3 | ||||||
| retry_delay: int = 60 # seconds; only applied on network exceptions | ||||||
|
|
||||||
|
|
||||||
| @dataclass | ||||||
| class FileDiff: | ||||||
| """Diff between existing and new file sets, keyed by checksum.""" | ||||||
|
|
||||||
| to_add: List[str] # checksums of new files to upload | ||||||
| to_delete: List[str] # checksums of files to remove | ||||||
| existing: List[str] | ||||||
|
|
||||||
|
|
||||||
| class FileSynchronizer: | ||||||
| """Handles file I/O, diffing, uploading, and deletion for draft records.""" | ||||||
|
|
||||||
| def __init__(self, retry_config: RetryConfig = None): | ||||||
| """Constructor.""" | ||||||
| self.retry_config = retry_config or RetryConfig() | ||||||
|
|
||||||
| def compute_diff(self, existing_files, new_files) -> FileDiff: | ||||||
| """Return the set difference between existing and new file checksums.""" | ||||||
| existing_checksums = [value["checksum"] for value in existing_files.values()] | ||||||
| new_checksums = [value["checksum"] for value in new_files.values()] | ||||||
|
|
||||||
| return FileDiff( | ||||||
| to_add=list(set(new_checksums) - set(existing_checksums)), | ||||||
| to_delete=list(set(existing_checksums) - set(new_checksums)), | ||||||
| existing=list(set(existing_checksums)), | ||||||
| ) | ||||||
|
|
||||||
| def fetch(self, url, logger) -> BytesIO: | ||||||
| """Fetch file content from URL. | ||||||
|
|
||||||
| Raises WriterError after exhausting retries. | ||||||
| """ | ||||||
| max_retries = self.retry_config.max_retries | ||||||
| retry_delay = self.retry_config.retry_delay | ||||||
|
|
||||||
| logger.debug(f"File URL: {url}") | ||||||
| attempt = 0 | ||||||
| while attempt < max_retries: | ||||||
| attempt += 1 | ||||||
| try: | ||||||
| logger.debug( | ||||||
| f"Attempt {attempt}/{max_retries} - HEAD request to: {url}" | ||||||
| ) | ||||||
| head = requests.head(url, allow_redirects=True) | ||||||
| resolved_url = head.url | ||||||
| logger.info(f"Get file, URL: {resolved_url}.") | ||||||
| response = requests.get(resolved_url, stream=True) | ||||||
| logger.debug(f"Response status code: {response.status_code}") | ||||||
| if response.status_code == 200: | ||||||
| logger.debug("Success: File retrieved.") | ||||||
| return BytesIO(response.content) | ||||||
| else: | ||||||
| logger.warning( | ||||||
| f"Retrieving file request failed. " | ||||||
| f"Attempt {attempt}/{max_retries} " | ||||||
| f"Error {response.status_code}." | ||||||
| f" URL: {resolved_url}." | ||||||
| ) | ||||||
| except Exception as e: | ||||||
| logger.warning( | ||||||
| f"Attempt {attempt}/{max_retries} failed with exception: {e}" | ||||||
| ) | ||||||
| logger.debug("Retrying in 1 minute...") | ||||||
| time.sleep(retry_delay) | ||||||
|
|
||||||
| logger.error( | ||||||
| f"Retrieving file request failed. Max retries {max_retries} reached." | ||||||
| f" URL: {url}." | ||||||
| ) | ||||||
| raise WriterError( | ||||||
| f"Failed to fetch file from {url} after {max_retries} retries." | ||||||
| ) | ||||||
|
|
||||||
| def check_files_should_update(self, record, incoming_record, logger): | ||||||
| """Check if files should be updated.""" | ||||||
| if not record: | ||||||
| return True | ||||||
| record_dict = record.to_dict() | ||||||
| existing_files = record_dict["files"]["entries"] | ||||||
| new_files = incoming_record["files"].get("entries", {}) | ||||||
| logger.info( | ||||||
| f"Existing files count: {len(existing_files)}," | ||||||
| f" New files count: {len(new_files)}" | ||||||
| ) | ||||||
|
|
||||||
| diff = self.compute_diff(existing_files, new_files) | ||||||
| logger.debug(f"Existing files' checksums: {diff.existing}.") | ||||||
| logger.debug(f"New files' checksums: {diff.to_add}.") | ||||||
| should_update_files = bool(new_files) and bool(diff.to_add or diff.to_delete) | ||||||
|
|
||||||
| return should_update_files | ||||||
|
|
||||||
| def sync(self, draft, record, incoming_record, logger, import_files=True): | ||||||
| """Sync files on a draft: delete removed files, upload added files.""" | ||||||
| existing_files = {} | ||||||
| if not record: | ||||||
| should_import_files = False | ||||||
| else: | ||||||
| should_import_files = ( | ||||||
| record | ||||||
| and import_files | ||||||
| and record.data.get("files", {}).get("enabled", False) | ||||||
| ) | ||||||
| record_dict = record.to_dict() | ||||||
| existing_files = record_dict["files"]["entries"] | ||||||
| if should_import_files: | ||||||
| current_rdm_records_service.import_files(system_identity, draft.id) | ||||||
| logger.debug( | ||||||
| f"Imported files to {draft.id} from previous version: {record.id}" | ||||||
| ) | ||||||
|
|
||||||
| new_files = incoming_record["files"].get("entries", {}) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. was it not |
||||||
| diff = self.compute_diff(existing_files, new_files) | ||||||
| should_update_files = bool(new_files) and bool(diff.to_add or diff.to_delete) | ||||||
| if should_update_files: | ||||||
| for filename, file_data in existing_files.items(): | ||||||
| if file_data["checksum"] in diff.to_delete: | ||||||
| logger.debug(f"Delete file: {filename}") | ||||||
| current_rdm_records_service.draft_files.delete_file( | ||||||
| system_identity, draft.id, filename | ||||||
| ) | ||||||
|
|
||||||
| logger.info(f"{len(diff.to_delete)} files successfully deleted.") | ||||||
|
|
||||||
| logger.debug("Creating new files") | ||||||
| for key, file in new_files.items(): | ||||||
| if file["checksum"] in diff.to_add: | ||||||
| logger.debug(f"Processing new file: {key}") | ||||||
| inspire_url = file.get("source_url") | ||||||
| file_content = self.fetch(inspire_url, logger) | ||||||
| self._upload_file(draft, file, file_content, logger) | ||||||
| logger.info(f"{len(new_files)} files successfully created.") | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
|
|
||||||
| def _upload_file(self, draft, file_data, file_content, logger): | ||||||
| """Initialize, upload, and commit a single file to the draft.""" | ||||||
| logger.debug(f"Filename: '{file_data['key']}'.") | ||||||
| service = current_rdm_records_service | ||||||
| inspire_checksum = file_data["checksum"] | ||||||
| new_checksum = None | ||||||
|
|
||||||
| try: | ||||||
| if inspire_checksum is None: | ||||||
| # this can happen when we get the file directly from arxiv. | ||||||
| # unfortunately, arxiv does not expose checksums | ||||||
| del file_data["checksum"] | ||||||
| file_data_to_init = { | ||||||
| k: v for k, v in file_data.items() if k != "source_url" | ||||||
| } | ||||||
| service.draft_files.init_files( | ||||||
| system_identity, draft.id, [file_data_to_init] | ||||||
| ) | ||||||
| logger.debug(f"Filename: '{file_data['key']}' initialized successfully.") | ||||||
|
|
||||||
| service.draft_files.set_file_content( | ||||||
| system_identity, draft.id, file_data["key"], file_content | ||||||
| ) | ||||||
| logger.debug( | ||||||
| f"Filename: '{file_data['key']}' content set successfully. Commit file..." | ||||||
| ) | ||||||
|
|
||||||
| result = service.draft_files.commit_file( | ||||||
| system_identity, draft.id, file_data["key"] | ||||||
| ) | ||||||
| new_checksum = result.data["checksum"] | ||||||
| logger.debug( | ||||||
| f"Filename: '{file_data['key']}' committed." | ||||||
| f" File checksum: {result.data['checksum']}." | ||||||
| ) | ||||||
|
|
||||||
| if inspire_checksum: | ||||||
| assert inspire_checksum == new_checksum | ||||||
| except AssertionError: | ||||||
| logger.error( | ||||||
| f"Files checksums don't match." | ||||||
| f" Delete file: '{file_data['key']}' from draft." | ||||||
| ) | ||||||
| service.draft_files.delete_file(system_identity, draft.id, file_data["key"]) | ||||||
| raise WriterError( | ||||||
| f"File {file_data['key']} checksum mismatch." | ||||||
| f" Expected: {inspire_checksum}, got: {new_checksum}." | ||||||
| ) | ||||||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since arxiv files dont have a checksum here and we generate the checksum when file uploaded, if we receive the same arxiv file it'll have None in the new_checksums if I understand correctly. Could this cause the same file to be treated as changed?