Skip to content
This repository was archived by the owner on Sep 12, 2024. It is now read-only.

Commit 22100fd

Browse files
authored
Merge pull request #519 from amaltaro/fix-wmcore-9527
Remove all input data placement; force transferor to set wflows to staged
2 parents a9aa106 + 169e65c commit 22100fd

3 files changed

Lines changed: 40 additions & 222 deletions

File tree

Unified/assignor.py

Lines changed: 14 additions & 209 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,16 @@
22
from assignSession import *
33
import reqMgrClient
44
from utils import workflowInfo, campaignInfo, siteInfo, userLock, unifiedConfiguration, reqmgr_url, monitor_pub_dir, monitor_dir, global_SI
5-
from utils import getWorkLoad, getDatasetPresence, getDatasets, findCustodialLocation, getDatasetBlocksFraction, getDatasetEventsPerLumi, getLFNbase, getDatasetBlocks, lockInfo, isHEPCloudReady, do_html_in_each_module
5+
from utils import getDatasetEventsPerLumi, getLFNbase, lockInfo, isHEPCloudReady, do_html_in_each_module
66
from utils import componentInfo, sendEmail, sendLog, getWorkflows, closeAllBlocks, eosRead
77
#from utils import lockInfo
8-
from utils import moduleLock, notRunningBefore
8+
from utils import moduleLock
99
import optparse
10-
import itertools
11-
import time
1210
from htmlor import htmlor
13-
import os
1411
import random
1512
import json
1613
import copy
1714
import os
18-
import sys
1915

2016
def assignor(url ,specific = None, talk=True, options=None):
2117
if userLock() and not options.manual: return
@@ -61,7 +57,6 @@ def assignor(url ,specific = None, talk=True, options=None):
6157
#if options.partial and not specific:
6258
# pass
6359

64-
dataset_endpoints = json.loads(eosRead('%s/dataset_endpoints.json'%monitor_dir))
6560
aaa_mapping = json.loads(eosRead('%s/equalizor.json'%monitor_pub_dir))['mapping']
6661
all_stuck = set()
6762
all_stuck.update( json.loads(eosRead('%s/stuck_transfers.json'%monitor_pub_dir) ))
@@ -110,13 +105,9 @@ def rank( wfn ):
110105

111106
options_text=""
112107
if options.early: options_text+=", early option is ON"
113-
if options.partial:
114-
options_text+=", partial option is ON"
115-
options_text+=", good fraction is %.2f"%options.good_enough
116-
117108

118109

119-
wfh.sendLog('assignor',"%s to be assigned%s"%(wfo.name, options_text))
110+
wfh.sendLog('assignor',"%s to be assigned %s"%(wfo.name, options_text))
120111

121112
## the site whitelist takes into account siteInfo, campaignInfo, memory and cores
122113
(lheinput,primary,parent,secondary, sites_allowed) = wfh.getSiteWhiteList()
@@ -200,19 +191,15 @@ def rank( wfn ):
200191
continue
201192

202193

203-
original_sites_allowed = copy.deepcopy( sites_allowed )
204194
wfh.sendLog('assignor',"Site white list %s"%sorted(sites_allowed))
205-
override_sec_location = CI.get(wfh.request['Campaign'], 'SecondaryLocation', [])
206195

207196
blocks = wfh.getBlocks()
208197
if blocks:
209198
wfh.sendLog('assignor',"Needs {} blocks in input {}".format(len(blocks), '\n'.join(blocks)))
210199
wfh.sendLog('assignor',"Allowed %s"%sorted(sites_allowed))
211-
secondary_locations=None
212200

213201
primary_aaa = options.primary_aaa
214202
secondary_aaa = options.secondary_aaa
215-
do_partial = False #options.good_enough if options.partial else 0
216203

