1+ import re
12import os
23import time
34import json
@@ -56,20 +57,11 @@ def send(self):
5657 "total_records_skipped" : sum (item ['records_skipped' ] for item in self .metadata ["resources" ].values ()),
5758 "total_records_failed" : sum (item ['records_failed' ] for item in self .metadata ["resources" ].values ())
5859 })
59- # total up counts by message and status
60- for resource , resource_metadata in self .metadata ["resources" ].items ():
61- if "failed_statuses" in resource_metadata .keys ():
62- for status , status_metadata in resource_metadata ["failed_statuses" ].items ():
63- total_num_errs = 0
64- for message , message_metadata in status_metadata .items ():
65- for file , file_metadata in message_metadata ["files" ].items ():
66- num_errs = len (file_metadata ["line_numbers" ])
67- file_metadata .update ({
68- "count" : num_errs ,
69- "line_numbers" : "," .join (str (x ) for x in file_metadata ["line_numbers" ])
70- })
71- total_num_errs += num_errs
72- status_metadata .update ({"count" : total_num_errs })
60+ # sort failing line numbers
61+ for resource in self .metadata ["resources" ].keys ():
62+ if "failures" in self .metadata ["resources" ][resource ].keys ():
63+ for idx , _ in enumerate (self .metadata ["resources" ][resource ]["failures" ]):
64+ self .metadata ["resources" ][resource ]["failures" ][idx ]["line_numbers" ].sort ()
7365
7466 ### Create structured output results_file if necessary
7567 if self .lightbeam .results_file :
@@ -78,7 +70,12 @@ def send(self):
7870 os .makedirs (os .path .dirname (self .lightbeam .results_file ), exist_ok = True )
7971
8072 with open (self .lightbeam .results_file , 'w' ) as fp :
81- fp .write (json .dumps (self .metadata , indent = 4 ))
73+ content = json .dumps (self .metadata , indent = 4 )
74+ # failures.line_numbers are split each on their own line; here we remove those line breaks
75+ def repl (m ):
76+ return m .group (0 ).replace (' ' , '' ).replace ('\t ' , '' ).replace ('\n ' , '' )
77+ content = re .sub (r'"line_numbers": \[(\d|,|\s|\n)*\]' , repl , content )
78+ fp .write (content )
8279
8380 if self .metadata ["total_records_processed" ] == self .metadata ["total_records_skipped" ]:
8481 self .logger .info ("all payloads skipped" )
@@ -112,8 +109,10 @@ async def do_send(self, endpoint):
112109 for file_name in data_files :
113110 with open (file_name ) as file :
114111 # process each line
112+ line_counter = 0
115113 for line in file :
116114 total_counter += 1
115+ line_counter += 1
117116 data = line .strip ()
118117 # compute hash of current row
119118 hash = hashlog .get_hash (data )
@@ -123,15 +122,15 @@ async def do_send(self, endpoint):
123122 if self .lightbeam .meets_process_criteria (self .hashlog_data [hash ]):
124123 # yes, we need to (re)post it; append to task queue
125124 tasks .append (asyncio .create_task (
126- self .do_post (endpoint , file_name , data , total_counter , hash )))
125+ self .do_post (endpoint , file_name , data , line_counter , hash )))
127126 else :
128127 # no, do not (re)post
129128 self .lightbeam .num_skipped += 1
130129 continue
131130 else :
132131 # new, never-before-seen payload! append it to task queue
133132 tasks .append (asyncio .create_task (
134- self .do_post (endpoint , file_name , data , total_counter , hash )))
133+ self .do_post (endpoint , file_name , data , line_counter , hash )))
135134
136135 if total_counter % self .lightbeam .MAX_TASK_QUEUE_SIZE == 0 :
137136 await self .lightbeam .do_tasks (tasks , total_counter )
@@ -176,19 +175,26 @@ async def do_post(self, endpoint, file_name, data, line, hash):
176175 message = str (response .status ) + ": " + util .linearize (json .loads (body ).get ("message" ))
177176
178177 # update run metadata...
179- failed_statuses_dict = self .metadata ["resources" ][endpoint ].get ("failed_statuses" , {})
180- if response .status not in failed_statuses_dict .keys ():
181- failed_statuses_dict .update ({response .status : {}})
182- if message not in failed_statuses_dict [response .status ].keys ():
183- failed_statuses_dict [response .status ].update ({message : {}})
184- if "files" not in failed_statuses_dict [response .status ][message ].keys ():
185- failed_statuses_dict [response .status ][message ].update ({"files" : {}})
186- if file_name not in failed_statuses_dict [response .status ][message ]["files" ].keys ():
187- failed_statuses_dict [response .status ][message ]["files" ].update ({file_name : {}})
188- if "line_numbers" not in failed_statuses_dict [response .status ][message ]["files" ][file_name ].keys ():
189- failed_statuses_dict [response .status ][message ]["files" ][file_name ].update ({"line_numbers" : []})
190- failed_statuses_dict [response .status ][message ]["files" ][file_name ]["line_numbers" ].append (line )
191- self .metadata ["resources" ][endpoint ]["failed_statuses" ] = failed_statuses_dict
178+ status_code = response .status
179+ file = file_name
180+ line_number = line
181+ failures = self .metadata ["resources" ][endpoint ].get ("failures" , [])
182+ do_append = True
183+ for index , item in enumerate (failures ):
184+ if item ["status_code" ]== status_code and item ["message" ]== message and item ["file" ]== file :
185+ failures [index ]["line_numbers" ].append (line_number )
186+ failures [index ]["count" ] += 1
187+ do_append = False
188+ if do_append :
189+ failure = {
190+ 'status_code' : status_code ,
191+ 'message' : message ,
192+ 'file' : file ,
193+ 'line_numbers' : [line ],
194+ 'count' : 1
195+ }
196+ failures .append (failure )
197+ self .metadata ["resources" ][endpoint ]["failures" ] = failures
192198
193199 # update output and counters
194200 self .lightbeam .increment_status_reason (message )
0 commit comments