Skip to content

Commit d4a61dd

Browse files
author
Tom Reitz
committed
implementing structured results file output for validate
1 parent 5ad6915 commit d4a61dd

3 files changed

Lines changed: 108 additions & 72 deletions

File tree

lightbeam/lightbeam.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import os
2+
import re
23
import json
34
import yaml
45
import logging
@@ -80,6 +81,7 @@ def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys
8081
self.api = EdFiAPI(self)
8182
self.token_version = 0
8283
self.results_file = results_file
84+
self.start_timestamp = datetime.now()
8385

8486
# load params and/or env vars for config YAML interpolation
8587
self.params = json.loads(params) if params else {}
@@ -115,6 +117,51 @@ def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys
115117
if self.track_state and not os.path.isdir(self.config["state_dir"]):
116118
self.logger.debug("creating state dir {0}".format(self.config["state_dir"]))
117119
os.mkdir(self.config["state_dir"])
120+
121+
# Initialize a dictionary for tracking run metadata (for structured output)
122+
self.metadata = {
123+
"started_at": self.start_timestamp.isoformat(timespec='microseconds'),
124+
"working_dir": os.getcwd(),
125+
"config_file": self.config_file,
126+
"data_dir": self.config["data_dir"],
127+
"api_url": self.config["edfi_api"]["base_url"],
128+
"namespace": self.config["namespace"],
129+
"resources": {}
130+
}
131+
132+
# helper function used below
133+
def replace_linebreaks(self, m):
134+
return re.sub(r"\s+", '', m.group(0))
135+
136+
def write_structured_output(self):
137+
### Create structured output results_file if necessary
138+
self.end_timestamp = datetime.now()
139+
self.metadata.update({
140+
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
141+
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
142+
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
143+
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
144+
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
145+
})
146+
# sort failing line numbers
147+
for resource in self.metadata["resources"].keys():
148+
if "failures" in self.metadata["resources"][resource].keys():
149+
for idx, _ in enumerate(self.metadata["resources"][resource]["failures"]):
150+
self.metadata["resources"][resource]["failures"][idx]["line_numbers"].sort()
151+
152+
### Create structured output results_file if necessary
153+
if self.results_file:
154+
155+
# create directory if not exists
156+
os.makedirs(os.path.dirname(self.results_file), exist_ok=True)
157+
158+
with open(self.results_file, 'w') as fp:
159+
content = json.dumps(self.metadata, indent=4)
160+
# failures.line_numbers are split each on their own line; here we remove those line breaks
161+
content = re.sub(r'"line_numbers": \[(\d|,|\s|\n)*\]', self.replace_linebreaks, content)
162+
fp.write(content)
163+
self.logger.info(f"results written to {self.results_file}")
164+
118165

119166
def load_config_file(self) -> dict:
120167
_env_backup = os.environ.copy()

lightbeam/send.py

Lines changed: 10 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import time
44
import json
55
import asyncio
6-
import datetime
76

87
from lightbeam import util
98
from lightbeam import hashlog
@@ -16,22 +15,10 @@ def __init__(self, lightbeam=None):
1615
self.lightbeam.reset_counters()
1716
self.logger = self.lightbeam.logger
1817
self.hashlog_data = {}
19-
self.start_timestamp = datetime.datetime.now()
2018

2119
# Sends all (selected) endpoints
2220
def send(self):
2321

24-
# Initialize a dictionary for tracking run metadata (for structured output)
25-
self.metadata = {
26-
"started_at": self.start_timestamp.isoformat(timespec='microseconds'),
27-
"working_dir": os.getcwd(),
28-
"config_file": self.lightbeam.config_file,
29-
"data_dir": self.lightbeam.config["data_dir"],
30-
"api_url": self.lightbeam.config["edfi_api"]["base_url"],
31-
"namespace": self.lightbeam.config["namespace"],
32-
"resources": {}
33-
}
34-
3522
# get token with which to send requests
3623
self.lightbeam.api.do_oauth()
3724

