Skip to content

Commit f7bd739

Browse files
author
Tom Reitz
committed
bugfixes and performance improvements to reference validation
1 parent 0aa59f4 commit f7bd739

2 files changed

Lines changed: 70 additions & 54 deletions

File tree

lightbeam/lightbeam.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -160,7 +160,7 @@ def get_data_files_for_endpoint(self, endpoint):
160160
return file_list
161161

162162
# Prunes the list of endpoints down to those for which .jsonl files exist in the config.data_dir
163-
def get_endpoints_with_data(self, endpoints):
163+
def get_endpoints_with_data(self):
164164
self.logger.debug("discovering data...")
165165
endpoints_with_data = []
166166
data_dir_list = os.listdir(self.config["data_dir"])

lightbeam/validate.py

Lines changed: 69 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,11 @@
1212
class Validator:
1313

1414
MAX_VALIDATION_ERRORS_TO_DISPLAY = 10
15-
MAX_VALIDATE_TASK_QUEUE_SIZE = 50
15+
MAX_VALIDATE_TASK_QUEUE_SIZE = 100
1616
DEFAULT_VALIDATION_METHODS = ["schema", "descriptors", "uniqueness"]
1717

1818
EDFI_GENERICS_TO_RESOURCES_MAPPING = {
1919
"educationOrganizations": ["localEducationAgencies", "stateEducationAgencies", "schools"],
20-
"objectiveAssessment": [""]
2120
}
2221
EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING = {
2322
"educationOrganizationId": {
@@ -45,28 +44,39 @@ def validate(self):
4544
self.lightbeam.reset_counters()
4645
self.load_local_descriptors()
4746

48-
endpoints = self.lightbeam.endpoints
49-
for endpoint in endpoints:
47+
endpoints_with_data = self.lightbeam.get_endpoints_with_data()
48+
self.lightbeam.endpoints = self.lightbeam.api.apply_filters(endpoints_with_data)
49+
50+
# structures for local and remote reference lookups to prevent repeated lookups for the same thing
51+
self.remote_reference_cache = {}
52+
self.local_reference_cache = {}
53+
54+
for endpoint in self.lightbeam.endpoints:
5055
if "references" in validation_methods and "Descriptor" not in endpoint: # Descriptors have no references:
5156
# We don't want every `do_validate_payload()` to separately have to open and scan
5257
# local files looking for a matching payload; this pre-loads local data that
5358
# might resolve references from within payloads of this endpoint.
5459
# We assume that the data fits in memory; the largest Ed-Fi endpoints
5560
# (studentSectionAssociations, studentSchoolAttendanceEvents, etc.) contain references
5661
# to comparatively datasets (sections, schools, students).
57-
self.load_local_reference_data(endpoint)
58-
# create a structure which remote reference lookups can populate to prevent repeated lookups for the same thing
59-
self.remote_reference_cache = {}
62+
self.build_local_reference_cache(endpoint)
6063
asyncio.run(self.validate_endpoint(endpoint))
6164

62-
def load_local_reference_data(self, endpoint):
63-
self.local_references = {}
65+
def build_local_reference_cache(self, endpoint):
6466
swagger = self.lightbeam.api.resources_swagger
6567
definition = self.get_swagger_definition_for_endpoint(endpoint)
6668
references_structure = self.load_references_structure(swagger, definition)
6769
references_structure = self.rebalance_local_references_structure(references_structure)
68-
references_data = self.load_references_data(references_structure)
69-
self.local_references.update(references_data)
70+
# more memory-efficient to load local data and populate cache for one endpoint at a time:
71+
for original_endpoint in references_structure.keys():
72+
endpoints_to_check = self.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
73+
for endpoint in endpoints_to_check:
74+
if endpoint in self.local_reference_cache.keys():
75+
# already loaded (when validating another endpoint); no need to reload
76+
continue
77+
self.logger.debug(f"(discovering any local data for {endpoint}...)")
78+
endpoint_data = self.load_references_data(endpoint, references_structure)
79+
self.local_reference_cache[endpoint] = self.references_data_to_cache(endpoint, endpoint_data, references_structure)
7080

7181
# this is (unfortunately) necessary to allow lookup of nested references in local payload
7282
# (for remote reference lookup, a flat dict of keys is passed to the Ed-Fi API and it takes care of nesting)
@@ -80,29 +90,37 @@ def rebalance_local_references_structure(self, references_structure):
8090
references_structure["objectiveAssessments"].append("assessmentReference.namespace")
8191
return references_structure
8292

83-
def load_references_data(self, references_structure):
84-
data = {}
85-
for original_endpoint in references_structure.keys():
86-
endpoints_to_check = self.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
87-
for endpoint in endpoints_to_check:
88-
data[endpoint] = []
89-
data_files = self.lightbeam.get_data_files_for_endpoint(endpoint)
90-
for file_name in data_files:
91-
with open(file_name) as file:
92-
for line_counter, line in enumerate(file):
93-
line = line.strip()
94-
try:
95-
payload = json.loads(line)
96-
except Exception as e:
97-
self.logger.warning(f"... (ignoring invalid JSON payload at {line_counter} of {file_name})")
98-
ref_payload = {}
99-
for key in references_structure[endpoint]:
100-
key = self.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
101-
tmpdata = payload
102-
for subkey in key.split("."):
103-
tmpdata = tmpdata[subkey]
104-
ref_payload[key] = tmpdata
105-
data[endpoint].append(ref_payload)
93+
def references_data_to_cache(self, endpoint, endpoint_data, references_structure):
94+
cache = []
95+
structure = references_structure[endpoint]
96+
for payload in endpoint_data:
97+
sorted_keys = structure.copy()
98+
sorted_keys.sort(key=lambda x: x.split(".")[-1])
99+
cache_key = ''
100+
for key in sorted_keys:
101+
cache_key += f"{payload[key]}~~~"
102+
cache.append(cache_key)
103+
return cache
104+
105+
def load_references_data(self, endpoint, references_structure):
106+
data = []
107+
data_files = self.lightbeam.get_data_files_for_endpoint(endpoint)
108+
for file_name in data_files:
109+
with open(file_name) as file:
110+
for line_counter, line in enumerate(file):
111+
line = line.strip()
112+
try:
113+
payload = json.loads(line)
114+
except Exception as e:
115+
self.logger.warning(f"... (ignoring invalid JSON payload at {line_counter} of {file_name})")
116+
ref_payload = {}
117+
for key in references_structure[endpoint]:
118+
key = self.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
119+
tmpdata = payload
120+
for subkey in key.split("."):
121+
tmpdata = tmpdata[subkey]
122+
ref_payload[key] = tmpdata
123+
data.append(ref_payload)
106124
return data
107125

108126
def load_references_structure(self, swagger, definition):
@@ -171,6 +189,8 @@ async def validate_endpoint(self, endpoint):
171189
if total_counter%self.MAX_VALIDATE_TASK_QUEUE_SIZE==0:
172190
await self.lightbeam.do_tasks(tasks, total_counter, log_status_counts=False)
173191
tasks = []
192+
if total_counter%1000==0:
193+
self.logger.info(f"(processed {total_counter}...)")
174194

175195
# implement "fail fast" feature:
176196
if self.lightbeam.num_errors >= fail_fast_threshold:
@@ -322,21 +342,11 @@ def has_invalid_references(self, payload, path=""):
322342
endpoints_to_check = self.EDFI_GENERICS_TO_RESOURCES_MAPPING.get(original_endpoint, [original_endpoint])
323343
for endpoint in endpoints_to_check:
324344
# check if it's a local reference:
325-
if endpoint not in self.local_references.keys(): break
326-
for local_payload in self.local_references[endpoint]:
327-
instance_matches = True
328-
for key,value in local_payload.items():
329-
key = self.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
330-
local_key = key
331-
if "." in key:
332-
local_key = key.split(".")[-1]
333-
if payload[k][local_key]!=value:
334-
instance_matches = False
335-
break
336-
if instance_matches:
337-
is_valid_reference = True
338-
break
339-
if is_valid_reference:
345+
if endpoint not in self.local_reference_cache.keys(): break
346+
# construct cache_key for reference
347+
cache_key = self.get_cache_key(payload[k])
348+
if cache_key in self.local_reference_cache[endpoint]:
349+
is_valid_reference = True
340350
break
341351
if not is_valid_reference: # not found in local data...
342352
for endpoint in endpoints_to_check:
@@ -357,15 +367,19 @@ def is_valid_descriptor_value(self, namespace, codeValue):
357367
if row[1]==namespace and row[2]==codeValue:
358368
return True
359369
return False
370+
371+
@staticmethod
372+
def get_cache_key(payload):
373+
cache_key = ''
374+
for k in payload.keys():
375+
cache_key += f"{payload[k]}~~~"
376+
return cache_key
360377

361378
def remote_reference_exists(self, endpoint, params):
362379
# check cache:
363-
if endpoint=='students' and 'studentUniqueId' in params.keys(): return True
364380
if endpoint not in self.remote_reference_cache.keys():
365381
self.remote_reference_cache[endpoint] = []
366-
cache_key = ''
367-
for k in sorted(params.keys()):
368-
cache_key += f"{params[k]}-"
382+
cache_key = self.get_cache_key(params)
369383
if cache_key in self.remote_reference_cache[endpoint]:
370384
return True
371385
# do remote lookup
@@ -381,6 +395,8 @@ def remote_reference_exists(self, endpoint, params):
381395
)
382396
body = response.text
383397
status = str(response.status_code)
398+
if status!='401':
399+
self.lightbeam.increment_status_counts(status)
384400
if status=='401':
385401
# this could be broken out to a separate function call,
386402
# but not doing so should help keep the critical section small

0 commit comments

Comments
 (0)