|
1 | 1 | #!/usr/bin/env python3 |
2 | 2 |
|
3 | | -import requests |
4 | | -import json |
5 | | -import re |
6 | | -import os |
7 | | -from jsonschema import validate |
8 | | -from jsonschema.exceptions import ValidationError, SchemaError |
9 | | -from typing import Union, Any, Optional, List |
10 | | - |
11 | | -# -- # |
12 | | -# Hardcoded values |
13 | | -# -- # |
14 | | -biosamples_endpoints = { |
15 | | - "prod": "https://www.ebi.ac.uk/biosamples/samples/", |
16 | | - "dev": "https://wwwdev.ebi.ac.uk/biosamples/samples/", |
17 | | -} |
18 | | -input_json_schema_filepath = "./mars_lib/input-schema.json" |
19 | | - |
20 | | - |
21 | | -# -- # |
22 | | -# Code blocks |
23 | | -# -- # |
24 | | -def load_json_file(file: str) -> Any: |
25 | | - """ |
26 | | - Function to load a JSON file as a dictionary. |
27 | | - Args: |
28 | | - file (str): Path to the file to be loaded. |
29 | | - """ |
30 | | - if not os.path.exists(file): |
31 | | - raise FileNotFoundError(f"The file '{file}' does not exist.") |
32 | | - if not os.path.isfile(file): |
33 | | - raise ValueError(f"The path '{file}' is not a file.") |
34 | | - if not file.endswith(".json"): |
35 | | - raise ValueError( |
36 | | - f"The given file '{file}' is not a JSON file based on its extension." |
37 | | - ) |
38 | | - |
39 | | - try: |
40 | | - with open(file, "r") as f: |
41 | | - loaded_dict = json.load(f) |
42 | | - return loaded_dict |
43 | | - except json.JSONDecodeError: |
44 | | - raise ValueError( |
45 | | - f"The file content of the given file '{file}' is not valid JSON." |
| 3 | +from typing import Any |
| 4 | + |
| 5 | +from mars_lib.isa_json import detect_target_repo_comment |
| 6 | +from mars_lib.models.isa_json import IsaJson, Assay, Sample |
| 7 | +from requests import Response, HTTPError |
| 8 | + |
| 9 | +from mars_lib.target_repo import TargetRepository |
| 10 | + |
| 11 | + |
| 12 | +def _extract_accessions(isa_json: IsaJson) -> list[dict[str, Any]]: |
| 13 | + studies = [] |
| 14 | + |
| 15 | + for study in isa_json.investigation.studies: |
| 16 | + study_samples = study.materials.samples |
| 17 | + study_dict = {'title': study.title, 'id': study.id} |
| 18 | + accessions = [ |
| 19 | + { |
| 20 | + 'id': assay.id, |
| 21 | + 'title': assay.title, |
| 22 | + 'accession': _fetch_assay_accession(assay), |
| 23 | + 'target_repo': _fetch_assay_target_repo(assay), |
| 24 | + 'biosamples': _biosamples_used_in_assay(study_samples, assay) |
| 25 | + } |
| 26 | + for assay in study.assays |
| 27 | + ] |
| 28 | + study_dict.update({'assays': accessions}) |
| 29 | + studies.append(study_dict) |
| 30 | + |
| 31 | + return studies |
| 32 | + |
| 33 | +def _biosamples_used_in_assay(all_biosamples: list[Sample], assay: Assay) -> list[Sample]: |
| 34 | + filter( |
| 35 | + lambda sample: , all_biosamples) |
| 36 | + |
| 37 | +def _fetch_assay_target_repo(assay:Assay) -> str: |
| 38 | + return detect_target_repo_comment(assay.comments).value |
| 39 | + |
| 40 | +def _fetch_assay_accession(assay: Assay) -> str | ValueError: |
| 41 | + accession_comment = next( |
| 42 | + filter( |
| 43 | + lambda comment: "assay_accession" in comment.name.lower(), |
| 44 | + assay.comments |
| 45 | + ), |
| 46 | + None |
| 47 | + ) |
| 48 | + if accession_comment is None: |
| 49 | + return ValueError(f"Accession characteristic not found in assay '[{assay.id}] - {assay.title}'!") |
| 50 | + |
| 51 | + return accession_comment.value |
| 52 | + |
| 53 | +def _get_accession_comment_from_a(material: Material) -> str: |
| 54 | + accession_characteristic = next( |
| 55 | + filter( |
| 56 | + lambda characteristic: characteristic.category.characteristcType.annotationValue.lower() == 'accession', |
| 57 | + material.characteristics |
46 | 58 | ) |
| 59 | + ) |
| 60 | + return accession_characteristic.value |
| 61 | + |
| 62 | +def update_external_references(isa_json: IsaJson, urls_dict: dict) -> Response | HTTPError | ValueError: |
| 63 | + accessions_dict = _extract_accessions(isa_json) |
| 64 | + biosamples_url = urls_dict.get("BIOSAMPLES", {}).get("SERVICE", {}) |
| 65 | + for study in accessions_dict: |
| 66 | + for assay in study.assays: |
| 67 | + url = urls_dict.get(TargetRepository(assay['target_repo']), {}).get('EXTERNAL-REF-URL', None) |
| 68 | + if url is None: |
| 69 | + return ValueError(f"No 'EXTERNAL-REF-URL' found for target repository '{assay['target_repo']}' in assay '{assay['title']}'.") |
47 | 70 |
|
48 | | - |
49 | | -def handle_input_dict(input: dict[str, str]) -> Optional[dict[str, str]]: |
50 | | - """ |
51 | | - Function to handle the input: assert that it's either a dictionary or |
52 | | - the filepath to an existing file containing the dictionary |
53 | | -
|
54 | | - Args: |
55 | | - input (dict or str): Dictionary or filepath to the dictionary JSON. |
56 | | - """ |
57 | | - if isinstance(input, dict): |
58 | | - return input |
59 | | - else: |
60 | | - try: |
61 | | - loaded_dict = load_json_file(input) |
62 | | - |
63 | | - if not isinstance(loaded_dict, dict): |
64 | | - raise ValueError( |
65 | | - f"The file '{input}' does not contain a valid dictionary." |
66 | | - ) |
67 | | - |
68 | | - return loaded_dict |
69 | | - |
70 | | - # TODO: Remove this because the json.JSONDecodeError is already caught in load_json_file |
71 | | - # In the meantime, it doesn't break anything. |
72 | | - except json.JSONDecodeError: |
73 | | - raise ValueError(f"The file '{input}' is not a valid JSON file.") |
74 | | - |
75 | | - |
76 | | -def get_header(token: str) -> dict[str, str]: |
77 | | - """ |
78 | | - Obtain the header using a token. |
79 | | -
|
80 | | - Args: |
81 | | - token (str): The Webin auth token. |
82 | | -
|
83 | | - Returns: |
84 | | - dict: The header. |
85 | | - """ |
86 | | - return { |
87 | | - "Content-Type": "application/json;charset=UTF-8", |
88 | | - "Accept": "application/hal+json", |
89 | | - "Authorization": f"Bearer {token}", |
90 | | - } |
91 | | - |
92 | | - |
93 | | -def validate_bs_accession(accession_str: str) -> None: |
94 | | - """ |
95 | | - Validates that the given accession string conforms to the specified regex format. |
96 | | - See: https://registry.identifiers.org/registry/biosample |
97 | | -
|
98 | | - Args: |
99 | | - accession_str (str): The accession string to be validated. |
100 | | - """ |
101 | | - |
102 | | - pattern = r"^SAM[NED](\w)?\d+$" |
103 | | - |
104 | | - if not re.match(pattern, accession_str): |
105 | | - raise ValueError( |
106 | | - f"The provided accession string '{accession_str}' does not match the required format." |
107 | | - ) |
108 | | - |
109 | | - |
110 | | -def validate_json_against_schema( |
111 | | - json_doc: Union[dict[str, List[str]], str], json_schema: Union[dict[str, str], str] |
112 | | -) -> Optional[bool]: |
113 | | - """ |
114 | | - Validates a JSON document against a given JSON Schema. |
115 | | -
|
116 | | - Args: |
117 | | - json_filepath (dict, str): JSON document or the filepath to it. |
118 | | - schema_filepath (dict, str): JSON schema or the filepath to it. |
119 | | - """ |
120 | | - # Load both files if needed |
121 | | - if isinstance(json_doc, dict): |
122 | | - json_data = json_doc |
123 | | - else: |
124 | | - json_data = load_json_file(json_doc) |
125 | | - |
126 | | - if isinstance(json_schema, dict): |
127 | | - schema_data = json_schema |
128 | | - else: |
129 | | - schema_data = load_json_file(json_schema) |
130 | | - |
131 | | - # Validating JSON against the schema |
132 | | - try: |
133 | | - validate(instance=json_data, schema=schema_data) |
134 | | - return True |
135 | | - except ValidationError as e: |
136 | | - raise ValidationError( |
137 | | - f"Found an error when validating the JSON document with its JSON Schema. This may mean that the given input is invalid." |
138 | | - f"JSON validation error: {e.message}" |
139 | | - ) |
140 | | - except SchemaError as e: |
141 | | - raise SchemaError(f"Schema error: {e.message}") |
142 | | - |
143 | | - |
144 | | -class BiosamplesRecord: |
145 | | - """ |
146 | | - Class representing a record for biosamples to be extended. |
147 | | -
|
148 | | - Attributes: |
149 | | - biosamples_externalReferences: dict or filepath to external references |
150 | | - production: boolean indicating environment mode |
151 | | - """ |
152 | | - |
153 | | - def __init__(self, bs_accession: str) -> None: |
154 | | - """ |
155 | | - Initialize the BiosamplesRecord with provided arguments. |
156 | | -
|
157 | | - Args: |
158 | | - bs_accession: a valid Biosamples accession (e.g. SAMEA112654119) |
159 | | - """ |
160 | | - validate_bs_accession(bs_accession) |
161 | | - self.bs_accession = bs_accession |
162 | | - self.biosamples_credentials: Optional[dict[str, str]] = None |
163 | | - self.biosamples_externalReferences: List[str] = [] |
164 | | - self.production: bool = False |
165 | | - |
166 | | - def display(self) -> None: |
167 | | - """ |
168 | | - Display the attributes for demonstration purposes. |
169 | | - """ |
170 | | - print("Biosamples Credentials:", self.biosamples_credentials) |
171 | | - print("Biosamples External References:", self.biosamples_externalReferences) |
172 | | - print("Production Mode:", self.production) |
173 | | - |
174 | | - def fetch_bs_json(self, biosamples_endpoint: str) -> Optional[dict[str, str]]: |
175 | | - """ |
176 | | - Fetches the BioSample's record (JSON) of the accession. |
177 | | -
|
178 | | - Args: |
179 | | - biosamples_endpoint (str): The endpoint to be used to fetch the record's JSON. |
180 | | - """ |
181 | | - |
182 | | - self.biosamples_url = f"{biosamples_endpoint}{self.bs_accession}.json" |
183 | | - |
184 | | - try: |
185 | | - r = requests.get( |
186 | | - self.biosamples_url |
187 | | - ) # No auth token needed, it's public info |
188 | | - |
189 | | - if r.status_code != 200: |
190 | | - raise RuntimeError( |
191 | | - f"Expected status code 200, but received {r.status_code}. Used URL: '{self.biosamples_url}'. Response content: {r.text}" |
192 | | - ) |
193 | | - |
194 | | - # Attempt to load the JSON content |
195 | | - response_json = r.json() |
196 | | - if not isinstance(response_json, dict): |
197 | | - raise ValueError( |
198 | | - f"The response content is not a valid dictionary. Content: {r.text}" |
199 | | - ) |
200 | | - |
201 | | - except requests.RequestException as e: |
202 | | - raise RuntimeError(f"Error making the request. Details: {e}") |
203 | | - |
204 | | - except json.JSONDecodeError: |
205 | | - raise ValueError( |
206 | | - f"The server response is not valid JSON. Content: {r.text}" |
207 | | - ) |
208 | | - |
209 | | - self.bs_json = response_json |
210 | | - return self.bs_json |
211 | | - |
212 | | - def load_bs_json( |
213 | | - self, bs_json: Union[str, dict[str, str]] |
214 | | - ) -> Optional[dict[str, str]]: |
215 | | - """ |
216 | | - Loads a given JSON, or the file containing it, as the BioSample's record (JSON) for this instance. |
217 | | - It is an alternative to fetching it directly from BioSample. |
218 | | -
|
219 | | - Args: |
220 | | - bs_json Union[str, dict]: The already Biosamples JSON metadata of the accession either path to file or dictionary. |
221 | | - """ |
222 | | - if isinstance(bs_json, dict): |
223 | | - self.bs_json = bs_json |
224 | | - return self.bs_json |
225 | | - elif isinstance(bs_json, str): |
226 | | - bs_json_data = load_json_file(bs_json) |
227 | | - self.bs_json = bs_json_data |
228 | | - return self.bs_json |
229 | | - else: |
230 | | - raise ValueError( |
231 | | - "Neither the file containing the Biosamples JSON nor the Biosamples JSON itself were given to load it into the instance." |
232 | | - ) |
233 | | - |
234 | | - def pop_links(self) -> dict[str, str]: |
235 | | - """ |
236 | | - Removes "_links" array (which is added automatically after updating the biosamples on the BioSample's side). |
237 | | - """ |
238 | | - |
239 | | - if "_links" in self.bs_json: |
240 | | - self.bs_json.pop("_links") |
241 | | - |
242 | | - return self.bs_json |
243 | | - |
244 | | - def extend_externalReferences( |
245 | | - self, new_ext_refs_list: List[dict[str, str]] |
246 | | - ) -> dict[str, str]: |
247 | | - """Extends the JSON of the BioSample's record with new externalReferences""" |
248 | | - if not self.bs_json: |
249 | | - endpoint = ( |
250 | | - biosamples_endpoints["prod"] |
251 | | - if self.production |
252 | | - else biosamples_endpoints["dev"] |
253 | | - ) |
254 | | - self.fetch_bs_json(endpoint) |
255 | | - self.pop_links() |
256 | | - |
257 | | - if "externalReferences" not in self.bs_json: |
258 | | - ext_refs_list = new_ext_refs_list |
259 | | - else: |
260 | | - existing_ext_refs_list = self.bs_json["externalReferences"] |
261 | | - |
262 | | - # Convert dictionaries to JSON strings and add them to a set for deduplication |
263 | | - unique_refs_set = set( |
264 | | - json.dumps(dic) for dic in existing_ext_refs_list + new_ext_refs_list |
265 | | - ) |
266 | | - |
267 | | - # Convert JSON strings back to dictionaries |
268 | | - ext_refs_list = [json.loads(dic_str) for dic_str in unique_refs_set] |
269 | | - |
270 | | - self.bs_json["externalReferences"] = ext_refs_list |
271 | | - return self.bs_json |
272 | | - |
273 | | - def update_remote_record( |
274 | | - self, header: dict[str, str], webin_auth: str = "?authProvider=WEBIN" |
275 | | - ) -> Optional[str]: |
276 | | - """ |
277 | | - Updates the remote record of the BioSample's accession with the current sample JSON. |
278 | | -
|
279 | | - Args: |
280 | | - header (dict): The HTTP headers to use in the request. |
281 | | - webin_auth (str, optional): The authentication provider for WEBIN. |
282 | | - """ |
283 | | - update_url = f"{self.biosamples_url}{webin_auth}" |
284 | | - updated_json = json.dumps(self.bs_json) |
285 | | - |
286 | | - try: |
287 | | - r = requests.put(update_url, headers=header, data=updated_json) |
288 | | - |
289 | | - # Check if HTTP status code indicates success (2xx range) |
290 | | - if r.status_code != 200: |
291 | | - raise RuntimeError( |
292 | | - f"Expected status code 200, but received {r.status_code}. Response content: {r.text}" |
293 | | - ) |
294 | | - |
295 | | - except requests.RequestException as e: |
296 | | - raise RuntimeError(f"Error making the request. Details: {e}") |
297 | | - |
298 | | - return r.text |
0 commit comments