Skip to content

Commit 69f12e5

Browse files
committed
Reformatted with black
1 parent f94b52e commit 69f12e5

27 files changed

Lines changed: 942 additions & 748 deletions

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
# Changelog
22
## v1.1.6 6/26/24
3-
- Generalized Avro functions and separated encoding/decoding behavior.
3+
- Generalized Avro functions and separated encoding/decoding behavior
44

55
## v1.1.5 6/6/24
66
- Use executemany instead of execute when appropriate in RedshiftClient.execute_transaction

src/nypl_py_utils/classes/avro_client.py

Lines changed: 54 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -9,46 +9,53 @@
99
from nypl_py_utils.functions.log_helper import create_log
1010
from requests.exceptions import JSONDecodeError, RequestException
1111

12+
1213
class AvroClient:
1314
"""
1415
Base class for Avro schema interaction. Takes as input the
1516
Platform API endpoint from which to fetch the schema in JSON format.
1617
"""
17-
18+
1819
def __init__(self, platform_schema_url):
19-
self.logger = create_log('avro_encoder')
20-
self.schema = avro.schema.parse(
21-
self.get_json_schema(platform_schema_url))
22-
20+
self.logger = create_log("avro_encoder")
21+
self.schema = avro.schema.parse(self.get_json_schema(platform_schema_url))
22+
2323
def get_json_schema(self, platform_schema_url):
2424
"""
2525
Fetches a JSON response from the input Platform API endpoint and
2626
interprets it as an Avro schema.
2727
"""
28-
self.logger.info('Fetching Avro schema from {}'.format(
29-
platform_schema_url))
28+
self.logger.info("Fetching Avro schema from {}".format(platform_schema_url))
3029
try:
3130
response = requests.get(platform_schema_url)
3231
response.raise_for_status()
3332
except RequestException as e:
3433
self.logger.error(
35-
'Failed to retrieve schema from {url}: {error}'.format(
36-
url=platform_schema_url, error=e))
34+
"Failed to retrieve schema from {url}: {error}".format(
35+
url=platform_schema_url, error=e
36+
)
37+
)
3738
raise AvroClientError(
38-
'Failed to retrieve schema from {url}: {error}'.format(
39-
url=platform_schema_url, error=e)) from None
39+
"Failed to retrieve schema from {url}: {error}".format(
40+
url=platform_schema_url, error=e
41+
)
42+
) from None
4043

4144
try:
4245
json_response = response.json()
43-
return json_response['data']['schema']
46+
return json_response["data"]["schema"]
4447
except (JSONDecodeError, KeyError) as e:
4548
self.logger.error(
46-
'Retrieved schema is malformed: {errorType} {errorMessage}'
47-
.format(errorType=type(e), errorMessage=e))
49+
"Retrieved schema is malformed: {errorType} {errorMessage}".format(
50+
errorType=type(e), errorMessage=e
51+
)
52+
)
4853
raise AvroClientError(
49-
'Retrieved schema is malformed: {errorType} {errorMessage}'
50-
.format(errorType=type(e), errorMessage=e)) from None
51-
54+
"Retrieved schema is malformed: {errorType} {errorMessage}".format(
55+
errorType=type(e), errorMessage=e
56+
)
57+
) from None
58+
5259

5360
class AvroEncoder(AvroClient):
5461
"""
@@ -63,18 +70,17 @@ def encode_record(self, record):
6370
Returns the encoded record as a byte string.
6471
"""
6572
self.logger.debug(
66-
'Encoding record using {schema} schema'.format(
67-
schema=self.schema.name))
73+
"Encoding record using {schema} schema".format(schema=self.schema.name)
74+
)
6875
datum_writer = DatumWriter(self.schema)
6976
with BytesIO() as output_stream:
7077
encoder = BinaryEncoder(output_stream)
7178
try:
7279
datum_writer.write(record, encoder)
7380
return output_stream.getvalue()
7481
except AvroException as e:
75-
self.logger.error('Failed to encode record: {}'.format(e))
76-
raise AvroClientError(
77-
'Failed to encode record: {}'.format(e)) from None
82+
self.logger.error("Failed to encode record: {}".format(e))
83+
raise AvroClientError("Failed to encode record: {}".format(e)) from None
7884

7985
def encode_batch(self, record_list):
8086
"""
@@ -83,8 +89,10 @@ def encode_batch(self, record_list):
8389
Returns a list of byte strings where each string is an encoded record.
8490
"""
8591
self.logger.info(
86-
'Encoding ({num_rec}) records using {schema} schema'.format(
87-
num_rec=len(record_list), schema=self.schema.name))
92+
"Encoding ({num_rec}) records using {schema} schema".format(
93+
num_rec=len(record_list), schema=self.schema.name
94+
)
95+
)
8896
encoded_records = []
8997
datum_writer = DatumWriter(self.schema)
9098
with BytesIO() as output_stream:
@@ -96,9 +104,10 @@ def encode_batch(self, record_list):
96104
output_stream.seek(0)
97105
output_stream.truncate(0)
98106
except AvroException as e:
99-
self.logger.error('Failed to encode record: {}'.format(e))
107+
self.logger.error("Failed to encode record: {}".format(e))
100108
raise AvroClientError(
101-
'Failed to encode record: {}'.format(e)) from None
109+
"Failed to encode record: {}".format(e)
110+
) from None
102111
return encoded_records
103112

