Skip to content

Commit ebc84f4

Browse files
authored
Merge branch 'main' into feature/validate_fixes_and_uniqueness_in_array_elements
2 parents 8d6ccd9 + 6a7f805 commit ebc84f4

5 files changed

Lines changed: 81 additions & 20 deletions

File tree

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
### Unreleased changes
2+
* bugfix: `validate` and `send` could incorrect number of total records
3+
14
### v0.1.6
25
<details>
36
<summary>Released 2024-11-15</summary>

lightbeam/__main__.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,11 @@ def main(argv=None):
108108
type=str,
109109
help='produces a JSON output file with structured information about run results'
110110
)
111+
parser.add_argument("--set",
112+
type=str,
113+
nargs="*",
114+
help='overrides a setting in the config YAML; example: --set fetch.page_size 1000'
115+
)
111116

112117
defaults = { "selector":"*", "params": "", "older_than": "", "newer_than": "", "resend_status_codes": "", "results_file": "" }
113118
parser.set_defaults(**defaults)
@@ -143,6 +148,12 @@ def main(argv=None):
143148

144149
if not args.config_file:
145150
logger.error("config file not specified with `-c` flag, and no default {" + ", ".join(DEFAULT_CONFIG_FILES) + "} found")
151+
152+
if args.set and len(args.set)%2 != 0: # odd number of overrides
153+
logger.error("overrides specified with --set must be followed by an even number of strings (key value key value ...)")
154+
overrides = None
155+
if args.set:
156+
overrides = dict(zip(args.set[::2], args.set[1::2]))
146157

147158
lb = Lightbeam(
148159
config_file=args.config_file,
@@ -158,7 +169,8 @@ def main(argv=None):
158169
older_than=args.older_than,
159170
newer_than=args.newer_than,
160171
resend_status_codes=args.resend_status_codes,
161-
results_file=args.results_file
172+
results_file=args.results_file,
173+
overrides=overrides,
162174
)
163175
try:
164176
logger.info("starting...")

lightbeam/lightbeam.py

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class Lightbeam:
5555
MAX_STATUS_REASONS_TO_DISPLAY = 10
5656
DATA_FILE_EXTENSIONS = ['json', 'jsonl', 'ndjson']
5757

58-
def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys="*", drop_keys="", query="{}", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes="", results_file=""):
58+
def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys="*", drop_keys="", query="{}", params="", wipe=False, force=False, older_than="", newer_than="", resend_status_codes="", results_file="", overrides={}):
5959
self.config_file = config_file
6060
self.logger = logger
6161
self.errors = 0
@@ -82,12 +82,16 @@ def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys
8282
self.token_version = 0
8383
self.results_file = os.path.abspath(results_file) if results_file else None
8484
self.start_timestamp = datetime.now()
85+
self.overrides = overrides
8586

8687
# load params and/or env vars for config YAML interpolation
8788
self.params = json.loads(params) if params else {}
8889
user_config = self.load_config_file()
89-
9090
self.config = util.merge_dicts(user_config, self.config_defaults)
91+
92+
# inject overrides into config
93+
if self.overrides: self.inject_cli_overrides()
94+
9195
if "state_dir" in self.config:
9296
self.track_state = True
9397
self.config["state_dir"] = os.path.expanduser(self.config["state_dir"])
@@ -129,6 +133,39 @@ def __init__(self, config_file, logger=None, selector="*", exclude="", keep_keys
129133
"resources": {}
130134
}
131135

