Skip to content

Commit f284f9e

Browse files
committed
It's working
1 parent e20e33c commit f284f9e

5 files changed

Lines changed: 125 additions & 332 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,3 +5,4 @@ __pycache__/
55
*env/
66
*.py[cod]
77
*$py.class
8+
*.pytest_cache

pyproject.toml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,3 +75,11 @@ development = [
7575
"pytest-mock>=3.10.0",
7676
"requests-mock>=1.10.0"
7777
]
78+
79+
[tool.pytest.ini_options]
80+
minversion = "8.0"
81+
addopts = "-ra -q"
82+
pythonpath = "src"
83+
testpaths = [
84+
"tests"
85+
]

src/nypl_py_utils/classes/avro_client.py

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import avro.schema
22
import base64
3+
import json
34
import requests
45

56
from avro.schema import AvroException
@@ -114,8 +115,8 @@ def decode_record(self, record, encoding="binary"):
114115
115116
Returns a dictionary where each key is a field in the schema.
116117
"""
117-
self.logger.debug('Decoding {rec} using {schema} schema'.format(
118-
rec=record, schema=self.schema.name))
118+
self.logger.info('Decoding {rec} of type {type} using {schema} schema'.format(
119+
rec=record, type=encoding, schema=self.schema.name))
119120

120121
if encoding == "base64":
121122
return self._decode_base64(record)
@@ -125,9 +126,18 @@ def decode_record(self, record, encoding="binary"):
125126
self.logger.error('Failed to decode record due to encoding type: {}'.format(encoding))
126127
raise AvroClientError(
127128
'Invalid encoding type: {}'.format(encoding))
128-
129+
129130
def _decode_base64(self, record):
130-
return base64.b64decode(record).decode('utf-8')
131+
decoded_data = base64.b64decode(record).decode("utf-8")
132+
try:
133+
return json.loads(decoded_data)
134+
except Exception as e:
135+
if isinstance(decoded_data, bytes):
136+
self._decode_binary(decoded_data)
137+
else:
138+
self.logger.error('Failed to decode record: {}'.format(e))
139+
raise AvroClientError(
140+
'Failed to decode record: {}'.format(e)) from None
131141

132142
def _decode_binary(self, record):
133143
datum_reader = DatumReader(self.schema)

tests/test_avro_client.py

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import json
2+
import pytest
3+
4+
from nypl_py_utils.classes.avro_client import AvroDecoder, AvroEncoder, AvroClientError
5+
from requests.exceptions import ConnectTimeout
6+
7+
_TEST_SCHEMA = {'data': {'schema': json.dumps({
8+
'name': 'TestSchema',
9+
'type': 'record',
10+
'fields': [
11+
{
12+
'name': 'patron_id',
13+
'type': 'int'
14+
},
15+
{
16+
'name': 'library_branch',
17+
'type': ['null', 'string']
18+
}
19+
]
20+
})}}
21+
22+
class TestAvroClient:
23+
24+
@pytest.fixture
25+
def test_avro_encoder_instance(self, requests_mock):
26+
requests_mock.get(
27+
'https://test_schema_url', text=json.dumps(_TEST_SCHEMA))
28+
return AvroEncoder('https://test_schema_url')
29+
30+
@pytest.fixture
31+
def test_avro_decoder_instance(self, requests_mock):
32+
requests_mock.get(
33+
'https://test_schema_url', text=json.dumps(_TEST_SCHEMA))
34+
return AvroDecoder('https://test_schema_url')
35+
36+
def test_get_json_schema(self, test_avro_encoder_instance, test_avro_decoder_instance):
37+
assert test_avro_encoder_instance.schema == _TEST_SCHEMA['data']['schema']
38+
assert test_avro_decoder_instance.schema == _TEST_SCHEMA['data']['schema']
39+
40+
def test_request_error(self, requests_mock):
41+
requests_mock.get('https://test_schema_url', exc=ConnectTimeout)
42+
with pytest.raises(AvroClientError):
43+
AvroEncoder('https://test_schema_url')
44+
45+
def test_bad_json_error(self, requests_mock):
46+
requests_mock.get(
47+
'https://test_schema_url', text='bad json')
48+
with pytest.raises(AvroClientError):
49+
AvroEncoder('https://test_schema_url')
50+
51+
def test_missing_key_error(self, requests_mock):
52+
requests_mock.get(
53+
'https://test_schema_url', text=json.dumps({'field': 'value'}))
54+
with pytest.raises(AvroClientError):
55+
AvroEncoder('https://test_schema_url')
56+
57+
def test_encode_record(self, test_avro_encoder_instance, test_avro_decoder_instance):
58+
TEST_RECORD = {'patron_id': 123, 'library_branch': 'aa'}
59+
encoded_record = test_avro_encoder_instance.encode_record(TEST_RECORD)
60+
assert type(encoded_record) is bytes
61+
assert test_avro_decoder_instance.decode_record(encoded_record) == TEST_RECORD
62+
63+
def test_encode_record_error(self, test_avro_encoder_instance):
64+
TEST_RECORD = {'patron_id': 123, 'bad_field': 'bad'}
65+
with pytest.raises(AvroClientError):
66+
test_avro_encoder_instance.encode_record(TEST_RECORD)
67+
68+
def test_encode_batch(self, test_avro_encoder_instance, test_avro_decoder_instance):
69+
TEST_BATCH = [
70+
{'patron_id': 123, 'library_branch': 'aa'},
71+
{'patron_id': 456, 'library_branch': None},
72+
{'patron_id': 789, 'library_branch': 'bb'}]
73+
encoded_records = test_avro_encoder_instance.encode_batch(TEST_BATCH)
74+
assert len(encoded_records) == len(TEST_BATCH)
75+
for i in range(3):
76+
assert type(encoded_records[i]) is bytes
77+
assert test_avro_decoder_instance.decode_record(
78+
encoded_records[i]) == TEST_BATCH[i]
79+
80+
def test_encode_batch_error(self, test_avro_encoder_instance):
81+
BAD_BATCH = [
82+
{'patron_id': 123, 'library_branch': 'aa'},
83+
{'patron_id': 456, 'bad_field': 'bad'}]
84+
with pytest.raises(AvroClientError):
85+
test_avro_encoder_instance.encode_batch(BAD_BATCH)
86+
87+
def test_decode_record_binary(self, test_avro_decoder_instance):
88+
TEST_DECODED_RECORD = {"patron_id": 123, "library_branch": "aa"}
89+
TEST_ENCODED_RECORD = b'\xf6\x01\x02\x04aa'
90+
assert test_avro_decoder_instance.decode_record(
91+
TEST_ENCODED_RECORD) == TEST_DECODED_RECORD
92+
93+
def test_decode_record_b64(self, test_avro_decoder_instance):
94+
TEST_DECODED_RECORD = {"patron_id'": 123, "library_branch": "aa"}
95+
TEST_ENCODED_RECORD = "eyJwYXRyb25faWQnIjogMTIzLCAibGlicmFyeV9icmFuY2giOiAiYWEifQ=="
96+
assert test_avro_decoder_instance.decode_record(
97+
TEST_ENCODED_RECORD, "base64") == TEST_DECODED_RECORD
98+
99+
def test_decode_record_error(self, test_avro_decoder_instance):
100+
TEST_ENCODED_RECORD = b'bad-encoding'
101+
with pytest.raises(AvroClientError):
102+
test_avro_decoder_instance.decode_record(TEST_ENCODED_RECORD)

0 commit comments

Comments
 (0)