Skip to content

Commit fa92d91

Browse files
committed
Use fastavro for avro encoding/decoding
1 parent 8e4e968 commit fa92d91

3 files changed

Lines changed: 66 additions & 51 deletions

File tree

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ dependencies = []
2424

2525
[project.optional-dependencies]
2626
avro-client = [
27-
"avro>=1.11.1",
27+
"fastavro>=1.11.1",
2828
"requests>=2.28.1"
2929
]
3030
cloudlibrary-client = [

src/nypl_py_utils/classes/avro_client.py

Lines changed: 27 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,7 @@
1-
import avro.schema
1+
import json
22
import requests
33

4-
from avro.errors import AvroException
5-
from avro.io import BinaryDecoder, BinaryEncoder, DatumReader, DatumWriter
4+
from fastavro import schemaless_writer, schemaless_reader, parse_schema
65
from io import BytesIO
76
from nypl_py_utils.functions.log_helper import create_log
87
from requests.adapters import HTTPAdapter, Retry
@@ -23,7 +22,7 @@ def __init__(self, platform_schema_url):
2322
self.session = requests.Session()
2423
self.session.mount("https://",
2524
HTTPAdapter(max_retries=retry_policy))
26-
self.schema = avro.schema.parse(
25+
self.schema = parse_schema(
2726
self.get_json_schema(platform_schema_url))
2827

2928
def get_json_schema(self, platform_schema_url):
@@ -52,7 +51,7 @@ def get_json_schema(self, platform_schema_url):
5251

5352
try:
5453
json_response = response.json()
55-
return json_response["data"]["schema"]
54+
return json.loads(json_response["data"]["schema"])
5655
except (JSONDecodeError, KeyError) as e:
5756
self.logger.error(
5857
"Retrieved schema is malformed: {errorType} {errorMessage}"
@@ -70,26 +69,27 @@ class AvroEncoder(AvroClient):
7069
Platform API endpoint from which to fetch the schema in JSON format.
7170
"""
7271

73-
def encode_record(self, record):
72+
def encode_record(self, record, silent=False):
7473
"""
7574
Encodes a single JSON record using the given Avro schema.
7675
7776
Returns the encoded record as a byte string.
7877
"""
79-
self.logger.debug(
80-
"Encoding record using {schema} schema".format(
81-
schema=self.schema.name)
82-
)
83-
datum_writer = DatumWriter(self.schema)
78+
if not silent:
79+
self.logger.info(
80+
"Encoding record using {schema} schema".format(
81+
schema=self.schema['name']
82+
)
83+
)
8484
with BytesIO() as output_stream:
85-
encoder = BinaryEncoder(output_stream)
8685
try:
87-
datum_writer.write(record, encoder)
86+
schemaless_writer(output_stream, self.schema, record, strict_allow_default=True)
8887
return output_stream.getvalue()
89-
except AvroException as e:
88+
except Exception as e:
9089
self.logger.error("Failed to encode record: {}".format(e))
9190
raise AvroClientError(
92-
"Failed to encode record: {}".format(e)) from None
91+
"Failed to encode record: {}".format(e)
92+
) from None
9393

9494
def encode_batch(self, record_list):
9595
"""
@@ -99,25 +99,10 @@ def encode_batch(self, record_list):
9999
"""
100100
self.logger.info(
101101
"Encoding ({num_rec}) records using {schema} schema".format(
102-
num_rec=len(record_list), schema=self.schema.name
102+
num_rec=len(record_list), schema=self.schema['name']
103103
)
104104
)
105-
encoded_records = []
106-
datum_writer = DatumWriter(self.schema)
107-
with BytesIO() as output_stream:
108-
encoder = BinaryEncoder(output_stream)
109-
for record in record_list:
110-
try:
111-
datum_writer.write(record, encoder)
112-
encoded_records.append(output_stream.getvalue())
113-
output_stream.seek(0)
114-
output_stream.truncate(0)
115-
except AvroException as e:
116-
self.logger.error("Failed to encode record: {}".format(e))
117-
raise AvroClientError(
118-
"Failed to encode record: {}".format(e)
119-
) from None
120-
return encoded_records
105+
return [self.encode_record(record, silent=True) for record in record_list]
121106

122107

123108
class AvroDecoder(AvroClient):
@@ -126,28 +111,28 @@ class AvroDecoder(AvroClient):
126111
Platform API endpoint from which to fetch the schema in JSON format.
127112
"""
128113

129-
def decode_record(self, record):
114+
def decode_record(self, record, silent=False):
130115
"""
131116
Decodes a single record represented using the given Avro
132117
schema. Input must be a bytes-like object.
133118
134119
Returns a dictionary where each key is a field in the schema.
135120
"""
136-
self.logger.debug(
137-
"Decoding {rec} using {schema} schema".format(
138-
rec=record, schema=self.schema.name
121+
if not silent:
122+
self.logger.info(
123+
"Decoding record using {schema} schema".format(
124+
schema=self.schema['name']
125+
)
139126
)
140-
)
141-
datum_reader = DatumReader(self.schema)
142127
with BytesIO(record) as input_stream:
143-
decoder = BinaryDecoder(input_stream)
144128
try:
145-
return datum_reader.read(decoder)
129+
return schemaless_reader(input_stream, self.schema)
146130
except Exception as e:
147131
self.logger.error("Failed to decode record: {}".format(e))
148132
raise AvroClientError(
149133
"Failed to decode record: {}".format(e)) from None
150134

135+
151136
def decode_batch(self, record_list):
152137
"""
153138
Decodes a list of JSON records using the given Avro schema. Input
@@ -157,14 +142,10 @@ def decode_batch(self, record_list):
157142
"""
158143
self.logger.info(
159144
"Decoding ({num_rec}) records using {schema} schema".format(
160-
num_rec=len(record_list), schema=self.schema.name
145+
num_rec=len(record_list), schema=self.schema['name']
161146
)
162147
)
163-
decoded_records = []
164-
for record in record_list:
165-
decoded_record = self.decode_record(record)
166-
decoded_records.append(decoded_record)
167-
return decoded_records
148+
return [self.decode_record(record, silent=True) for record in record_list]
168149

169150

170151
class AvroClientError(Exception):

tests/test_avro_client.py

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,43 @@
2020
]
2121
})}}
2222

