Skip to content

Commit 5ed3a74

Browse files
authored
Merge pull request #65 from edanalytics/fix/total_count_logged
Fix: Validation error message shows the wrong number of records
2 parents d28a925 + 1919563 commit 5ed3a74

3 files changed

Lines changed: 24 additions & 18 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
### Unreleased changes
2+
* bugfix: `validate` and `send` could incorrect number of total records
3+
14
### v0.1.6
25
<details>
36
<summary>Released 2024-11-15</summary>

lightbeam/send.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ async def do_send(self, endpoint):
6969
for file_name in data_files:
7070
with open(file_name) as file:
7171
# process each line
72-
for line_counter, line in enumerate(file):
72+
for i, line in enumerate(file):
73+
line_number = i + 1
7374
total_counter += 1
7475
data = line.strip()
7576
# compute hash of current row
@@ -90,7 +91,7 @@ async def do_send(self, endpoint):
9091
endpoint,
9192
file_name,
9293
data,
93-
line_counter,
94+
line_number,
9495
data_hash,
9596
)
9697
)
@@ -104,7 +105,7 @@ async def do_send(self, endpoint):
104105
tasks.append(
105106
asyncio.create_task(
106107
self.do_post(
107-
endpoint, file_name, data, line_counter, data_hash
108+
endpoint, file_name, data, line_number, data_hash
108109
)
109110
)
110111
)
@@ -136,7 +137,7 @@ async def do_send(self, endpoint):
136137
})
137138

138139
# Posts a single data payload to a single endpoint
139-
async def do_post(self, endpoint, file_name, data, line, data_hash):
140+
async def do_post(self, endpoint, file_name, data, line_number, data_hash):
140141
curr_token_version = int(str(self.lightbeam.token_version))
141142
while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
142143
try:
@@ -162,15 +163,15 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
162163
do_append = True
163164
for index, item in enumerate(failures):
164165
if item["status_code"]==response.status and item["message"]==message and item["file"]==file_name:
165-
failures[index]["line_numbers"].append(line)
166+
failures[index]["line_numbers"].append(line_number)
166167
failures[index]["count"] += 1
167168
do_append = False
168169
if do_append:
169170
failure = {
170171
'status_code': response.status,
171172
'message': message,
172173
'file': file_name,
173-
'line_numbers': [line],
174+
'line_numbers': [line_number],
174175
'count': 1
175176
}
176177
failures.append(failure)
@@ -208,5 +209,5 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
208209
except Exception as e:
209210
status = 400
210211
self.lightbeam.num_errors += 1
211-
self.logger.warn("{0} (at line {1} of {2} )".format(str(e), line, file_name))
212+
self.logger.warn("{0} (at line {1} of {2} )".format(str(e), line_number, file_name))
212213
break

lightbeam/validate.py

Lines changed: 13 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,13 @@ def load_references_data(self, endpoint, references_structure):
118118
data_files = self.lightbeam.get_data_files_for_endpoint(endpoint)
119119
for file_name in data_files:
120120
with open(file_name) as file:
121-
for line_counter, line in enumerate(file):
121+
for i, line in enumerate(file):
122+
line_number = i + 1
122123
line = line.strip()
123124
try:
124125
payload = json.loads(line)
125126
except Exception as e:
126-
self.logger.warning(f"... (ignoring invalid JSON payload at {line_counter} of {file_name})")
127+
self.logger.warning(f"... (ignoring invalid JSON payload at {line_number} of {file_name})")
127128
ref_payload = {}
128129
for key in references_structure[endpoint]:
129130
key = self.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
@@ -199,12 +200,13 @@ async def validate_endpoint(self, endpoint):
199200
for file_name in data_files:
200201
self.logger.info(f"validating {file_name} against {definition} schema...")
201202
with open(file_name) as file:
202-
for line_counter, line in enumerate(file):
203+
for i, line in enumerate(file):
204+
line_number = i + 1
203205
total_counter += 1
204206
data = line.strip()
205207