217204
if 'Campaign' in wfh.request and wfh.request['Campaign'] in CI.campaigns:
218205
assign_parameters.update( CI.campaigns[wfh.request['Campaign']] )
@@ -221,163 +208,42 @@ def rank( wfn ):
221208
primary_aaa = primary_aaa or assign_parameters['primary_AAA']
222209
if 'secondary_AAA' in assign_parameters:
223210
secondary_aaa = secondary_aaa or assign_parameters['secondary_AAA']
224-
if 'partial_copy' in assign_parameters:
225-
## can this only work if there is a stuck input ? maybe not
226-
## this is a number. 0 means no
227-
print "Could do partial disk copy assignment"
228-
if is_stuck or options.partial:
229-
do_partial = assign_parameters['partial_copy']
230-
wfh.sendLog('assignor',"Overiding partial copy assignment to %.2f fraction"% do_partial)
231-
#sendEmail('stuck input to assignment','%s is stuck for assigning %s and going fractional'%(','.join( is_stuck), wfo.name))
232-
233-
do_partial = options.good_enough if options.partial else do_partial
234-
235-
236-
for sec in list(secondary):
237-
if override_sec_location:
238-
print "We don't care where the secondary is"
239-
print "Cannot pass for now"
240-
#sendEmail("tempting to pass sec location check","but we cannot yet IMO")
241-
#pass
242-
243-
presence = getDatasetPresence( url, sec )
244-
print sec
245-
print json.dumps(presence, indent=2)
246-
one_secondary_locations = [site for (site,(there,frac)) in presence.items() if frac>98.]
247-
248-
if secondary_aaa:
249-
if not one_secondary_locations:
250-
sec_availability = getDatasetBlocksFraction( url, sec )
251-
if sec_availability >=1. and options.go:
252-
## there is at least one copy of each block on disk. We should go ahead and let it go.
253-
wfh.sendLog('assignor',"The secondary %s is available %s times on disk, and usable"%( sec, sec_availability))
254-
else:
255-
## not even a copy on disk anywhere !!!!
256-
sites_allowed = [] ## will block the assignment
257-
wfh.sendLog('assignor',"The secondary %s is nowhere on disk"% sec)
258-
#just continue without checking
259-
continue
260211

261-
#one_secondary_locations = [site for (site,(there,frac)) in presence.items() if there]
262-
if secondary_locations==None:
263-
secondary_locations = one_secondary_locations
264-
else:
265-
secondary_locations = list(set(secondary_locations) & set(one_secondary_locations))
266-
## reduce the site white list to site with secondary only
267-
#sites_allowed = [site for site in sites_allowed if any([osite.startswith(site) for osite in one_secondary_locations])]
268-
sites_allowed = [site for site in sites_allowed if SI.CE_to_SE(site) in one_secondary_locations]
269-
270-
wfh.sendLog('assignor',"Intersecting with secondary requirement, now allowed %s"%sorted(sites_allowed))
212+
wfh.sendLog('assignor',"Initial values for primary_AAA=%s and secondary_AAA=%s"%(primary_aaa, secondary_aaa))
271213

272-
initial_sites_allowed = copy.deepcopy( sites_allowed ) ## keep track of this, after secondary input location restriction : that's how you want to operate it
214+
## keep track of this, after secondary input location restriction : that's how you want to operate it
215+
initial_sites_allowed = copy.deepcopy( sites_allowed )
273216

274-
sites_all_data = copy.deepcopy( sites_allowed )
275-
sites_with_data = copy.deepcopy( sites_allowed )
276-
sites_with_any_data = copy.deepcopy( sites_allowed )
277-
primary_locations = None
278-
available_fractions = {}
279217
set_lfn = '/store/mc' ## by default
280218