@@ -47,43 +34,15 @@ def send(self):
4734
self.logger.info("finished processing endpoint {0}!".format(endpoint))
4835
self.logger.info(" (final status counts: {0}) ".format(self.lightbeam.status_counts))
4936
self.lightbeam.log_status_reasons()
37+
38+
# write structured output (if needed)
39+
self.lightbeam.write_structured_output()
5040

51-
### Create structured output results_file if necessary
52-
self.end_timestamp = datetime.datetime.now()
53-
self.metadata.update({
54-
"completed_at": self.end_timestamp.isoformat(timespec='microseconds'),
55-
"runtime_sec": (self.end_timestamp - self.start_timestamp).total_seconds(),
56-
"total_records_processed": sum(item['records_processed'] for item in self.metadata["resources"].values()),
57-
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
58-
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
59-
})
60-
# sort failing line numbers
61-
for resource in self.metadata["resources"].keys():
62-
if "failures" in self.metadata["resources"][resource].keys():
63-
for idx, _ in enumerate(self.metadata["resources"][resource]["failures"]):
64-
self.metadata["resources"][resource]["failures"][idx]["line_numbers"].sort()
65-
66-
# helper function used below
67-
def repl(m):
68-
return re.sub(r"\s+", '', m.group(0))
69-
70-
### Create structured output results_file if necessary
71-
if self.lightbeam.results_file:
72-
73-
# create directory if not exists
74-
os.makedirs(os.path.dirname(self.lightbeam.results_file), exist_ok=True)
75-
76-
with open(self.lightbeam.results_file, 'w') as fp:
77-
content = json.dumps(self.metadata, indent=4)
78-
# failures.line_numbers are split each on their own line; here we remove those line breaks
79-
content = re.sub(r'"line_numbers": \[(\d|,|\s|\n)*\]', repl, content)
80-
fp.write(content)
81-
82-
if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]:
41+
if self.lightbeam.metadata["total_records_processed"] == self.lightbeam.metadata["total_records_skipped"]:
8342
self.logger.info("all payloads skipped")
8443
exit(99) # signal to downstream tasks (in Airflow) all payloads skipped
8544

86-
if self.metadata["total_records_processed"] == self.metadata["total_records_failed"]:
45+
if self.lightbeam.metadata["total_records_processed"] == self.lightbeam.metadata["total_records_failed"]:
8746
self.logger.info("all payloads failed")
8847
exit(1) # signal to downstream tasks (in Airflow) all payloads failed
8948

@@ -100,7 +59,7 @@ async def do_send(self, endpoint):
10059
hashlog_file = os.path.join(self.lightbeam.config["state_dir"], f"{endpoint}.dat")
10160
self.hashlog_data = hashlog.load(hashlog_file)
10261

103-
self.metadata["resources"].update({endpoint: {}})
62+
self.lightbeam.metadata["resources"].update({endpoint: {}})
10463
self.lightbeam.reset_counters()
10564

