Skip to content

Commit 7a92f39

Browse files
author
Daniel Abercrombie
authored
Merge pull request #129 from phylsix/master
py3 for workflowmonit subdir
2 parents 7d95994 + 1816709 commit 7a92f39

7 files changed

Lines changed: 162 additions & 42 deletions

File tree

workflowmonit/alertingDefs.py

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
#!/usr/bin/env python
2+
from __future__ import print_function
3+
import time
4+
import json
5+
import smtplib
6+
from email.mime.text import MIMEText
7+
8+
9+
def onFailureRate(doc, thres=0.5):
10+
"""
11+
check a workflow (represented by `doc`), if its failureRate is larger than
12+
0.5 AND running time is > 2days, then ALERT.
13+
14+
:param doc dict: information describe a workflow
15+
:param thres float: threshold
16+
:returns: (judge result, short msg if failed)
17+
"""
18+
19+
res = (False, '')
20+
if doc['status'] != 'running-open': return res
21+
if doc['failureRate'] < thres: return res
22+
runningOpen = [
23+
tr for tr in doc['transitions'] if tr['Status'] == 'running-open'
24+
]
25+
if not runningOpen: return res
26+
27+
runningOpenTime = runningOpen[0]['UpdateTime']
28+
if time.time() - runningOpenTime < 2 * 24 * 60 * 60: return res
29+
30+
failMsg = 'FailureRate ({}) larger than threshold({}), while running time over 2 days (started at {})'.format(
31+
doc['failureRate'], thres, time.ctime(runningOpenTime))
32+
33+
return (True, failMsg)
34+
35+
36+
AlertDefs = [
37+
onFailureRate,
38+
]
39+
40+
41+
def alertWithEmail(docs, recipients):
42+
"""
43+
handling docs with alert emails.
44+
45+
46+
:param docs list: list of documents
47+
:param recipients list: list of recipients email addresses
48+
"""
49+
50+
sender = 'toolsandint-workflowmonitalert@cern.ch'
51+
52+
for doc in docs:
53+
alertResults = [ad(doc) for ad in AlertDefs]
54+
positiveRes = filter(lambda d: d[0], alertResults)
55+
if positiveRes:
56+
shortAlertMsgs = [x[1] for x in positiveRes]
57+
_contentMsg = '\n\n'.join([
58+
'*** THIS IS A GENERATED MESSAGE, PLEASE DO NOT REPLY ***',
59+
'Workflow: {}'.format(doc['name']),
60+
'Short Summary:\n{}'.format('\n'.join([
61+
'- {}'.format(s) for s in shortAlertMsgs
62+
])),
63+
'-'* 79,
64+
'Full document:\n{}'.format(
65+
json.dumps(
66+
doc, sort_keys=True, indent=4, separators=(',', ': ')))
67+
])
68+
69+
contentMsg = MIMEText(_contentMsg)
70+
contentMsg['Subject'] = '[workflowmonit] Alert on * {} *'.format(
71+
doc['name'])
72+
contentMsg['From'] = sender
73+
contentMsg['To'] = ', '.join(recipients)
74+
s = smtplib.SMTP('localhost')
75+
s.sendmail(sender, recipients, contentMsg.as_string())
76+
s.quit()
77+
78+
79+
def errorEmailShooter(msg, recipients):
80+
"""
81+
forward the error message to recipients by emails
82+
83+
:param msg str: error mesages
84+
:param recipients list: list of recipients email address
85+
"""
86+
87+
sender = 'toolsandint-workflowmonitalert@cern.ch'
88+
89+
contentMsg = MIMEText(msg)
90+
contentMsg['Subject'] = 'Exception caught for workflowmonit'
91+
contentMsg['From'] = sender
92+
contentMsg['To'] = ', '.join(recipients)
93+
s = smtplib.SMTP('localhost')
94+
s.sendmail(sender, recipients, contentMsg.as_string())
95+
s.quit()
96+
97+
98+
def main():
99+
100+
import os
101+
testdoc = os.path.join(
102+
os.path.dirname(os.path.abspath(__file__)),
103+
'Logs/toSendDoc_190317-033802.json')
104+
docs = json.load(open(testdoc))
105+
print([(d['name'], d['failureRate']) for d in docs])
106+
107+
alertWithEmail(docs, ['weinan.si@cern.ch', ])
108+
109+
110+
111+
if __name__ == "__main__":
112+
main()

workflowmonit/dumpWfStatusDb.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#!/user/bin/env python
2+
from __future__ import print_function
23
import os
34
import sys
45
import sqlite3

workflowmonit/requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
schedule
22
git+https://github.com/dmwm/WMCore.git
33
stomp.py
4+
pyyaml>=5.1

