forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWorkflowHelpers.h
More file actions
232 lines (200 loc) · 8.12 KB
/
WorkflowHelpers.h
File metadata and controls
232 lines (200 loc) · 8.12 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef O2_FRAMEWORK_WORKFLOWHELPERS_H_
#define O2_FRAMEWORK_WORKFLOWHELPERS_H_
#include "Framework/InputSpec.h"
#include "Framework/OutputSpec.h"
#include "Framework/WorkflowSpec.h"
#include "Framework/DataOutputDirector.h"
#include <cstddef>
#include <vector>
#include <iosfwd>
namespace o2::framework
{
inline static std::string debugWorkflow(std::vector<DataProcessorSpec> const& specs)
{
std::ostringstream out;
for (auto& spec : specs) {
out << spec.name << "\n";
out << " Inputs:\n";
for (auto& ii : spec.inputs) {
out << " - " << DataSpecUtils::describe(ii) << "\n";
}
// out << "\n Outputs:\n";
// for (auto& ii : spec.outputs) {
// out << " - " << DataSpecUtils::describe(ii) << "\n";
// }
}
return out.str();
}
struct ConfigContext;
// Structure to hold information which was derived
// for output channels.
struct LogicalOutputInfo {
size_t specIndex;
size_t outputGlobalIndex;
bool forward;
bool enabled = true;
};
// We use this to keep track of the forwards which should
// be added to each device.
// @a consumer is the data processor id the information refers to
// (so all the devices which are incarnation of that data processor should
// have the forward).
// @a inputGlobalIndex is pointer to a unique id for the input the forward
// refers to.
struct LogicalForwardInfo {
size_t consumer;
size_t inputLocalIndex;
size_t outputGlobalIndex;
};
enum struct ConnectionKind {
Out = 0,
Forward = 1,
In = 2,
Unknown = 3
};
struct DeviceConnectionEdge {
// the index in the workflow of the producer DataProcessorSpec
size_t producer;
// the index in the workflow of the consumer DataProcessorSpec
size_t consumer;
// The timeindex for the consumer
size_t timeIndex;
// The timeindex of the producer
size_t producerTimeIndex;
// An absolute id for the output
size_t outputGlobalIndex;
// A DataProcessor relative id for the input
size_t consumerInputIndex;
// Whether this is the result of a forwarding operation or not
bool isForward;
enum ConnectionKind kind;
};
// Unique identifier for a connection
struct DeviceConnectionId {
size_t producer;
size_t consumer;
size_t timeIndex;
size_t producerTimeIndex;
uint16_t port;
bool operator<(const DeviceConnectionId& rhs) const
{
return std::tie(producer, consumer, timeIndex, producerTimeIndex) <
std::tie(rhs.producer, rhs.consumer, rhs.timeIndex, rhs.producerTimeIndex);
}
};
// A device is uniquely identified by its DataProcessorSpec and
// the timeslice it consumes.
struct DeviceId {
size_t processorIndex;
size_t timeslice;
size_t deviceIndex;
bool operator<(const DeviceId& rhs) const
{
return std::tie(processorIndex, timeslice) <
std::tie(rhs.processorIndex, rhs.timeslice);
}
};
struct EdgeAction {
bool requiresNewDevice = false;
bool requiresNewChannel = false;
};
/// Helper struct to keep track of the results of the topological sort
struct TopoIndexInfo {
int index; //!< the index in the actual storage of the nodes to be sorted topologically
int layer; //!< the associated layer in the sorting procedure
bool operator<(TopoIndexInfo const& rhs) const
{
return index < rhs.index;
}
bool operator==(TopoIndexInfo const& rhs) const
{
return index == rhs.index;
}
friend std::ostream& operator<<(std::ostream& out, TopoIndexInfo const& info);
};
// Information about the policies which were derived for a given data processor.
struct DataProcessorPoliciesInfo {
std::string completionPolicyName;
};
struct OutputObj {
InputSpec spec;
bool isdangling;
};
enum struct WorkflowParsingState : int {
Valid,
Empty,
};
/// A set of internal helper classes to manipulate a Workflow
struct WorkflowHelpers {
/// Topological sort for a graph of @a nodeCount nodes.
///
/// @a edgeIn pointer to the index of the input node for the first edge
/// @a edgeOut pointer to the index of the out node for the first edge
/// @a stride distance (in bytes) between the first and the second element the array
/// holding the edges
/// @return an index vector for the @a nodeCount nodes, where the order is a topological
/// sort according to the information provided in edges. The first element of
/// the pair is the index in the nodes array, the second one is the layer in the topological
/// sort.
static std::vector<TopoIndexInfo> topologicalSort(size_t nodeCount,
int const* edgeIn,
int const* edgeOut,
size_t byteStride,
size_t edgesCount);
// Helper method to verify that a given workflow is actually valid e.g. that
// it contains no empty labels.
[[nodiscard]] static WorkflowParsingState verifyWorkflow(const WorkflowSpec& workflow);
// Depending on the workflow and the dangling inputs inside it, inject "fake"
// devices to mark the fact we might need some extra action to make sure
// dangling inputs are satisfied.
// @a workflow the workflow to decorate
// @a ctx the context for the configuration phase
static void injectServiceDevices(WorkflowSpec& workflow, ConfigContext& ctx);
// Function to correctly add AOD writer
static void injectAODWriter(WorkflowSpec& workflow, ConfigContext const& ctx);
// Final adjustments to @a workflow after service devices have been injected.
static void adjustTopology(WorkflowSpec& workflow, ConfigContext const& ctx);
static void constructGraph(const WorkflowSpec& workflow,
std::vector<DeviceConnectionEdge>& logicalEdges,
std::vector<OutputSpec>& outputs,
std::vector<LogicalForwardInfo>& availableForwardsInfo);
// FIXME: this is an implementation detail for compute edge action,
// actually. It should be moved to the cxx. Comes handy for testing things though..
static void sortEdges(std::vector<size_t>& inEdgeIndex,
std::vector<size_t>& outEdgeIndex,
const std::vector<DeviceConnectionEdge>& edges);
static std::vector<EdgeAction> computeOutEdgeActions(
const std::vector<DeviceConnectionEdge>& edges,
const std::vector<size_t>& index);
static std::vector<EdgeAction> computeInEdgeActions(
const std::vector<DeviceConnectionEdge>& edges,
const std::vector<size_t>& index);
/// Given @a workflow it gathers all the OutputSpec and in addition provides
/// the information whether and output is dangling and/or of type AOD
/// An Output is dangling if it does not have a corresponding InputSpec.
/// @return a vector of InputSpec of all outputs and a vector of bool
/// wether the output is dangling
static std::tuple<std::vector<InputSpec>, std::vector<bool>> analyzeOutputs(WorkflowSpec const& workflow);
/// returns only dangling outputs
static std::vector<InputSpec> computeDanglingOutputs(WorkflowSpec const& workflow);
/// Validate that the nodes at the ends of the edges of the graph
/// are actually compatible with each other.
/// For example we should make sure that Lifetime::Timeframe inputs of
/// one node is not connected to an Output of Lifetime::Sporadic of another node.
static void validateEdges(WorkflowSpec const& workflow,
std::vector<DataProcessorPoliciesInfo> const& policiesInfos,
std::vector<DeviceConnectionEdge> const& edges,
std::vector<OutputSpec> const& outputs);
};
} // namespace o2::framework
#endif // O2_FRAMEWORK_WORKFLOWHELPERS_H_