281-
endpoints = set()
282219
for prim in list(primary):
283-
if prim in dataset_endpoints:
284-
print "endpoints from stagor",dataset_endpoints[prim]
285-
endpoints.update( dataset_endpoints[prim] )
286220
set_lfn = getLFNbase( prim )
287221
## if they are requested for processing, they should bbe all closed already
222+
# FIXME: remove this closeAllBlocks
288223
closeAllBlocks(url, prim, blocks)
289-
presence = getDatasetPresence( url, prim , only_blocks=blocks)
290-
if talk:
291-
print prim
292-
print json.dumps(presence, indent=2)
293-
available_fractions[prim] = getDatasetBlocksFraction(url, prim, sites = [SI.CE_to_SE(site) for site in sites_allowed] , only_blocks = blocks)
294-
if primary_aaa:
295-
available_fractions[prim] = getDatasetBlocksFraction(url, prim, only_blocks = blocks)
296-
297-
sites_all_data = [site for site in sites_with_data if SI.CE_to_SE(site) in [psite for (psite,(there,frac)) in presence.items() if there]]
298-
if primary_aaa:
299-
sites_all_data = set()
300-
for (psite,(there,frac)) in presence.items():
301-
if there:
302-
sites_all_data.update( SI.SE_to_CEs(psite) )
303-
sites_all_data = list(sites_all_data)
304-
#sites_all_data = list(set([SI.SE_to_CE(psite) for (psite,(there,frac)) in presence.items() if there]))
305-
sites_with_data = [site for site in sites_with_data if SI.CE_to_SE(site) in [psite for (psite,frac) in presence.items() if frac[1]>90.]]
306-
sites_with_any_data = [site for site in sites_with_any_data if SI.CE_to_SE(site) in presence.keys()]
307-
if primary_aaa:
308-
sites_with_any_data = set()
309-
for psite in presence.keys():
310-
sites_with_any_data.update( SI.SE_to_CEs(psite) )
311-
sites_with_any_data = list(sites_with_any_data)
312-
#sites_with_any_data = list(set([SI.SE_to_CE(psite) for psite in presence.keys()]))
313-
314-
holding_but_not_allowed = set()
315-
for se_site in presence.keys():
316-
if not (set(SI.SE_to_CEs(se_site)) & set(sites_allowed)):
317-
holding_but_not_allowed.add( se_site )
318-
#wfh.sendLog('assignor',"Holding the data but not allowed %s"%sorted(list(set([se_site for se_site in presence.keys() if not SI.SE_to_CE(se_site) in sites_allowed]))))
319-
wfh.sendLog('assignor',"Holding the data but not allowed %s"%sorted( holding_but_not_allowed ))
320-
if primary_locations==None:
321-
primary_locations = presence.keys()
322-
else:
323-
primary_locations = list(set(primary_locations) & set(presence.keys() ))
324-
325-
sites_with_data = list(set(sites_with_data))
326-
sites_with_any_data = list(set(sites_with_any_data))
327-
328-
opportunistic_sites=[]
329-
down_time = False
330-
## opportunistic running where any piece of data is available
331-
if secondary_locations or primary_locations:
332-
## intersection of both any pieces of the primary and good IO
333-
#opportunistic_sites = [SI.SE_to_CE(site) for site in list((set(secondary_locations) & set(primary_locations) & set(SI.sites_with_goodIO)) - set(sites_allowed))]
334-
if secondary_locations and primary_locations:
335-
opportunistic_sites = [SI.SE_to_CE(site) for site in list((set(secondary_locations) & set(primary_locations)) - set([SI.CE_to_SE(site) for site in sites_allowed]))]
336-
elif primary_locations:
337-
opportunistic_sites = [SI.SE_to_CE(site) for site in list(set(primary_locations) - set([SI.CE_to_SE(site) for site in sites_allowed]))]
338-
else:
339-
opportunistic_sites = []
340-
wfh.sendLog('assignor',"We could be running in addition at %s"% sorted(opportunistic_sites))
341-
if any([osite in SI.sites_not_ready for osite in opportunistic_sites]):
342-
wfh.sendLog('assignor',"One of the usable site is in downtime %s"%([osite for osite in opportunistic_sites if osite in SI.sites_not_ready]))
343-
down_time = True
344-
## should this be send back to considered ?
345-
346224