136+
def inject_cli_overrides(self):
137+
# parse self.overrides into configs:
138+
for key, value in self.overrides.items():
139+
self.config = Lightbeam.set_path(self.config, key, value)
140+
141+
@staticmethod
142+
def set_path(my_dict, path, value):
143+
path_pieces = path.split(".")
144+
current = my_dict
145+
for path_piece in path_pieces[:-1]:
146+
if path_piece not in current.keys():
147+
current[path_piece] = {}
148+
current = current[path_piece]
149+
current[path_pieces[-1]] = Lightbeam.autocast(value)
150+
return my_dict
151+
152+
@staticmethod
153+
def autocast(value):
154+
if value.lower() in ['true', 'yes', 'on', 't', 'y']:
155+
return True
156+
elif value.lower() in ['false', 'no', 'off', 'f', 'n']:
157+
return False
158+
elif '.' in value:
159+
try:
160+
return float(value)
161+
except ValueError:
162+
return value
163+
else:
164+
try:
165+
return int(value)
166+
except ValueError:
167+
return value
168+
132169
# this is intended to be called before any CRITICAL errors;
133170
# any cleanup tasks should go here:
134171
def shutdown(self, method):
@@ -255,9 +292,10 @@ def get_endpoints_with_data(self, filter_endpoints=None):
255292
if os.path.isfile(sub_dir_item_path):
256293
filename = os.path.basename(sub_dir_item)
257294
extension = filename.rsplit(".", 1)[-1]
295+
filename_without_extension = filename.rsplit(".", 1)[0]
258296
if (
259297
extension in self.DATA_FILE_EXTENSIONS # valid file extension
260-
and filename_without_extension in self.all_endpoints # valid endpoint
298+
and data_dir_item in self.all_endpoints # valid endpoint
261299
and data_dir_item in filter_endpoints # selected endpoint
262300
):
263301
has_data_file = True

lightbeam/send.py

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ async def do_send(self, endpoint):
6969
for file_name in data_files:
7070
with open(file_name) as file:
7171
# process each line
72-
for line_counter, line in enumerate(file):
72+
for i, line in enumerate(file):
73+
line_number = i + 1
7374
total_counter += 1
7475
data = line.strip()
7576
# compute hash of current row
@@ -90,7 +91,7 @@ async def do_send(self, endpoint):
9091
endpoint,
9192
file_name,
9293
data,
93-
line_counter,
94+
line_number,
9495
data_hash,
9596
)
9697
)
@@ -104,7 +105,7 @@ async def do_send(self, endpoint):
104105
tasks.append(
105106
asyncio.create_task(
106107
self.do_post(
107-
endpoint, file_name, data, line_counter, data_hash
108+
endpoint, file_name, data, line_number, data_hash
108109
)
109110
)
110111
)
@@ -136,7 +137,7 @@ async def do_send(self, endpoint):
136137
})
137138

138139
# Posts a single data payload to a single endpoint
139-
async def do_post(self, endpoint, file_name, data, line, data_hash):
140+
async def do_post(self, endpoint, file_name, data, line_number, data_hash):
140141
curr_token_version = int(str(self.lightbeam.token_version))
141142
while True: # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
142143
try:
@@ -162,15 +163,15 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
162163
do_append = True
163164
for index, item in enumerate(failures):
164165
if item["status_code"]==response.status and item["message"]==message and item["file"]==file_name:
165-
failures[index]["line_numbers"].append(line)
166+
failures[index]["line_numbers"].append(line_number)
166167
failures[index]["count"] += 1
167168
do_append = False
168169
if do_append:
169170
failure = {
170171
'status_code': response.status,
171172
'message': message,
172173
'file': file_name,
173-
'line_numbers': [line],
174+
'line_numbers': [line_number],
174175
'count': 1
175176
}
176177
failures.append(failure)
@@ -208,5 +209,5 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
208209
except Exception as e:
209210
status = 400
210211
self.lightbeam.num_errors += 1
211-
self.logger.warn("{0} (at line {1} of {2} )".format(str(e), line, file_name))
212+
self.logger.warn("{0} (at line {1} of {2} )".format(str(e), line_number, file_name))
212213
break

lightbeam/validate.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -118,12 +118,13 @@ def load_references_data(self, endpoint, references_structure):
118118
data_files = self.lightbeam.get_data_files_for_endpoint(endpoint)
119119
for file_name in data_files:
120120
with open(file_name) as file:
121-
for line_counter, line in enumerate(file):
121+
for i, line in enumerate(file):
122+
line_number = i + 1
122123
line = line.strip()
123124
try:
124125
payload = json.loads(line)
125126
except Exception as e:
126-
self.logger.warning(f"... (ignoring invalid JSON payload at {line_counter} of {file_name})")
127+
self.logger.warning(f"... (ignoring invalid JSON payload at {line_number} of {file_name})")
127128
ref_payload = {}
128129
for key in references_structure[endpoint]:
129130
key = self.EDFI_GENERIC_REFS_TO_PROPERTIES_MAPPING.get(key, {}).get(endpoint, key)
@@ -217,12 +218,13 @@ async def validate_endpoint(self, endpoint):
217218
for file_name in data_files:
218219
self.logger.info(f"validating {file_name} against {definition} schema...")
219220
with open(file_name) as file:
220-
for line_counter, line in enumerate(file):
221+
for i, line in enumerate(file):
222+
line_number = i + 1
221223
total_counter += 1
222224
data = line.strip()
223225

