forked from diffpy/diffpy.srreal
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathparallel.py
More file actions
218 lines (166 loc) · 7.1 KB
/
parallel.py
File metadata and controls
218 lines (166 loc) · 7.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
#!/usr/bin/env python
##############################################################################
#
# diffpy.srreal by DANSE Diffraction group
# Simon J. L. Billinge
# (c) 2010 The Trustees of Columbia University
# in the City of New York. All rights reserved.
#
# File coded by: Pavol Juhas
#
# See AUTHORS.txt for a list of people who contributed.
# See LICENSE_DANSE.txt for license information.
#
##############################################################################
"""ParallelPairQuantity -- proxy class for converting PairQuantity types
into parallel calculators.
"""
# exported items
__all__ = ["createParallelCalculator"]
import copy
import inspect
from diffpy.srreal.attributes import Attributes
# ----------------------------------------------------------------------------
def createParallelCalculator(pqobj, ncpu, pmap):
"""Create a proxy parallel calculator to a PairQuantity instance.
pqobj -- instance of PairQuantity calculator to be run in parallel
ncpu -- number of parallel jobs
pmap -- a parallel map function used to submit job to workers
The ``pqobj.evaluatortype`` is reset to 'BASIC' because other
evaluator types are not supported within parallel calculations.
Return a proxy calculator instance that has the same interface,
but executes the calculation in parallel split among ncpu jobs.
"""
class ParallelPairQuantity(Attributes):
"""Class for running parallel calculations. This is a proxy class to
the wrapper PairQuantity type with the same interface.
Instance data:
pqobj -- the master PairQuantity object to be evaluated in parallel
ncpu -- number of parallel jobs
pmap -- a parallel map function used to submit job to workers
"""
def __init__(self, pqobj, ncpu, pmap):
"""Initialize a parallel proxy to the PairQuantity instance.
pqobj -- instance of PairQuantity calculator to be run
in parallel
ncpu -- number of parallel jobs
pmap -- a parallel map function used to submit job to workers
"""
# use explicit assignment to avoid setattr forwarding to the pqobj
object.__setattr__(self, "pqobj", pqobj)
object.__setattr__(self, "ncpu", ncpu)
object.__setattr__(self, "pmap", pmap)
# parallel calculations support only the BASIC evaluation
self.pqobj.evaluatortype = "BASIC"
return
def eval(self, stru=None):
"""Perform parallel calculation and return internal value array.
stru -- object that can be converted to StructureAdapter,
e.g., example diffpy Structure or pyobjcryst Crystal.
Use the last structure when None.
Return numpy array.
"""
# use StructureAdapter for faster pickles
from diffpy.srreal.structureadapter import createStructureAdapter
if stru is None:
struadpt = self.pqobj.getStructure()
else:
struadpt = createStructureAdapter(stru)
self.pqobj.setStructure(struadpt)
kwd = {
"cpuindex": None,
"ncpu": self.ncpu,
"pqobj": copy.copy(self.pqobj),
}
# shallow copies of kwd dictionary each with a unique cpuindex
arglist = [kwd.copy() for kwd["cpuindex"] in range(self.ncpu)]
for pdata in self.pmap(_parallelData, arglist):
self.pqobj._mergeParallelData(pdata, self.ncpu)
return self.pqobj.value
def __call__(self, *args, **kwargs):
"""Call the wrapped calculator using parallel evaluation.
The arguments and return value are the same as for the
wrapped PairQuantity calculator.
"""
savedeval = self.pqobj.__dict__.get("eval")
def restore_eval():
if savedeval:
self.pqobj.eval = savedeval
else:
self.pqobj.__dict__.pop("eval", None)
def parallel_eval(stru):
assert self.pqobj.eval is parallel_eval
restore_eval()
return self.eval(stru)
self.pqobj.eval = parallel_eval
try:
rv = self.pqobj(*args, **kwargs)
finally:
restore_eval()
return rv
@property
def evaluatortype(self):
"""str : Type of evaluation procedure.
Parallel calculations allow only the 'BASIC' type.
"""
return self.pqobj.evaluatortype
@evaluatortype.setter
def evaluatortype(self, value):
if value != "BASIC":
emsg = "Parallel calculations require 'BASIC' evaluatortype."
raise ValueError(emsg)
self.pqobj.evaluatortype = value
return
# class ParallelPairQuantity
# Create proxy method and properties to the wrapped PairQuantity
pqtype = type(pqobj)
# create proxy methods to all public methods and some protected methods
proxy_forced = set(
"""_getDoubleAttr _setDoubleAttr _hasDoubleAttr
_namesOfDoubleAttributes _namesOfWritableDoubleAttributes
""".split()
)
def _make_proxymethod(name, f):
def proxymethod(self, *args, **kwargs):
pqobj = object.__getattribute__(self, "pqobj")
return f(pqobj, *args, **kwargs)
proxymethod.__name__ = name
proxymethod.__doc__ = f.__doc__
return proxymethod
for n, f in inspect.getmembers(pqtype, inspect.isroutine):
ignore = n not in proxy_forced and (
n.startswith("_") or hasattr(ParallelPairQuantity, n)
)
if ignore:
continue
setattr(ParallelPairQuantity, n, _make_proxymethod(n, f))
# create proxy properties to all properties that do not conflict with
# existing class items
def _make_proxyproperty(prop):
fget = fset = fdel = None
if prop.fget:
def _fget(self):
return prop.fget(self.pqobj)
fget = _fget
if prop.fset:
def _fset(self, value):
return prop.fset(self.pqobj, value)
fset = _fset
if prop.fdel:
def _fdel(self):
return prop.fdel(self.pqobj)
fdel = _fdel
return property(fget, fset, fdel, prop.__doc__)
for n, p in inspect.getmembers(pqtype, lambda x: type(x) is property):
if hasattr(ParallelPairQuantity, n):
continue
setattr(ParallelPairQuantity, n, _make_proxyproperty(p))
# finally create an instance of this very custom class
return ParallelPairQuantity(pqobj, ncpu, pmap)
def _parallelData(kwd):
"""Helper for calculating and fetching raw results from a worker node."""
pqobj = kwd["pqobj"]
pqobj._setupParallelRun(kwd["cpuindex"], kwd["ncpu"])
pqobj.eval()
return pqobj._getParallelData()
# End of file