33import time
44import json
55import asyncio
6- import datetime
76
87from lightbeam import util
98from lightbeam import hashlog
@@ -16,22 +15,10 @@ def __init__(self, lightbeam=None):
1615 self .lightbeam .reset_counters ()
1716 self .logger = self .lightbeam .logger
1817 self .hashlog_data = {}
19- self .start_timestamp = datetime .datetime .now ()
2018
2119 # Sends all (selected) endpoints
2220 def send (self ):
2321
24- # Initialize a dictionary for tracking run metadata (for structured output)
25- self .metadata = {
26- "started_at" : self .start_timestamp .isoformat (timespec = 'microseconds' ),
27- "working_dir" : os .getcwd (),
28- "config_file" : self .lightbeam .config_file ,
29- "data_dir" : self .lightbeam .config ["data_dir" ],
30- "api_url" : self .lightbeam .config ["edfi_api" ]["base_url" ],
31- "namespace" : self .lightbeam .config ["namespace" ],
32- "resources" : {}
33- }
34-
3522 # get token with which to send requests
3623 self .lightbeam .api .do_oauth ()
3724
@@ -47,43 +34,15 @@ def send(self):
4734 self .logger .info ("finished processing endpoint {0}!" .format (endpoint ))
4835 self .logger .info (" (final status counts: {0}) " .format (self .lightbeam .status_counts ))
4936 self .lightbeam .log_status_reasons ()
37+
38+ # write structured output (if needed)
39+ self .lightbeam .write_structured_output ("send" )
5040
51- ### Create structured output results_file if necessary
52- self .end_timestamp = datetime .datetime .now ()
53- self .metadata .update ({
54- "completed_at" : self .end_timestamp .isoformat (timespec = 'microseconds' ),
55- "runtime_sec" : (self .end_timestamp - self .start_timestamp ).total_seconds (),
56- "total_records_processed" : sum (item ['records_processed' ] for item in self .metadata ["resources" ].values ()),
57- "total_records_skipped" : sum (item ['records_skipped' ] for item in self .metadata ["resources" ].values ()),
58- "total_records_failed" : sum (item ['records_failed' ] for item in self .metadata ["resources" ].values ())
59- })
60- # sort failing line numbers
61- for resource in self .metadata ["resources" ].keys ():
62- if "failures" in self .metadata ["resources" ][resource ].keys ():
63- for idx , _ in enumerate (self .metadata ["resources" ][resource ]["failures" ]):
64- self .metadata ["resources" ][resource ]["failures" ][idx ]["line_numbers" ].sort ()
65-
66- # helper function used below
67- def repl (m ):
68- return re .sub (r"\s+" , '' , m .group (0 ))
69-
70- ### Create structured output results_file if necessary
71- if self .lightbeam .results_file :
72-
73- # create directory if not exists
74- os .makedirs (os .path .dirname (self .lightbeam .results_file ), exist_ok = True )
75-
76- with open (self .lightbeam .results_file , 'w' ) as fp :
77- content = json .dumps (self .metadata , indent = 4 )
78- # failures.line_numbers are split each on their own line; here we remove those line breaks
79- content = re .sub (r'"line_numbers": \[(\d|,|\s|\n)*\]' , repl , content )
80- fp .write (content )
81-
82- if self .metadata ["total_records_processed" ] == self .metadata ["total_records_skipped" ]:
41+ if self .lightbeam .metadata ["total_records_processed" ] == self .lightbeam .metadata ["total_records_skipped" ]:
8342 self .logger .info ("all payloads skipped" )
8443 exit (99 ) # signal to downstream tasks (in Airflow) all payloads skipped
8544
86- if self .metadata ["total_records_processed" ] == self .metadata ["total_records_failed" ]:
45+ if self .lightbeam . metadata ["total_records_processed" ] == self . lightbeam .metadata ["total_records_failed" ]:
8746 self .logger .info ("all payloads failed" )
8847 exit (1 ) # signal to downstream tasks (in Airflow) all payloads failed
8948
@@ -100,7 +59,7 @@ async def do_send(self, endpoint):
10059 hashlog_file = os .path .join (self .lightbeam .config ["state_dir" ], f"{ endpoint } .dat" )
10160 self .hashlog_data = hashlog .load (hashlog_file )
10261
103- self .metadata ["resources" ].update ({endpoint : {}})
62+ self .lightbeam . metadata ["resources" ].update ({endpoint : {}})
10463 self .lightbeam .reset_counters ()
10564
10665 # process each file
@@ -169,8 +128,8 @@ async def do_send(self, endpoint):
169128 if status >= 200 and status < 300 :
170129 successes .append ({"status_code" : status , "count" : self .lightbeam .status_counts [status ]})
171130 if len (successes )> 0 :
172- self .metadata ["resources" ][endpoint ].update ({"successes" : successes })
173- self .metadata ["resources" ][endpoint ].update ({
131+ self .lightbeam . metadata ["resources" ][endpoint ].update ({"successes" : successes })
132+ self .lightbeam . metadata ["resources" ][endpoint ].update ({
174133 "records_processed" : total_counter ,
175134 "records_skipped" : self .lightbeam .num_skipped ,
176135 "records_failed" : self .lightbeam .num_errors
@@ -199,7 +158,7 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
199158 message = str (response .status ) + ": " + util .linearize (json .loads (body ).get ("message" ))
200159
201160 # update run metadata...
202- failures = self .metadata ["resources" ][endpoint ].get ("failures" , [])
161+ failures = self .lightbeam . metadata ["resources" ][endpoint ].get ("failures" , [])
203162 do_append = True
204163 for index , item in enumerate (failures ):
205164 if item ["status_code" ]== response .status and item ["message" ]== message and item ["file" ]== file_name :
@@ -215,7 +174,7 @@ async def do_post(self, endpoint, file_name, data, line, data_hash):
215174 'count' : 1
216175 }
217176 failures .append (failure )
218- self .metadata ["resources" ][endpoint ]["failures" ] = failures
177+ self .lightbeam . metadata ["resources" ][endpoint ]["failures" ] = failures
219178
220179 # update output and counters
221180 self .lightbeam .increment_status_reason (message )
0 commit comments