11import avro .schema
22import base64
3- import json
43import requests
54
65from avro .errors import AvroException
@@ -109,39 +108,18 @@ class AvroDecoder(AvroClient):
109108 Platform API endpoint from which to fetch the schema in JSON format.
110109 """
111110
112- def decode_record (self , record , encoding = "binary" ):
111+ def decode_record (self , record ):
113112 """
114113 Decodes a single record represented either as a byte or
115114 base64 string, using the given Avro schema.
116115
117116 Returns a dictionary where each key is a field in the schema.
118117 """
119- self .logger .info ('Decoding {rec} of type {type} using {schema} schema'
120- .format (rec = record , type = encoding ,
121- schema = self .schema .name ))
122-
123- if encoding == "base64" :
124- return self ._decode_base64 (record )
125- elif encoding == "binary" :
126- return self ._decode_binary (record )
127- else :
128- self .logger .error (
129- 'Failed to decode record due to encoding type: {}'
130- .format (encoding ))
131- raise AvroClientError (
132- 'Invalid encoding type: {}' .format (encoding ))
133-
134- def _decode_base64 (self , record ):
135- decoded_data = base64 .b64decode (record ).decode ("utf-8" )
136- try :
137- return json .loads (decoded_data )
138- except Exception as e :
139- if isinstance (decoded_data , bytes ):
140- self ._decode_binary (decoded_data )
141- else :
142- self .logger .error ('Failed to decode record: {}' .format (e ))
143- raise AvroClientError (
144- 'Failed to decode record: {}' .format (e )) from None
118+ self .logger .info ('Decoding {rec} using {schema} schema'
119+ .format (rec = record , schema = self .schema .name ))
120+ bytes_input = base64 .b64decode (record ) if (
121+ isinstance (record , str )) else record
122+ return self ._decode_binary (bytes_input )
145123
146124 def _decode_binary (self , record ):
147125 datum_reader = DatumReader (self .schema )
@@ -154,6 +132,21 @@ def _decode_binary(self, record):
154132 raise AvroClientError (
155133 'Failed to decode record: {}' .format (e )) from None
156134
135+ def decode_batch (self , record_list ):
136+ """
137+ Decodes a list of JSON records using the given Avro schema.
138+
139+ Returns a list of strings where each string is an decoded record.
140+ """
141+ self .logger .info (
142+ 'Encoding ({num_rec}) records using {schema} schema' .format (
143+ num_rec = len (record_list ), schema = self .schema .name ))
144+ decoded_records = []
145+ for record in record_list :
146+ decoded_record = self ._decode_binary (record )
147+ decoded_records .append (decoded_record )
148+ return decoded_records
149+
157150
158151class AvroClientError (Exception ):
159152 def __init__ (self , message = None ):
0 commit comments