11import avro .schema
2+ import base64
23import requests
34
4- from avro .errors import AvroException
5+ from avro .schema import AvroException
56from avro .io import BinaryDecoder , BinaryEncoder , DatumReader , DatumWriter
67from io import BytesIO
78from nypl_py_utils .functions .log_helper import create_log
89from requests .exceptions import JSONDecodeError , RequestException
910
10- class AvroInterpreter :
11+ class AvroClient :
1112 """
1213 Base class for Avro schema interaction. Takes as input the
1314 Platform API endpoint from which to fetch the schema in JSON format.
@@ -32,7 +33,7 @@ def get_json_schema(self, platform_schema_url):
3233 self .logger .error (
3334 'Failed to retrieve schema from {url}: {error}' .format (
3435 url = platform_schema_url , error = e ))
35- raise AvroInterpreterError (
36+ raise AvroClientError (
3637 'Failed to retrieve schema from {url}: {error}' .format (
3738 url = platform_schema_url , error = e )) from None
3839
@@ -43,12 +44,12 @@ def get_json_schema(self, platform_schema_url):
4344 self .logger .error (
4445 'Retrieved schema is malformed: {errorType} {errorMessage}'
4546 .format (errorType = type (e ), errorMessage = e ))
46- raise AvroInterpreterError (
47+ raise AvroClientError (
4748 'Retrieved schema is malformed: {errorType} {errorMessage}'
4849 .format (errorType = type (e ), errorMessage = e )) from None
4950
5051
51- class AvroEncoder (AvroInterpreter ):
52+ class AvroEncoder (AvroClient ):
5253 """
5354 Class for encoding records using an Avro schema. Takes as input the
5455 Platform API endpoint from which to fetch the schema in JSON format.
@@ -71,7 +72,7 @@ def encode_record(self, record):
7172 return output_stream .getvalue ()
7273 except AvroException as e :
7374 self .logger .error ('Failed to encode record: {}' .format (e ))
74- raise AvroInterpreterError (
75+ raise AvroClientError (
7576 'Failed to encode record: {}' .format (e )) from None
7677
7778 def encode_batch (self , record_list ):
@@ -95,37 +96,51 @@ def encode_batch(self, record_list):
9596 output_stream .truncate (0 )
9697 except AvroException as e :
9798 self .logger .error ('Failed to encode record: {}' .format (e ))
98- raise AvroInterpreterError (
99+ raise AvroClientError (
99100 'Failed to encode record: {}' .format (e )) from None
100101 return encoded_records
101102
102103
103- class AvroDecoder (AvroInterpreter ):
104+ class AvroDecoder (AvroClient ):
104105 """
105106 Class for decoding records using an Avro schema. Takes as input the
106107 Platform API endpoint from which to fetch the schema in JSON format.
107108 """
108109
109- def decode_record (self , record ):
110+ def decode_record (self , record , encoding = "binary" ):
110111 """
111- Decodes a single record represented as a byte string using the given
112- Avro schema.
112+ Decodes a single record represented either as a byte or
113+ base64 string, using the given Avro schema.
113114
114115 Returns a dictionary where each key is a field in the schema.
115116 """
116117 self .logger .debug ('Decoding {rec} using {schema} schema' .format (
117118 rec = record , schema = self .schema .name ))
119+
120+ if encoding == "base64" :
121+ return self ._decode_base64 (record )
122+ elif encoding == "binary" :
123+ return self ._decode_binary (record )
124+ else :
125+ self .logger .error ('Failed to decode record due to encoding type: {}' .format (encoding ))
126+ raise AvroClientError (
127+ 'Invalid encoding type: {}' .format (encoding ))
128+
129+ def _decode_base64 (self , record ):
130+ return base64 .b64decode (record ).decode ('utf-8' )
131+
132+ def _decode_binary (self , record ):
118133 datum_reader = DatumReader (self .schema )
119134 with BytesIO (record ) as input_stream :
120135 decoder = BinaryDecoder (input_stream )
121136 try :
122137 return datum_reader .read (decoder )
123138 except Exception as e :
124139 self .logger .error ('Failed to decode record: {}' .format (e ))
125- raise AvroInterpreterError (
140+ raise AvroClientError (
126141 'Failed to decode record: {}' .format (e )) from None
127142
128143
129- class AvroInterpreterError (Exception ):
144+ class AvroClientError (Exception ):
130145 def __init__ (self , message = None ):
131146 self .message = message
0 commit comments