Skip to content

Commit d7e2fab

Browse files
committed
Merge branch 'main' of github.com:edanalytics/lightbeam into fix/command_list
2 parents 1f455af + 0d8caf9 commit d7e2fab

7 files changed

Lines changed: 116 additions & 97 deletions

File tree

CHANGELOG.md

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,12 @@
1+
### v0.1.2
2+
<details>
3+
<summary>Released 2024-04-19</summary>
4+
5+
* feature: [Add ability for fetch `--keep-keys` and `--drop-keys` flags to allow wildcard matching](https://github.com/edanalytics/lightbeam/pull/23)
6+
* feature: [Update structured logging to be flatter, per recent team discussion](https://github.com/edanalytics/lightbeam/pull/24)
7+
* bugfix: [Support for `definitions`being renamed to `components.schemas` in Ed-Fi 7.1 Swagger](https://github.com/edanalytics/lightbeam/pull/25)
8+
</details>
9+
110
### v0.1.1
211
<details>
312
<summary>Released 2024-02-16</summary>

README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -120,6 +120,8 @@ Optionally specify `--keep-keys id` or `-k id` to keep only specific keys from e
120120

121121
Optionally specify `--drop-keys id,_etag,_lastModified` or `-d id` to remove specific keys from every payload. This can be useful if you want to `fetch` data from one Ed-Fi API and then turn around and `send` it to another.
122122

123+
Like [selectors](#selectors), `keep-keys` and `drop-keys` are comma-separated lists of values, each of which may begin or end with an asterisk (`*`) for wildcard matching. Example: `-d _*` would remove properties beginning with an underscore (`_`) character from any `fetch`ed payloads.
124+
123125
## `validate`
124126
```bash
125127
lightbeam validate -c path/to/config.yaml

lightbeam/api.py

Lines changed: 24 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -53,67 +53,24 @@ def prepare(self):
5353

5454

5555
def apply_filters(self, endpoints=[]):
56-
selected_endpoints = self.parse_endpoint_string(self.lightbeam.selector, endpoints=endpoints, all_on_empty=True)
57-
58-
# make sure all selectors resolve to an endpoint
59-
unknown_endpoints = list(set(selected_endpoints).difference(endpoints))
60-
if unknown_endpoints:
61-
self.logger.critical("no match for selector(s) [{0}] to any endpoint in your API; check for typos?".format(", ".join(unknown_endpoints)))
62-
63-
excluded_endpoints = self.parse_endpoint_string(self.lightbeam.exclude, endpoints=selected_endpoints)
56+
# apply filters
57+
my_endpoints = util.apply_selections(endpoints, self.lightbeam.selector, self.lightbeam.exclude)
6458

6559
# make sure we have some endpoints to process
66-
my_endpoints = list(set(selected_endpoints).difference(excluded_endpoints))
6760
if not my_endpoints:
6861
self.logger.critical("selector filtering left no endpoints to process; check your selector for typos?")
6962

63+
# make sure all selectors resolve to an endpoint
64+
unknown_endpoints = set(my_endpoints).difference(endpoints)
65+
if unknown_endpoints:
66+
self.logger.critical("no match for selector(s) [{0}] to any endpoint in your API; check for typos?".format(", ".join(unknown_endpoints)))
67+
7068
# all the list(set()) stuff above can mess up the ordering of the endpoints (which must be in dependency-order)... this puts them back in dependency-order
7169
final_endpoints = [x for x in endpoints if x in my_endpoints]
7270

7371
return final_endpoints
7472

7573

76-
@staticmethod
77-
def parse_endpoint_string(full_endpoint_string: str, endpoints=[], all_on_empty=False):
78-
"""
79-
Possible endpoint strings:
80-
- "students"
81-
- "students,schools"
82-
- "student*"
83-
- "student*,schools"
84-
- "*Associations"
85-
- "*Associations,schools"
86-
"""
87-
# If no string is provided, return all or no endpoints, depending on use-case.
88-
if not full_endpoint_string:
89-
if all_on_empty:
90-
return endpoints
91-
else:
92-
return []
93-
94-
# Asterisk wildcards to all endpoints.
95-
if full_endpoint_string == "*":
96-
return endpoints
97-
98-
# Otherwise, a comma-separated list of endpoints is expected.
99-
return_endpoints = set()
100-
101-
for endpoint_string in full_endpoint_string.split(","):
102-
103-
if endpoint_string.startswith("*"): # left wildcard: "*Associations"
104-
return_endpoints.update(
105-
filter(lambda endpoint: endpoint.endswith(endpoint_string.lstrip("*")), endpoints)
106-
)
107-
elif endpoint_string.endswith("*"): # right wildcard: "student*"
108-
return_endpoints.update(
109-
filter(lambda endpoint: endpoint.startswith(endpoint_string.rstrip("*")), endpoints)
110-
)
111-
else: # no wildcard: "students"
112-
return_endpoints.add(endpoint_string)
113-
114-
return list(return_endpoints)
115-
116-
11774
# Returns a client object with exponential retry and other parameters per configs
11875
def get_retry_client(self):
11976
return RetryClient(
@@ -344,12 +301,25 @@ def get_params_for_endpoint(self, endpoint):
344301

345302
def get_required_params_from_swagger(self, swagger, definition, prefix=""):
346303
params = {}
347-
for requiredProperty in swagger["definitions"][definition]["required"]:
348-
if "$ref" in swagger["definitions"][definition]["properties"][requiredProperty].keys():
349-
sub_definition = swagger["definitions"][definition]["properties"][requiredProperty]["$ref"].replace("#/definitions/", "")
304+
use_definitions = False
305+
if "definitions" in swagger.keys():
306+
schema = swagger["definitions"][definition]
307+
use_definitions = True
308+
elif "components" in swagger.keys() and "schemas" in swagger["components"].keys():
309+
schema = swagger["components"]["schemas"][definition]
310+
else:
311+
self.logger.critical(f"Swagger contains neither `definitions` nor `components.schemas` - check that the Swagger is valid.")
312+
313+
for requiredProperty in schema["required"]:
314+
if "$ref" in schema["properties"][requiredProperty].keys():
315+
sub_definition = schema["properties"][requiredProperty]["$ref"]
316+
if use_definitions:
317+
sub_definition = sub_definition.replace("#/definitions/", "")
318+
else:
319+
sub_definition = sub_definition.replace("#/components/schemas/", "")
350320
sub_params = self.get_required_params_from_swagger(swagger, sub_definition, prefix=requiredProperty+".")
351321
for k,v in sub_params.items():
352322
params[k] = v
353-
elif swagger["definitions"][definition]["properties"][requiredProperty]["type"]!="array":
323+
elif schema["properties"][requiredProperty]["type"]!="array":
354324
params[requiredProperty] = prefix + requiredProperty
355325
return params

lightbeam/fetch.py

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -86,18 +86,14 @@ async def get_endpoint_records(self, endpoint, limit, offset, file_handle=None):
8686
if type(values) != list:
8787
self.logger.warn(f"Unable to load records for {endpoint}... API JSON response was not a list of records.")
8888
else:
89+
payload_keys = list(values[0].keys())
90+
final_keys = util.apply_selections(payload_keys, self.lightbeam.keep_keys, self.lightbeam.drop_keys)
91+
do_key_filtering = len(payload_keys) != len(final_keys)
8992
for v in values:
90-
if self.lightbeam.keep_keys!="":
91-
row = {}
92-
for key in self.lightbeam.keep_keys.split(','):
93-
row.update({key: v[key]})
93+
if do_key_filtering: row = {k: v[k] for k in final_keys}
9494
else: row = v
95-
# delete_keys (id, _etag, _lastModifiedDate)
96-
for key in self.lightbeam.drop_keys.split(','):
97-
if key in row.keys():
98-
del row[key]
9995
if file_handle: file_handle.write(json.dumps(row)+"\n")
100-
else: self.lightbeam.results.append(v)
96+
else: self.lightbeam.results.append(row)
10197
self.lightbeam.increment_status_counts(status)
10298
break
10399
else:

lightbeam/send.py

Lines changed: 35 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import re
12
import os
23
import time
34
import json
@@ -56,20 +57,16 @@ def send(self):
5657
"total_records_skipped": sum(item['records_skipped'] for item in self.metadata["resources"].values()),
5758
"total_records_failed": sum(item['records_failed'] for item in self.metadata["resources"].values())
5859
})
59-
# total up counts by message and status
60-
for resource, resource_metadata in self.metadata["resources"].items():
61-
if "failed_statuses" in resource_metadata.keys():
62-
for status, status_metadata in resource_metadata["failed_statuses"].items():
63-
total_num_errs = 0
64-
for message, message_metadata in status_metadata.items():
65-
for file, file_metadata in message_metadata["files"].items():
66-
num_errs = len(file_metadata["line_numbers"])
67-
file_metadata.update({
68-
"count": num_errs,
69-
"line_numbers": ",".join(str(x) for x in file_metadata["line_numbers"])
70-
})
71-
total_num_errs += num_errs
72-
status_metadata.update({"count": total_num_errs})
60+
# sort failing line numbers
61+
for resource in self.metadata["resources"].keys():
62+
if "failures" in self.metadata["resources"][resource].keys():
63+
for idx, _ in enumerate(self.metadata["resources"][resource]["failures"]):
64+
self.metadata["resources"][resource]["failures"][idx]["line_numbers"].sort()
65+
66+
67+
# helper function used below
68+
def repl(m):
69+
return re.sub(r"\s+", '', m.group(0))
7370

7471
### Create structured output results_file if necessary
7572
if self.lightbeam.results_file:
@@ -78,7 +75,10 @@ def send(self):
7875
os.makedirs(os.path.dirname(self.lightbeam.results_file), exist_ok=True)
7976

8077
with open(self.lightbeam.results_file, 'w') as fp:
81-
fp.write(json.dumps(self.metadata, indent=4))
78+
content = json.dumps(self.metadata, indent=4)
79+
# failures.line_numbers are split each on their own line; here we remove those line breaks
80+
content = re.sub(r'"line_numbers": \[(\d|,|\s|\n)*\]', repl, content)
81+
fp.write(content)
8282

8383
if self.metadata["total_records_processed"] == self.metadata["total_records_skipped"]:
8484
self.logger.info("all payloads skipped")
@@ -112,7 +112,7 @@ async def do_send(self, endpoint):
112112
for file_name in data_files:
113113
with open(file_name) as file:
114114
# process each line
115-
for line in file:
115+
for line_counter, line in enumerate(file):
116116
total_counter += 1
117117
data = line.strip()
118118
# compute hash of current row
@@ -123,15 +123,15 @@ async def do_send(self, endpoint):
123123
if self.lightbeam.meets_process_criteria(self.hashlog_data[hash]):
124124
# yes, we need to (re)post it; append to task queue
125125
tasks.append(asyncio.create_task(
126-
self.do_post(endpoint, file_name, data, total_counter, hash)))
126+
self.do_post(endpoint, file_name, data, line_counter, hash)))
127127
else:
128128
# no, do not (re)post
129129
self.lightbeam.num_skipped += 1
130130
continue
131131
else:
132132
# new, never-before-seen payload! append it to task queue
133133
tasks.append(asyncio.create_task(
134-
self.do_post(endpoint, file_name, data, total_counter, hash)))
134+
self.do_post(endpoint, file_name, data, line_counter, hash)))
135135

136136
if total_counter%self.lightbeam.MAX_TASK_QUEUE_SIZE==0:
137137
await self.lightbeam.do_tasks(tasks, total_counter)
@@ -176,19 +176,23 @@ async def do_post(self, endpoint, file_name, data, line, hash):
176176
message = str(response.status) + ": " + util.linearize(json.loads(body).get("message"))
177177

178178
# update run metadata...
179-
failed_statuses_dict = self.metadata["resources"][endpoint].get("failed_statuses", {})
180-
if response.status not in failed_statuses_dict.keys():
181-
failed_statuses_dict.update({response.status: {}})
182-
if message not in failed_statuses_dict[response.status].keys():
183-
failed_statuses_dict[response.status].update({message: {}})
184-
if "files" not in failed_statuses_dict[response.status][message].keys():
185-
failed_statuses_dict[response.status][message].update({"files": {}})
186-
if file_name not in failed_statuses_dict[response.status][message]["files"].keys():
187-
failed_statuses_dict[response.status][message]["files"].update({file_name: {}})
188-
if "line_numbers" not in failed_statuses_dict[response.status][message]["files"][file_name].keys():
189-
failed_statuses_dict[response.status][message]["files"][file_name].update({"line_numbers": []})
190-
failed_statuses_dict[response.status][message]["files"][file_name]["line_numbers"].append(line)
191-
self.metadata["resources"][endpoint]["failed_statuses"] = failed_statuses_dict
179+
failures = self.metadata["resources"][endpoint].get("failures", [])
180+
do_append = True
181+
for index, item in enumerate(failures):
182+
if item["status_code"]==response.status and item["message"]==message and item["file"]==file_name:
183+
failures[index]["line_numbers"].append(line)
184+
failures[index]["count"] += 1
185+
do_append = False
186+
if do_append:
187+
failure = {
188+
'status_code': response.status,
189+
'message': message,
190+
'file': file_name,
191+
'line_numbers': [line],
192+
'count': 1
193+
}
194+
failures.append(failure)
195+
self.metadata["resources"][endpoint]["failures"] = failures
192196

193197
# update output and counters
194198
self.lightbeam.increment_status_reason(message)

lightbeam/util.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import re
22
import json
3+
import itertools
34

45
# Strips newlines from a string
56
# Replace single-quotes with backticks
@@ -40,4 +41,36 @@ def interpolate_params(params_structure, payload):
4041
def url_join(*args):
4142
return '/'.join(
4243
map(lambda x: str(x).rstrip('/'), filter(lambda x: x is not None, args))
43-
)
44+
)
45+
46+
# Returns the subset of `keys` that match the `keep` and `drop` criteria, importantly
47+
# respecting wildcards! (so keep=["*Association,student*"] matches anything beginning
48+
# with "student" or ending with "Association")
49+
# This function is used for both the endpoint selection in apply_filters() of api.py and
50+
# the keep-keys and drop-keys filtering in fetch.py
51+
def apply_selections(keys, keep, drop):
52+
# `keep` and `drop` _should_ be arrays, but in case they're strings, we split them
53+
if isinstance(keep, str): keep = keep.split(",")
54+
if isinstance(drop, str): drop = drop.split(",")
55+
# this will be the filtered set of keys
56+
final_keys = []
57+
# populate `final_keys` with `keys` that match `keep`
58+
if keep and keep != ["*"]:
59+
for payload_key, keep_key in list(itertools.product(keys, keep)):
60+
if (keys_match(payload_key, keep_key)):
61+
final_keys.append(payload_key)
62+
else: final_keys = keys
63+
# remove from `final_keys` keys that match `drop`
64+
if drop and drop != [""]:
65+
for payload_key, drop_key in list(itertools.product(keys, drop)):
66+
if (keys_match(payload_key, drop_key)):
67+
if payload_key in final_keys: final_keys.remove(payload_key)
68+
return final_keys
69+
70+
# Compares a key like "stateAbbreviationDescriptors" with a (potentially wildcard) expression
71+
# like "*Descriptors" for match.
72+
def keys_match(key, wildcard_key):
73+
if key==wildcard_key: return True
74+
if wildcard_key.startswith("*") and key.endswith(wildcard_key.lstrip("*")): return True
75+
if wildcard_key.endswith("*") and key.startswith(wildcard_key.rstrip("*")): return True
76+
return False

lightbeam/validate.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,13 @@ def validate(self):
3131
# Validates a single endpoint based on the Swagger docs
3232
def validate_endpoint(self, swagger, endpoint, local_descriptors=[]):
3333
definition = util.camel_case(self.lightbeam.config["namespace"]) + "_" + util.singularize_endpoint(endpoint)
34-
resource_schema = swagger["definitions"][definition]
35-
34+
if "definitions" in swagger.keys():
35+
resource_schema = swagger["definitions"][definition]
36+
elif "components" in swagger.keys() and "schemas" in swagger["components"].keys():
37+
resource_schema = swagger["components"]["schemas"][definition]
38+
else:
39+
self.logger.critical(f"Swagger contains neither `definitions` nor `components.schemas` - check that the Swagger is valid.")
40+
3641
resolver = RefResolver("test", swagger, swagger)
3742
validator = Draft4Validator(resource_schema, resolver=resolver)
3843
params_structure = self.lightbeam.api.get_params_for_endpoint(endpoint)

0 commit comments

Comments
 (0)