|
1 | 1 | import json |
| 2 | +from typing import TypedDict, Optional |
2 | 3 |
|
3 | 4 | from flask import request, Response |
4 | 5 | from flask_restx import Resource |
|
12 | 13 | parser.add_argument("schemas", type=str, help="List of schema id's, comma separated", required=True) |
13 | 14 |
|
14 | 15 |
|
15 | | -@app.ns_validation_to_msgs.route('/') |
| 16 | +class ValidationResult(TypedDict): |
| 17 | + severity: str |
| 18 | + validation_message: str |
| 19 | + check_result_messages: list[str] |
| 20 | + |
| 21 | + |
| 22 | +class AssetMessage(TypedDict): |
| 23 | + """AssetMessage messages contains a list of all ValidationResult that are associated with the assetID""" |
| 24 | + |
| 25 | + assetID: str |
| 26 | + messages: list[ValidationResult] |
| 27 | + |
| 28 | + |
| 29 | +@app.ns_validation_to_msgs.route("/") |
16 | 30 | class ValidationToMessagesController(Resource): |
17 | 31 |
|
18 | | - @app.ns_validation_to_msgs.doc(description="Post a new validation schema", responses={ |
19 | | - 200: "Ok", |
20 | | - 404: "Schema not found", |
21 | | - 400: "Unknown filetype, Invalid ESDL"}) |
| 32 | + @app.ns_validation_to_msgs.doc( |
| 33 | + description="Post a new validation schema", |
| 34 | + responses={200: "Ok", 404: "Schema not found", 400: "Unknown filetype, Invalid ESDL"}, |
| 35 | + ) |
22 | 36 | @app.api.expect(parser, validate=True) |
23 | 37 | def post(self): |
24 | | - """Validate an ESDL file against one or more validation schemas and return JSON""" |
| 38 | + """Validate an ESDL file and return grouped validation messages per asset""" |
| 39 | + |
25 | 40 | if request.data: |
26 | 41 | # 'Contains the incoming request data as string in case it came with a mimetype Flask does not handle' |
27 | 42 | # > Happens with requests from the mapeditor |
28 | | - file = request.data.decode('utf-8') |
| 43 | + file = request.data.decode("utf-8") |
29 | 44 | else: |
30 | 45 | # with openapi |
31 | | - file = request.args['data'] |
| 46 | + file = request.args["data"] |
32 | 47 |
|
33 | 48 | if "schemas" not in request.args: |
34 | 49 | return "Bad Request: Required 'schemas' parameter missing", 400 |
35 | | - schema_list = [id for id in request.args['schemas'].split(',')] |
| 50 | + |
| 51 | + schema_list = [id for id in request.args["schemas"].split(",")] |
| 52 | + |
36 | 53 | try: |
37 | 54 | result = validationService.validateContents(file, schema_list) |
38 | 55 | except SchemaNotFound as e: |
39 | | - return e.message, 400 |
40 | | - |
41 | | - json_result = [] |
42 | | - |
43 | | - if 'schemas' in result: |
44 | | - for schema in result['schemas']: |
45 | | - if 'validations' in schema: |
46 | | - for validation in schema['validations']: |
47 | | - if 'errors' in validation: |
48 | | - for error in validation['errors']: |
49 | | - if 'offending_asset' in error: |
50 | | - asset = error['offending_asset'] |
51 | | - msgs = self.get_messages(json_result, asset) |
52 | | - msgs.append({'message': error['message'], 'severity': 'ERROR'}) |
53 | | - json_result = self.set_messages(json_result, asset, msgs) |
54 | | - if 'warnings' in validation: |
55 | | - for warning in validation['warnings']: |
56 | | - if 'offending_asset' in warning: |
57 | | - asset = warning['offending_asset'] |
58 | | - msgs = self.get_messages(json_result, asset) |
59 | | - msgs.append({'message': warning['message'], 'severity': 'WARNING'}) |
60 | | - json_result = self.set_messages(json_result, asset, msgs) |
61 | | - |
62 | | - return Response(response=json.dumps(json_result), status=200, mimetype='text/json') |
| 56 | + return e.message, e.statusCode |
63 | 57 |
|
64 | | - @staticmethod |
65 | | - def get_messages(result, asset): |
66 | | - for element in result: |
67 | | - if element["assetID"] == asset: |
68 | | - return element["messages"] |
69 | | - return [] |
| 58 | + json_result: list[AssetMessage] = [] |
| 59 | + |
| 60 | + for schema in result["schemas"]: |
| 61 | + if "validations" not in schema: |
| 62 | + continue |
| 63 | + |
| 64 | + for validation in schema["validations"]: |
| 65 | + for severity_key in ["errors", "warnings"]: |
| 66 | + severity = severity_key.rstrip("s").upper() |
| 67 | + |
| 68 | + if severity_key in validation: |
| 69 | + for issue in validation[severity_key]: |
| 70 | + asset_id = issue.get("offending_asset") |
| 71 | + |
| 72 | + if asset_id is None: |
| 73 | + raise KeyError( |
| 74 | + f"Can not associate message with an asset as 'offending_asset' key not found." |
| 75 | + ) |
| 76 | + |
| 77 | + message = issue.get("message") |
| 78 | + if isinstance(message, dict): |
| 79 | + validation_msg = message.get("validation_message", "") |
| 80 | + check_msgs = message.get("check_result_message", []) |
| 81 | + if isinstance(check_msgs, str): |
| 82 | + check_msgs = [check_msgs] |
| 83 | + else: |
| 84 | + validation_msg = "" |
| 85 | + check_msgs = [str(message)] |
| 86 | + |
| 87 | + asset_message = self.get_asset_message(asset_id, json_result) |
| 88 | + |
| 89 | + if asset_message is None: |
| 90 | + new_asset_message = AssetMessage( |
| 91 | + assetID=asset_id, |
| 92 | + messages=[ |
| 93 | + ValidationResult( |
| 94 | + severity=severity, |
| 95 | + validation_message=validation_msg, |
| 96 | + check_result_messages=check_msgs, |
| 97 | + ) |
| 98 | + ], |
| 99 | + ) |
| 100 | + json_result.append(new_asset_message) |
| 101 | + |
| 102 | + else: |
| 103 | + # Try to find existing validation_message entry and extend its check_result_messages |
| 104 | + matched = False |
| 105 | + for v_result in asset_message["messages"]: |
| 106 | + if v_result["validation_message"] == validation_msg: |
| 107 | + v_result["check_result_messages"].extend(check_msgs) |
| 108 | + matched = True |
| 109 | + break |
| 110 | + |
| 111 | + if not matched: |
| 112 | + asset_message["messages"].append( |
| 113 | + ValidationResult( |
| 114 | + severity=severity, |
| 115 | + validation_message=validation_msg, |
| 116 | + check_result_messages=check_msgs, |
| 117 | + ) |
| 118 | + ) |
| 119 | + |
| 120 | + return Response(response=json.dumps(json_result), status=200, mimetype="text/json") |
70 | 121 |
|
71 | 122 | @staticmethod |
72 | | - def set_messages(result, asset, msgs): |
73 | | - for element in result: |
74 | | - if element["assetID"] == asset: |
75 | | - element["messages"] = msgs |
76 | | - return result |
77 | | - result.append({"assetID": asset, "messages": msgs}) |
78 | | - return result |
| 123 | + def get_asset_message(asset_id: str, assets_messages: list[AssetMessage]) -> Optional[AssetMessage]: |
| 124 | + """Find and return the AssetMessage for a given asset ID""" |
| 125 | + return next((m for m in assets_messages if m["assetID"] == asset_id), None) |
0 commit comments