104113

@@ -110,34 +119,37 @@ class AvroDecoder(AvroClient):
110119

111120
def decode_record(self, record, encoding="binary"):
112121
"""
113-
Decodes a single record represented either as a byte or
122+
Decodes a single record represented either as a byte or
114123
base64 string, using the given Avro schema.
115124
116125
Returns a dictionary where each key is a field in the schema.
117126
"""
118-
self.logger.info('Decoding {rec} of type {type} using {schema} schema'.format(
119-
rec=record, type=encoding, schema=self.schema.name))
120-
127+
self.logger.info(
128+
"Decoding {rec} of type {type} using {schema} schema".format(
129+
rec=record, type=encoding, schema=self.schema.name
130+
)
131+
)
132+
121133
if encoding == "base64":
122134
return self._decode_base64(record)
123135
elif encoding == "binary":
124136
return self._decode_binary(record)
125137
else:
126-
self.logger.error('Failed to decode record due to encoding type: {}'.format(encoding))
127-
raise AvroClientError(
128-
'Invalid encoding type: {}'.format(encoding))
129-
138+
self.logger.error(
139+
"Failed to decode record due to encoding type: {}".format(encoding)
140+
)
141+
raise AvroClientError("Invalid encoding type: {}".format(encoding))
142+
130143
def _decode_base64(self, record):
131-
decoded_data = base64.b64decode(record).decode("utf-8")
144+
decoded_data = base64.b64decode(record)
132145
try:
133146
return json.loads(decoded_data)
134147
except Exception as e:
135148
if isinstance(decoded_data, bytes):
136-
self._decode_binary(decoded_data)
149+
return self._decode_binary(decoded_data)
137150
else:
138-
self.logger.error('Failed to decode record: {}'.format(e))
139-
raise AvroClientError(
140-
'Failed to decode record: {}'.format(e)) from None
151+
self.logger.error("Failed to decode record: {}".format(e))
152+
raise AvroClientError("Failed to decode record: {}".format(e)) from None
141153

142154
def _decode_binary(self, record):
143155
datum_reader = DatumReader(self.schema)
@@ -146,11 +158,10 @@ def _decode_binary(self, record):
146158
try:
147159
return datum_reader.read(decoder)
148160
except Exception as e:
149-
self.logger.error('Failed to decode record: {}'.format(e))
150-
raise AvroClientError(
151-
'Failed to decode record: {}'.format(e)) from None
161+
self.logger.error("Failed to decode record: {}".format(e))
162+
raise AvroClientError("Failed to decode record: {}".format(e)) from None
152163

153164

154165
class AvroClientError(Exception):
155166
def __init__(self, message=None):
156-
self.message = message
167+
self.message = message

src/nypl_py_utils/classes/kinesis_client.py

Lines changed: 35 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,19 @@ class KinesisClient:
1717
"""
1818

1919
def __init__(self, stream_arn, batch_size, max_retries=5):
20-
self.logger = create_log('kinesis_client')
20+
self.logger = create_log("kinesis_client")
2121
self.stream_arn = stream_arn
2222
self.batch_size = batch_size
2323
self.max_retries = max_retries
2424

2525
try:
2626
self.kinesis_client = boto3.client(
27-
'kinesis', region_name=os.environ.get('AWS_REGION',
28-
'us-east-1'))
27+
"kinesis", region_name=os.environ.get("AWS_REGION", "us-east-1")
28+
)
2929
except ClientError as e:
30-
self.logger.error(
31-
'Could not create Kinesis client: {err}'.format(err=e))
30+
self.logger.error("Could not create Kinesis client: {err}".format(err=e))
3231
raise KinesisClientError(
33-
'Could not create Kinesis client: {err}'.format(err=e)
32+
"Could not create Kinesis client: {err}".format(err=e)
3433
) from None
3534

3635
def close(self):
@@ -45,10 +44,11 @@ def send_records(self, records):
4544
"""
4645
records_sent_since_pause = 0
4746
for i in range(0, len(records), self.batch_size):
48-
encoded_batch = records[i:i + self.batch_size]
49-
kinesis_records = [{'Data': record, 'PartitionKey':
50-
str(int(time.time() * 1000000000))}
51-
for record in encoded_batch]
47+
encoded_batch = records[i : i + self.batch_size]
48+
kinesis_records = [
49+
{"Data": record, "PartitionKey": str(int(time.time() * 1000000000))}
50+
for record in encoded_batch
51+
]
5252

