|
| 1 | +import jsonschema |
| 2 | +from jsonschema.exceptions import ValidationError |
| 3 | + |
| 4 | +from esdlvalidator.core.esdl import utils as esdlUtils |
| 5 | + |
| 6 | +from esdlvalidator.validation.functions import utils |
| 7 | +from esdlvalidator.validation.functions.function import ( |
| 8 | + FunctionFactory, |
| 9 | + FunctionCheck, |
| 10 | + FunctionDefinition, |
| 11 | + ArgDefinition, |
| 12 | + FunctionType, |
| 13 | + CheckResult, |
| 14 | +) |
| 15 | + |
| 16 | +from pyecore.ecore import EOrderedSet |
| 17 | + |
| 18 | +# TODO: check ports are connected to 1 hydraulically coupled network |
| 19 | + |
| 20 | +@FunctionFactory.register(FunctionType.CHECK, "compare_reference_attributes") |
| 21 | +class CompareRefAttributes(FunctionCheck): |
| 22 | + |
| 23 | + def get_function_definition(self): |
| 24 | + return FunctionDefinition( |
| 25 | + "compare_reference_attributes", |
| 26 | + "TODO", |
| 27 | + [ |
| 28 | + ArgDefinition( |
| 29 | + "ref", |
| 30 | + "TODO", |
| 31 | + True, |
| 32 | + ), |
| 33 | + ArgDefinition( |
| 34 | + "operator", |
| 35 | + "TODO", |
| 36 | + True, |
| 37 | + ), |
| 38 | + ArgDefinition( |
| 39 | + "left", |
| 40 | + "TODO", |
| 41 | + True, |
| 42 | + ), |
| 43 | + ArgDefinition( |
| 44 | + "right", |
| 45 | + "TODO", |
| 46 | + True, |
| 47 | + ), |
| 48 | + ArgDefinition("resultMsgJSON", "Display output in JSON format", False), |
| 49 | + ], |
| 50 | + ) |
| 51 | + |
| 52 | + # Supported operator types. Can be extended in future if required. |
| 53 | + OPERATOR_ENUM = ["greater_than", "equal", "less_than"] |
| 54 | + |
| 55 | + args_schema = { |
| 56 | + "type": "object", |
| 57 | + "required": ["ref", "operator", "left", "right"], |
| 58 | + "properties": { |
| 59 | + "ref": {"type": "string"}, |
| 60 | + "operator": {"type": "string", "enum": OPERATOR_ENUM}, |
| 61 | + "left": { |
| 62 | + "type": "object", |
| 63 | + "required": ["ref_filter", "attribute"], |
| 64 | + "properties": { |
| 65 | + "ref_filter": { |
| 66 | + "type": "object", |
| 67 | + "required": ["is_type", "match"], |
| 68 | + "properties": {"is_type": {"type": "string"}, "match": {"type": "object"}}, |
| 69 | + }, |
| 70 | + "attribute": {"type": "string"}, |
| 71 | + }, |
| 72 | + }, |
| 73 | + "right": { |
| 74 | + "type": "object", |
| 75 | + "required": ["ref_filter", "attribute"], |
| 76 | + "properties": { |
| 77 | + "ref_filter": { |
| 78 | + "type": "object", |
| 79 | + "required": ["is_type", "match"], |
| 80 | + "properties": {"is_type": {"type": "string"}, "match": {"type": "object"}}, |
| 81 | + }, |
| 82 | + "attribute": {"type": "string"}, |
| 83 | + }, |
| 84 | + }, |
| 85 | + "resultMsgJSON": {"type": "boolean"}, |
| 86 | + }, |
| 87 | + } |
| 88 | + |
| 89 | + NOT_FOUND = "Not found" |
| 90 | + UNSET = "Unset" |
| 91 | + |
| 92 | + def execute(self): |
| 93 | + entity = self.value |
| 94 | + args_dict = self.args |
| 95 | + |
| 96 | + results = [] |
| 97 | + |
| 98 | + try: |
| 99 | + jsonschema.validate(instance=args_dict, schema=self.args_schema) |
| 100 | + except ValidationError as e: |
| 101 | + raise ValueError(f"Schema validation failed at {list(e.path)}: {e.message}") |
| 102 | + |
| 103 | + ref = utils.get_attribute(self.args, "ref") |
| 104 | + operator = utils.get_attribute(self.args, "operator") |
| 105 | + # TODO: rename to ref_args? |
| 106 | + left = utils.get_attribute(self.args, "left") |
| 107 | + right = utils.get_attribute(self.args, "right") |
| 108 | + resultMsgJSON = utils.get_attribute(self.args, "resultMsgJSON") |
| 109 | + |
| 110 | + # Check if the retrieved asset references are valid |
| 111 | + references = utils.get_attr_or_ref_attr(entity, ref) |
| 112 | + if references == self.NOT_FOUND: |
| 113 | + raise ValueError(f"[{ref}] not found.") |
| 114 | + if references == self.UNSET: |
| 115 | + results.append(f"[{ref}] should be defined, but is unset.") |
| 116 | + elif not isinstance(references, EOrderedSet): |
| 117 | + raise TypeError(f"Expect [{ref}] to be of EOrderedSet type, found {type(references)}") |
| 118 | + else: |
| 119 | + left_entity = self.get_ref(references, left["ref_filter"]) |
| 120 | + right_entity = self.get_ref(references, right["ref_filter"]) |
| 121 | + |
| 122 | + for entity_ref, side_args in [(left_entity, left), (right_entity, right)]: |
| 123 | + if entity_ref is None: |
| 124 | + match_str = ", ".join(f"[{k}] = '{v}'" for k, v in side_args["ref_filter"]["match"].items()) |
| 125 | + results.append( |
| 126 | + f"[{ref}] should contain a [{side_args['ref_filter']['is_type']}] ({match_str}), but not found." |
| 127 | + ) |
| 128 | + |
| 129 | + if left_entity and right_entity: |
| 130 | + left_val = self.get_attribute_value(left_entity, left, results) |
| 131 | + right_val = self.get_attribute_value(right_entity, right, results) |
| 132 | + |
| 133 | + if left_val is not None and right_val is not None: |
| 134 | + try: |
| 135 | + left_num = float(left_val) |
| 136 | + right_num = float(right_val) |
| 137 | + except (TypeError, ValueError): |
| 138 | + # TODO: message |
| 139 | + results.append(f"Cannot compare non-numeric values: left='{left_val}', right='{right_val}'") |
| 140 | + else: |
| 141 | + if operator == "greater_than" and left_num <= right_num: |
| 142 | + results.append( |
| 143 | + f"[{left['attribute']}] (vale: {left_num}) of [{left['ref_filter']['is_type']}] should be greater than [{right['attribute']}] (value: {right_val}) of [{right['ref_filter']['is_type']}]." |
| 144 | + ) |
| 145 | + elif operator == "less_than" and left_num >= right_num: |
| 146 | + results.append( |
| 147 | + f"[{left['attribute']}] (vale: {left_num}) of [{left['ref_filter']['is_type']}] should be less than [{right['attribute']}] (value: {right_val}) of [{right['ref_filter']['is_type']}]." |
| 148 | + ) |
| 149 | + elif operator == "equal" and left_num != right_num: |
| 150 | + results.append( |
| 151 | + f"[{left['attribute']}] (vale: {left_num}) of [{left['ref_filter']['is_type']}] should be equal to [{right['attribute']}] (value: {right_val}) of [{right['ref_filter']['is_type']}]." |
| 152 | + ) |
| 153 | + |
| 154 | + if len(results) > 0: |
| 155 | + if resultMsgJSON: |
| 156 | + msg = {"offending_asset": utils.get_attribute(entity, "id"), "message": results} |
| 157 | + return CheckResult(False, msg) |
| 158 | + else: |
| 159 | + return CheckResult(False, results) |
| 160 | + else: |
| 161 | + return CheckResult(True) |
| 162 | + |
| 163 | + def get_ref(self, references: EOrderedSet, ref_filter: dict): |
| 164 | + """ |
| 165 | + Filters a set of ESDL references to find the first entity matching the given type and attribute criteria. |
| 166 | +
|
| 167 | + Args: |
| 168 | + references: A collection of ESDL entities. |
| 169 | + ref_filter: A dictionary with keys: |
| 170 | + - 'is_type': the expected ESDL type as a string. |
| 171 | + - 'match': a dictionary of attribute names and expected values. |
| 172 | +
|
| 173 | + Returns: |
| 174 | + The first matching entity, or None if no match is found. |
| 175 | + """ |
| 176 | + ref_type = ref_filter["is_type"] |
| 177 | + match_dict = ref_filter["match"] |
| 178 | + esdlType = esdlUtils.get_esdl_class_from_string(ref_type)[0] |
| 179 | + |
| 180 | + for entity in references: |
| 181 | + if not isinstance(entity, esdlType): |
| 182 | + continue |
| 183 | + if all(utils.get_attr_or_ref_attr(entity, key) == value for key, value in match_dict.items()): |
| 184 | + return entity |
| 185 | + |
| 186 | + return None |
| 187 | + |
| 188 | + def get_attribute_value(self, entity, args: dict, results): |
| 189 | + filter = args["ref_filter"] |
| 190 | + attr_path = args["attribute"] |
| 191 | + value = utils.get_attr_or_ref_attr(entity, attr_path) |
| 192 | + |
| 193 | + if value == self.NOT_FOUND: |
| 194 | + raise ValueError(f"[{attr_path}] not found.") |
| 195 | + if value == self.UNSET: |
| 196 | + # TODO: rename variable |
| 197 | + # msg_head = f"[{attr_path}]" |
| 198 | + |
| 199 | + match_str = ", ".join(f"{k}={v}" for k, v in filter["match"].items()) |
| 200 | + msg_head = f"[{filter['is_type']}] ({match_str})" |
| 201 | + # if len(attr_path.split(".")) > 1: |
| 202 | + # parent_path = attr_path.split(".")[-2] |
| 203 | + # parent_value = utils.get_attr_or_ref_attr(entity, parent_path) |
| 204 | + # if parent_path and parent_value.name: |
| 205 | + # msg_head = f"[{attr_path.split('.')[-1]}] of [{parent_path}] ({parent_value.name})" |
| 206 | + results.append(f"{msg_head} should have [{attr_path}] defined, but is unset.") |
| 207 | + return None |
| 208 | + return value |
0 commit comments