-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathnode.hpp
More file actions
382 lines (281 loc) · 10.5 KB
/
node.hpp
File metadata and controls
382 lines (281 loc) · 10.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
/* Nodes.
*
* Author: Steffen Vogel <post@steffenvogel.de>
* SPDX-FileCopyrightText: 2014-2023 Institute for Automation of Complex Power Systems, RWTH Aachen University
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once
#include <functional>
#include <iostream>
#include <fmt/ostream.h>
#include <jansson.h>
#include <uuid/uuid.h>
#include <villas/colors.hpp>
#include <villas/common.hpp>
#include <villas/list.hpp>
#include <villas/log.hpp>
#include <villas/node/memory.hpp>
#include <villas/node_direction.hpp>
#include <villas/node_list.hpp>
#include <villas/path_destination.hpp>
#include <villas/path_source.hpp>
#include <villas/plugin.hpp>
#include <villas/queue.h>
#include <villas/sample.hpp>
#include <villas/stats.hpp>
#if defined(LIBNL3_ROUTE_FOUND) && defined(__linux__)
#define WITH_NETEM
#endif // LIBNL3_ROUTE_FOUND
// Forward declarations
#ifdef WITH_NETEM
struct rtnl_qdisc;
struct rtnl_cls;
#endif // WITH_NETEM
#define RE_NODE_NAME "[a-z0-9_-]{2,32}"
namespace villas {
namespace node {
// Forward declarations
class NodeFactory;
class SuperNode;
/* The class for a node.
*
* Every entity which exchanges messages is represented by a node.
* Nodes can be remote machines and simulators or locally running processes.
*/
class Node {
friend NodeFactory;
public:
Logger logger;
uint64_t sequence_init;
uint64_t
sequence; // This is a counter of received samples, in case the node-type does not generate sequence numbers itself.
NodeDirection in, out;
PathSourceList sources; // A list of path sources which reference this node.
PathDestinationList
destinations; // A list of path destinations which reference this node.
std::string configPath;
#ifdef __linux__
int fwmark; // Socket mark for netem, routing and filtering
#ifdef WITH_NETEM
struct rtnl_qdisc *tc_qdisc; // libnl3: Network emulator queuing discipline
struct rtnl_cls *tc_classifier; // libnl3: Firewall mark classifier
#endif // WITH_NETEM
#endif // __linux__
protected:
enum State state;
uuid_t uuid;
bool enabled;
Stats::Ptr
stats; // Statistic counters. This is a pointer to the statistic hooks private data.
json_t *config; // A JSON object containing the configuration of the node.
std::string
name_short; // A short identifier of the node, only used for configuration and logging
std::string name_long; // Singleton: A string used to print to screen.
std::string name_full; // Singleton: A string used to print to screen.
std::string details;
int affinity; // CPU Affinity of this node
NodeFactory *factory; // The factory which created this instance
virtual int _read(struct Sample *smps[], unsigned cnt) { return -1; }
virtual int _write(struct Sample *smps[], unsigned cnt) { return -1; }
virtual json_t *_readStatus() const { return nullptr; }
int parseCommon(
json_t *json,
std::function<Signal::Ptr(json_t *, NodeDirection::Direction)>
parse_signal = [](json_t *j, NodeDirection::Direction d) {
return Signal::fromJson(j);
});
public:
// Initialize node with default values
Node(const uuid_t &id = {}, const std::string &name = "");
// Destroy node by freeing dynamically allocated memory.
virtual ~Node();
// Do initialization after parsing the configuration
virtual int prepare();
/* Parse settings of a node.
*
* @param json A JSON object containing the configuration of the node.
* @retval 0 Success. Everything went well.
* @retval <0 Error. Something went wrong.
*/
virtual int parse(json_t *json);
// Validate node configuration.
virtual int check();
// Start operation of a node.
virtual int start();
// Stops operation of a node.
virtual int stop();
// Pauses operation of a node.
virtual int pause() {
if (state != State::STARTED)
return -1;
logger->info("Pausing node");
return 0;
}
// Resumes operation of a node.
virtual int resume() { return 0; }
// Restarts operation of a node.
virtual int restart();
/* Receive multiple messages at once.
*
* This callback is optional. It will only be called if non-null.
*
* Messages are received with a single recvmsg() syscall by
* using gathering techniques (struct iovec).
* The messages will be stored in a circular buffer / array @p m.
* Indexes used to address @p m will wrap around after len messages.
* Some node-types might only support to receive one message at a time.
*
* @param smps An array of pointers to memory blocks where the function should store received samples.
* @param cnt The number of samples that are allocated by the calling function.
* @return The number of messages actually received.
*/
int read(struct Sample *smps[], unsigned cnt);
/* Send multiple messages in a single datagram / packet.
*
* This callback is optional. It will only be called if non-null.
*
* Messages are sent with a single sendmsg() syscall by
* using gathering techniques (struct iovec).
* The messages have to be stored in a circular buffer / array m.
* So the indexes will wrap around after len.
*
* @param smps An array of pointers to memory blocks where samples read from.
* @param cnt The number of samples that are allocated by the calling function.
* @return The number of messages actually sent.
*/
int write(struct Sample *smps[], unsigned cnt);
// Reverse local and remote socket address.
virtual int reverse() { return -1; }
/* Get a list of file descriptors on which the path should poll
* to detect the availability of new samples which can be read.
*/
virtual std::vector<int> getPollFDs() { return {}; }
/* Get a list of socket file descriptors which are used by the node
* To perform network IO. We use those to selectively apply network emulation
*/
virtual std::vector<int> getNetemFDs() { return {}; }
/* Get the memory type which this node-type expects.
*
* This is useful for special node-types like Infiniband, GPUs & FPGAs
* which require DMA-backed memory.
*/
virtual struct villas::node::memory::Type *getMemoryType() {
return villas::node::memory::default_type;
}
// Get the factory which was used to construct this node.
villas::node::NodeFactory *getFactory() const { return factory; }
/* Return a pointer to a string which should be used to print this node.
*
* @param n A pointer to the node structure.
*/
const std::string &getNameShort() const { return name_short; }
// Return a pointer to a string which should be used to print this node.
const std::string &getName() const { return name_long; }
// Get the full name including type and details of the node.
const std::string &getNameFull();
// Just get the config details of this node as a string
virtual const std::string &getDetails() {
static std::string empty;
return empty;
}
/* Return a pointer to a string which should be used to print this node.
*
* @param n A pointer to the node structure.
*/
const std::string &getNameLong();
/* Return a list of signals which are sent to this node.
*
* This list is derived from the path which uses the node as destination.
*/
SignalList::Ptr getOutputSignals(bool after_hooks = true) const;
SignalList::Ptr getInputSignals(bool after_hooks = true) const;
// Get the number of input signals (received by this node)
unsigned getInputSignalsMaxCount() const;
// Get the number of output signals (send out via this node)
unsigned getOutputSignalsMaxCount() const;
void swapSignals();
// Get the node configuration as JSON.
json_t *getConfig() { return config; }
// Get the state of this node.
enum State getState() const { return state; }
// Set the state of this node.
void setState(enum State s) { state = s; }
// Get the UUID of this node.
const uuid_t &getUuid() const { return uuid; }
std::shared_ptr<Stats> getStats() { return stats; }
void setStats(std::shared_ptr<Stats> sts) { stats = sts; }
void setEnabled(bool en) { enabled = en; }
friend std::ostream &operator<<(std::ostream &os, const Node &n) {
os << n.getName();
return os;
}
virtual json_t *toJson() const;
static bool isValidName(const std::string &name);
bool isEnabled() const { return enabled; }
};
class NodeFactory : public villas::plugin::Plugin {
friend Node;
protected:
virtual void init(Node *n) {
n->logger = getLogger();
n->factory = this;
n->name_long = fmt::format(CLR_RED("{}") "(" CLR_YEL("{}") ")",
n->name_short, getName());
instances.push_back(n);
}
State state;
public:
enum class Flags {
SUPPORTS_POLL = (1 << 0),
SUPPORTS_READ = (1 << 1),
SUPPORTS_WRITE = (1 << 2),
REQUIRES_WEB = (1 << 3),
PROVIDES_SIGNALS = (1 << 4),
INTERNAL = (1 << 5),
HIDDEN = (1 << 6)
};
NodeList instances;
NodeFactory() : Plugin() { state = State::INITIALIZED; }
virtual Node *make(const uuid_t &id = {}, const std::string &nme = "") = 0;
static Node *make(json_t *json, const uuid_t &id,
const std::string &name = "");
static Node *make(const std::string &type, const uuid_t &id = {},
const std::string &name = "");
virtual std::string getType() const { return "node"; }
friend std::ostream &operator<<(std::ostream &os, const NodeFactory &f) {
os << f.getName();
return os;
}
virtual int getFlags() const { return 0; }
virtual int getVectorize() const { return 0; }
bool isInternal() const { return getFlags() & (int)Flags::INTERNAL; }
bool isHidden() const {
return isInternal() || getFlags() & (int)Flags::HIDDEN;
}
virtual int start(SuperNode *sn);
virtual int stop();
State getState() const { return state; }
};
template <typename T, const char *name, const char *desc, int flags = 0,
int vectorize = 0>
class NodePlugin : public NodeFactory {
public:
virtual Node *make(const uuid_t &id = {}, const std::string &nme = "") {
T *n = new T(id, nme);
init(n);
return n;
}
virtual int getFlags() const { return flags; }
virtual int getVectorize() const { return vectorize; }
virtual std::string getName() const { return name; }
virtual std::string getDescription() const { return desc; }
};
} // namespace node
} // namespace villas
#ifndef FMT_LEGACY_OSTREAM_FORMATTER
template <>
class fmt::formatter<villas::node::Node> : public fmt::ostream_formatter {};
template <>
class fmt::formatter<villas::node::NodeFactory>
: public fmt::ostream_formatter {};
#endif