347225
## should be 2 but for the time-being let's lower it to get things going
348-
copies_wanted,cpuh = wfh.getNCopies()
226+
_copies_wanted,cpuh = wfh.getNCopies()
349227
wfh.sendLog('assignor',"we need %s CPUh"%cpuh)
350228
if cpuh>max_cpuh_block and not options.go:
351229
#sendEmail('large workflow','that wf %s has a large number of CPUh %s, not assigning, please check the logs'%(wfo.name, cpuh))#,destination=['Dmytro.Kovalskyi@cern.ch'])
352230
sendLog('assignor','%s requires a large numbr of CPUh %s , not assigning, please check with requester'%( wfo.name, cpuh), level='critical')
353231
wfh.sendLog('assignor',"Requiring a large number of CPUh %s, not assigning"%cpuh)
354232
continue
355233

356-
if 'Campaign' in wfh.request and wfh.request['Campaign'] in CI.campaigns and 'maxcopies' in CI.campaigns[wfh.request['Campaign']]:
357-
copies_needed_from_campaign = CI.campaigns[wfh.request['Campaign']]['maxcopies']
358-
copies_wanted = min(copies_needed_from_campaign, copies_wanted)
359-
360-
if not options.early:
361-
less_copies_than_requested = UC.get("less_copies_than_requested")
362-
copies_wanted = max(1,copies_wanted-less_copies_than_requested) # take one out for the efficiency
363-
else:
364-
## find out whether there is a site in the whitelist, that is lacking jobs and reduce to 1 copy needed to get things going
365-
pass
366-
367-
wfh.sendLog('assignor',"needed availability fraction %s"% copies_wanted)
368-
369234
## should also check on number of sources, if large enough, we should be able to overflow most, efficiently
370235

371236
## default back to white list to original white list with any data
372237
wfh.sendLog('assignor',"Allowed sites :%s"% sorted(sites_allowed))
373238

239+
# TODO Alan on 1/april/2020: keep the AAA functionality
374240
if primary_aaa:
375241
## remove the sites not reachable localy if not in having the data
376-
if not sites_all_data:
242+
if not sites_allowed:
377243
wfh.sendLog('assignor',"Overiding the primary on AAA setting to Off")
378244
primary_aaa=False
379245
else:
380-
aaa_grid = set(sites_all_data)
246+
aaa_grid = set(sites_allowed)
381247
for site in list(aaa_grid):
382248
aaa_grid.update( aaa_mapping.get(site,[]) )
383249
sites_allowed = list(set(initial_sites_allowed) & aaa_grid)
@@ -395,10 +261,8 @@ def rank( wfn ):
395261
continue
396262

397263
if not primary_aaa:
398-
if not isStoreResults:
399-
sites_allowed = sites_with_any_data
400-
else:
401-
## if we are dealing with a StoreResults request, we don't need to check dataset availability and
264+
if isStoreResults:
265+
## if we are dealing with a StoreResults request, we don't need to check dataset availability and
402266
## should use the SiteWhiteList set in the original request
403267
if 'SiteWhitelist' in wfh.request:
404268
sites_allowed = wfh.request['SiteWhitelist']
@@ -407,24 +271,8 @@ def rank( wfn ):
407271
sendLog('assignor','Cannot assign StoreResults request because SiteWhitelist is missing', level='critical')
408272
n_stalled += 1
409273
continue
410-
available_fractions = {}
411274
wfh.sendLog('assignor',"Selected for any data %s"%sorted(sites_allowed))
412275