workflowmonit/sendToMonit.py

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import yaml
1414
from workflowmonit.stompAMQ import stompAMQ
1515
import workflowmonit.workflowCollector as wc
16+
import workflowmonit.alertingDefs as ad
1617

1718
CRED_FILE_PATH = os.path.join(os.path.dirname(
1819
os.path.abspath(__file__)), 'credential.yml')
@@ -268,31 +269,40 @@ def sendDoc(cred, docs):
268269

269270
def main():
270271

271-
with open(LOGGING_CONFIG, 'r') as f:
272-
config = yaml.safe_load(f.read())
273-
logging.config.dictConfig(config)
272+
recipients = wc.get_yamlconfig(CONFIG_FILE_PATH).get('alert_recipients', [])
274273

275-
global logger
276-
logger = logging.getLogger('workflowmonitLogger')
274+
try:
275+
with open(LOGGING_CONFIG, 'r') as f:
276+
config = yaml.safe_load(f.read())
277+
logging.config.dictConfig(config)
278+
279+
global logger
280+
logger = logging.getLogger('workflowmonitLogger')
277281

278-
cred = wc.get_yamlconfig(CRED_FILE_PATH)
279-
docs = buildDoc(CONFIG_FILE_PATH)
282+
cred = wc.get_yamlconfig(CRED_FILE_PATH)
283+
docs = buildDoc(CONFIG_FILE_PATH)
280284

281-
if not os.path.isdir(LOGDIR):
282-
os.makedirs(LOGDIR)
285+
# handling alerts
286+
ad.alertWithEmail(docs, recipients)
283287

284-
doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format(
285-
time.strftime('%y%m%d-%H%M%S')))
286-
wc.save_json(docs, doc_bkp)
287-
logger.info('Document saved at: {}.json'.format(doc_bkp))
288+
# backup documents
289+
if not os.path.isdir(LOGDIR):
290+
os.makedirs(LOGDIR)
288291

289-
failures = sendDoc(cred=cred, docs=docs)
292+
doc_bkp = os.path.join(LOGDIR, 'toSendDoc_{}'.format(
293+
time.strftime('%y%m%d-%H%M%S')))
294+
wc.save_json(docs, doc_bkp)
295+
logger.info('Document saved at: {}.json'.format(doc_bkp))
290296

291-
failedDocs_bkp = os.path.join(
292-
LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S')))
293-
if len(failures):
294-
wc.save_json(failures, failedDocs_bkp)
295-
logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp))
297+
failures = sendDoc(cred=cred, docs=docs)
298+
299+
failedDocs_bkp = os.path.join(
300+
LOGDIR, 'amqFailedMsg_{}'.format(time.strftime('%y%m%d-%H%M%S')))
301+
if len(failures):
302+
wc.save_json(failures, failedDocs_bkp)
303+
logger.info('Failed message saved at: {}.json'.format(failedDocs_bkp))
304+
except Exception as e:
305+
ad.errorEmailShooter(str(e), recipients)
296306

297307

298308
if __name__ == "__main__":

workflowmonit/stompAMQ.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
#!/usr/bin/env python
2-
from __future__ import print_function
3-
42
import json
53

64
from WMCore.Services.StompAMQ.StompAMQ import StompAMQ

workflowmonit/workflowCollector.py

Lines changed: 19 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ def get_yamlconfig(configPath):
4444
return {}
4545

4646
try:
47-
return yaml.load(open(configPath).read())
47+
return yaml.load(open(configPath).read(), Loader=yaml.FullLoader)
4848
except:
4949
return {}
5050

@@ -290,7 +290,7 @@ def error_logs(workflow):
290290
if not wf_stepinfo:
291291
return error_logs
292292

