@@ -17,7 +17,7 @@ def __init__(self, lightbeam=None):
1717 self .logger = self .lightbeam .logger
1818 self .hashlog_data = {}
1919 self .start_timestamp = datetime .datetime .now ()
20-
20+
2121 # Sends all (selected) endpoints
2222 def send (self ):
2323
@@ -39,15 +39,15 @@ def send(self):
3939 endpoints = self .lightbeam .get_endpoints_with_data (self .lightbeam .endpoints )
4040 if len (endpoints )== 0 :
4141 self .logger .critical ("`data_dir` {0} has no *.jsonl files" .format (self .lightbeam .config ["data_dir" ]) + " for selected endpoints" )
42-
42+
4343 # send each endpoint
4444 for endpoint in endpoints :
4545 self .logger .info ("sending endpoint {0} ..." .format (endpoint ))
4646 asyncio .run (self .do_send (endpoint ))
4747 self .logger .info ("finished processing endpoint {0}!" .format (endpoint ))
4848 self .logger .info (" (final status counts: {0}) " .format (self .lightbeam .status_counts ))
4949 self .lightbeam .log_status_reasons ()
50-
50+
5151 ### Create structured output results_file if necessary
5252 self .end_timestamp = datetime .datetime .now ()
5353 self .metadata .update ({
@@ -62,18 +62,17 @@ def send(self):
6262 if "failures" in self .metadata ["resources" ][resource ].keys ():
6363 for idx , _ in enumerate (self .metadata ["resources" ][resource ]["failures" ]):
6464 self .metadata ["resources" ][resource ]["failures" ][idx ]["line_numbers" ].sort ()
65-
66-
65+
6766 # helper function used below
6867 def repl (m ):
6968 return re .sub (r"\s+" , '' , m .group (0 ))
70-
69+
7170 ### Create structured output results_file if necessary
7271 if self .lightbeam .results_file :
73-
72+
7473 # create directory if not exists
7574 os .makedirs (os .path .dirname (self .lightbeam .results_file ), exist_ok = True )
76-
75+
7776 with open (self .lightbeam .results_file , 'w' ) as fp :
7877 content = json .dumps (self .metadata , indent = 4 )
7978 # failures.line_numbers are split each on their own line; here we remove those line breaks
@@ -83,12 +82,11 @@ def repl(m):
8382 if self .metadata ["total_records_processed" ] == self .metadata ["total_records_skipped" ]:
8483 self .logger .info ("all payloads skipped" )
8584 exit (99 ) # signal to downstream tasks (in Airflow) all payloads skipped
86-
85+
8786 if self .metadata ["total_records_processed" ] == self .metadata ["total_records_failed" ]:
8887 self .logger .info ("all payloads failed" )
8988 exit (1 ) # signal to downstream tasks (in Airflow) all payloads failed
9089
91-
9290 # Sends a single endpoint
9391 async def do_send (self , endpoint ):
9492 # We try to avoid re-POSTing JSON we've already (successfully) sent.
@@ -101,7 +99,7 @@ async def do_send(self, endpoint):
10199 if self .lightbeam .track_state :
102100 hashlog_file = os .path .join (self .lightbeam .config ["state_dir" ], f"{ endpoint } .dat" )
103101 self .hashlog_data = hashlog .load (hashlog_file )
104-
102+
105103 self .metadata ["resources" ].update ({endpoint : {}})
106104 self .lightbeam .reset_counters ()
107105
@@ -116,45 +114,63 @@ async def do_send(self, endpoint):
116114 total_counter += 1
117115 data = line .strip ()
118116 # compute hash of current row
119- hash = hashlog .get_hash (data )
117+ data_hash = hashlog .get_hash (data )
120118 # check if we've posted this data before
121- if self .lightbeam .track_state and hash in self .hashlog_data .keys ():
119+ if (
120+ self .lightbeam .track_state
121+ and data_hash in self .hashlog_data .keys ()
122+ ):
122123 # check if the last post meets criteria for a resend
123- if self .lightbeam .meets_process_criteria (self .hashlog_data [hash ]):
124+ if self .lightbeam .meets_process_criteria (
125+ self .hashlog_data [data_hash ]
126+ ):
124127 # yes, we need to (re)post it; append to task queue
125- tasks .append (asyncio .create_task (
126- self .do_post (endpoint , file_name , data , line_counter , hash )))
128+ tasks .append (
129+ asyncio .create_task (
130+ self .do_post (
131+ endpoint ,
132+ file_name ,
133+ data ,
134+ line_counter ,
135+ data_hash ,
136+ )
137+ )
138+ )
127139 else :
128140 # no, do not (re)post
129141 self .lightbeam .num_skipped += 1
130142 continue
131143 else :
132144 # new, never-before-seen payload! append it to task queue
133- tasks .append (asyncio .create_task (
134- self .do_post (endpoint , file_name , data , line_counter , hash )))
135-
145+ tasks .append (
146+ asyncio .create_task (
147+ self .do_post (
148+ endpoint , file_name , data , line_counter , data_hash
149+ )
150+ )
151+ )
152+
136153 if total_counter % self .lightbeam .MAX_TASK_QUEUE_SIZE == 0 :
137154 await self .lightbeam .do_tasks (tasks , total_counter )
138155 tasks = []
139-
156+
140157 if self .lightbeam .num_skipped > 0 :
141158 self .logger .info ("skipped {0} of {1} payloads because they were previously processed and did not match any resend criteria" .format (self .lightbeam .num_skipped , total_counter ))
142159 if len (tasks )> 0 : await self .lightbeam .do_tasks (tasks , total_counter )
143-
160+
144161 # any task may have updated the hashlog, so we need to re-save it out to disk
145162 if self .lightbeam .track_state :
146163 hashlog .save (hashlog_file , self .hashlog_data )
147-
164+
148165 # update metadata counts for this endpoint
149166 self .metadata ["resources" ][endpoint ].update ({
150167 "records_processed" : total_counter ,
151168 "records_skipped" : self .lightbeam .num_skipped ,
152169 "records_failed" : self .lightbeam .num_errors
153170 })
154-
155-
171+
156172 # Posts a single data payload to a single endpoint
157- async def do_post (self , endpoint , file_name , data , line , hash ):
173+ async def do_post (self , endpoint , file_name , data , line , data_hash ):
158174 curr_token_version = int (str (self .lightbeam .token_version ))
159175 while True : # this is not great practice, but an effective way (along with the `break` below) to achieve a do:while loop
160176 try :
@@ -170,7 +186,7 @@ async def do_post(self, endpoint, file_name, data, line, hash):
170186 # update status_counts (for every-second status update)
171187 self .lightbeam .increment_status_counts (status )
172188 self .lightbeam .num_finished += 1
173-
189+
174190 # warn about errors
175191 if response .status not in [ 200 , 201 ]:
176192 message = str (response .status ) + ": " + util .linearize (json .loads (body ).get ("message" ))
@@ -198,12 +214,16 @@ async def do_post(self, endpoint, file_name, data, line, hash):
198214 self .lightbeam .increment_status_reason (message )
199215 if response .status == 400 :
200216 raise Exception (message )
201- else : self .lightbeam .num_errors += 1
202-
217+ else :
218+ self .lightbeam .num_errors += 1
219+
203220 # update hashlog
204221 if self .lightbeam .track_state :
205- self .hashlog_data [hash ] = (round (time .time ()), response .status )
206-
222+ self .hashlog_data [data_hash ] = (
223+ round (time .time ()),
224+ response .status ,
225+ )
226+
207227 break # (out of while loop)
208228
209229 else : # 401 status
@@ -216,12 +236,11 @@ async def do_post(self, endpoint, file_name, data, line, hash):
216236 else :
217237 await asyncio .sleep (1 )
218238 curr_token_version = int (str (self .lightbeam .token_version ))
219-
239+
220240 except RuntimeError as e :
221241 await asyncio .sleep (1 )
222242 except Exception as e :
223243 status = 400
224244 self .lightbeam .num_errors += 1
225245 self .logger .warn ("{0} (at line {1} of {2} )" .format (str (e ), line , file_name ))
226246 break
227-
0 commit comments