5353
if records_sent_since_pause + len(encoded_batch) > 1000:
5454
records_sent_since_pause = 0
@@ -63,32 +63,41 @@ def _send_kinesis_format_records(self, kinesis_records, call_count):
6363
"""
6464
if call_count > self.max_retries:
6565
self.logger.error(
66-
'Failed to send records to Kinesis {} times in a row'.format(
67-
call_count-1))
66+
"Failed to send records to Kinesis {} times in a row".format(
67+
call_count - 1
68+
)
69+
)
6870
raise KinesisClientError(
69-
'Failed to send records to Kinesis {} times in a row'.format(
70-
call_count-1)) from None
71+
"Failed to send records to Kinesis {} times in a row".format(
72+
call_count - 1
73+
)
74+
) from None
7175

7276
try:
7377
self.logger.info(
74-
'Sending ({count}) records to {arn} Kinesis stream'.format(
75-
count=len(kinesis_records), arn=self.stream_arn))
78+
"Sending ({count}) records to {arn} Kinesis stream".format(
79+
count=len(kinesis_records), arn=self.stream_arn
80+
)
81+
)
7682
response = self.kinesis_client.put_records(
77-
Records=kinesis_records, StreamARN=self.stream_arn)
78-
if response['FailedRecordCount'] > 0:
83+
Records=kinesis_records, StreamARN=self.stream_arn
84+
)
85+
if response["FailedRecordCount"] > 0:
7986
self.logger.warning(
80-
'Failed to send {} records to Kinesis'.format(
81-
response['FailedRecordCount']))
87+
"Failed to send {} records to Kinesis".format(
88+
response["FailedRecordCount"]
89+
)
90+
)
8291
failed_records = []
83-
for i in range(len(response['Records'])):
84-
if 'ErrorCode' in response['Records'][i]:
92+
for i in range(len(response["Records"])):
93+
if "ErrorCode" in response["Records"][i]:
8594
failed_records.append(kinesis_records[i])
86-
self._send_kinesis_format_records(failed_records, call_count+1)
95+
self._send_kinesis_format_records(failed_records, call_count + 1)
8796
except ClientError as e:
88-
self.logger.error(
89-
'Error sending records to Kinesis: {}'.format(e))
97+
self.logger.error("Error sending records to Kinesis: {}".format(e))
9098
raise KinesisClientError(
91-
'Error sending records to Kinesis: {}'.format(e)) from None
99+
"Error sending records to Kinesis: {}".format(e)
100+
) from None
92101

93102

94103
class KinesisClientError(Exception):

src/nypl_py_utils/classes/kms_client.py

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,16 +11,17 @@ class KmsClient:
1111
"""Client for interacting with a KMS client"""
1212

1313
def __init__(self):
14-
self.logger = create_log('kms_client')
14+
self.logger = create_log("kms_client")
1515

1616
try:
1717
self.kms_client = boto3.client(
18-
'kms', region_name=os.environ.get('AWS_REGION', 'us-east-1'))
18+
"kms", region_name=os.environ.get("AWS_REGION", "us-east-1")
19+
)
1920
except ClientError as e:
20-
self.logger.error(
21-
'Could not create KMS client: {err}'.format(err=e))
21+
self.logger.error("Could not create KMS client: {err}".format(err=e))
2222
raise KmsClientError(
23-
'Could not create KMS client: {err}'.format(err=e)) from None
23+
"Could not create KMS client: {err}".format(err=e)
24+
) from None
2425

2526
def close(self):
2627
self.kms_client.close()
@@ -30,16 +31,19 @@ def decrypt(self, encrypted_text):
3031
This method takes a base 64 KMS-encoded string and uses the KMS client
3132
to decrypt it into a usable string.
3233
"""
33-
self.logger.debug('Decrypting \'{}\''.format(encrypted_text))
34+
self.logger.debug("Decrypting '{}'".format(encrypted_text))
3435
try:
3536
decoded_text = b64decode(encrypted_text)
3637
return self.kms_client.decrypt(CiphertextBlob=decoded_text)[
37-
'Plaintext'].decode('utf-8')
38+
"Plaintext"
39+
].decode("utf-8")
3840
except (ClientError, base64Error, TypeError) as e:
39-
self.logger.error('Could not decrypt \'{val}\': {err}'.format(
40-
val=encrypted_text, err=e))
41-
raise KmsClientError('Could not decrypt \'{val}\': {err}'.format(
42-
val=encrypted_text, err=e)) from None
41+
self.logger.error(
42+
"Could not decrypt '{val}': {err}".format(val=encrypted_text, err=e)
43+
)
44+
raise KmsClientError(
45+
"Could not decrypt '{val}': {err}".format(val=encrypted_text, err=e)
46+
) from None
4347

4448

4549
class KmsClientError(Exception):

0 commit comments

Comments
 (0)