Skip to content

Commit 41ef2df

Browse files
authored
Merge pull request #29 from NYPL/de-101/python-rewrite
Rename and segment avro_client functionality
2 parents 07927f5 + e06efef commit 41ef2df

8 files changed

Lines changed: 301 additions & 212 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ __pycache__/
55
*env/
66
*.py[cod]
77
*$py.class
8+
*.pytest_cache

.python-version

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
3.9.16
1+
3.9.16

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
11
# Changelog
2+
## v1.2.0 7/17/24
3+
- Generalized Avro functions and separated encoding/decoding behavior.
4+
25
## v1.1.6 7/12/24
36
- Add put functionality to Oauth2 Client
47
- Update pyproject version

pyproject.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "hatchling.build"
44

55
[project]
66
name = "nypl_py_utils"
7-
version = "1.1.6"
7+
version = "1.2.0"
88
authors = [
99
{ name="Aaron Friedman", email="aaronfriedman@nypl.org" },
1010
]
@@ -23,7 +23,7 @@ dependencies = []
2323
"Bug Tracker" = "https://github.com/NYPL/python-utils/issues"
2424

2525
[project.optional-dependencies]
26-
avro-encoder = [
26+
avro-client = [
2727
"avro>=1.11.1",
2828
"requests>=2.28.1"
2929
]
@@ -67,11 +67,19 @@ research-catalog-identifier-helper = [
6767
"requests>=2.28.1"
6868
]
6969
development = [
70-
"nypl_py_utils[avro-encoder,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
70+
"nypl_py_utils[avro-client,kinesis-client,kms-client,mysql-client,oauth2-api-client,postgresql-client,postgresql-pool-client,redshift-client,s3-client,config-helper,obfuscation-helper,research-catalog-identifier-helper]",
7171
"flake8>=6.0.0",
7272
"freezegun>=1.2.2",
7373
"mock>=4.0.3",
7474
"pytest>=7.2.0",
7575
"pytest-mock>=3.10.0",
7676
"requests-mock>=1.10.0"
7777
]
78+
79+
[tool.pytest.ini_options]
80+
minversion = "8.0"
81+
addopts = "-ra -q"
82+
pythonpath = "src"
83+
testpaths = [
84+
"tests"
85+
]
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import avro.schema
2+
import requests
3+
4+
from avro.errors import AvroException
5+
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
6+
from io import BytesIO
7+
from nypl_py_utils.functions.log_helper import create_log
8+
from requests.exceptions import JSONDecodeError, RequestException
9+
10+
11+
class AvroClient:
12+
"""
13+
Base class for Avro schema interaction. Takes as input the
14+
Platform API endpoint from which to fetch the schema in JSON format.
15+
"""
16+
17+
def __init__(self, platform_schema_url):
18+
self.logger = create_log("avro_encoder")
19+
self.schema = avro.schema.parse(
20+
self.get_json_schema(platform_schema_url))
21+
22+
def get_json_schema(self, platform_schema_url):
23+
"""
24+
Fetches a JSON response from the input Platform API endpoint and
25+
interprets it as an Avro schema.
26+
"""
27+
self.logger.info(
28+
"Fetching Avro schema from {}".format(platform_schema_url))
29+
try:
30+
response = requests.get(platform_schema_url)
31+
response.raise_for_status()
32+
except RequestException as e:
33+
self.logger.error(
34+
"Failed to retrieve schema from {url}: {error}".format(
35+
url=platform_schema_url, error=e
36+
)
37+
)
38+
raise AvroClientError(
39+
"Failed to retrieve schema from {url}: {error}".format(
40+
url=platform_schema_url, error=e
41+
)
42+
) from None
43+
44+
try:
45+
json_response = response.json()
46+
return json_response["data"]["schema"]
47+
except (JSONDecodeError, KeyError) as e:
48+
self.logger.error(
49+
"Retrieved schema is malformed: {errorType} {errorMessage}"
50+
.format(errorType=type(e), errorMessage=e)
51+
)
52+
raise AvroClientError(
53+
"Retrieved schema is malformed: {errorType} {errorMessage}"
54+
.format(errorType=type(e), errorMessage=e)
55+
) from None
56+
57+
58+
class AvroEncoder(AvroClient):
59+
"""
60+
Class for encoding records using an Avro schema. Takes as input the
61+
Platform API endpoint from which to fetch the schema in JSON format.
62+
"""
63+
64+
def encode_record(self, record):
65+
"""
66+
Encodes a single JSON record using the given Avro schema.
67+
68+
Returns the encoded record as a byte string.
69+
"""
70+
self.logger.debug(
71+
"Encoding record using {schema} schema".format(
72+
schema=self.schema.name)
73+
)
74+
datum_writer = DatumWriter(self.schema)
75+
with BytesIO() as output_stream:
76+
encoder = BinaryEncoder(output_stream)
77+
try:
78+
datum_writer.write(record, encoder)
79+
return output_stream.getvalue()
80+
except AvroException as e:
81+
self.logger.error("Failed to encode record: {}".format(e))
82+
raise AvroClientError(
83+
"Failed to encode record: {}".format(e)) from None
84+
85+
def encode_batch(self, record_list):
86+
"""
87+
Encodes a list of JSON records using the given Avro schema.
88+
89+
Returns a list of byte strings where each string is an encoded record.
90+
"""
91+
self.logger.info(
92+
"Encoding ({num_rec}) records using {schema} schema".format(
93+
num_rec=len(record_list), schema=self.schema.name
94+
)
95+
)
96+
encoded_records = []
97+
datum_writer = DatumWriter(self.schema)
98+
with BytesIO() as output_stream:
99+
encoder = BinaryEncoder(output_stream)
100+
for record in record_list:
101+
try:
102+
datum_writer.write(record, encoder)
103+
encoded_records.append(output_stream.getvalue())
104+
output_stream.seek(0)
105+
output_stream.truncate(0)
106+
except AvroException as e:
107+
self.logger.error("Failed to encode record: {}".format(e))
108+
raise AvroClientError(
109+
"Failed to encode record: {}".format(e)
110+
) from None
111+
return encoded_records
112+
113+
114+
class AvroDecoder(AvroClient):
115+
"""
116+
Class for decoding records using an Avro schema. Takes as input the
117+
Platform API endpoint from which to fetch the schema in JSON format.
118+
"""
119+
120+
def decode_record(self, record):
121+
"""
122+
Decodes a single record represented using the given Avro
123+
schema. Input must be a bytes-like object.
124+
125+
Returns a dictionary where each key is a field in the schema.
126+
"""
127+
self.logger.info(
128+
"Decoding {rec} using {schema} schema".format(
129+
rec=record, schema=self.schema.name
130+
)
131+
)
132+
datum_reader = DatumReader(self.schema)
133+
with BytesIO(record) as input_stream:
134+
decoder = BinaryDecoder(input_stream)
135+
try:
136+
return datum_reader.read(decoder)
137+
except Exception as e:
138+
self.logger.error("Failed to decode record: {}".format(e))
139+
raise AvroClientError(
140+
"Failed to decode record: {}".format(e)) from None
141+
142+
def decode_batch(self, record_list):
143+
"""
144+
Decodes a list of JSON records using the given Avro schema. Input
145+
must be a list of bytes-like objects.
146+
147+
Returns a list of strings where each string is a decoded record.
148+
"""
149+
self.logger.info(
150+
"Decoding ({num_rec}) records using {schema} schema".format(
151+
num_rec=len(record_list), schema=self.schema.name
152+
)
153+
)
154+
decoded_records = []
155+
for record in record_list:
156+
decoded_record = self.decode_record(record)
157+
decoded_records.append(decoded_record)
158+
return decoded_records
159+
160+
161+
class AvroClientError(Exception):
162+
def __init__(self, message=None):
163+
self.message = message

src/nypl_py_utils/classes/avro_encoder.py

Lines changed: 0 additions & 118 deletions
This file was deleted.

0 commit comments

Comments
 (0)