|
| 1 | +import logging |
| 2 | +import requests |
| 3 | +from requests.compat import urljoin |
| 4 | +from typing import Any, Optional, Dict, List, Set, Tuple |
| 5 | + |
| 6 | +from ..app import app, cache |
| 7 | +from .sri_node_normalizer import SriNodeNormalizer |
| 8 | + |
| 9 | + |
| 10 | +def _bypass_cache(f, *args, **kwargs): |
| 11 | + return kwargs.get('bypass', False) |
| 12 | + |
| 13 | + |
| 14 | +class Ubergraph: |
| 15 | + base_url_default = 'https://automat.transltr.io/ubergraph/1.4/' |
| 16 | + base_urls = { |
| 17 | + 'dev': 'https://automat.renci.org/ubergraph/1.4/', |
| 18 | + 'ITRB-CI': 'https://automat.ci.transltr.io/ubergraph/1.4/', |
| 19 | + 'ITRB-TEST': 'https://automat.test.transltr.io/ubergraph/1.4/', |
| 20 | + 'ITRB-PROD': 'https://automat.transltr.io/ubergraph/1.4/' |
| 21 | + } |
| 22 | + endpoint_query = 'query' |
| 23 | + endpoint_meta_kg = 'meta_knowledge_graph' |
| 24 | + INFORES_ID = 'infores:automat-ubergraph' |
| 25 | + _TIMEOUT = 10 # Query timeout (seconds) |
| 26 | + |
| 27 | + deployment_env = app.config.get('DEPLOYMENT_ENV', 'dev') |
| 28 | + base_url = base_urls.get(deployment_env, base_url_default) |
| 29 | + logging.info(f'Deployment environment "{deployment_env}" --> using Node Norm @ {base_url}') |
| 30 | + |
| 31 | + |
| 32 | + @staticmethod |
| 33 | + @cache.memoize(timeout=86400, cache_none=False) |
| 34 | + def get_meta_kg(): |
| 35 | + """ Get Ontology KP meta_knowledge_graph """ |
| 36 | + try: |
| 37 | + url = urljoin(Ubergraph.base_url, Ubergraph.endpoint_meta_kg) |
| 38 | + resp = requests.get(url, timeout=Ubergraph._TIMEOUT) |
| 39 | + if resp.status_code == 200: |
| 40 | + return resp.json() |
| 41 | + else: |
| 42 | + # Return None, indicating an error occurred |
| 43 | + logging.warning(f'Received a non-200 status response code from Ontology KP meta_kg ({url}): ' |
| 44 | + f'{(resp.status_code, resp.text)}') |
| 45 | + return None |
| 46 | + except requests.RequestException: |
| 47 | + # Return None, indicating an error occurred |
| 48 | + logging.warning(f'Encountered an RequestException when querying Ontology KP meta_kg: {url}') |
| 49 | + return None |
| 50 | + |
| 51 | + @staticmethod |
| 52 | + def get_allowed_prefixes(categories: List[str]) -> Optional[Set[str]]: |
| 53 | + """ Get the set of id_prefixes for categories from meta_knowledge_graph |
| 54 | +
|
| 55 | + Parameters |
| 56 | + ---------- |
| 57 | + categories |
| 58 | +
|
| 59 | + Returns |
| 60 | + ------- |
| 61 | + Set[str] or None if error |
| 62 | + """ |
| 63 | + meta_kg = Ubergraph.get_meta_kg() |
| 64 | + if meta_kg is None: |
| 65 | + return None |
| 66 | + nodes = meta_kg.get('nodes') |
| 67 | + if nodes is None: |
| 68 | + logging.warning('Ontology KP meta_kg has missing "nodes"') |
| 69 | + return None |
| 70 | + |
| 71 | + allowed_prefixes = set() |
| 72 | + for cat in categories: |
| 73 | + if cat in nodes and 'id_prefixes' in nodes[cat]: |
| 74 | + allowed_prefixes = allowed_prefixes.union(nodes[cat]['id_prefixes']) |
| 75 | + |
| 76 | + return allowed_prefixes |
| 77 | + |
| 78 | + @staticmethod |
| 79 | + def convert_to_preferred(curies: List[str], categories: List[str]) -> Dict[str, str]: |
| 80 | + """ Converts the input CURIEs into the prefixes prefered by Ontology KP |
| 81 | +
|
| 82 | + Parameters |
| 83 | + ---------- |
| 84 | + curies - List[str] |
| 85 | + categories - List[str] |
| 86 | +
|
| 87 | + Returns |
| 88 | + ------- |
| 89 | + Dict of CURIEs converted to preferred prefixes, if successful. Otherwise, the CURIEs are returned unaltered. |
| 90 | + """ |
| 91 | + allowed_prefixes = Ubergraph.get_allowed_prefixes(categories) |
| 92 | + if allowed_prefixes is not None: |
| 93 | + # Get normalized nodes for any of the CURIEs with prefixes that are not in the allowed list |
| 94 | + curies_to_convert = [c for c in curies if c.split(':')[0] not in allowed_prefixes] |
| 95 | + norm_nodes = SriNodeNormalizer.get_normalized_nodes(curies_to_convert) |
| 96 | + if norm_nodes is None: |
| 97 | + # Failed node normalizer. Return the original curies |
| 98 | + return {c:c for c in curies} |
| 99 | + |
| 100 | + preferred_curies = dict() |
| 101 | + for curie in curies: |
| 102 | + if curie not in curies_to_convert: |
| 103 | + # This CURIE already allowed |
| 104 | + preferred_curies[curie] = curie |
| 105 | + else: |
| 106 | + if norm_nodes.get(curie) is None: |
| 107 | + # No node normalizer info for this CURIE, try to use the CURIE as is |
| 108 | + preferred_curies[curie] = curie |
| 109 | + continue |
| 110 | + |
| 111 | + # Get the ID with the prefix that appears earliest in the allowed |
| 112 | + new_ids = [v.id for v in norm_nodes[curie].equivalent_identifiers] |
| 113 | + preferred_curie = None |
| 114 | + for prefix in allowed_prefixes: |
| 115 | + for nid in new_ids: |
| 116 | + if nid.split(':')[0] == prefix: |
| 117 | + preferred_curie = nid |
| 118 | + break |
| 119 | + if preferred_curie is not None: |
| 120 | + break |
| 121 | + if preferred_curie is None: |
| 122 | + # No CURIE with allowed prefix found. Just try with the original CURIE |
| 123 | + preferred_curie = curie |
| 124 | + preferred_curies[curie] = preferred_curie |
| 125 | + return preferred_curies |
| 126 | + else: |
| 127 | + # Didn't get a valid response from meta_knowledge_graph. Don't alter the input CURIEs |
| 128 | + return {curie:curie for curie in curies} |
| 129 | + |
| 130 | + |
| 131 | + @staticmethod |
| 132 | + @cache.memoize(timeout=3600, cache_none=False, unless=_bypass_cache) |
| 133 | + def get_descendants(curies: List[str], categories: Optional[List[str]] = None, timeout: int = _TIMEOUT, bypass: bool = False) -> \ |
| 134 | + Tuple[Optional[Dict[str, Any]], Optional[Dict[str, Any]]]: |
| 135 | + """ Get descendant CURIEs from Ontology KP |
| 136 | +
|
| 137 | + Parameters |
| 138 | + ---------- |
| 139 | + curies - list of curies |
| 140 | + categories - list of biolink categories, or None |
| 141 | +
|
| 142 | + Returns |
| 143 | + ------- |
| 144 | + All knowledge graph nodes returned by the Ontology KP. If any errors, an emtpy dict is returned. |
| 145 | + """ |
| 146 | + # Ontology KP doesn't seem to like it when categories is null. Replace it with NamedThing for functionally |
| 147 | + # equivalent TRAPI |
| 148 | + if categories is None: |
| 149 | + categories = ['biolink:NamedThing'] |
| 150 | + |
| 151 | + preferred_curies = Ubergraph.convert_to_preferred(curies, categories) |
| 152 | + # Reverse mapping to get original CURIE from preferred CURIE |
| 153 | + original_curies = {v:k for (k,v) in preferred_curies.items()} |
| 154 | + |
| 155 | + try: |
| 156 | + # Query Ontology KP for descendants |
| 157 | + m = { |
| 158 | + "message": { |
| 159 | + "query_graph": { |
| 160 | + "nodes": { |
| 161 | + "a": { |
| 162 | + "ids": list(preferred_curies.values()) |
| 163 | + }, |
| 164 | + "b": { |
| 165 | + "categories": categories |
| 166 | + } |
| 167 | + }, |
| 168 | + "edges": { |
| 169 | + "ab": { |
| 170 | + "subject": "b", |
| 171 | + "object": "a", |
| 172 | + "predicates": ["biolink:subclass_of"] |
| 173 | + } |
| 174 | + } |
| 175 | + } |
| 176 | + } |
| 177 | + } |
| 178 | + |
| 179 | + logging.debug(m) |
| 180 | + |
| 181 | + url = urljoin(Ubergraph.base_url, Ubergraph.endpoint_query) |
| 182 | + response = requests.post(url=url, json=m, timeout=timeout) |
| 183 | + if response.status_code == 200: |
| 184 | + j = response.json() |
| 185 | + if 'message' in j and 'knowledge_graph' in j['message']: |
| 186 | + kg = j['message']['knowledge_graph'] |
| 187 | + nodes = kg.get('nodes') |
| 188 | + edges = kg.get('edges') |
| 189 | + if nodes is not None and edges is not None: |
| 190 | + # Replace preferred CURIEs with the original queried CURIE |
| 191 | + for curie, pc in preferred_curies.items(): |
| 192 | + if pc in nodes and curie != pc: |
| 193 | + nodes[curie] = nodes[pc] |
| 194 | + del nodes[pc] |
| 195 | + |
| 196 | + # Also return a dictionary indicating the QNode IDs that are ancestors of each descendant |
| 197 | + ancestor_dict = {original_curies[e['subject']] if e['subject'] in original_curies |
| 198 | + else e['subject']:original_curies[e['object']] |
| 199 | + for e in edges.values() if e['predicate'] == 'biolink:subclass_of'} |
| 200 | + |
| 201 | + return nodes, ancestor_dict |
| 202 | + else: |
| 203 | + # Return an empty dict, indicating no descendants found |
| 204 | + return dict(), dict() |
| 205 | + else: |
| 206 | + logging.warning(f'Automat-Ubergraph returned status code {response.status_code}: {response.content}') |
| 207 | + except requests.Timeout: |
| 208 | + logging.warning(f'Automat-Ubergraph timed out when querying for descendants ({Ubergraph._TIMEOUT} sec)') |
| 209 | + return None |
| 210 | + except requests.RequestException: |
| 211 | + # Return None, indicating an error occurred |
| 212 | + logging.warning('Encountered an RequestException when querying descendants from Automat-Ubergraph') |
| 213 | + return None |
| 214 | + |
| 215 | + # Return None, indicating an error occurred |
| 216 | + return None |
0 commit comments