-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathweblock.py
More file actions
247 lines (204 loc) · 9.8 KB
/
weblock.py
File metadata and controls
247 lines (204 loc) · 9.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
import logging
import collections
import urllib2
import fnmatch
import re
import time
import dynamo.utils.interface.webservice as webservice
from dynamo.dataformat import Configuration, Block, ObjectError
LOG = logging.getLogger(__name__)
class WebReplicaLock(object):
"""
Dataset lock read from www or remote (Oracle) database sources.
Sets one attr:
locked_blocks: {site: set([blocks]) or None if dataset-level}
"""
produces = ['locked_blocks']
# content types
LIST_OF_DATASETS, CMSWEB_LIST_OF_DATASETS, SITE_TO_DATASETS = range(3)
def __init__(self, config):
self._sources = {} # {name: (RESTService, content type, site pattern, lock of locks)}
for name, source_config in config.sources.items():
self.add_source(name, source_config, config.auth)
def add_source(self, name, config, auth_config):
LOG.info(config)
rest_config = Configuration()
rest_config.url_base = config.get('url', None)
rest_config.accept = config.get('data_type', 'application/json')
if config.auth == 'noauth':
rest_config.auth_handler = 'None'
else:
auth = auth_config[config.auth]
rest_config.auth_handler = auth.auth_handler
rest_config.auth_handler_conf = Configuration(auth.get('auth_handler_conf', {}))
content_type = getattr(WebReplicaLock, config.content_type)
site_pattern = config.get('sites', None)
lock_url = config.get('lock_url', None)
if rest_config.url_base is not None:
self._sources[name] = (webservice.RESTService(rest_config), content_type, site_pattern, lock_url)
if config.get('oracledb', None) is not None:
oracle_config = Configuration()
oracle_config.db = config.oracledb.db
oracle_config.pw = config.oracledb.password
oracle_config.host = config.oracledb.host
self._sources[name] = (webservice.OracleService(oracle_config), content_type, site_pattern, (config.oracledb.lockoflock,config.oracledb.locks))
def load(self, inventory):
for dataset in inventory.datasets.itervalues():
try:
dataset.attr.pop('locked_blocks')
except KeyError:
pass
for item, site in self.get_list(inventory):
if type(item) is Dataset:
dataset = item
block = None
else:
dataset = item.dataset
block = item
try:
locked_blocks = dataset.attr['locked_blocks']
except KeyError:
locked_blocks = dataset.attr['locked_blocks'] = {}
if site is None:
sites = [r.site for r in dataset.replicas]
else:
sites = [site]
for st in sites:
if block is None:
locked_blocks[st] = None
elif st in locked_blocks:
if locked_blocks[st] is None:
pass
else:
locked_blocks[st].add(block)
else:
locked_blocks[st] = set(block)
for dataset in inventory.dataests.itervalues():
try:
locked_blocks = dataset.attr['locked_blocks']
except KeyError:
continue
for site, blocks in locked_blocks.items():
if blocks is None:
continue
# if all blocks are locked, set to None (dataset-level lock)
if blocks == dataset.blocks:
locked_blocks[site] = None
def get_list(self, inventory):
all_locks = [] # [(item, site)]
for source, content_type, site_pattern, lock_url in self._sources.itervalues():
if lock_url is not None and isinstance(lock_url, basestring):
# check that the lock files themselves are not locked
while True:
# Hacky but this is temporary any way
opener = urllib2.build_opener(webservice.HTTPSCertKeyHandler(Configuration()))
opener.addheaders.append(('Accept', 'application/json'))
request = urllib2.Request(lock_url)
try:
opener.open(request)
except urllib2.HTTPError as err:
if err.code == 404:
# file not found -> no lock
break
else:
raise
LOG.info('Lock files are being produced. Waiting 60 seconds.')
time.sleep(60)
elif not isinstance(lock_url, basestring):
# lock_url is a tuple of Oracle db queries (a,b): a - checking for lock of locks, b - locks themselves
# source is automatically an OracleService
try:
locked = True
while locked:
locked = False
locks = source.make_request(lock_url[0].replace('`','"'))
for lock in locks:
if lock:
locked = True
break
if not locked:
break
LOG.info('Locks are being produced. Waiting 60 seconds.')
time.sleep(60)
except:
pass
if site_pattern is None:
site_re = None
else:
site_re = re.compile(fnmatch.translate(site_pattern))
LOG.info('Retrieving lock information from %s', source)
try:
data = source.make_request()
except TypeError:
# OracleService expects a query text
data = source.make_request(lock_url[1].replace('`','"'))
if content_type == WebReplicaLock.LIST_OF_DATASETS:
# simple list of datasets
for dataset_name in data:
if dataset_name is None:
LOG.debug('Dataset name None found in %s', source)
continue
try:
dataset = inventory.datasets[dataset_name]
except KeyError:
LOG.debug('Unknown dataset %s in %s', dataset_name, source)
continue
if site_re is not None:
for replica in dataset.replicas:
if not site_re.match(replica.site.name):
continue
all_locks.append((dataset, replica.site))
else:
all_locks.append((dataset, None))
elif content_type == WebReplicaLock.CMSWEB_LIST_OF_DATASETS:
# data['result'] -> simple list of datasets
for dataset_name in data['result']:
if dataset_name is None:
LOG.debug('Dataset name None found in %s', source.url_base)
continue
try:
dataset = inventory.datasets[dataset_name]
except KeyError:
LOG.debug('Unknown dataset %s in %s', dataset_name, source.url_base)
continue
if site_re is not None:
for replica in dataset.replicas:
if not site_re.match(replica.site.name):
continue
all_locks.append((dataset, replica.site))
else:
all_locks.append((dataset, None))
elif content_type == WebReplicaLock.SITE_TO_DATASETS:
# data = {site: {dataset: info}}
for site_name, objects in data.items():
try:
site = inventory.sites[site_name]
except KeyError:
LOG.debug('Unknown site %s in %s', site_name, source.url_base)
continue
for object_name, info in objects.items():
if not info['lock']:
LOG.debug('Object %s is not locked at %s', object_name, site_name)
continue
try:
dataset_name, block_name = Block.from_full_name(object_name)
except ObjectError:
dataset_name, block_name = object_name, None
try:
dataset = inventory.datasets[dataset_name]
except KeyError:
LOG.debug('Unknown dataset %s in %s', dataset_name, source.url_base)
continue
replica = site.find_dataset_replica(dataset)
if replica is None:
LOG.debug('Replica of %s is not at %s in %s', dataset_name, site_name, source.url_base)
continue
if block_name is None:
all_locks.append((dataset, site))
else:
block_replica = replica.find_block_replica(block_name)
if block_replica is None:
LOG.debug('Unknown block %s in %s', object_name, source.url_base)
continue
all_locks.append((block_replica.block, site))
return all_locks