224226
tasks.append(asyncio.create_task(
225-
self.do_validate_payload(endpoint, file_name, data, line_counter)))
227+
self.do_validate_payload(endpoint, file_name, data, line_number)))
226228

227229
if len(tasks) >= self.MAX_VALIDATE_TASK_QUEUE_SIZE:
228230
await self.lightbeam.do_tasks(tasks, total_counter, log_status_counts=False)
@@ -243,6 +245,11 @@ async def validate_endpoint(self, endpoint):
243245

244246
if len(tasks)>0: await self.lightbeam.do_tasks(tasks, total_counter, log_status_counts=False)
245247

248+
# update metadata counts
249+
self.lightbeam.metadata["resources"][endpoint]["records_processed"] = total_counter
250+
self.lightbeam.metadata["resources"][endpoint]["records_skipped"] = self.lightbeam.num_skipped
251+
self.lightbeam.metadata["resources"][endpoint]["records_failed"] = self.lightbeam.num_errors
252+
246253
if self.lightbeam.num_errors==0: self.logger.info(f"... all lines validate ok!")
247254
else:
248255
num_others = self.lightbeam.num_errors - self.MAX_VALIDATION_ERRORS_TO_DISPLAY
@@ -257,14 +264,14 @@ async def validate_endpoint(self, endpoint):
257264
self.schema_validator = None
258265

259266

260-
async def do_validate_payload(self, endpoint, file_name, data, line_counter):
267+
async def do_validate_payload(self, endpoint, file_name, data, line_number):
261268
if self.fail_fast_threshold is not None and self.lightbeam.num_errors >= self.fail_fast_threshold: return
262269

263270
# check payload is valid JSON
264271
try:
265272
payload = json.loads(data)
266273
except Exception as e:
267-
self.log_validation_error(endpoint, file_name, line_counter, "json", f"invalid JSON {str(e).replace(' line 1','')}")
274+
self.log_validation_error(endpoint, file_name, line_number, "json", f"invalid JSON {str(e).replace(' line 1','')}")
268275
return
269276

270277
# check payload obeys Swagger schema
@@ -275,14 +282,14 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
275282
e_path = [str(x) for x in list(e.path)]
276283
context = ""
277284
if len(e_path)>0: context = " in " + " -> ".join(e_path)
278-
self.log_validation_error(endpoint, file_name, line_counter, "schema", f"{str(e.message)} {context}")
285+
self.log_validation_error(endpoint, file_name, line_number, "schema", f"{str(e.message)} {context}")
279286
return
280287

281288
# check descriptor values are valid
282289
if "descriptors" in self.validation_methods:
283290
error_message = self.has_invalid_descriptor_values(payload, path="")
284291
if error_message != "":
285-
self.log_validation_error(endpoint, file_name, line_counter, "descriptors", error_message)
292+
self.log_validation_error(endpoint, file_name, line_number, "descriptors", error_message)
286293
return
287294

288295
# check natural keys are unique
@@ -298,7 +305,7 @@ async def do_validate_payload(self, endpoint, file_name, data, line_counter):
298305
self.lightbeam.api.do_oauth()
299306
error_message = self.has_invalid_references(payload, path="")
300307
if error_message != "":
301-
self.log_validation_error(endpoint, file_name, line_counter, "references", error_message)
308+
self.log_validation_error(endpoint, file_name, line_number, "references", error_message)
302309

303310

304311
def log_validation_error(self, endpoint, file_name, line_number, method, message):

0 commit comments

Comments
 (0)