22"""
33This scripts creates the overall job reports for monitoring in SSB
44Should be set as a cronjob @15 min
5- Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, Running*.txt and Pending*.txt ( * in types )
5+ Creates the following files: SSB_siteInfo.json, SSB_voBoxInfo.json, CondorJobs_Workflows.json, Running*.txt and Pending*.txt ( * in types )
66"""
77
8- import sys,os,re,urllib,urllib2,subprocess,time
8+ import sys,os,re,urllib,urllib2,subprocess,time,smtplib,os
99from datetime import datetime
10+ from email.MIMEMultipart import MIMEMultipart
11+ from email.MIMEBase import MIMEBase
12+ from email.MIMEText import MIMEText
13+ from email.Utils import COMMASPACE, formatdate
14+ from email import Encoders
1015try:
1116 import json
1217except ImportError:
3439overview_pending_vobox = {}# Pending per vobox
3540json_name_vobox = "SSBCERN_voBoxInfo.json" # Output json file name
3641
42+ ##Counting Workflows
43+ overview_workflows = {}
44+ json_name_workflows = "CondorJobs_Workflows.json" # Output json file name
45+
3746##SSB plot links
3847site_link = "http://dashb-ssb.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site="
3948overalls_link = "http://dashb-ssb-dev.cern.ch/dashboard/templates/sitePendingRunningJobs.html?site=All%20"
4251jobTypes = ['Processing', 'Production', 'Skim', 'Harvest', 'Merge', 'LogCollect', 'Cleanup', 'RelVal', 'T0']
4352t0Types = ['Repack', 'Express', 'Reco']
4453
54+ # Mailing list for notifications
55+ mailingList = ['cms-comp-ops-workflow-team@cern.ch']
56+
4557def createSiteList():
4658 """
4759 Creates a initial site list with the data from site status in Dashboard
@@ -171,6 +183,51 @@ def increasePendingVoBox(sched,type):
171183 """
172184 overview_pending_vobox[sched][type] += 1
173185
186+ def increaseRunningWorkflow(workflow,siteToExtract):
187+ """
188+ Increases the number of running jobs per workflow
189+ """
190+ if workflow not in overview_workflows.keys():
191+ addWorkflow(workflow)
192+ if siteToExtract in overview_workflows[workflow]['runningJobs'].keys():
193+ overview_workflows[workflow]['runningJobs'][siteToExtract] += 1
194+ overview_workflows[workflow]['condorJobs'] += 1
195+ else:
196+ overview_workflows[workflow]['runningJobs'][siteToExtract] = 1
197+ overview_workflows[workflow]['condorJobs'] += 1
198+ else:
199+ if siteToExtract in overview_workflows[workflow]['runningJobs'].keys():
200+ overview_workflows[workflow]['runningJobs'][siteToExtract] += 1
201+ overview_workflows[workflow]['condorJobs'] += 1
202+ else:
203+ overview_workflows[workflow]['runningJobs'][siteToExtract] = 1
204+ overview_workflows[workflow]['condorJobs'] += 1
205+
206+ def increasePendingWorkflow(workflow,siteToExtract):
207+ """
208+ Increases the number of pending jobs per workflow
209+ """
210+ if workflow not in overview_workflows.keys():
211+ addWorkflow(workflow)
212+ overview_workflows[workflow]['condorJobs'] += 1
213+ overview_workflows[workflow]['pendingJobs'] += 1
214+ overview_workflows[workflow]['desiredSites'] = overview_workflows[workflow]['desiredSites'].union(set(siteToExtract))
215+ else:
216+ overview_workflows[workflow]['condorJobs'] += 1
217+ overview_workflows[workflow]['pendingJobs'] += 1
218+ overview_workflows[workflow]['desiredSites'] = overview_workflows[workflow]['desiredSites'].union(set(siteToExtract))
219+
220+ def addWorkflow(workflow):
221+ """
222+ Add a new workflow to overview_workflows
223+ """
224+ overview_workflows[workflow] = {
225+ 'condorJobs' : 0,
226+ 'runningJobs' : {},
227+ 'pendingJobs' : 0,
228+ 'desiredSites' : set()
229+ }
230+
174231def findTask(id,sched,typeToExtract):
175232 """
176233 This deduces job type from given info about scheduler and taskName
@@ -404,6 +461,40 @@ def jsonDict(json_name,currTime,date,hour,key):
404461 jsonfile.write(json.dumps(update,sort_keys=True, indent=3))
405462 jsonfile.close()
406463
464+ def send_mail(send_from, send_to, subject, text, files=[], server="localhost"):
465+ """
466+ Method to send emails
467+ """
468+ assert isinstance(send_to, list)
469+ assert isinstance(files, list)
470+
471+ msg = MIMEMultipart()
472+ msg['From'] = send_from
473+ msg['To'] = COMMASPACE.join(send_to)
474+ msg['Date'] = formatdate(localtime=True)
475+ msg['Subject'] = subject
476+
477+ msg.attach( MIMEText(text) )
478+
479+ for f in files:
480+ part = MIMEBase('application', "octet-stream")
481+ part.set_payload( open(f,"rb").read() )
482+ Encoders.encode_base64(part)
483+ part.add_header('Content-Disposition', 'attachment; filename="%s"' % os.path.basename(f))
484+ msg.attach(part)
485+
486+ smtp = smtplib.SMTP(server)
487+ smtp.sendmail(send_from, send_to, msg.as_string())
488+ smtp.close()
489+
490+ def set_default(obj):
491+ """
492+ JSON enconder doesnt support sets, parse them to lists
493+ """
494+ if isinstance(obj, set):
495+ return list(obj)
496+ raise TypeError
497+
407498def main():
408499 """
409500 Main algorithm
@@ -430,10 +521,14 @@ def main():
430521 out, err = proc.communicate()
431522 for line in err.split('\n') :
432523 if 'Error' in line:
433- listcommand="bash send_email.sh %s ; " % col
434- proc = subprocess.Popen(listcommand, stderr = subprocess.PIPE,stdout = subprocess.PIPE, shell = True)
435- out2, err2 = proc.communicate()
436- print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email' % col
524+ body_text = 'There is a problem with one of the collectors! The monitoring scripts will give false information:\n\n'
525+ body_text += ' /afs/cern.ch/user/c/cmst1/scratch0/WFM_Input_DashBoard/WFMonDBShort.py\n\n'
526+ body_text += 'See the log file in the same directory for the error output\n'
527+ send_mail('luis89@fnal.gov',
528+ mailingList,
529+ '[Monitoring] Condor Collector %s Error' % col,
530+ body_text)
531+ print 'ERROR: I find a problem while getting schedulers for collector %s, I will send an email to: %s' % (col, str(mailingList))
437532 #print out2, '\n', "Error: ", '\n', err2
438533 break
439534 for line in out.split('\n') :
@@ -472,6 +567,7 @@ def main():
472567 # --> new software len(array) ={6,7} depending if the job is already running in a site
473568 id = array[0]
474569 status = array[1]
570+ workflow = array[2].split('/')[1]
475571 task = array[2].split('/')[-1]
476572 siteToExtract = array[3].replace(' ', '').split(",")
477573
@@ -493,9 +589,11 @@ def main():
493589 if status == "2":
494590 increaseRunning(siteToExtract[0],type) # I assume one job can only run at one site
495591 increaseRunningVoBox(sched.replace(".","_").strip(),type)
592+ increaseRunningWorkflow(workflow,siteToExtract[0])
496593 elif status == "1":
497594 temp_pending.append([type,siteToExtract])
498595 increasePendingVoBox(sched.replace(".","_").strip(),type)
596+ increasePendingWorkflow(workflow,siteToExtract)
499597 else: # We do not care about jobs in another status (condor job status: https://htcondor-wiki.cs.wisc.edu/index.cgi/wiki?p=MagicNumbers)
500598 continue
501599 print "INFO: Full condor status pooling is done"
@@ -522,11 +620,15 @@ def main():
522620 print "INFO: Smart pending site counting done \n"
523621
524622 # Handling jobs that failed task extraction logic
525- if jobs_failedTypeLogic != {}:
526- command="bash failedLogic_email.sh \"%s\"" % str(jobs_failedTypeLogic)
527- proc = subprocess.Popen(command, stderr = subprocess.PIPE,stdout = subprocess.PIPE, shell = True)
528- out, err = proc.communicate()
529- print 'ERROR: I find jobs that failed the type assignment logic, I will send an email'
623+ if jobs_failedTypeLogic != {}:
624+ body_text = 'There is a problem with the logic to deduce job type from the condor data.n'
625+ body_text += 'Please have a look to the following jobs:\n\n'
626+ body_text += '%s' % str(jobs_failedTypeLogic)
627+ send_mail('luis89@fnal.gov',
628+ mailingList,
629+ '[Monitoring] Failed task type logic problem',
630+ body_text)
631+ print 'ERROR: I find jobs that failed the type assignment logic, I will send an email to: %s' % str(mailingList)
530632 #print out, '\n', "Error: ", '\n', err
531633
532634 # Adding sites not in either of running/pending overviews
@@ -549,6 +651,11 @@ def main():
549651 # Creates json file (This is needed for plots per site)
550652 jsonDict( json_name_sites, currTime, date, hour, 'site')
551653
654+ # Creates json file for jobs per workflow
655+ jsonfile = open(json_name_workflows,'w+')
656+ jsonfile.write(json.dumps(overview_workflows, default=set_default, sort_keys=True, indent=4))
657+ jsonfile.close()
658+
552659 print 'INFO: The script has finished after: ', datetime.now()-starttime
553660
554661if __name__ == "__main__":
0 commit comments