206208
tasks.append(asyncio.create_task(
207-
self.do_validate_payload(endpoint, file_name, data, line_counter)))
209+
self.do_validate_payload(endpoint, file_name, data, line_number)))
208210

209211
if len(tasks) >= self.MAX_VALIDATE_TASK_QUEUE_SIZE:
210212
await self.lightbeam.do_tasks(tasks, total_counter, log_status_counts=False)
@@ -235,10 +237,10 @@ async def validate_endpoint(self, endpoint):
235237
num_others = self.lightbeam.num_errors - self.MAX_VALIDATION_ERRORS_TO_DISPLAY
236238
if self.lightbeam.num_errors > self.MAX_VALIDATION_ERRORS_TO_DISPLAY:
237239
self.logger.warn(f"... and {num_others} others!")
238-
self.logger.warn(f"... VALIDATION ERRORS on {self.lightbeam.num_errors} of {line_counter} lines in {file_name}; see details above.")
240+
self.logger.warn(f"... VALIDATION ERRORS on {self.lightbeam.num_errors} of {line_number} lines in {file_name}; see details above.")
239241

240242

241-
async def do_validate_payload(self, endpoint, file_name, data, line_counter):
243+
async def do_validate_payload(self, endpoint, file_name, data, line_number):
242244
if self.fail_fast_threshold is not None and self.lightbeam.num_errors >= self.fail_fast_threshold: return
243245
definition = self.get_swagger_definition_for_endpoint(endpoint)
244246
if "Descriptor" in endpoint:
@@ -262,7 +264,7 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
262264
try:
263265
payload = json.loads(data)
264266
except Exception as e:
265-
self.log_validation_error(endpoint, file_name, line_counter, "json", f"invalid JSON {str(e).replace(' line 1','')}")
267+
self.log_validation_error(endpoint, file_name, line_number, "json", f"invalid JSON {str(e).replace(' line 1','')}")
266268
return
267269

268270
# check payload obeys Swagger schema
@@ -273,22 +275,22 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
273275
e_path = [str(x) for x in list(e.path)]
274276
context = ""
275277
if len(e_path)>0: context = " in " + " -> ".join(e_path)
276-
self.log_validation_error(endpoint, file_name, line_counter, "schema", f"{str(e.message)} {context}")
278+
self.log_validation_error(endpoint, file_name, line_number, "schema", f"{str(e.message)} {context}")
277279
return
278280

279281
# check descriptor values are valid
280282
if "descriptors" in self.validation_methods:
281283
error_message = self.has_invalid_descriptor_values(payload, path="")
282284
if error_message != "":
283-
self.log_validation_error(endpoint, file_name, line_counter, "descriptors", error_message)
285+
self.log_validation_error(endpoint, file_name, line_number, "descriptors", error_message)
284286
return
285287

286288
# check natural keys are unique
287289
if "uniqueness" in self.validation_methods:
288290
params = json.dumps(util.interpolate_params(identity_params_structure, payload))
289291
params_hash = hashlog.get_hash(params)
290292
if params_hash in distinct_params:
291-
self.log_validation_error(endpoint, file_name, line_counter, "uniqueness", "duplicate value(s) for natural key(s): {params}")
293+
self.log_validation_error(endpoint, file_name, line_number, "uniqueness", "duplicate value(s) for natural key(s): {params}")
292294
return
293295
else: distinct_params.append(params_hash)
294296

@@ -297,7 +299,7 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
297299
self.lightbeam.api.do_oauth()
298300
error_message = self.has_invalid_references(payload, path="")
299301
if error_message != "":
300-
self.log_validation_error(endpoint, file_name, line_counter, "references", error_message)
302+
self.log_validation_error(endpoint, file_name, line_number, "references", error_message)
301303

302304

303305
def log_validation_error(self, endpoint, file_name, line_number, method, message):

0 commit comments

Comments
 (0)