23+
FASTAVRO_SCHEMA = {
24+
"type": "record",
25+
"name": "TestSchema",
26+
"fields": [
27+
{
28+
"name": "patron_id",
29+
"type": "int"
30+
},
31+
{
32+
"name": "library_branch",
33+
"type": [
34+
"null",
35+
"string"
36+
]
37+
}
38+
],
39+
"__fastavro_parsed": True,
40+
"__named_schemas": {
41+
"TestSchema": {
42+
"type": "record",
43+
"name": "TestSchema",
44+
"fields": [
45+
{
46+
"name": "patron_id",
47+
"type": "int"
48+
},
49+
{
50+
"name": "library_branch",
51+
"type": [
52+
"null",
53+
"string"
54+
]
55+
}
56+
]
57+
}
58+
}
59+
}
2360

2461
class TestAvroClient:
2562
@pytest.fixture
@@ -36,10 +73,7 @@ def test_avro_decoder_instance(self, requests_mock):
3673

3774
def test_get_json_schema_success(self, test_avro_encoder_instance,
3875
test_avro_decoder_instance):
39-
assert test_avro_encoder_instance.schema == _TEST_SCHEMA["data"][
40-
"schema"]
41-
assert test_avro_decoder_instance.schema == _TEST_SCHEMA["data"][
42-
"schema"]
76+
assert test_avro_encoder_instance.schema == FASTAVRO_SCHEMA
4377

4478
def test_get_json_schema_error(self, requests_mock):
4579
requests_mock.get("https://test_schema_url", exc=ConnectTimeout)

0 commit comments

Comments
 (0)