Skip to content

Commit dcf9599

Browse files
Merge branch 'qa' into production
2 parents 0fe24f9 + 10550cf commit dcf9599

14 files changed

Lines changed: 458 additions & 216 deletions

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ __pycache__/
55
*env/
66
*.py[cod]
77
*$py.class
8+
*.pytest_cache

.python-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.9.16
1+
3.9.16

CHANGELOG.md

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,17 @@
11
# Changelog
2+
## v1.3.0 7/30/24
3+
- Added SecretsManager client
4+
5+
## v1.2.1 7/25/24
6+
- Add retry for fetching Avro schemas
7+
8+
## v1.2.0 7/17/24
9+
- Generalized Avro functions and separated encoding/decoding behavior
10+
11+
## v1.1.6 7/12/24
12+
- Add put functionality to Oauth2 Client
13+
- Update pyproject version
14+
215
## v1.1.5 6/6/24
316
- Use executemany instead of execute when appropriate in RedshiftClient.execute_transaction
417

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ This package contains common Python utility classes and functions.
77
* Setting and retrieving a resource in S3
88
* Decrypting values with KMS
99
* Encoding and decoding records using a given Avro schema
10+
* Retrieving secrets from AWS Secrets Manager
1011
* Connecting to and querying a MySQL database
1112
* Connecting to and querying a PostgreSQL database
1213
* Connecting to and querying a PostgreSQL database using a connection pool
@@ -35,7 +36,7 @@ kinesis_client = KinesisClient(...)
3536
# Do not use any version below 1.0.0
3637
# All available optional dependencies can be found in pyproject.toml.
3738
# See the "Managing dependencies" section below for more details.
38-
nypl-py-utils[kinesis-client,config-helper]==1.1.2
39+
nypl-py-utils[kinesis-client,config-helper]==1.3.0
3940
```
4041

4142
## Developing locally

pyproject.toml

Lines changed: 15 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "nypl_py_utils"
7-
version = "1.1.5"
7+
version = "1.3.0"
88
authors = [
99
{ name="Aaron Friedman", email="aaronfriedman@nypl.org" },
1010
]
@@ -23,7 +23,7 @@ dependencies = []
2323
"Bug Tracker" = "https://github.com/NYPL/python-utils/issues"
2424

2525
[project.optional-dependencies]
26-
avro-encoder = [
26+
avro-client = [
2727
"avro>=1.11.1",
2828
"requests>=2.28.1"
2929
]
@@ -56,6 +56,10 @@ s3-client = [
5656
"boto3>=1.26.5",
5757
"botocore>=1.29.5"
5858
]
59+
secrets-manager-client = [
60+
"boto3>=1.26.5",
61+
"botocore>=1.29.5"
62+
]
5963
config-helper = [
6064
"nypl_py_utils[kms-client]",
6165
"PyYAML>=6.0"
@@ -67,11 +71,19 @@ research-catalog-identifier-helper = [
6771
"requests>=2.28.1"
6872
]
6973
development = [
70-
"nypl_py_utils[avro-encoder,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
74+
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,secrets-manager-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
7175
"flake8>=6.0.0",
7276
"freezegun>=1.2.2",
7377
"mock>=4.0.3",
7478
"pytest>=7.2.0",
7579
"pytest-mock>=3.10.0",
7680
"requests-mock>=1.10.0"
7781
]
82+
83+
[tool.pytest.ini_options]
84+
minversion = "8.0"
85+
addopts = "-ra -q"
86+
pythonpath = "src"
87+
testpaths = [
88+
"tests"
89+
]
Lines changed: 172 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,172 @@
1+
import avro.schema
2+
import requests
3+
4+
from avro.errors import AvroException
5+
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
6+
from io import BytesIO
7+
from nypl_py_utils.functions.log_helper import create_log
8+
from requests.adapters import HTTPAdapter, Retry
9+
from requests.exceptions import JSONDecodeError
10+
11+
12+
class AvroClient:
13+
"""
14+
Base class for Avro schema interaction. Takes as input the
15+
Platform API endpoint from which to fetch the schema in JSON format.
16+
"""
17+
18+
def __init__(self, platform_schema_url):
19+
self.logger = create_log("avro_client")
20+
retry_policy = Retry(total=3, backoff_factor=45,
21+
status_forcelist=[500, 502, 503, 504],
22+
allowed_methods=frozenset(['GET']))
23+
self.session = requests.Session()
24+
self.session.mount("https://",
25+
HTTPAdapter(max_retries=retry_policy))
26+
self.schema = avro.schema.parse(
27+
self.get_json_schema(platform_schema_url))
28+
29+
def get_json_schema(self, platform_schema_url):
30+
"""
31+
Fetches a JSON response from the input Platform API endpoint and
32+
interprets it as an Avro schema.
33+
"""
34+
self.logger.info(
35+
"Fetching Avro schema from {}".format(platform_schema_url))
36+
try:
37+
38+
response = self.session.get(url=platform_schema_url,
39+
timeout=60)
40+
response.raise_for_status()
41+
except Exception as e:
42+
self.logger.error(
43+
"Failed to retrieve schema from {url}: {error}".format(
44+
url=platform_schema_url, error=e
45+
)
46+
)
47+
raise AvroClientError(
48+
"Failed to retrieve schema from {url}: {error}".format(
49+
url=platform_schema_url, error=e
50+
)
51+
)
52+
53+
try:
54+
json_response = response.json()
55+
return json_response["data"]["schema"]
56+
except (JSONDecodeError, KeyError) as e:
57+
self.logger.error(
58+
"Retrieved schema is malformed: {errorType} {errorMessage}"
59+
.format(errorType=type(e), errorMessage=e)
60+
)
61+
raise AvroClientError(
62+
"Retrieved schema is malformed: {errorType} {errorMessage}"
63+
.format(errorType=type(e), errorMessage=e)
64+
) from None
65+
66+
67+
class AvroEncoder(AvroClient):
68+
"""
69+
Class for encoding records using an Avro schema. Takes as input the
70+
Platform API endpoint from which to fetch the schema in JSON format.
71+
"""
72+
73+
def encode_record(self, record):
74+
"""
75+
Encodes a single JSON record using the given Avro schema.
76+
77+
Returns the encoded record as a byte string.
78+
"""
79+
self.logger.debug(
80+
"Encoding record using {schema} schema".format(
81+
schema=self.schema.name)
82+
)
83+
datum_writer = DatumWriter(self.schema)
84+
with BytesIO() as output_stream:
85+
encoder = BinaryEncoder(output_stream)
86+
try:
87+
datum_writer.write(record, encoder)
88+
return output_stream.getvalue()
89+
except AvroException as e:
90+
self.logger.error("Failed to encode record: {}".format(e))
91+
raise AvroClientError(
92+
"Failed to encode record: {}".format(e)) from None
93+
94+
def encode_batch(self, record_list):
95+
"""
96+
Encodes a list of JSON records using the given Avro schema.
97+
98+
Returns a list of byte strings where each string is an encoded record.
99+
"""
100+
self.logger.info(
101+
"Encoding ({num_rec}) records using {schema} schema".format(
102+
num_rec=len(record_list), schema=self.schema.name
103+
)
104+
)
105+
encoded_records = []
106+
datum_writer = DatumWriter(self.schema)
107+
with BytesIO() as output_stream:
108+
encoder = BinaryEncoder(output_stream)
109+
for record in record_list:
110+
try:
111+
datum_writer.write(record, encoder)
112+
encoded_records.append(output_stream.getvalue())
113+
output_stream.seek(0)
114+
output_stream.truncate(0)
115+
except AvroException as e:
116+
self.logger.error("Failed to encode record: {}".format(e))
117+
raise AvroClientError(
118+
"Failed to encode record: {}".format(e)
119+
) from None
120+
return encoded_records
121+
122+
123+
class AvroDecoder(AvroClient):
124+
"""
125+
Class for decoding records using an Avro schema. Takes as input the
126+
Platform API endpoint from which to fetch the schema in JSON format.
127+
"""
128+
129+
def decode_record(self, record):
130+
"""
131+
Decodes a single record represented using the given Avro
132+
schema. Input must be a bytes-like object.
133+
134+
Returns a dictionary where each key is a field in the schema.
135+
"""
136+
self.logger.info(
137+
"Decoding {rec} using {schema} schema".format(
138+
rec=record, schema=self.schema.name
139+
)
140+
)
141+
datum_reader = DatumReader(self.schema)
142+
with BytesIO(record) as input_stream:
143+
decoder = BinaryDecoder(input_stream)
144+
try:
145+
return datum_reader.read(decoder)
146+
except Exception as e:
147+
self.logger.error("Failed to decode record: {}".format(e))
148+
raise AvroClientError(
149+
"Failed to decode record: {}".format(e)) from None
150+
151+
def decode_batch(self, record_list):
152+
"""
153+
Decodes a list of JSON records using the given Avro schema. Input
154+
must be a list of bytes-like objects.
155+
156+
Returns a list of strings where each string is a decoded record.
157+
"""
158+
self.logger.info(
159+
"Decoding ({num_rec}) records using {schema} schema".format(
160+
num_rec=len(record_list), schema=self.schema.name
161+
)
162+
)
163+
decoded_records = []
164+
for record in record_list:
165+
decoded_record = self.decode_record(record)
166+
decoded_records.append(decoded_record)
167+
return decoded_records
168+
169+
170+
class AvroClientError(Exception):
171+
def __init__(self, message=None):
172+
self.message = message

src/nypl_py_utils/classes/avro_encoder.py

Lines changed: 0 additions & 118 deletions
This file was deleted.

0 commit comments

Comments
 (0)