This repository was archived by the owner on Sep 12, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 51
Expand file tree
/
Copy pathinvalidator.py
More file actions
executable file
·98 lines (89 loc) · 4.09 KB
/
invalidator.py
File metadata and controls
executable file
·98 lines (89 loc) · 4.09 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
#!/usr/bin/env python
from assignSession import *
from McMClient import McMClient
from utils import workflowInfo, invalidate
import reqMgrClient
from utils import componentInfo, setDatasetStatus, sendLog
from collections import defaultdict
import time
def invalidator(url, invalid_status='INVALID'):
use_mcm = True
up = componentInfo(ignore=['wtc','jira'])
if not up.check(): return
mcm = McMClient(dev=False)
invalids = mcm.getA('invalidations',query='status=announced')
if not invalids: return
print len(invalids),"Object to be invalidated"
text_to_batch = defaultdict(str)
text_to_request = defaultdict(str)
for invalid in invalids:
acknowledge= False
pid = invalid['prepid']
batch_lookup = invalid['prepid']
text = ""
if invalid['type'] == 'request':
wfn = invalid['object']
print "need to invalidate the workflow",wfn
wfo = session.query(Workflow).filter(Workflow.name == wfn).first()
if wfo:
## set forget of that thing (although checkor will recover from it)
print "setting the status of",wfo.status,"to forget"
wfo.status = 'forget'
session.commit()
else:
## do not go on like this, do not acknoledge it
print wfn,"is set to be rejected, but we do not know about it yet"
#continue
wfi = workflowInfo(url, wfn)
success = "not rejected"
## to do, we should find a way to reject the workflow and any related acdc
successes = invalidate(url, wfi, only_resub=True, with_output=False)
wfi.sendLog('invalidator',"rejection is performed from McM invalidations request")
acknowledge= all(successes)
text = "The workflow %s (%s) was rejected due to invalidation in McM" % ( wfn, pid )
batch_lookup = wfn ##so that the batch id is taken as the one containing the workflow name
elif invalid['type'] == 'dataset':
dataset = invalid['object']
if '?' in dataset: continue
if 'None' in dataset: continue
if 'None-' in dataset: continue
if 'FAKE-' in dataset: continue
print "setting",dataset,"to",invalid_status
success = setDatasetStatus(dataset , invalid_status )
if success:
acknowledge= True
text = "The dataset %s (%s) was set INVALID due to invalidation in McM" % ( dataset, pid )
else:
msg = "Could not invalidate {}. Please consider contacting data management team for manual intervention.".format(dataset)
print(msg)
sendLog('invalidator', msg, level='critical')
else:
print "\t\t",invalid['type']," type not recognized"
if acknowledge:
## acknoldge invalidation in mcm, provided we can have the api
print "acknowledgment to mcm"
ackno_url = '/restapi/invalidations/acknowledge/%s'%( invalid['_id'] )
print "at",ackno_url
mcm.get(ackno_url)
# prepare the text for batches
batches = []
batches.extend(mcm.getA('batches',query='contains=%s'%batch_lookup))
batches = filter(lambda b : b['status'] in ['announced','done','reset'], batches)
if len(batches):
bid = batches[-1]['prepid']
print "batch nofication to",bid
text_to_batch[bid] += text+"\n\n"
# prepare the text for requests
text_to_request[pid] += text+"\n\n"
for bid,text in text_to_batch.items():
if not text: continue
text += '\n This is an automated message'
mcm.put('/restapi/batches/notify',{ "notes" : text, "prepid" : bid})
pass
for pid,text in text_to_request.items():
if not text: continue
text += '\n This is an automated message'
mcm.put('/restapi/requests/notify',{ "message" : text, "prepids" : [pid]})
if __name__ == "__main__":
url = 'cmsweb.cern.ch'
invalidator(url)