1+ import re
12import os
23import time
34import json
@@ -56,20 +57,16 @@ def send(self):
5657 "total_records_skipped" : sum (item ['records_skipped' ] for item in self .metadata ["resources" ].values ()),
5758 "total_records_failed" : sum (item ['records_failed' ] for item in self .metadata ["resources" ].values ())
5859 })
59- # total up counts by message and status
60- for resource , resource_metadata in self .metadata ["resources" ].items ():
61- if "failed_statuses" in resource_metadata .keys ():
62- for status , status_metadata in resource_metadata ["failed_statuses" ].items ():
63- total_num_errs = 0
64- for message , message_metadata in status_metadata .items ():
65- for file , file_metadata in message_metadata ["files" ].items ():
66- num_errs = len (file_metadata ["line_numbers" ])
67- file_metadata .update ({
68- "count" : num_errs ,
69- "line_numbers" : "," .join (str (x ) for x in file_metadata ["line_numbers" ])
70- })
71- total_num_errs += num_errs
72- status_metadata .update ({"count" : total_num_errs })
60+ # sort failing line numbers
61+ for resource in self .metadata ["resources" ].keys ():
62+ if "failures" in self .metadata ["resources" ][resource ].keys ():
63+ for idx , _ in enumerate (self .metadata ["resources" ][resource ]["failures" ]):
64+ self .metadata ["resources" ][resource ]["failures" ][idx ]["line_numbers" ].sort ()
65+
66+
67+ # helper function used below
68+ def repl (m ):
69+ return re .sub (r"\s+" , '' , m .group (0 ))
7370
7471 ### Create structured output results_file if necessary
7572 if self .lightbeam .results_file :
@@ -78,7 +75,10 @@ def send(self):
7875 os .makedirs (os .path .dirname (self .lightbeam .results_file ), exist_ok = True )
7976
8077 with open (self .lightbeam .results_file , 'w' ) as fp :
81- fp .write (json .dumps (self .metadata , indent = 4 ))
78+ content = json .dumps (self .metadata , indent = 4 )
79+ # failures.line_numbers are split each on their own line; here we remove those line breaks
80+ content = re .sub (r'"line_numbers": \[(\d|,|\s|\n)*\]' , repl , content )
81+ fp .write (content )
8282
8383 if self .metadata ["total_records_processed" ] == self .metadata ["total_records_skipped" ]:
8484 self .logger .info ("all payloads skipped" )
@@ -112,7 +112,7 @@ async def do_send(self, endpoint):
112112 for file_name in data_files :
113113 with open (file_name ) as file :
114114 # process each line
115- for line in file :
115+ for line_counter , line in enumerate ( file ) :
116116 total_counter += 1
117117 data = line .strip ()
118118 # compute hash of current row
@@ -123,15 +123,15 @@ async def do_send(self, endpoint):
123123 if self .lightbeam .meets_process_criteria (self .hashlog_data [hash ]):
124124 # yes, we need to (re)post it; append to task queue
125125 tasks .append (asyncio .create_task (
126- self .do_post (endpoint , file_name , data , total_counter , hash )))
126+ self .do_post (endpoint , file_name , data , line_counter , hash )))
127127 else :
128128 # no, do not (re)post
129129 self .lightbeam .num_skipped += 1
130130 continue
131131 else :
132132 # new, never-before-seen payload! append it to task queue
133133 tasks .append (asyncio .create_task (
134- self .do_post (endpoint , file_name , data , total_counter , hash )))
134+ self .do_post (endpoint , file_name , data , line_counter , hash )))
135135
136136 if total_counter % self .lightbeam .MAX_TASK_QUEUE_SIZE == 0 :
137137 await self .lightbeam .do_tasks (tasks , total_counter )
@@ -176,19 +176,23 @@ async def do_post(self, endpoint, file_name, data, line, hash):
176176 message = str (response .status ) + ": " + util .linearize (json .loads (body ).get ("message" ))
177177
178178 # update run metadata...
179- failed_statuses_dict = self .metadata ["resources" ][endpoint ].get ("failed_statuses" , {})
180- if response .status not in failed_statuses_dict .keys ():
181- failed_statuses_dict .update ({response .status : {}})
182- if message not in failed_statuses_dict [response .status ].keys ():
183- failed_statuses_dict [response .status ].update ({message : {}})
184- if "files" not in failed_statuses_dict [response .status ][message ].keys ():
185- failed_statuses_dict [response .status ][message ].update ({"files" : {}})
186- if file_name not in failed_statuses_dict [response .status ][message ]["files" ].keys ():
187- failed_statuses_dict [response .status ][message ]["files" ].update ({file_name : {}})
188- if "line_numbers" not in failed_statuses_dict [response .status ][message ]["files" ][file_name ].keys ():
189- failed_statuses_dict [response .status ][message ]["files" ][file_name ].update ({"line_numbers" : []})
190- failed_statuses_dict [response .status ][message ]["files" ][file_name ]["line_numbers" ].append (line )
191- self .metadata ["resources" ][endpoint ]["failed_statuses" ] = failed_statuses_dict
179+ failures = self .metadata ["resources" ][endpoint ].get ("failures" , [])
180+ do_append = True
181+ for index , item in enumerate (failures ):
182+ if item ["status_code" ]== response .status and item ["message" ]== message and item ["file" ]== file_name :
183+ failures [index ]["line_numbers" ].append (line )
184+ failures [index ]["count" ] += 1
185+ do_append = False
186+ if do_append :
187+ failure = {
188+ 'status_code' : response .status ,
189+ 'message' : message ,
190+ 'file' : file_name ,
191+ 'line_numbers' : [line ],
192+ 'count' : 1
193+ }
194+ failures .append (failure )
195+ self .metadata ["resources" ][endpoint ]["failures" ] = failures
192196
193197 # update output and counters
194198 self .lightbeam .increment_status_reason (message )
0 commit comments