10665
# process each file
@@ -169,8 +128,8 @@ async def do_send(self, endpoint):
169128
if status>=200 and status<300:
170129
successes.append({"status_code": status, "count": self.lightbeam.status_counts[status]})
171130
if len(successes)>0:
172-
self.metadata["resources"][endpoint].update({"successes": successes})
173-
self.metadata["resources"][endpoint].update({
131+
self.lightbeam.metadata["resources"][endpoint].update({"successes": successes})
132+
self.lightbeam.metadata["resources"][endpoint].update({
174133
"records_processed": total_counter,
175134
"records_skipped": self.lightbeam.num_skipped,
176135
"records_failed": self.lightbeam.num_errors
@@ -199,7 +158,7 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
199158
message = str(response.status) + ": " + util.linearize(json.loads(body).get("message"))
200159

201160
# update run metadata...
202-
failures = self.metadata["resources"][endpoint].get("failures", [])
161+
failures = self.lightbeam.metadata["resources"][endpoint].get("failures", [])
203162
do_append = True
204163
for index, item in enumerate(failures):
205164
if item["status_code"]==response.status and item["message"]==message and item["file"]==file_name:
@@ -215,7 +174,7 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
215174
'count': 1
216175
}
217176
failures.append(failure)
218-
self.metadata["resources"][endpoint]["failures"] = failures
177+
self.lightbeam.metadata["resources"][endpoint]["failures"] = failures
219178

220179
# update output and counters
221180
self.lightbeam.increment_status_reason(message)

lightbeam/validate.py

Lines changed: 51 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ def __init__(self, lightbeam=None):
3333

3434
# Validates (selected) endpoints
3535
def validate(self):
36+
3637
# The below should go in __init__(), but rely on lightbeam.config which is not yet available there.
3738
self.fail_fast_threshold = self.lightbeam.config.get("validate",{}).get("references",{}).get("max_failures", self.DEFAULT_FAIL_FAST_THRESHOLD)
3839
self.validation_methods = self.lightbeam.config.get("validate",{}).get("methods",self.DEFAULT_VALIDATION_METHODS)
@@ -65,6 +66,13 @@ def validate(self):
6566
# to comparatively small datasets (sections, schools, students).
6667
self.build_local_reference_cache(endpoint)
6768
asyncio.run(self.validate_endpoint(endpoint))
69+
70+
# write structured output (if needed)
71+
self.lightbeam.write_structured_output()
72+
73+
if self.lightbeam.metadata["total_records_processed"] == self.lightbeam.metadata["total_records_failed"]:
74+
self.logger.info("all payloads failed")
75+
exit(1) # signal to downstream tasks (in Airflow) all payloads failed
6876

6977
def build_local_reference_cache(self, endpoint):
7078
swagger = self.lightbeam.api.resources_swagger
@@ -173,14 +181,15 @@ def get_swagger_definition_for_endpoint(self, endpoint):
173181

174182
# Validates a single endpoint based on the Swagger docs
175183
async def validate_endpoint(self, endpoint):
184+
self.lightbeam.metadata["resources"].update({endpoint: {}})
176185
definition = self.get_swagger_definition_for_endpoint(endpoint)
177186
data_files = self.lightbeam.get_data_files_for_endpoint(endpoint)
178187
tasks = []
179188
total_counter = 0
189+
self.lightbeam.num_errors = 0
180190
for file_name in data_files:
181191
self.logger.info(f"validating {file_name} against {definition} schema...")
182192
with open(file_name) as file:
183-
self.lightbeam.num_errors = 0
184193
for line_counter, line in enumerate(file):
185194
total_counter += 1
186195
data = line.strip()
@@ -200,13 +209,20 @@ async def validate_endpoint(self, endpoint):
200209
break
201210

202211
if len(tasks)>0: await self.lightbeam.do_tasks(tasks, total_counter, log_status_counts=False)
212+
213+
# update metadata counts for this endpoint
214+
self.lightbeam.metadata["resources"][endpoint].update({
215+
"records_processed": total_counter,
216+
"records_skipped": self.lightbeam.num_skipped,
217+
"records_failed": self.lightbeam.num_errors
218+
})
203219

204220
if self.lightbeam.num_errors==0: self.logger.info(f"... all lines validate ok!")
205221
else:
206222
num_others = self.lightbeam.num_errors - self.MAX_VALIDATION_ERRORS_TO_DISPLAY
207223
if self.lightbeam.num_errors > self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
208-
self.logger.critical(f"... and {num_others} others!")
209-
self.logger.critical(f"... VALIDATION ERRORS on {self.lightbeam.num_errors} of {line_counter} lines in {file_name}; see details above.")
224+
self.logger.warn(f"... and {num_others} others!")
225+
self.logger.warn(f"... VALIDATION ERRORS on {self.lightbeam.num_errors} of {line_counter} lines in {file_name}; see details above.")
210226

211227

212228
async def do_validate_payload(self, endpoint, file_name, data, line_counter):
@@ -233,41 +249,33 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
233249
try:
234250
payload = json.loads(data)
235251
except Exception as e:
236-
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
237-
self.logger.warning(f"... VALIDATION ERROR (line {line_counter}): invalid JSON" + str(e).replace(" line 1",""))
238-
self.lightbeam.num_errors += 1
252+
self.log_validation_error(endpoint, file_name, line_counter, "json", f"invalid JSON {str(e).replace(' line 1','')}")
239253
return
240254

241255
# check payload obeys Swagger schema
242256
if "schema" in self.validation_methods:
243257
try:
244258
validator.validate(payload)
245259
except Exception as e:
246-
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
247-
e_path = [str(x) for x in list(e.path)]
248-
context = ""
249-
if len(e_path)>0: context = " in " + " -> ".join(e_path)
250-
self.logger.warning(f"... VALIDATION ERROR (line {line_counter}): " + str(e.message) + context)
251-
self.lightbeam.num_errors += 1
260+
e_path = [str(x) for x in list(e.path)]
261+
context = ""
262+
if len(e_path)>0: context = " in " + " -> ".join(e_path)
263+
self.log_validation_error(endpoint, file_name, line_counter, "schema", f"{str(e.message)} {context}")
252264
return
253265

254266
# check descriptor values are valid
255267
if "descriptors" in self.validation_methods:
256268
error_message = self.has_invalid_descriptor_values(payload, path="")
257269
if error_message != "":
258-
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
259-
self.logger.warning(f"... VALIDATION ERROR (line {line_counter}): " + error_message)
260-
self.lightbeam.num_errors += 1
270+
self.log_validation_error(endpoint, file_name, line_counter, "descriptors", error_message)
261271
return
262272

263273
# check natural keys are unique
264274
if "uniqueness" in self.validation_methods:
265275
params = json.dumps(util.interpolate_params(params_structure, payload))
266276
params_hash = hashlog.get_hash(params)
267277
if params_hash in distinct_params:
268-
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
269-
self.logger.warning(f"... VALIDATION ERROR (line {line_counter}): duplicate value(s) for natural key(s): {params}")
270-
self.lightbeam.num_errors += 1
278+
self.log_validation_error(endpoint, file_name, line_counter, "uniqueness", "duplicate value(s) for natural key(s): {params}")
271279
return
272280
else: distinct_params.append(params_hash)
273281

@@ -276,11 +284,33 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
276284
self.lightbeam.api.do_oauth()
277285
error_message = self.has_invalid_references(payload, path="")
278286
if error_message != "":
279-
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
280-
self.logger.warning(f"... VALIDATION ERROR (line {line_counter}): " + error_message)
281-
self.lightbeam.num_errors += 1
287+
self.log_validation_error(endpoint, file_name, line_counter, "references", error_message)
282288

283289

290+
def log_validation_error(self, endpoint, file_name, line_number, method, message):
291+
if self.lightbeam.num_errors < self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
292+
self.logger.warning(f"... VALIDATION ERROR (line {line_number}): {message}")
293+
self.lightbeam.num_errors += 1
294+
295+
# update run metadata...
296+
failures = self.lightbeam.metadata["resources"][endpoint].get("failures", [])
297+
do_append = True
298+
for index, item in enumerate(failures):
299+
if item["method"]==method and item["message"]==message and item["file"]==file_name:
300+
failures[index]["line_numbers"].append(line_number)
301+
failures[index]["count"] += 1
302+
do_append = False
303+
if do_append:
304+
failure = {
305+
'method': method,
306+
'message': message,
307+
'file': file_name,
308+
'line_numbers': [line_number],
309+
'count': 1
310+
}
311+
failures.append(failure)
312+
self.lightbeam.metadata["resources"][endpoint]["failures"] = failures
313+
284314
def load_local_descriptors(self):
285315
local_descriptors = []
286316
all_endpoints = self.lightbeam.api.get_sorted_endpoints()

0 commit comments

Comments
 (0)