99from nypl_py_utils .functions .log_helper import create_log
1010from requests .exceptions import JSONDecodeError , RequestException
1111
12-
1312class AvroClient :
1413 """
1514 Base class for Avro schema interaction. Takes as input the
1615 Platform API endpoint from which to fetch the schema in JSON format.
1716 """
18-
17+
1918 def __init__ (self , platform_schema_url ):
20- self .logger = create_log ("avro_encoder" )
21- self .schema = avro .schema .parse (self .get_json_schema (platform_schema_url ))
22-
19+ self .logger = create_log ('avro_encoder' )
20+ self .schema = avro .schema .parse (
21+ self .get_json_schema (platform_schema_url ))
22+
2323 def get_json_schema (self , platform_schema_url ):
2424 """
2525 Fetches a JSON response from the input Platform API endpoint and
2626 interprets it as an Avro schema.
2727 """
28- self .logger .info ("Fetching Avro schema from {}" .format (platform_schema_url ))
28+ self .logger .info ('Fetching Avro schema from {}' .format (
29+ platform_schema_url ))
2930 try :
3031 response = requests .get (platform_schema_url )
3132 response .raise_for_status ()
3233 except RequestException as e :
3334 self .logger .error (
34- "Failed to retrieve schema from {url}: {error}" .format (
35- url = platform_schema_url , error = e
36- )
37- )
35+ 'Failed to retrieve schema from {url}: {error}' .format (
36+ url = platform_schema_url , error = e ))
3837 raise AvroClientError (
39- "Failed to retrieve schema from {url}: {error}" .format (
40- url = platform_schema_url , error = e
41- )
42- ) from None
38+ 'Failed to retrieve schema from {url}: {error}' .format (
39+ url = platform_schema_url , error = e )) from None
4340
4441 try :
4542 json_response = response .json ()
46- return json_response [" data" ][ " schema" ]
43+ return json_response [' data' ][ ' schema' ]
4744 except (JSONDecodeError , KeyError ) as e :
4845 self .logger .error (
49- "Retrieved schema is malformed: {errorType} {errorMessage}" .format (
50- errorType = type (e ), errorMessage = e
51- )
52- )
46+ 'Retrieved schema is malformed: {errorType} {errorMessage}'
47+ .format (errorType = type (e ), errorMessage = e ))
5348 raise AvroClientError (
54- "Retrieved schema is malformed: {errorType} {errorMessage}" .format (
55- errorType = type (e ), errorMessage = e
56- )
57- ) from None
58-
49+ 'Retrieved schema is malformed: {errorType} {errorMessage}'
50+ .format (errorType = type (e ), errorMessage = e )) from None
51+
5952
6053class AvroEncoder (AvroClient ):
6154 """
@@ -70,17 +63,18 @@ def encode_record(self, record):
7063 Returns the encoded record as a byte string.
7164 """
7265 self .logger .debug (
73- " Encoding record using {schema} schema" .format (schema = self . schema . name )
74- )
66+ ' Encoding record using {schema} schema' .format (
67+ schema = self . schema . name ) )
7568 datum_writer = DatumWriter (self .schema )
7669 with BytesIO () as output_stream :
7770 encoder = BinaryEncoder (output_stream )
7871 try :
7972 datum_writer .write (record , encoder )
8073 return output_stream .getvalue ()
8174 except AvroException as e :
82- self .logger .error ("Failed to encode record: {}" .format (e ))
83- raise AvroClientError ("Failed to encode record: {}" .format (e )) from None
75+ self .logger .error ('Failed to encode record: {}' .format (e ))
76+ raise AvroClientError (
77+ 'Failed to encode record: {}' .format (e )) from None
8478
8579 def encode_batch (self , record_list ):
8680 """
@@ -89,10 +83,8 @@ def encode_batch(self, record_list):
8983 Returns a list of byte strings where each string is an encoded record.
9084 """
9185 self .logger .info (
92- "Encoding ({num_rec}) records using {schema} schema" .format (
93- num_rec = len (record_list ), schema = self .schema .name
94- )
95- )
86+ 'Encoding ({num_rec}) records using {schema} schema' .format (
87+ num_rec = len (record_list ), schema = self .schema .name ))
9688 encoded_records = []
9789 datum_writer = DatumWriter (self .schema )
9890 with BytesIO () as output_stream :
@@ -104,10 +96,9 @@ def encode_batch(self, record_list):
10496 output_stream .seek (0 )
10597 output_stream .truncate (0 )
10698 except AvroException as e :
107- self .logger .error (" Failed to encode record: {}" .format (e ))
99+ self .logger .error (' Failed to encode record: {}' .format (e ))
108100 raise AvroClientError (
109- "Failed to encode record: {}" .format (e )
110- ) from None
101+ 'Failed to encode record: {}' .format (e )) from None
111102 return encoded_records
112103
113104
@@ -119,37 +110,34 @@ class AvroDecoder(AvroClient):
119110
120111 def decode_record (self , record , encoding = "binary" ):
121112 """
122- Decodes a single record represented either as a byte or
113+ Decodes a single record represented either as a byte or
123114 base64 string, using the given Avro schema.
124115
125116 Returns a dictionary where each key is a field in the schema.
126117 """
127- self .logger .info (
128- "Decoding {rec} of type {type} using {schema} schema" .format (
129- rec = record , type = encoding , schema = self .schema .name
130- )
131- )
132-
118+ self .logger .info ('Decoding {rec} of type {type} using {schema} schema' .format (
119+ rec = record , type = encoding , schema = self .schema .name ))
120+
133121 if encoding == "base64" :
134122 return self ._decode_base64 (record )
135123 elif encoding == "binary" :
136124 return self ._decode_binary (record )
137125 else :
138- self .logger .error (
139- "Failed to decode record due to encoding type: {}" .format (encoding )
140- )
141- raise AvroClientError ("Invalid encoding type: {}" .format (encoding ))
142-
126+ self .logger .error ('Failed to decode record due to encoding type: {}' .format (encoding ))
127+ raise AvroClientError (
128+ 'Invalid encoding type: {}' .format (encoding ))
129+
143130 def _decode_base64 (self , record ):
144- decoded_data = base64 .b64decode (record )
131+ decoded_data = base64 .b64decode (record ). decode ( "utf-8" )
145132 try :
146133 return json .loads (decoded_data )
147134 except Exception as e :
148135 if isinstance (decoded_data , bytes ):
149- return self ._decode_binary (decoded_data )
136+ self ._decode_binary (decoded_data )
150137 else :
151- self .logger .error ("Failed to decode record: {}" .format (e ))
152- raise AvroClientError ("Failed to decode record: {}" .format (e )) from None
138+ self .logger .error ('Failed to decode record: {}' .format (e ))
139+ raise AvroClientError (
140+ 'Failed to decode record: {}' .format (e )) from None
153141
154142 def _decode_binary (self , record ):
155143 datum_reader = DatumReader (self .schema )
@@ -158,10 +146,11 @@ def _decode_binary(self, record):
158146 try :
159147 return datum_reader .read (decoder )
160148 except Exception as e :
161- self .logger .error ("Failed to decode record: {}" .format (e ))
162- raise AvroClientError ("Failed to decode record: {}" .format (e )) from None
149+ self .logger .error ('Failed to decode record: {}' .format (e ))
150+ raise AvroClientError (
151+ 'Failed to decode record: {}' .format (e )) from None
163152
164153
165154class AvroClientError (Exception ):
166155 def __init__ (self , message = None ):
167- self .message = message
156+ self .message = message
0 commit comments