-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathbase.py
More file actions
266 lines (224 loc) · 9.05 KB
/
base.py
File metadata and controls
266 lines (224 loc) · 9.05 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
"""
This module contains base classes that define interfaces
and contain code common to sub-classes, to avoid code duplication.
# Copyright (c) 2023 openMetadataInitiative
"""
from __future__ import annotations
from datetime import date, datetime
from collections import defaultdict
import json
from typing import Union
import rfc3987
from .registry import Registry
class Node(metaclass=Registry):
"""
Base class for a metadata node
"""
@property
def uuid(self):
if self.id is not None:
return self.id.split("/")[-1]
else:
return None
def has_property(self, name):
for property in self.properties:
if property.name == name:
return True
return False
def __eq__(self, other: Node) -> bool:
for property in self.properties:
property_other = getattr(other, property.name, None)
property_self = getattr(self, property.name, None)
if property_other != property_self:
return False
return True
def to_jsonld(self, include_empty_properties=True, embed_linked_nodes=True, with_context=True):
"""
Return a represention of this metadata node as a dictionary that can be directly serialized to JSON-LD.
"""
def value_to_jsonld(value):
if isinstance(value, LinkedMetadata):
if embed_linked_nodes:
item = value.to_jsonld(
with_context=False,
include_empty_properties=include_empty_properties,
embed_linked_nodes=embed_linked_nodes,
)
else:
if hasattr(value, "id") and value.id is None:
raise ValueError("Exporting as a stand-alone JSON-LD document requires @id to be defined.")
item = {"@id": value.id}
elif isinstance(value, EmbeddedMetadata):
item = value.to_jsonld(
with_context=False,
include_empty_properties=include_empty_properties,
embed_linked_nodes=embed_linked_nodes,
)
elif hasattr(value, "to_jsonld"): # e.g. IRI
item = value.to_jsonld()
elif isinstance(value, (date, datetime)):
item = value.isoformat()
else:
item = value
return item
data = {"@type": self.type_}
if with_context:
data["@context"] = {"@vocab": "https://openminds.ebrains.eu/vocab/"}
if hasattr(self, "id") and self.id:
data["@id"] = self.id
for property in self.__class__.properties:
value = getattr(self, property.name)
if value or include_empty_properties:
if property.multiple:
if value is None:
data[property.path] = value
else:
if not isinstance(value, (tuple, list)):
value = [value]
data[property.path] = [value_to_jsonld(item) for item in value]
else:
data[property.path] = value_to_jsonld(value)
return {key: data[key] for key in sorted(data)}
@classmethod
def from_jsonld(cls, data):
"""
Create a Python object representing a metadata node from a JSON-LD-compatible dictionary
"""
data_copy = data.copy()
context = data_copy.pop("@context", None)
type_ = data_copy.pop("@type")
if type_ and type_ != cls.type_:
raise TypeError(f"Mismatched types. Data has '{type_}' but trying to create '{cls.type_}'")
deserialized_data = {}
if issubclass(cls, LinkedMetadata):
deserialized_data["id"] = data_copy.pop("@id", None)
for property in cls.properties:
if property.path in data_copy: # todo: use context to resolve uris
value = data_copy.pop(property.path)
if value:
deserialized_data[property.name] = property.deserialize(value)
else:
deserialized_data[property.name] = value
if len(data_copy) > 0:
raise NameError(f"Unexpected arguments for {cls}: {tuple(data_copy.keys())}")
return cls(**deserialized_data)
def validate(self, ignore=None):
"""
Check whether all constraints are satisfied.
Arguments:
ignore: an optional list of check types that should be ignored
("required", "type", "multiplicity")
Returns a dict containing information about any validation failures.
"""
failures = defaultdict(list)
for property in self.properties:
value = getattr(self, property.name, None)
for key, values in property.validate(value, ignore=ignore).items():
failures[key] += values
return failures
@property
def is_valid(self):
failures = self.validate()
return len(failures) == 0
@property
def links(self):
"""
Return a list of attributes that reference other metadata nodes
"""
_links = []
for property in self.__class__.properties:
value = getattr(self, property.name)
if property.multiple:
if not isinstance(value, (tuple, list)):
value = [value]
for item in value:
if isinstance(item, LinkedMetadata):
_links.append(item)
if hasattr(item, "links"):
_links.extend(item.links)
elif isinstance(value, LinkedMetadata):
_links.append(value)
if hasattr(value, "links"):
_links.extend(value.links)
return _links
def _resolve_links(self, node_lookup):
"""Replace `Link` attributes with typed Nodes where possible"""
for property in self.__class__.properties:
value = getattr(self, property.name)
if isinstance(value, Link):
resolved_value = node_lookup[value.identifier]
setattr(self, property.name, resolved_value)
elif hasattr(value, "_resolve_links"):
value._resolve_links(node_lookup)
elif isinstance(value, (tuple, list)):
resolved_values = []
for item in value:
if isinstance(item, Link):
resolved_values.append(node_lookup[item.identifier])
else:
resolved_values.append(item)
if hasattr(item, "_resolve_links"):
item._resolve_links(node_lookup)
setattr(self, property.name, resolved_values)
class LinkedMetadata(Node):
"""
A Python representation of a metadata node that should have a unique identifier.
"""
_instance_lookup = None
def __init__(self, id=None, **properties):
self.id = id # todo: check this is a URI
for name, value in properties.items():
setattr(self, name, value)
def save(self, file_path, indent=2):
"""
Save this object to a file in JSON-LD format
"""
with open(file_path, "w") as output_file:
json.dump(self.to_jsonld(), output_file, indent=indent)
@classmethod
def load(cls, file_path):
"""
Create a Python object representing a metadata node from a JSON-LD file
"""
with open(file_path, "r") as input_file:
data = json.load(input_file)
return cls.from_jsonld(data)
class EmbeddedMetadata(Node):
"""
A Python representation of a metadata node that should only be embedded within another node,
and should not have a unique identifier.
"""
def __init__(self, **properties):
for name, value in properties.items():
setattr(self, name, value)
class Link:
"""Representation of a metadata node for which only the identifier is currently known."""
def __init__(self, identifier):
self.identifier = identifier
class IRI:
"""
Representation of an International Resource Identifier
"""
def __init__(self, value: Union[str, IRI]):
if isinstance(value, IRI):
iri = value.value
else:
iri = value
if not rfc3987.match(iri, rule="IRI"):
raise ValueError("Invalid IRI")
self.value: str = iri
def __eq__(self, other):
return self.__class__ == other.__class__ and self.value == other.value
def __repr__(self):
return f"IRI({self.value})"
def __str__(self):
return self.value
def to_jsonld(self):
return self.value
def validate(self, ignore=None):
if ignore is None:
ignore = []
failures = defaultdict(list)
if self.value.startswith("file") and "value" not in ignore:
failures["value"].append("IRI points to a local file path")
return failures