11#!/usr/bin/env python
22from assignSession import *
3- from utils import workflowInfo , getWorkflows , global_SI , sendEmail , componentInfo , getDatasetPresence , monitor_dir , monitor_pub_dir , reqmgr_url , campaignInfo , unifiedConfiguration , sendLog , do_html_in_each_module , base_eos_dir , eosRead , eosFile , agent_speed_draining
3+ from utils import workflowInfo , getWorkflows , global_SI , sendEmail , componentInfo , getDatasetPresence , monitor_dir , monitor_pub_dir , reqmgr_url , campaignInfo , unifiedConfiguration , sendLog , do_html_in_each_module , base_eos_dir , eosRead , eosFile , agent_speed_draining , cacheInfo
44import reqMgrClient
55import json
66import os , sys
@@ -25,10 +25,10 @@ def equalizor(url , specific = None, options=None):
2525
2626 ## start from scratch
2727 modifications = defaultdict (dict )
28- ## define regionality site => fallback allowed. feed on an ssb metric ??
29- mapping = defaultdict ( list )
30- reversed_mapping = defaultdict ( list )
31- regions = defaultdict ( list )
28+
29+ cache = cacheInfo ( )
30+ mapping = cache . get ( 'overflow_mapping' )
31+ reversed_mapping = cache . get ( 'overflow_reverse_mapping' )
3232
3333 UC = unifiedConfiguration ()
3434 over_rides = []
@@ -40,160 +40,12 @@ def equalizor(url , specific = None, options=None):
4040 if options .hlt : use_HLT = True
4141 if use_HLT : over_rides .append ('T2_CH_CERN_HLT' )
4242
43- use_CSCS = ('T0_CH_CSCS_HPC' in UC .get ("site_for_overflow" ))
44- if options .cscs : use_CSCS = True
45- if use_CSCS : over_rides .append ('T0_CH_CSCS_HPC' )
46-
47-
4843 SI = global_SI ( over_rides )
49- print sorted (SI .all_sites )
50- print sorted (SI .sites_T0s )
51-
5244 CI = campaignInfo ()
5345
54- #sites_to_consider = SI.all_sites
55- sites_to_consider = SI .sites_ready
56- for site in sites_to_consider :
57- region = site .split ('_' )[1 ]
58- if not region in ['US'
59- ,'DE' ,'IT' ,'FR' ,
60- 'ES' ,
61- 'UK' ,
62- 'RU' ### latest addition
63- ]: continue
64- regions [region ] = [region ]
65-
66- def site_in_depletion (s ):
67- return True
68- if s in SI .sites_pressure :
69- (m , r , pressure ) = SI .sites_pressure [s ]
70- if float (m ) < float (r ):
71- print s ,m ,r ,"lacking pressure"
72- return True
73- else :
74- print s ,m ,r ,"pressure"
75- pass
76-
77- return False
78-
79- for site in sites_to_consider :
80- region = site .split ('_' )[1 ]
81- ## fallback to the region, to site with on-going low pressure
82- within_region = [fb for fb in sites_to_consider if any ([('_%s_' % (reg ) in fb and fb != site and site_in_depletion (fb ))for reg in regions [region ]]) ]
83- #print site,region, within_region
84- mapping [site ] = within_region
85-
86-
87- for site in sites_to_consider :
88- if site .split ('_' )[1 ] == 'US' : ## to all site in the US
89- ## add NERSC
90- mapping [site ].append ('T3_US_NERSC' )
91- mapping [site ].append ('T3_US_SDSC' )
92- mapping [site ].append ('T3_US_TACC' )
93- mapping [site ].append ('T3_US_PSC' )
94- ## add OSG
95- mapping [site ].append ('T3_US_OSG' )
96- #mapping[site].append('T3_US_Colorado')
97- pass
98-
99- if use_HLT :
100- mapping ['T2_CH_CERN' ].append ('T2_CH_CERN_HLT' )
101-
102- if use_T0 :
103- ## who can read from T0
104- mapping ['T2_CH_CERN' ].append ('T0_CH_CERN' )
105- mapping ['T1_IT_CNAF' ].append ('T0_CH_CERN' )
106- mapping ['T1_FR_CCIN2P3' ].append ('T0_CH_CERN' )
107- mapping ['T1_DE_KIT' ].append ('T0_CH_CERN' )
108-
109- if use_CSCS :
110- ## analog config to T0:
111- mapping ['T2_CH_CERN' ].append ('T0_CH_CSCS_HPC' )
112- mapping ['T1_IT_CNAF' ].append ('T0_CH_CSCS_HPC' )
113- mapping ['T1_FR_CCIN2P3' ].append ('T0_CH_CSCS_HPC' )
114- mapping ['T1_DE_KIT' ].append ('T0_CH_CSCS_HPC' )
115-
116- ## temptatively
117- mapping ['T0_CH_CERN' ].append ( 'T2_CH_CERN' )
118-
119- ## all europ can read from CERN
120- for reg in ['IT' ,'DE' ,'UK' ,'FR' ,'BE' ,'ES' ]:
121- mapping ['T2_CH_CERN' ].extend ([fb for fb in sites_to_consider if '_%s_' % reg in fb ])
122- pass
123-
124- ## all europ T1 among each others
125- europ_t1 = [site for site in sites_to_consider if site .startswith ('T1' ) and any ([reg in site for reg in ['IT' ,'DE' ,'UK' ,'FR' ,'ES' ,'RU' ]])]
126- #print europ_t1
127- for one in europ_t1 :
128- for two in europ_t1 :
129- if one == two : continue
130- mapping [one ].append (two )
131- pass
132- ## all EU T1 can read from T0
133- mapping ['T0_CH_CERN' ].append ( one )
134-
135- mapping ['T0_CH_CERN' ].append ( 'T1_US_FNAL' )
136- #mapping['T1_IT_CNAF'].append( 'T1_US_FNAL' )
137- #mapping['T1_IT_CNAF'].extend( [site for site in SI.sites_ready if '_US_' in site] ) ## all US can read from CNAF
138- mapping ['T1_IT_CNAF' ].append ( 'T2_CH_CERN' )
139- mapping ['T1_DE_KIT' ].append ( 'T2_CH_CERN' )
140- mapping ['T2_CH_CERN' ].append ( 'T1_IT_CNAF' )
141- mapping ['T2_CH_CERN' ].append ( 'T1_US_FNAL' )
142- mapping ['T2_CH_CERN' ].append ('T3_CH_CERN_HelixNebula' )
143- mapping ['T2_CH_CERN' ].append ('T3_CH_CERN_HelixNebula_REHA' )
144-
145-
146- for site in sites_to_consider :
147- if '_US_' in site :
148- mapping [site ].append ('T2_CH_CERN' )
14946 ## make them appear as OK to use
15047 force_sites = []
15148
152- ## overflow CERN to underutilized T1s
153- upcoming = json .loads ( eosRead ('%s/GQ.json' % monitor_dir ) )
154- for possible in SI .sites_T1s :
155- if not possible in upcoming :
156- mapping ['T2_CH_CERN' ].append (possible )
157- pass
158-
159- take_site_out = UC .get ('site_out_of_overflow' )
160-
161- for site ,fallbacks in mapping .items ():
162- mapping [site ] = list (set (fallbacks ))
163-
164-
165- ### mapping is a dictionnary where
166- # key can read from site in values.
167- ### reverserd mapping is a dictionnary where
168- # key can be read by site in values.
169- ## create the reverse mapping for the condor module
170- for site ,fallbacks in mapping .items ():
171- if site in take_site_out :
172- print "taking" ,site ,"out of overflow source by unified configuration"
173- mapping .pop (site )
174- continue
175- for fb in fallbacks :
176- if fb == site :
177- ## remove self
178- mapping [site ].remove (fb )
179- continue
180- if fb in take_site_out :
181- ## remove those to be removed
182- print "taking" ,fb ,"out of overflow destination by unified configuration"
183- mapping [site ].remove (fb )
184- continue
185- if not site in reversed_mapping [fb ]:
186- reversed_mapping [fb ].append (site )
187-
188- #for site in mapping.keys():
189- # mapping[site] = list(set(mapping[site]))
190-
191- ## this is the fallback mapping
192- #print "Direct mapping : site => overflow"
193- #print json.dumps( mapping, indent=2)
194- #print "Reverse mapping : dest <= from origin"
195- #print json.dumps( reversed_mapping, indent=2)
196-
19749 altered_tasks = set ()
19850
19951 def running_idle ( wfi , task_name ):
0 commit comments