Skip to content

Commit 3b24497

Browse files
Merge pull request #70 from CESNET/flowscatter-ng
Flowscatter ng - rss mode to staging
2 parents 16dfcf0 + 6c2a84f commit 3b24497

2 files changed

Lines changed: 43 additions & 0 deletions

File tree

modules/flowScatter/src/flowScatter.cpp

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -266,11 +266,19 @@ void FlowScatter::ruleParse(const std::string& rule)
266266
{
267267
m_rules.branches.clear();
268268
m_cachedBranches.clear();
269+
m_rssMode = false;
270+
m_rss_src_id = 0;
271+
m_rss_dst_id = 0;
269272

270273
if (rule.empty()) {
271274
return; // No rules defined, nothing to parse
272275
}
273276

277+
if (rule == "rss") {
278+
m_rssMode = true;
279+
return;
280+
}
281+
274282
std::istringstream ruleStream(rule);
275283
std::string branchStr;
276284

@@ -306,6 +314,18 @@ void FlowScatter::changeTemplate()
306314
{
307315
m_cachedBranches.clear();
308316

317+
if (m_rssMode) {
318+
int const srcId = ur_get_id_by_name("SRC_IP");
319+
int const dstId = ur_get_id_by_name("DST_IP");
320+
if (srcId < 0 || dstId < 0) {
321+
throw std::runtime_error("RSS mode requires SRC_IP and DST_IP fields in template");
322+
}
323+
m_rss_src_id = static_cast<ur_field_id_t>(srcId);
324+
m_rss_dst_id = static_cast<ur_field_id_t>(dstId);
325+
326+
return;
327+
}
328+
309329
for (const auto& branch : m_rules.branches) {
310330
CachedBranch cachedBranch;
311331

@@ -344,6 +364,25 @@ size_t FlowScatter::outputIndex(const UnirecRecordView& record)
344364
{
345365
m_totalRecords++;
346366

367+
if (m_rssMode) {
368+
auto srcIp = record.getFieldAsType<IpAddress>(m_rss_src_id);
369+
auto dstIp = record.getFieldAsType<IpAddress>(m_rss_dst_id);
370+
371+
IpAddress orResult;
372+
const uint8_t* srcBytes = reinterpret_cast<const uint8_t*>(&srcIp);
373+
const uint8_t* dstBytes = reinterpret_cast<const uint8_t*>(&dstIp);
374+
uint8_t* resultBytes = reinterpret_cast<uint8_t*>(&orResult);
375+
376+
for (size_t i = 0; i < sizeof(IpAddress); ++i) {
377+
resultBytes[i] = srcBytes[i] | dstBytes[i];
378+
}
379+
380+
auto hashValue = XXH64(&orResult, sizeof(IpAddress), 0xdeadd00de);
381+
auto index = static_cast<size_t>(hashValue % M_NUM_OUTPUTS);
382+
m_sentRecords[index]++;
383+
return index;
384+
}
385+
347386
// If no rules are defined, distribute records round-robin
348387
if (m_rules.branches.empty()) {
349388
size_t const index = (m_totalRecords - 1) % M_NUM_OUTPUTS;

modules/flowScatter/src/flowScatter.hpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,10 @@ class FlowScatter {
124124
};
125125

126126
std::vector<CachedBranch> m_cachedBranches;
127+
128+
bool m_rssMode = false;
129+
ur_field_id_t m_rss_src_id = 0;
130+
ur_field_id_t m_rss_dst_id = 0;
127131
};
128132

129133
} // namespace Fs

0 commit comments

Comments
 (0)