forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathTopologyPolicy.cxx
More file actions
280 lines (257 loc) · 12.3 KB
/
TopologyPolicy.cxx
File metadata and controls
280 lines (257 loc) · 12.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#include "Framework/DataProcessorSpec.h"
#include "Framework/TopologyPolicy.h"
#include "Framework/Signpost.h"
#include <string>
#include <regex>
O2_DECLARE_DYNAMIC_LOG(topology);
namespace o2::framework
{
struct TopologyPolicyHelpers {
static TopologyPolicy::DataProcessorMatcher matchAll();
static TopologyPolicy::DataProcessorMatcher matchByName(std::string const& name);
static TopologyPolicy::DataProcessorMatcher matchByRegex(std::string const& re);
static TopologyPolicy::DependencyChecker dataDependency();
static TopologyPolicy::DependencyChecker alwaysDependent();
};
TopologyPolicy::DataProcessorMatcher TopologyPolicyHelpers::matchAll()
{
return [](DataProcessorSpec const& spec) {
return true;
};
}
TopologyPolicy::DataProcessorMatcher TopologyPolicyHelpers::matchByName(std::string const& name)
{
return [name](DataProcessorSpec const& spec) {
return spec.name == name;
};
}
TopologyPolicy::DataProcessorMatcher TopologyPolicyHelpers::matchByRegex(std::string const& re)
{
return [re](DataProcessorSpec const& spec) -> bool {
const std::regex matcher(re);
// Check if regex applies
std::cmatch m;
return std::regex_match(spec.name.data(), m, matcher);
};
}
/// Return true if a depends on b, i.e. if any of the inputs of a
/// is satisfied by any of the outputs of b.
bool dataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b)
{
for (size_t ii = 0; ii < a.inputs.size(); ++ii) {
for (size_t oi = 0; oi < b.outputs.size(); ++oi) {
try {
if (DataSpecUtils::match(a.inputs[ii], b.outputs[oi])) {
return true;
}
} catch (...) {
continue;
}
}
}
return false;
}
// This is to make sure that if a device has sporadic / timer inputs
// it gets sorted after one which does not, in case there is no other
// dependencies between the two.
bool sporadicDataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b)
{
auto checkSporadic = [](InputSpec const& input) {
return input.lifetime == Lifetime::Sporadic;
};
bool isBWithSporadicInput = std::find_if(b.inputs.begin(), b.inputs.end(), checkSporadic) != b.inputs.end();
bool isAWithSporadicInput = std::find_if(a.inputs.begin(), a.inputs.end(), checkSporadic) != a.inputs.end();
// If neither has sporadic inputs, we return false and sort as usual
if (!isAWithSporadicInput && !isBWithSporadicInput) {
return false;
}
// If both have sporadic inputs, we return false and sort as usual.
if (isAWithSporadicInput && isBWithSporadicInput) {
return false;
}
// We have a with sporadic inputs. We sort it later, unless there was already some actual
// dependency between A and B.
if (isAWithSporadicInput) {
bool hasDependency = dataDeps(b, a);
return !hasDependency;
}
// b is has sporadic inputs and a does not. We are fine as it is.
return false;
}
bool expendableDataDeps(DataProcessorSpec const& a, DataProcessorSpec const& b)
{
O2_SIGNPOST_ID_GENERATE(sid, topology);
O2_SIGNPOST_START(topology, sid, "expendableDataDeps", "Checking if %s depends on %s", a.name.c_str(), b.name.c_str());
if (a.name.find("internal-dpl-injected-dummy-sink") != std::string::npos &&
b.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Dummy sink never depends on itself.");
return false;
}
// We never put anything behind the dummy sink.
if (b.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is dummy sink and it nothing can depend on it.", b.name.c_str());
return false;
}
if (a.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s is dummy sink and it nothing can depend on it.", a.name.c_str());
return true;
}
/// If there is an actual dependency between a and b, we return true.
if (dataDeps(a, b)) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s has a data dependency on %s", a.name.c_str(), b.name.c_str());
return true;
}
// If we are here we do not have any data dependency,
// however we still consider a dependent on b if
// a has the "expendable" label and b does not.
auto checkExpendable = [](DataProcessorLabel const& label) {
if (label.value == "expendable") {
return true;
}
return false;
};
// A task marked as expendable or resilient can be put after an expendable task
auto checkResilient = [](DataProcessorLabel const& label) {
if (label.value == "resilient") {
return true;
}
return false;
};
bool isBExpendable = std::find_if(b.labels.begin(), b.labels.end(), checkExpendable) != b.labels.end();
bool isAExpendable = std::find_if(a.labels.begin(), a.labels.end(), checkExpendable) != a.labels.end();
bool bResilient = std::find_if(b.labels.begin(), b.labels.end(), checkResilient) != b.labels.end();
const std::regex matcher(".*output-proxy.*");
std::cmatch m;
bool isBOutputProxy = std::regex_match(b.name.data(), m, matcher);
// If none is expendable. We simply return false and sort as usual.
if (!isAExpendable && !isBExpendable) {
bool sporadic = sporadicDataDeps(a, b);
if (sporadic && !isBOutputProxy) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. Neither %s nor %s are expendable. However the former has sporadic inputs so we sort it after.",
a.name.c_str(), b.name.c_str());
return true;
}
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Neither %s nor %s are expendable. No dependency beyond data deps.",
a.name.c_str(), b.name.c_str());
return false;
}
// If both are expendable. We return false and sort as usual.
if (isAExpendable && isBExpendable) {
bool sporadic = sporadicDataDeps(a, b);
if (sporadic && !isBOutputProxy) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. Both %s and %s are expendable. However the former has sporadic inputs, so we sort it after.",
a.name.c_str(), b.name.c_str());
return true;
}
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. Both %s and %s are expendable. No dependency.",
a.name.c_str(), b.name.c_str());
return false;
}
// If a is expendable but b is resilient, we can keep the same order.
if (isAExpendable && bResilient) {
bool sporadic = sporadicDataDeps(a, b);
if (sporadic && !isBOutputProxy) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "true. %s is expendable but %s is resilient, however the former also has sporadic inputs, so we sort it after.",
a.name.c_str(), b.name.c_str());
return true;
}
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is expendable but %s is resilient. No need to do do anything.",
a.name.c_str(), b.name.c_str());
return false;
}
// If a is expendable we consider it as if there was a dependency from a to b,
// however we still need to check if there is not one already from b to a.
if (isAExpendable) {
bool hasDependency = dataDeps(b, a);
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "%s is expendable. %s from %s to %s => %s.",
a.name.c_str(), hasDependency ? "There is however an inverse dependency" : "No inverse dependency", b.name.c_str(), a.name.c_str(),
!hasDependency ? "true" : "false");
if (!hasDependency) {
return true;
}
bool sporadic = sporadicDataDeps(a, b);
if (sporadic && !isBOutputProxy) {
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "%s is expendable. No inverse dependency from %s to %s. However the former has an occasioanl input => true.",
a.name.c_str(), b.name.c_str(), a.name.c_str());
return true;
}
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "%s is expendable. No inverse dependency from %s to %s => false.",
a.name.c_str(), b.name.c_str(), a.name.c_str());
return false;
}
// b is expendable and a is not. We are fine with no dependency.
O2_SIGNPOST_END(topology, sid, "expendableDataDeps", "false. %s is expendable but %s is not. No need to add an unneeded dependency.",
b.name.c_str(), a.name.c_str());
return false;
};
TopologyPolicy::DependencyChecker TopologyPolicyHelpers::dataDependency()
{
return [](DataProcessorSpec const& a, DataProcessorSpec const& b) {
return expendableDataDeps(a, b);
};
}
TopologyPolicy::DependencyChecker TopologyPolicyHelpers::alwaysDependent()
{
return [](DataProcessorSpec const& dependent, DataProcessorSpec const& ancestor) {
O2_SIGNPOST_ID_GENERATE(sid, topology);
O2_SIGNPOST_START(topology, sid, "alwaysDependent", "Checking if %s depends on %s", dependent.name.c_str(), ancestor.name.c_str());
if (dependent.name == ancestor.name) {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. %s and %s are the same.", dependent.name.c_str(), ancestor.name.c_str());
return false;
}
if (ancestor.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. Nothing can depend on %s by policy.", ancestor.name.c_str());
return false;
}
// We never put anything behind the dummy sink.
if (dependent.name.find("internal-dpl-injected-dummy-sink") != std::string::npos) {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "true. %s is always last.", ancestor.name.c_str());
return true;
}
const std::regex matcher(".*output-proxy.*");
// Check if regex applies
std::cmatch m;
bool isAncestorOutputProxy = std::regex_match(ancestor.name.data(), m, matcher);
// For now dependent is always an output proxy.
assert(std::regex_match(dependent.name.data(), m, matcher));
bool isAncestorExpendable = std::find_if(ancestor.labels.begin(), ancestor.labels.end(), [](DataProcessorLabel const& label) {
return label.value == "expendable";
}) != ancestor.labels.end();
bool isDependentResilient = std::find_if(dependent.labels.begin(), dependent.labels.end(), [](DataProcessorLabel const& label) {
return label.value == "resilient";
}) != dependent.labels.end();
bool isAncestorResilient = std::find_if(ancestor.labels.begin(), ancestor.labels.end(), [](DataProcessorLabel const& label) {
return label.value == "resilient";
}) != ancestor.labels.end();
if (!isDependentResilient && isAncestorExpendable) {
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "false. Ancestor %s is expendable while %s is non-resilient output proxy (dependent).",
ancestor.name.c_str(), dependent.name.c_str());
return false;
}
if (isAncestorOutputProxy || (!isDependentResilient && isAncestorResilient)) {
bool hasDependency = dataDeps(dependent, ancestor);
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "%s. Dependent %s %s a dependency on ancestor %s.",
hasDependency ? "true" : "false", dependent.name.c_str(), hasDependency ? "has" : "has not", ancestor.name.c_str());
return hasDependency;
}
O2_SIGNPOST_END(topology, sid, "alwaysDependent", "true by default. Ancestor %s is not an output proxy.", ancestor.name.c_str());
return true;
};
}
std::vector<TopologyPolicy> TopologyPolicy::createDefaultPolicies()
{
return {
{TopologyPolicyHelpers::matchByRegex(".*output-proxy.*"), TopologyPolicyHelpers::alwaysDependent()},
{TopologyPolicyHelpers::matchAll(), TopologyPolicyHelpers::dataDependency()}};
}
} // namespace o2::framework