22"""
33This scripts creates the overall job reports for monitoring in SSB
44Should be set as a cronjob @15 min
5- Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, Running*.txt and Pending*.txt ( * in types )
5+ Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, CondorJobs_Workflows.json, Running*.txt and Pending*.txt ( * in types )
66"""
77
8- import sys ,os ,re ,urllib ,urllib2 ,subprocess ,time
8+ import sys ,os ,re ,urllib ,urllib2 ,subprocess ,time , smtplib , os
99from datetime import datetime
10+ from email .MIMEMultipart import MIMEMultipart
11+ from email .MIMEBase import MIMEBase
12+ from email .MIMEText import MIMEText
13+ from email .Utils import COMMASPACE , formatdate
14+ from email import Encoders
1015try :
1116 import json
1217except ImportError :
3439overview_pending_vobox = {}# Pending per vobox
3540json_name_vobox = "SSBCERN_voBoxInfo.json" # Output json file name
3641
42+ ##Counting Workflows
43+ overview_workflows = {}
44+ json_name_workflows = "CondorJobs_Workflows.json" # Output json file name
45+
3746##SSB plot links
3847site_link = "http://dashb-ssb.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site="
3948overalls_link = "http://dashb-ssb-dev.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site=All%20"
4251jobTypes = ['Processing' , 'Production' , 'Skim' , 'Harvest' , 'Merge' , 'LogCollect' , 'Cleanup' , 'RelVal' , 'T0' ]
4352t0Types = ['Repack' , 'Express' , 'Reco' ]
4453
54+ # Mailing list for notifications
55+ mailingList = ['cms-comp-ops-workflow-team@cern.ch' ]
56+
4557def createSiteList ():
4658 """
4759 Creates a initial site list with the data from site status in Dashboard
@@ -171,6 +183,51 @@ def increasePendingVoBox(sched,type):
171183 """
172184 overview_pending_vobox [sched ][type ] += 1
173185
186+ def increaseRunningWorkflow (workflow ,siteToExtract ):
187+ """
188+ Increases the number of running jobs per workflow
189+ """
190+ if workflow not in overview_workflows .keys ():
191+ addWorkflow (workflow )
192+ if siteToExtract in overview_workflows [workflow ]['runningJobs' ].keys ():
193+ overview_workflows [workflow ]['runningJobs' ][siteToExtract ] += 1
194+ overview_workflows [workflow ]['condorJobs' ] += 1
195+ else :
196+ overview_workflows [workflow ]['runningJobs' ][siteToExtract ] = 1
197+ overview_workflows [workflow ]['condorJobs' ] += 1
198+ else :
199+ if siteToExtract in overview_workflows [workflow ]['runningJobs' ].keys ():
200+ overview_workflows [workflow ]['runningJobs' ][siteToExtract ] += 1
201+ overview_workflows [workflow ]['condorJobs' ] += 1
202+ else :
203+ overview_workflows [workflow ]['runningJobs' ][siteToExtract ] = 1
204+ overview_workflows [workflow ]['condorJobs' ] += 1
205+
206+ def increasePendingWorkflow (workflow ,siteToExtract ):
207+ """
208+ Increases the number of pending jobs per workflow
209+ """
210+ if workflow not in overview_workflows .keys ():
211+ addWorkflow (workflow )
212+ overview_workflows [workflow ]['condorJobs' ] += 1
213+ overview_workflows [workflow ]['pendingJobs' ] += 1
214+ overview_workflows [workflow ]['desiredSites' ] = overview_workflows [workflow ]['desiredSites' ].union (set (siteToExtract ))
215+ else :
216+ overview_workflows [workflow ]['condorJobs' ] += 1
217+ overview_workflows [workflow ]['pendingJobs' ] += 1
218+ overview_workflows [workflow ]['desiredSites' ] = overview_workflows [workflow ]['desiredSites' ].union (set (siteToExtract ))
219+
220+ def addWorkflow (workflow ):
221+ """
222+ Add a new workflow to overview_workflows
223+ """
224+ overview_workflows [workflow ] = {
225+ 'condorJobs' : 0 ,
226+ 'runningJobs' : {},
227+ 'pendingJobs' : 0 ,
228+ 'desiredSites' : set ()
229+ }
230+
174231def findTask (id ,sched ,typeToExtract ):
175232 """
176233 This deduces job type from given info about scheduler and taskName
@@ -404,6 +461,40 @@ def jsonDict(json_name,currTime,date,hour,key):
404461 jsonfile .write (json .dumps (update ,sort_keys = True , indent = 3 ))
405462 jsonfile .close ()
406463
464+ def send_mail (send_from , send_to , subject , text , files = [], server = "localhost" ):
465+ """
466+ Method to send emails
467+ """
468+ assert isinstance (send_to , list )
469+ assert isinstance (files , list )
470+
471+ msg = MIMEMultipart ()
472+ msg ['From' ] = send_from
473+ msg ['To' ] = COMMASPACE .join (send_to )
474+ msg ['Date' ] = formatdate (localtime = True )
475+ msg ['Subject' ] = subject
476+
477+ msg .attach ( MIMEText (text ) )
478+
479+ for f in files :
480+ part = MIMEBase ('application' , "octet-stream" )
481+ part .set_payload ( open (f ,"rb" ).read () )
482+ Encoders .encode_base64 (part )
483+ part .add_header ('Content-Disposition' , 'attachment; filename="%s"' % os .path .basename (f ))
484+ msg .attach (part )
485+
486+ smtp = smtplib .SMTP (server )
487+ smtp .sendmail (send_from , send_to , msg .as_string ())
488+ smtp .close ()
489+
490+ def set_default (obj ):
491+ """
492+ JSON enconder doesnt support sets, parse them to lists
493+ """
494+ if isinstance (obj , set ):
495+ return list (obj )
496+ raise TypeError
497+
407498def main ():
408499 """
409500 Main algorithm
@@ -430,10 +521,14 @@ def main():
430521 out , err = proc .communicate ()
431522 for line in err .split ('\n ' ) :
432523 if 'Error' in line :
433- listcommand = "bash send_email.sh %s ; " % col
434- proc = subprocess .Popen (listcommand , stderr = subprocess .PIPE ,stdout = subprocess .PIPE , shell = True )
435- out2 , err2 = proc .communicate ()
436- print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email' % col
524+ body_text = 'There is a problem with one of the collectors! The monitoring scripts will give false information:\n \n '
525+ body_text += ' /afs/cern.ch/user/c/cmst1/scratch0/WFM_Input_DashBoard/WFMonDBShort.py\n \n '
526+ body_text += 'See the log file in the same directory for the error output\n '
527+ send_mail ('luis89@fnal.gov' ,
528+ mailingList ,
529+ '[Monitoring] Condor Collector %s Error' % col ,
530+ body_text )
531+ print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email to: %s' % (col , str (mailingList ))
437532 #print out2, '\n', "Error: ", '\n', err2
438533 break
439534 for line in out .split ('\n ' ) :
@@ -472,6 +567,7 @@ def main():
472567 # --> new software len(array) ={6,7} depending if the job is already running in a site
473568 id = array [0 ]
474569 status = array [1 ]
570+ workflow = array [2 ].split ('/' )[1 ]
475571 task = array [2 ].split ('/' )[- 1 ]
476572 siteToExtract = array [3 ].replace (' ' , '' ).split ("," )
477573
@@ -493,9 +589,11 @@ def main():
493589 if status == "2" :
494590 increaseRunning (siteToExtract [0 ],type ) # I assume one job can only run at one site
495591 increaseRunningVoBox (sched .replace ("." ,"_" ).strip (),type )
592+ increaseRunningWorkflow (workflow ,siteToExtract [0 ])
496593 elif status == "1" :
497594 temp_pending .append ([type ,siteToExtract ])
498595 increasePendingVoBox (sched .replace ("." ,"_" ).strip (),type )
596+ increasePendingWorkflow (workflow ,siteToExtract )
499597 else : # We do not care about jobs in another status (condor job status: https://htcondor-wiki.cs.wisc.edu/index.cgi/wiki?p=MagicNumbers)
500598 continue
501599 print "INFO: Full condor status pooling is done"
@@ -522,11 +620,15 @@ def main():
522620 print "INFO: Smart pending site counting done \n "
523621
524622 # Handling jobs that failed task extraction logic
525- if jobs_failedTypeLogic != {}:
526- command = "bash failedLogic_email.sh \" %s\" " % str (jobs_failedTypeLogic )
527- proc = subprocess .Popen (command , stderr = subprocess .PIPE ,stdout = subprocess .PIPE , shell = True )
528- out , err = proc .communicate ()
529- print 'ERROR: I find jobs that failed the type assignment logic, I will send an email'
623+ if jobs_failedTypeLogic != {}:
624+ body_text = 'There is a problem with the logic to deduce job type from the condor data.n'
625+ body_text += 'Please have a look to the following jobs:\n \n '
626+ body_text += '%s' % str (jobs_failedTypeLogic )
627+ send_mail ('luis89@fnal.gov' ,
628+ mailingList ,
629+ '[Monitoring] Failed task type logic problem' ,
630+ body_text )
631+ print 'ERROR: I find jobs that failed the type assignment logic, I will send an email to: %s' % str (mailingList )
530632 #print out, '\n', "Error: ", '\n', err
531633
532634 # Adding sites not in either of running/pending overviews
@@ -549,6 +651,11 @@ def main():
549651 # Creates json file (This is needed for plots per site)
550652 jsonDict ( json_name_sites , currTime , date , hour , 'site' )
551653
654+ # Creates json file for jobs per workflow
655+ jsonfile = open (json_name_workflows ,'w+' )
656+ jsonfile .write (json .dumps (overview_workflows , default = set_default , sort_keys = True , indent = 4 ))
657+ jsonfile .close ()
658+
552659 print 'INFO: The script has finished after: ' , datetime .now ()- starttime
553660
554661if __name__ == "__main__" :
0 commit comments