413-
### check on endpoints for on-going transfers
414-
if do_partial:
415-
if endpoints:
416-
end_sites = [SI.SE_to_CE(s) for s in endpoints]
417-
sites_allowed = list(set(sites_allowed + end_sites))
418-
if down_time and not any(osite in SI.sites_not_ready for osite in end_sites):
419-
print "Flip the status of downtime, since our destinations are good"
420-
down_time = False
421-
print "with added endpoints",sorted(end_sites)
422-
else:
423-
print "Cannot do partial assignment without knowin the endpoints"
424-
n_stalled+=1
425-
continue
426-
427-
428276
#if not len(sites_allowed):
429277
# if not options.early:
430278
# wfh.sendLog('assignor',"cannot be assign with no matched sites")
@@ -438,48 +286,6 @@ def rank( wfn ):
438286
allowed_and_low = sorted(set(low_pressure) & set(sites_allowed))
439287
if allowed_and_low:
440288
wfh.sendLog('assignor',"The workflow can run at %s under low pressure currently"%( ','.join( allowed_and_low )))
441-
copies_wanted = max(1., copies_wanted-1.)
442-
443-
444-
if available_fractions and not all([available>=copies_wanted for available in available_fractions.values()]):
445-
not_even_once = not all([available>=1. for available in available_fractions.values()])
446-
above_good = all([available >= do_partial for available in available_fractions.values()])
447-
wfh.sendLog('assignor',"The input dataset is not available %s times, only %s"%( copies_wanted, available_fractions.values()))
448-
if down_time and not options.go and not options.early:
449-
wfo.status = 'considered'
450-
session.commit()
451-
wfh.sendLog('assignor',"sending back to considered because of site downtime, instead of waiting")
452-
#sendEmail( "cannot be assigned due to downtime","%s is not sufficiently available, due to down time of a site in the whitelist. check the assignor logs. sending back to considered."% wfo.name)
453-
sendLog('assignor','%s is not sufficiently available, due to down time of a site in the whitelist. sending back to considered.'%( wfo.name ), level='delay')
454-
n_stalled+=1
455-
continue
456-
#pass
457-
458-
print json.dumps(available_fractions)
459-
if (options.go and not_even_once) or not options.go:
460-
known = []
461-
try:
462-
known = json.loads(open('cannot_assign.json').read())
463-
except:
464-
pass
465-
if not wfo.name in known and not options.limit and not options.go and not options.early and not (do_partial and above_good):
466-
wfh.sendLog('assignor',"cannot be assigned, %s is not sufficiently available.\n %s"%(wfo.name,json.dumps(available_fractions)))
467-
#sendEmail( "cannot be assigned","%s is not sufficiently available.\n %s"%(wfo.name,json.dumps(available_fractions)))
468-
known.append( wfo.name )
469-
open('cannot_assign.json','w').write(json.dumps( known, indent=2))
470-
471-
if options.early:
472-
if wfo.status == 'considered':
473-
wfh.sendLog('assignor',"setting considered-tried")
474-
wfo.status = 'considered-tried'
475-
session.commit()
476-
else:
477-
print "tried but status is",wfo.status
478-
if do_partial and above_good:
479-
print "Will move on with partial locations"
480-
else:
481-
n_stalled+=1
482-
continue
483289

484290
if not len(sites_allowed) and not options.SiteWhitelist:
485291
if not options.early:
@@ -595,6 +401,7 @@ def pick_campaign( assign_parameters, parameters):
595401
parameters['EventsPerJob'] = eventsPerJob
596402
else:
597403
spl = wfh.getSplittings()[0]
404+
# FIXME: decide which of the lines below needs to remain...
598405
eventsPerJobEstimated = spl['events_per_job'] if 'events_per_job' in spl else None
599406
eventsPerJobEstimated = spl['avg_events_per_job'] if 'avg_events_per_job' in spl else None
600407
if eventsPerJobEstimated and eventsPerJobEstimated > eventsPerJob:
@@ -666,8 +473,6 @@ def pick_campaign( assign_parameters, parameters):
666473
parser.add_option('-t','--test', help='Only test the assignment',action='store_true',dest='test',default=False)
667474
parser.add_option('-m','--manual', help='Manual assignment, bypassing lock check',action='store_true',dest='manual',default=False)
668475
parser.add_option('-e', '--early', help='Fectch from early statuses',default=False, action="store_true")
669-
parser.add_option('-p', '--partial', help='Let the workflow assign to place with any part of the data, existent of being made',default=False, action="store_true")
670-
parser.add_option('--good_enough', help='Only useful with --partial option, determines whether to get the workflow started', default=0.5, type=float)
671476
parser.add_option('--go',help="Overrides the campaign go",default=False,action='store_true')
672477
parser.add_option('--team',help="Specify the agent to use",default=None)
673478
parser.add_option('--primary_aaa',help="Force to use the secondary location restriction, if any, and use the full site whitelist initially provided to run that type of wf",default=False, action='store_true')

0 commit comments

Comments
 (0)