forked from AliceO2Group/AliceO2
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathMessageSet.h
More file actions
184 lines (162 loc) · 5.55 KB
/
MessageSet.h
File metadata and controls
184 lines (162 loc) · 5.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
// Copyright 2019-2020 CERN and copyright holders of ALICE O2.
// See https://alice-o2.web.cern.ch/copyright for details of the copyright holders.
// All rights not expressly granted are reserved.
//
// This software is distributed under the terms of the GNU General Public
// License v3 (GPL Version 3), copied verbatim in the file "COPYING".
//
// In applying this license CERN does not waive the privileges and immunities
// granted to it by virtue of its status as an Intergovernmental Organization
// or submit itself to any jurisdiction.
#ifndef FRAMEWORK_MESSAGESET_H
#define FRAMEWORK_MESSAGESET_H
#include "Framework/PartRef.h"
#include <fairmq/Message.h>
#include "Framework/DataModelViews.h"
#include <memory>
#include <vector>
#include <cassert>
namespace o2::framework
{
/// A set of inflight messages.
/// The messages are stored in a linear vector. Originally, an O2 message was
/// comprised of a header-payload pair which makes indexing of pairs in the
/// storage simple. To support O2 messages with multiple payloads in a future
/// update of the data model, a message index is needed to store position in the
/// linear storage and number of messages.
/// DPL InputRecord API is providing refs of header-payload pairs, the original
/// O2 message model. For this purpose, also the pair index is filled and can
/// be used to access header and payload associated with a pair
struct MessageSet {
struct Index {
Index(size_t p, size_t s) : position(p), size(s) {}
size_t position = 0;
size_t size = 0;
};
// linear storage of messages
std::vector<fair::mq::MessagePtr> messages;
// message map describes O2 messages consisting of a header message and
// payload message(s), index describes position in the linear storage
std::vector<Index> messageMap;
// pair map describes all messages in one sequence of header-payload pairs and
// where in the message index the associated header and payload can be found
struct PairMapping {
PairMapping(size_t partId, size_t payloadId) : partIndex(partId), payloadIndex(payloadId) {}
// O2 message where the pair is located in
size_t partIndex = 0;
// payload index within the O2 message
size_t payloadIndex = 0;
};
std::vector<PairMapping> pairMap;
MessageSet()
: messages(), messageMap(), pairMap()
{
}
template <typename F>
MessageSet(F getter, size_t size)
: messages(), messageMap(), pairMap()
{
add(std::forward<F>(getter), size);
}
MessageSet(MessageSet&& other)
: messages(std::move(other.messages)), messageMap(std::move(other.messageMap)), pairMap(std::move(other.pairMap))
{
other.clear();
}
MessageSet& operator=(MessageSet&& other)
{
if (&other == this) {
return *this;
}
messages = std::move(other.messages);
messageMap = std::move(other.messageMap);
pairMap = std::move(other.pairMap);
other.clear();
return *this;
}
/// get number of in-flight O2 messages
[[nodiscard]] size_t size() const
{
return messages | count_parts{};
}
/// get number of header-payload pairs
[[nodiscard]] size_t getNumberOfPairs() const
{
return messages | count_payloads{};
}
/// get number of payloads for an in-flight message
[[nodiscard]] size_t getNumberOfPayloads(size_t mi) const
{
return messages | get_num_payloads{mi};
}
/// clear the set
void clear()
{
messages.clear();
messageMap.clear();
pairMap.clear();
}
// this is more or less legacy
// PartRef has been earlier used to store fixed header-payload pairs
// reset the set and store content of the part ref
void reset(PartRef&& ref)
{
clear();
add(std::move(ref));
}
// this is more or less legacy
// PartRef has been earlier used to store fixed header-payload pairs
// add content of the part ref
void add(PartRef&& ref)
{
pairMap.emplace_back(messageMap.size(), 0);
messageMap.emplace_back(messages.size(), 1);
messages.emplace_back(std::move(ref.header));
messages.emplace_back(std::move(ref.payload));
}
/// add an O2 message
template <typename F>
void add(F getter, size_t size)
{
auto partid = messageMap.size();
messageMap.emplace_back(messages.size(), size - 1);
for (size_t i = 0; i < size; ++i) {
if (i > 0) {
pairMap.emplace_back(partid, i - 1);
}
messages.emplace_back(std::move(getter(i)));
}
}
fair::mq::MessagePtr& header(size_t partIndex)
{
return messages[messageMap[partIndex].position];
}
fair::mq::MessagePtr& payload(size_t partIndex, size_t payloadIndex = 0)
{
assert(partIndex < messageMap.size());
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
return messages[messageMap[partIndex].position + payloadIndex + 1];
}
fair::mq::MessagePtr const& header(size_t partIndex) const
{
return messages[messageMap[partIndex].position];
}
fair::mq::MessagePtr const& payload(size_t partIndex, size_t payloadIndex = 0) const
{
assert(partIndex < messageMap.size());
assert(messageMap[partIndex].position + payloadIndex + 1 < messages.size());
return messages[messageMap[partIndex].position + payloadIndex + 1];
}
fair::mq::MessagePtr const& associatedHeader(size_t pos) const
{
return messages[messageMap[pairMap[pos].partIndex].position];
}
fair::mq::MessagePtr const& associatedPayload(size_t pos) const
{
auto partIndex = pairMap[pos].partIndex;
auto payloadIndex = pairMap[pos].payloadIndex;
return messages[messageMap[partIndex].position + payloadIndex + 1];
}
};
} // namespace o2::framework
#endif // FRAMEWORK_MESSAGESET_H