22"""
33This scripts creates the overall job reports for monitoring in SSB
44Should be set as a cronjob @15 min
5- Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, CondorJobs_Workflows.json, Running*.txt and Pending*.txt ( * in types )
5+ Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, Running*.txt and Pending*.txt ( * in types )
66"""
77
8- import sys ,os ,re ,urllib ,urllib2 ,subprocess ,time , smtplib , os
8+ import sys ,os ,re ,urllib ,urllib2 ,subprocess ,time
99from datetime import datetime
10- from email .MIMEMultipart import MIMEMultipart
11- from email .MIMEBase import MIMEBase
12- from email .MIMEText import MIMEText
13- from email .Utils import COMMASPACE , formatdate
14- from email import Encoders
1510try :
1611 import json
1712except ImportError :
3934overview_pending_vobox = {}# Pending per vobox
4035json_name_vobox = "SSBCERN_voBoxInfo.json" # Output json file name
4136
42- ##Counting Workflows
43- overview_workflows = {}
44- json_name_workflows = "CondorJobs_Workflows.json" # Output json file name
45-
4637##SSB plot links
4738site_link = "http://dashb-ssb.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site="
4839overalls_link = "http://dashb-ssb-dev.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site=All%20"
5142jobTypes = ['Processing' , 'Production' , 'Skim' , 'Harvest' , 'Merge' , 'LogCollect' , 'Cleanup' , 'RelVal' , 'T0' ]
5243t0Types = ['Repack' , 'Express' , 'Reco' ]
5344
54- # Mailing list for notifications
55- mailingList = ['cms-comp-ops-workflow-team@cern.ch' ]
56-
5745def createSiteList ():
5846 """
5947 Creates a initial site list with the data from site status in Dashboard
@@ -183,51 +171,6 @@ def increasePendingVoBox(sched,type):
183171 """
184172 overview_pending_vobox [sched ][type ] += 1
185173
186- def increaseRunningWorkflow (workflow ,siteToExtract ):
187- """
188- Increases the number of running jobs per workflow
189- """
190- if workflow not in overview_workflows .keys ():
191- addWorkflow (workflow )
192- if siteToExtract in overview_workflows [workflow ]['runningJobs' ].keys ():
193- overview_workflows [workflow ]['runningJobs' ][siteToExtract ] += 1
194- overview_workflows [workflow ]['condorJobs' ] += 1
195- else :
196- overview_workflows [workflow ]['runningJobs' ][siteToExtract ] = 1
197- overview_workflows [workflow ]['condorJobs' ] += 1
198- else :
199- if siteToExtract in overview_workflows [workflow ]['runningJobs' ].keys ():
200- overview_workflows [workflow ]['runningJobs' ][siteToExtract ] += 1
201- overview_workflows [workflow ]['condorJobs' ] += 1
202- else :
203- overview_workflows [workflow ]['runningJobs' ][siteToExtract ] = 1
204- overview_workflows [workflow ]['condorJobs' ] += 1
205-
206- def increasePendingWorkflow (workflow ,siteToExtract ):
207- """
208- Increases the number of pending jobs per workflow
209- """
210- if workflow not in overview_workflows .keys ():
211- addWorkflow (workflow )
212- overview_workflows [workflow ]['condorJobs' ] += 1
213- overview_workflows [workflow ]['pendingJobs' ] += 1
214- overview_workflows [workflow ]['desiredSites' ] = overview_workflows [workflow ]['desiredSites' ].union (set (siteToExtract ))
215- else :
216- overview_workflows [workflow ]['condorJobs' ] += 1
217- overview_workflows [workflow ]['pendingJobs' ] += 1
218- overview_workflows [workflow ]['desiredSites' ] = overview_workflows [workflow ]['desiredSites' ].union (set (siteToExtract ))
219-
220- def addWorkflow (workflow ):
221- """
222- Add a new workflow to overview_workflows
223- """
224- overview_workflows [workflow ] = {
225- 'condorJobs' : 0 ,
226- 'runningJobs' : {},
227- 'pendingJobs' : 0 ,
228- 'desiredSites' : set ()
229- }
230-
231174def findTask (id ,sched ,typeToExtract ):
232175 """
233176 This deduces job type from given info about scheduler and taskName
@@ -461,40 +404,6 @@ def jsonDict(json_name,currTime,date,hour,key):
461404 jsonfile .write (json .dumps (update ,sort_keys = True , indent = 3 ))
462405 jsonfile .close ()
463406
464- def send_mail (send_from , send_to , subject , text , files = [], server = "localhost" ):
465- """
466- Method to send emails
467- """
468- assert isinstance (send_to , list )
469- assert isinstance (files , list )
470-
471- msg = MIMEMultipart ()
472- msg ['From' ] = send_from
473- msg ['To' ] = COMMASPACE .join (send_to )
474- msg ['Date' ] = formatdate (localtime = True )
475- msg ['Subject' ] = subject
476-
477- msg .attach ( MIMEText (text ) )
478-
479- for f in files :
480- part = MIMEBase ('application' , "octet-stream" )
481- part .set_payload ( open (f ,"rb" ).read () )
482- Encoders .encode_base64 (part )
483- part .add_header ('Content-Disposition' , 'attachment; filename="%s"' % os .path .basename (f ))
484- msg .attach (part )
485-
486- smtp = smtplib .SMTP (server )
487- smtp .sendmail (send_from , send_to , msg .as_string ())
488- smtp .close ()
489-
490- def set_default (obj ):
491- """
492- JSON enconder doesnt support sets, parse them to lists
493- """
494- if isinstance (obj , set ):
495- return list (obj )
496- raise TypeError
497-
498407def main ():
499408 """
500409 Main algorithm
@@ -521,14 +430,10 @@ def main():
521430 out , err = proc .communicate ()
522431 for line in err .split ('\n ' ) :
523432 if 'Error' in line :
524- body_text = 'There is a problem with one of the collectors! The monitoring scripts will give false information:\n \n '
525- body_text += ' /afs/cern.ch/user/c/cmst1/scratch0/WFM_Input_DashBoard/WFMonDBShort.py\n \n '
526- body_text += 'See the log file in the same directory for the error output\n '
527- send_mail ('luis89@fnal.gov' ,
528- mailingList ,
529- '[Monitoring] Condor Collector %s Error' % col ,
530- body_text )
531- print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email to: %s' % (col , str (mailingList ))
433+ listcommand = "bash send_email.sh %s ; " % col
434+ proc = subprocess .Popen (listcommand , stderr = subprocess .PIPE ,stdout = subprocess .PIPE , shell = True )
435+ out2 , err2 = proc .communicate ()
436+ print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email' % col
532437 #print out2, '\n', "Error: ", '\n', err2
533438 break
534439 for line in out .split ('\n ' ) :
@@ -567,7 +472,6 @@ def main():
567472 # --> new software len(array) ={6,7} depending if the job is already running in a site
568473 id = array [0 ]
569474 status = array [1 ]
570- workflow = array [2 ].split ('/' )[1 ]
571475 task = array [2 ].split ('/' )[- 1 ]
572476 siteToExtract = array [3 ].replace (' ' , '' ).split ("," )
573477
@@ -589,11 +493,9 @@ def main():
589493 if status == "2" :
590494 increaseRunning (siteToExtract [0 ],type ) # I assume one job can only run at one site
591495 increaseRunningVoBox (sched .replace ("." ,"_" ).strip (),type )
592- increaseRunningWorkflow (workflow ,siteToExtract [0 ])
593496 elif status == "1" :
594497 temp_pending .append ([type ,siteToExtract ])
595498 increasePendingVoBox (sched .replace ("." ,"_" ).strip (),type )
596- increasePendingWorkflow (workflow ,siteToExtract )
597499 else : # We do not care about jobs in another status (condor job status: https://htcondor-wiki.cs.wisc.edu/index.cgi/wiki?p=MagicNumbers)
598500 continue
599501 print "INFO: Full condor status pooling is done"
@@ -620,15 +522,11 @@ def main():
620522 print "INFO: Smart pending site counting done \n "
621523
622524 # Handling jobs that failed task extraction logic
623- if jobs_failedTypeLogic != {}:
624- body_text = 'There is a problem with the logic to deduce job type from the condor data.n'
625- body_text += 'Please have a look to the following jobs:\n \n '
626- body_text += '%s' % str (jobs_failedTypeLogic )
627- send_mail ('luis89@fnal.gov' ,
628- mailingList ,
629- '[Monitoring] Failed task type logic problem' ,
630- body_text )
631- print 'ERROR: I find jobs that failed the type assignment logic, I will send an email to: %s' % str (mailingList )
525+ if jobs_failedTypeLogic != {}:
526+ command = "bash failedLogic_email.sh \" %s\" " % str (jobs_failedTypeLogic )
527+ proc = subprocess .Popen (command , stderr = subprocess .PIPE ,stdout = subprocess .PIPE , shell = True )
528+ out , err = proc .communicate ()
529+ print 'ERROR: I find jobs that failed the type assignment logic, I will send an email'
632530 #print out, '\n', "Error: ", '\n', err
633531
634532 # Adding sites not in either of running/pending overviews
@@ -651,11 +549,6 @@ def main():
651549 # Creates json file (This is needed for plots per site)
652550 jsonDict ( json_name_sites , currTime , date , hour , 'site' )
653551
654- # Creates json file for jobs per workflow
655- jsonfile = open (json_name_workflows ,'w+' )
656- jsonfile .write (json .dumps (overview_workflows , default = set_default , sort_keys = True , indent = 4 ))
657- jsonfile .close ()
658-
659552 print 'INFO: The script has finished after: ' , datetime .now ()- starttime
660553
661554if __name__ == "__main__" :
0 commit comments