88import logging
99import threading
1010import logging .config
11- from Queue import Queue
11+ try :
12+ from Queue import Queue
13+ except ImportError :
14+ from queue import Queue # pylint: disable=import-error
1215
1316import yaml
1417from CMSMonitoring .StompAMQ import StompAMQ
@@ -99,14 +102,7 @@ def getCompletedWorkflowsFromDb(configPath):
99102 """
100103 Get completed workflow list from local status db (setup to avoid unnecessary caching)
101104
102- Workflows whose status is one of
103-
104- - *running-closed*
105- - *completed*
106- - *aborted-archived*
107- - *rejected-arcived*
108-
109- are removed from further caching.
105+ Workflows whose status ends with *archived* are removed from further caching.
110106
111107 :param str configPath: location of config file
112108 :returns: list of workflow (str)
@@ -127,7 +123,7 @@ def getCompletedWorkflowsFromDb(configPath):
127123 status TEXT,
128124 failurerate REAL
129125 );"""
130- DB_QUERY_CMD = """SELECT * FROM workflowStatuses WHERE status IN ('running-closed', 'completed', 'aborted- archived', 'rejected-archived') """
126+ DB_QUERY_CMD = """SELECT * FROM workflowStatuses WHERE status LIKE '% archived'"""
131127
132128 res = []
133129 conn = sqlite3 .connect (dbPath )
@@ -205,20 +201,21 @@ def buildDoc(configpath):
205201
206202 wc .invalidate_caches ('/tmp/wsi/workflowinfo' )
207203
208- q = TimeoutQueue ()
209- num_threads = min (150 , len (wkfs ))
210- for wf in wkfs :
211- q .put (wf )
212-
213204 results = list ()
214- for _ in range (num_threads ):
215- t = threading .Thread (target = worker , args = (results , q , completedWfs , ))
216- t .daemon = True
217- t .start ()
218- try :
219- q .join_with_timeout (30 * 60 ) # timeout 30min
220- except NotFinished :
221- pass
205+
206+ q = TimeoutQueue (maxsize = 500 )
207+ num_threads = 500
208+ for i , wf in enumerate (wkfs , 1 ):
209+ q .put (wf )
210+ if q .full () or i == len (wkfs ):
211+ for _ in range (num_threads ):
212+ t = threading .Thread (target = worker , args = (results , q , completedWfs , ))
213+ t .daemon = True
214+ t .start ()
215+ try :
216+ q .join_with_timeout (30 * 60 ) # timeout 30min
217+ except NotFinished :
218+ pass
222219
223220 updateWorkflowStatusToDb (configpath , results )
224221 logger .info ('Number of updated workflows: {}' .format (len (results )))
@@ -255,7 +252,7 @@ def sendDoc(cred, docs):
255252
256253 doctype = 'workflowmonit_{}' .format (cred ['producer' ])
257254 notifications = [amq .make_notification (
258- payload = doc , docType = doctype ) for doc in docs ]
255+ payload = doc , docType = doctype )[ 0 ] for doc in docs ]
259256 failures = amq .send (notifications )
260257
261258 logger .info ("{}/{} docs successfully sent to AMQ." .format (
@@ -265,6 +262,7 @@ def sendDoc(cred, docs):
265262 except Exception as e :
266263 logger .exception (
267264 "Failed to send data to StompAMQ. Error: {}" .format (str (e )))
265+ raise
268266
269267
270268def main ():
@@ -291,17 +289,18 @@ def main():
291289
292290 doc_bkp = os .path .join (LOGDIR , 'toSendDoc_{}' .format (
293291 time .strftime ('%y%m%d-%H%M%S' )))
294- wc .save_json (docs , doc_bkp )
295- logger .info ('Document saved at: {}.json ' .format (doc_bkp ))
292+ docfn = wc .save_json (docs , filename = doc_bkp , gzipped = True )
293+ logger .info ('Document saved at: {}' .format (docfn ))
296294
297295 failures = sendDoc (cred = cred , docs = docs )
298296
299297 failedDocs_bkp = os .path .join (
300298 LOGDIR , 'amqFailedMsg_{}' .format (time .strftime ('%y%m%d-%H%M%S' )))
301299 if len (failures ):
302- wc .save_json (failures , failedDocs_bkp )
303- logger .info ('Failed message saved at: {}.json ' .format (failedDocs_bkp ))
300+ failedDocFn = wc .save_json (failures , filename = failedDocs_bkp , gzipped = True )
301+ logger .info ('Failed message saved at: {}' .format (failedDocFn ))
304302 except Exception as e :
303+ logger .exception ("Exception encounted, sending emails to {}" .format (str (recipients )))
305304 ad .errorEmailShooter (str (e ), recipients )
306305
307306
0 commit comments