@@ -16,7 +16,7 @@ class AvroClient:
1616 """
1717
1818 def __init__ (self , platform_schema_url ):
19- self .logger = create_log (' avro_encoder' )
19+ self .logger = create_log (" avro_encoder" )
2020 self .schema = avro .schema .parse (
2121 self .get_json_schema (platform_schema_url ))
2222
@@ -25,29 +25,35 @@ def get_json_schema(self, platform_schema_url):
2525 Fetches a JSON response from the input Platform API endpoint and
2626 interprets it as an Avro schema.
2727 """
28- self .logger .info ('Fetching Avro schema from {}' . format (
29- platform_schema_url ))
28+ self .logger .info (
29+ "Fetching Avro schema from {}" . format ( platform_schema_url ))
3030 try :
3131 response = requests .get (platform_schema_url )
3232 response .raise_for_status ()
3333 except RequestException as e :
3434 self .logger .error (
35- 'Failed to retrieve schema from {url}: {error}' .format (
36- url = platform_schema_url , error = e ))
35+ "Failed to retrieve schema from {url}: {error}" .format (
36+ url = platform_schema_url , error = e
37+ )
38+ )
3739 raise AvroClientError (
38- 'Failed to retrieve schema from {url}: {error}' .format (
39- url = platform_schema_url , error = e )) from None
40+ "Failed to retrieve schema from {url}: {error}" .format (
41+ url = platform_schema_url , error = e
42+ )
43+ ) from None
4044
4145 try :
4246 json_response = response .json ()
43- return json_response [' data' ][ ' schema' ]
47+ return json_response [" data" ][ " schema" ]
4448 except (JSONDecodeError , KeyError ) as e :
4549 self .logger .error (
46- 'Retrieved schema is malformed: {errorType} {errorMessage}'
47- .format (errorType = type (e ), errorMessage = e ))
50+ "Retrieved schema is malformed: {errorType} {errorMessage}"
51+ .format (errorType = type (e ), errorMessage = e )
52+ )
4853 raise AvroClientError (
49- 'Retrieved schema is malformed: {errorType} {errorMessage}'
50- .format (errorType = type (e ), errorMessage = e )) from None
54+ "Retrieved schema is malformed: {errorType} {errorMessage}"
55+ .format (errorType = type (e ), errorMessage = e )
56+ ) from None
5157
5258
5359class AvroEncoder (AvroClient ):
@@ -63,18 +69,19 @@ def encode_record(self, record):
6369 Returns the encoded record as a byte string.
6470 """
6571 self .logger .debug (
66- 'Encoding record using {schema} schema' .format (
67- schema = self .schema .name ))
72+ "Encoding record using {schema} schema" .format (
73+ schema = self .schema .name )
74+ )
6875 datum_writer = DatumWriter (self .schema )
6976 with BytesIO () as output_stream :
7077 encoder = BinaryEncoder (output_stream )
7178 try :
7279 datum_writer .write (record , encoder )
7380 return output_stream .getvalue ()
7481 except AvroException as e :
75- self .logger .error (' Failed to encode record: {}' .format (e ))
82+ self .logger .error (" Failed to encode record: {}" .format (e ))
7683 raise AvroClientError (
77- ' Failed to encode record: {}' .format (e )) from None
84+ " Failed to encode record: {}" .format (e )) from None
7885
7986 def encode_batch (self , record_list ):
8087 """
@@ -83,8 +90,10 @@ def encode_batch(self, record_list):
8390 Returns a list of byte strings where each string is an encoded record.
8491 """
8592 self .logger .info (
86- 'Encoding ({num_rec}) records using {schema} schema' .format (
87- num_rec = len (record_list ), schema = self .schema .name ))
93+ "Encoding ({num_rec}) records using {schema} schema" .format (
94+ num_rec = len (record_list ), schema = self .schema .name
95+ )
96+ )
8897 encoded_records = []
8998 datum_writer = DatumWriter (self .schema )
9099 with BytesIO () as output_stream :
@@ -96,9 +105,10 @@ def encode_batch(self, record_list):
96105 output_stream .seek (0 )
97106 output_stream .truncate (0 )
98107 except AvroException as e :
99- self .logger .error (' Failed to encode record: {}' .format (e ))
108+ self .logger .error (" Failed to encode record: {}" .format (e ))
100109 raise AvroClientError (
101- 'Failed to encode record: {}' .format (e )) from None
110+ "Failed to encode record: {}" .format (e )
111+ ) from None
102112 return encoded_records
103113
104114
@@ -115,35 +125,37 @@ def decode_record(self, record):
115125
116126 Returns a dictionary where each key is a field in the schema.
117127 """
118- self .logger .info ('Decoding {rec} using {schema} schema'
119- .format (rec = record , schema = self .schema .name ))
128+ self .logger .info (
129+ "Decoding {rec} using {schema} schema" .format (
130+ rec = record , schema = self .schema .name
131+ )
132+ )
120133 bytes_input = base64 .b64decode (record ) if (
121134 isinstance (record , str )) else record
122- return self ._decode_binary (bytes_input )
123-
124- def _decode_binary (self , record ):
125135 datum_reader = DatumReader (self .schema )
126- with BytesIO (record ) as input_stream :
136+ with BytesIO (bytes_input ) as input_stream :
127137 decoder = BinaryDecoder (input_stream )
128138 try :
129139 return datum_reader .read (decoder )
130140 except Exception as e :
131- self .logger .error (' Failed to decode record: {}' .format (e ))
141+ self .logger .error (" Failed to decode record: {}" .format (e ))
132142 raise AvroClientError (
133- ' Failed to decode record: {}' .format (e )) from None
143+ " Failed to decode record: {}" .format (e )) from None
134144
135145 def decode_batch (self , record_list ):
136146 """
137147 Decodes a list of JSON records using the given Avro schema.
138148
139- Returns a list of strings where each string is an decoded record.
149+ Returns a list of strings where each string is a decoded record.
140150 """
141151 self .logger .info (
142- 'Encoding ({num_rec}) records using {schema} schema' .format (
143- num_rec = len (record_list ), schema = self .schema .name ))
152+ "Decoding ({num_rec}) records using {schema} schema" .format (
153+ num_rec = len (record_list ), schema = self .schema .name
154+ )
155+ )
144156 decoded_records = []
145157 for record in record_list :
146- decoded_record = self ._decode_binary (record )
158+ decoded_record = self .decode_record (record )
147159 decoded_records .append (decoded_record )
148160 return decoded_records
149161
0 commit comments