293-
for stepname, stepdata in wf_stepinfo.iteritems():
293+
for stepname, stepdata in wf_stepinfo.items():
294294
_taskName = stepname.split('/')[-1]
295295
# Get the errors from both 'jobfailed' and 'submitfailed' details
296296
for error, sitedata in [
@@ -301,7 +301,7 @@ def error_logs(workflow):
301301
continue
302302
_errorcode = int(error)
303303

304-
for _sitename, siteinfo in sitedata.iteritems():
304+
for _sitename, siteinfo in sitedata.items():
305305
_errorsamples = list()
306306

307307
for sample in siteinfo['samples']:
@@ -383,18 +383,18 @@ def error_summary(workflow):
383383
if not errorInfo:
384384
return error_summary
385385

386-
for fullTaskName, taskErrors in errorInfo.iteritems():
386+
for fullTaskName, taskErrors in errorInfo.items():
387387
taskName = fullTaskName.split('/')[-1]
388388
if not taskName:
389389
continue
390390

391391
errorList = list()
392392
noReportSite = list(taskErrors.get('NotReported', {}).keys())
393-
for errorCode, siteCnt in taskErrors.iteritems():
393+
for errorCode, siteCnt in taskErrors.items():
394394
if errorCode == 'NotReported':
395395
continue
396396

397-
for siteName, counts in siteCnt.iteritems():
397+
for siteName, counts in siteCnt.items():
398398
errorList.append({
399399
'errorCode': int(errorCode),
400400
'siteName': siteName,
@@ -457,16 +457,16 @@ def populate_error_for_workflow(workflow):
457457
workflow_summary['transitions'] = requestTransition
458458

459459
nfailure = 0
460-
for agent, agentdata in agentJobInfo.iteritems():
460+
for agent, agentdata in agentJobInfo.items():
461461
status = agentdata.get('status', {})
462462
tasks = agentdata.get('tasks', {})
463463
if not all([status, tasks]):
464464
continue
465465

466-
for ftype, num in status.get('failure', {}).iteritems():
466+
for ftype, num in status.get('failure', {}).items():
467467
nfailure += num
468468

469-
for taskFullName, taskData in tasks.iteritems():
469+
for taskFullName, taskData in tasks.items():
470470
taskName = taskFullName.split('/')[-1]
471471

472472
inputTask = None
@@ -479,13 +479,13 @@ def populate_error_for_workflow(workflow):
479479
taskSiteError = dict()
480480

481481
if taskStatus and taskStatus.get('failure', {}):
482-
for site, siteData in taskData.get('sites', {}).iteritems():
482+
for site, siteData in taskData.get('sites', {}).items():
483483
errCnt = 0
484484
errCnts = siteData.get('failure', {})
485485
if not errCnts:
486486
continue
487487

488-
for ftype, cnt in errCnts.iteritems():
488+
for ftype, cnt in errCnts.items():
489489
errCnt += cnt
490490

491491
taskSiteError[site] = errCnt
@@ -499,7 +499,7 @@ def populate_error_for_workflow(workflow):
499499
if 'siteErrors' not in _task.keys():
500500
_task["siteErrors"] = taskSiteError
501501
else:
502-
for site, errors in taskSiteError.iteritems():
502+
for site, errors in taskSiteError.items():
503503
if site in _task["siteErrors"].keys():
504504
_task["siteErrors"][site] += errors
505505
else:
@@ -517,7 +517,7 @@ def populate_error_for_workflow(workflow):
517517

518518
# remove tasks that does not have any error
519519
taskToDel = list()
520-
for taskname, taskinfo in workflow_summary['tasks'].iteritems():
520+
for taskname, taskinfo in workflow_summary['tasks'].items():
521521
if 'siteErrors' in taskinfo and (not taskinfo['siteErrors']):
522522
taskToDel.append(taskname)
523523
for taskname in taskToDel:
@@ -531,17 +531,17 @@ def populate_error_for_workflow(workflow):
531531
wf_errorLog = error_logs(workflow)
532532

533533
# add information from errorSummary
534-
for taskName, taskErrors in wf_errorSummary.iteritems():
534+
for taskName, taskErrors in wf_errorSummary.items():
535535
if taskName in workflow_summary['tasks'].keys():
536536
workflow_summary['tasks'][taskName].update(taskErrors)
537537

538538
# add information from errorLog
539-
for taskName, taskErrorLogInfo in wf_errorLog.iteritems():
539+
for taskName, taskErrorLogInfo in wf_errorLog.items():
540540

541541
if taskName not in workflow_summary['tasks'].keys():
542542
continue
543-
for errorCode, siteInfo in taskErrorLogInfo.iteritems():
544-
for site, info in siteInfo.iteritems():
543+
for errorCode, siteInfo in taskErrorLogInfo.items():
544+
for site, info in siteInfo.items():
545545

546546
for e in workflow_summary['tasks'][taskName].get('errors', []):
547547
if e.get('siteName', None) != site:
@@ -562,13 +562,13 @@ def populate_error_for_workflow(workflow):
562562

563563
# last step, nest in task key(TaskName) as a key-value pair
564564
tasksAsList = []
565-
for taskname, taskinfo in workflow_summary['tasks'].iteritems():
565+
for taskname, taskinfo in workflow_summary['tasks'].items():
566566
taskinfo.update({"name": taskname})
567567
taskinfo['siteErrors'] = [
568568
{
569569
"site": site,
570570
"counts": counts
571-
} for site, counts in taskinfo['siteErrors'].iteritems()
571+
} for site, counts in taskinfo['siteErrors'].items()
572572
] # convert 'siteErrors' from a dict to a list of dict
573573
tasksAsList.append(taskinfo)
574574
workflow_summary['tasks'] = tasksAsList

workflowmonit/workflowMonitScheduler.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,4 @@
11
#!/usr/bin/env python
2-
from __future__ import print_function
3-
42
import os
53
import time
64

0 commit comments

Comments
 (0)