Skip to content

Commit a47d06a

Browse files
committed
FlowGrouper - Introduce new module
1 parent c3f5e5c commit a47d06a

11 files changed

Lines changed: 694 additions & 1 deletion

File tree

modules/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ add_subdirectory(sampler)
33
add_subdirectory(telemetry)
44
add_subdirectory(deduplicator)
55
add_subdirectory(clickhouse)
6+
add_subdirectory(flowGrouper)

modules/deduplicator/src/timeoutHashMap.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -302,7 +302,7 @@ class TimeoutHashMap {
302302
}
303303

304304
private:
305-
std::function<size_t(const FlowKey&)> m_hasher;
305+
std::function<size_t(const Key&)> m_hasher;
306306
typename HashMapTimeoutBucket::TimeoutBucketCallables m_timeoutBucketCallables;
307307
std::vector<HashMapTimeoutBucket> m_buckets;
308308
const uint64_t M_BUCKET_MASK;

modules/flowGrouper/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
add_subdirectory(src)

modules/flowGrouper/README.md

Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
# FlowGrouper module - README
2+
3+
## Description
4+
FlowGrouper groups Unirec flow records that share the same 5-tuple (source IP,
5+
destination IP, source port, destination port and protocol) within a configurable
6+
time window and assigns a stable `FLOW_GROUP_KEY` to all records that belong to
7+
the same group.
8+
9+
This module is useful when aggregating flow records that may be
10+
received multiple times (e.g., from multiple exporters).
11+
12+
13+
## Interfaces
14+
- Input: 1
15+
- Output: 1
16+
17+
## Required Unirec Fields
18+
The module expects the input Unirec template to contain the following fields:
19+
- `SRC_IP` (ipaddr)
20+
- `DST_IP` (ipaddr)
21+
- `SRC_PORT` (uint16)
22+
- `DST_PORT` (uint16)
23+
- `PROTOCOL` (uint8)
24+
25+
FlowGrouper will extend the template by adding `uint64 FLOW_GROUP_KEY` to the output records.
26+
27+
## Parameters
28+
Command-line parameters follow the TRAP / Unirec conventions. The main module
29+
parameters are:
30+
31+
- `-s, --size <int>` Exponent N for the hash map size (2^N entries). Default value is 15
32+
- `-t, --timeout <int>` Time to consider similar flows as duplicates in milliseconds. Default value is 5000 (5s)
33+
34+
- `-m, --appfs-mountpoint <path>` Path where the appFs directory will be mounted
35+
36+
### Common TRAP / Unirec parameters
37+
- `-h` : print help and module-specific parameters
38+
- `-v`, `-vv`, `-vvv` : verbosity levels
39+
40+
## How Flow Grouping Works
41+
- Records are grouped when they arrive within the configured `--timeout`
42+
interval and share the same `SRC_IP`, `DST_IP`, `SRC_PORT`, `DST_PORT` and
43+
`PROTOCOL` values.
44+
- When a record arrives and no existing group matches, a new `FLOW_GROUP_KEY`
45+
is created and stored in an internal timeout hash map keyed by the 5-tuple.
46+
- Subsequent records that match the tuple within the timeout receive the same`FLOW_GROUP_KEY`.
47+
Note: FLOW_GROUP_KEY is not unique identifier. It identifies records that belong to the same group only in the context of the 5-tuple (SRC_IP, DST_IP, SRC_PORT, DST_PORT, PROTOCOL).
48+
## Telemetry data format
49+
50+
```
51+
├─ input/
52+
│ └─ stats
53+
└─ flowGrouper/
54+
└─ statistics
55+
```
56+
57+
Telemetry counters include:
58+
- **Inserted groups:** number of newly created flow groups
59+
- **Replaced groups:** number of times an existing bucket entry was replaced with new group
60+
- **Found groups:** number of times a matching group was found for an input record
61+
62+
63+
## Usage Examples
64+
Process Unirec records from a TRAP input and forward them with an added
65+
`FLOW_GROUP_KEY`. The example sets the hash map exponent to `15` (2^15 entries)
66+
and timeout to `1000` ms:
67+
68+
```
69+
$ FlowGrouper -i "u:in,u:out" -s 15 -t 1000
70+
```
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
add_executable(flowGrouper
2+
main.cpp
3+
flowGrouper.cpp
4+
)
5+
6+
target_link_libraries(flowGrouper PRIVATE
7+
telemetry::telemetry
8+
telemetry::appFs
9+
common
10+
rapidcsv
11+
unirec::unirec++
12+
unirec::unirec
13+
trap::trap
14+
argparse
15+
xxhash
16+
)
17+
18+
install(TARGETS flowGrouper DESTINATION ${INSTALL_DIR_BIN})
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
2+
#include "flowGrouper.hpp"
3+
4+
#include <stdexcept>
5+
#include <type_traits>
6+
#include <xxhash.h>
7+
8+
using namespace Nemea;
9+
10+
namespace FlowGrouper {
11+
12+
template <typename Key>
13+
static uint64_t xxHasher(const Key& key)
14+
{
15+
return XXH3_64bits(reinterpret_cast<const void*>(&key), sizeof(key));
16+
}
17+
18+
static FlowGrouper::Timestamp timeSum(const FlowGrouper::Timestamp& value, uint64_t timeout)
19+
{
20+
return value + std::chrono::milliseconds(timeout);
21+
}
22+
23+
static ur_field_id_t getUnirecIdByName(const char* str)
24+
{
25+
auto unirecId = ur_get_id_by_name(str);
26+
if (unirecId == UR_E_INVALID_NAME) {
27+
throw std::runtime_error(std::string("Invalid Unirec name:") + str);
28+
}
29+
return static_cast<ur_field_id_t>(unirecId);
30+
}
31+
32+
FlowGrouper::FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters)
33+
: m_hashMap(parameters, xxHasher<FlowKey>, std::less<>(), timeSum)
34+
{
35+
constexpr const size_t timeoutBucketSize = 256;
36+
static_assert(
37+
sizeof(FlowGrouperHashMap::HashMapTimeoutBucket) == timeoutBucketSize,
38+
"TimeoutBucket size is not 256 bytes");
39+
}
40+
41+
void FlowGrouper::updateUnirecIds()
42+
{
43+
m_ids.srcIpId = getUnirecIdByName("SRC_IP");
44+
m_ids.dstIpId = getUnirecIdByName("DST_IP");
45+
m_ids.srcPortId = getUnirecIdByName("SRC_PORT");
46+
m_ids.dstPortId = getUnirecIdByName("DST_PORT");
47+
m_ids.protocolId = getUnirecIdByName("PROTOCOL");
48+
m_ids.flowGroupKeyId = getUnirecIdByName(getOutputFieldName().c_str());
49+
50+
}
51+
52+
FlowGrouper::FlowGroupKey FlowGrouper::getFlowKey(Nemea::UnirecRecordView& view)
53+
{
54+
FlowKey flowKey;
55+
flowKey.srcIp = view.getFieldAsType<IpAddress>(m_ids.srcIpId);
56+
flowKey.dstIp = view.getFieldAsType<IpAddress>(m_ids.dstIpId);
57+
flowKey.srcPort = view.getFieldAsType<uint16_t>(m_ids.srcPortId);
58+
flowKey.dstPort = view.getFieldAsType<uint16_t>(m_ids.dstPortId);
59+
flowKey.proto = view.getFieldAsType<uint8_t>(m_ids.protocolId);
60+
61+
FlowGrouper::FlowGroupKey newFlowKey = (FlowGrouper::FlowGroupKey) std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count();
62+
const auto [it, insertResult]
63+
= m_hashMap.insert({flowKey, newFlowKey}, std::chrono::steady_clock::now());
64+
65+
if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::INSERTED) {
66+
m_newInserted++;
67+
return newFlowKey;
68+
}
69+
if (insertResult == FlowGrouperHashMap::HashMapTimeoutBucket::InsertResult::REPLACED) {
70+
m_replaced++;
71+
return newFlowKey;
72+
}
73+
m_found++;
74+
return *it;
75+
}
76+
77+
void FlowGrouper::addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord)
78+
{
79+
FlowGrouper::FlowGroupKey flowKey = getFlowKey(inputRecord);
80+
outputRecord.setFieldFromType<uint64_t>(flowKey,m_ids.flowGroupKeyId);
81+
82+
return;
83+
}
84+
85+
void FlowGrouper::setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory)
86+
{
87+
m_holder.add(directory);
88+
89+
const telemetry::FileOps fileOps
90+
= {[this]() {
91+
telemetry::Dict dict;
92+
dict["replacedCount"] = telemetry::Scalar((long unsigned int) m_replaced);
93+
dict["newInsertedCount"] = telemetry::Scalar((long unsigned int) m_newInserted);
94+
dict["foundCount"] = telemetry::Scalar((long unsigned int) m_found);
95+
return dict;
96+
},
97+
nullptr};
98+
99+
m_holder.add(directory->addFile("statistics", fileOps));
100+
}
101+
102+
} // namespace FlowGrouper
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#pragma once
2+
3+
#include "../../deduplicator/src/timeoutHashMap.hpp"
4+
5+
#include <atomic>
6+
#include <memory>
7+
#include <telemetry.hpp>
8+
#include <thread>
9+
#include <unirec++/unirecRecordView.hpp>
10+
#include <unirec++/unirecRecord.hpp>
11+
#include <unirec++/urTime.hpp>
12+
#include <vector>
13+
14+
namespace FlowGrouper {
15+
16+
/**
17+
* @brief FlowGrouper class to add same flowID to duplicate records
18+
*/
19+
class FlowGrouper {
20+
public:
21+
using Timestamp = std::chrono::time_point<std::chrono::steady_clock>;
22+
23+
/**
24+
* @brief Field type representing Flow ID (FLOW_GROUP_KEY).
25+
*/
26+
using FlowGroupKey = uint64_t;
27+
28+
/**
29+
* @brief Represents key fields of flow that belong to the same group.
30+
*/
31+
struct FlowKey {
32+
Nemea::IpAddress srcIp; ///< Source IP address.
33+
Nemea::IpAddress dstIp; ///< Destination IP address.
34+
uint16_t srcPort; ///< Source port.
35+
uint16_t dstPort; ///< Destination port.
36+
uint8_t proto; ///< Protocol ID.
37+
};
38+
39+
40+
41+
/**
42+
* @brief Timeout hash map type used by FlowGrouper.
43+
*/
44+
using FlowGrouperHashMap = Deduplicator::TimeoutHashMap<
45+
FlowKey,
46+
FlowGroupKey,
47+
Timestamp,
48+
std::function<size_t(const FlowKey&)>,
49+
std::function<bool(const Timestamp&, const Timestamp&)>,
50+
std::function<Timestamp(const Timestamp&, uint64_t)>>;
51+
52+
static inline const uint64_t DEFAULT_HASHMAP_TIMEOUT = 5000; ///< Default timeout - 5s
53+
static inline const uint32_t DEFAULT_HASHMAP_EXPONENT = 20; ///< Default size exponent - 2^20 entries
54+
55+
/**
56+
* @brief FlowGrouper constructor
57+
*
58+
* @param parameters Parameters to build hash table of flowGrouper
59+
*/
60+
explicit FlowGrouper(const FlowGrouperHashMap::TimeoutHashMapParameters& parameters);
61+
62+
/**
63+
* @brief Checks if the given UnirecRecordView group already exists in the hash map if not adds it.
64+
* @param view The Unirec record to check.
65+
* @return FlowGroupKey of the flow.
66+
*/
67+
FlowGroupKey getFlowKey(Nemea::UnirecRecordView& view);
68+
69+
/**
70+
* @brief Adds FLOW_GROUP_KEY field to the output Unirec record.
71+
* @param inputRecord The input Unirec record view to get field values from.
72+
* @param outputRecord The output Unirec record where FLOW_GROUP_KEY will be added.
73+
*/
74+
void addFlowKey(Nemea::UnirecRecordView& inputRecord, Nemea::UnirecRecord& outputRecord);
75+
76+
/**
77+
* @brief Sets the telemetry directory for the flowGrouper.
78+
* @param directory directory for flowGrouper telemetry.
79+
*/
80+
void setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory);
81+
82+
/**
83+
* @brief Update Unirec Id of required fields after template format change.
84+
*/
85+
void updateUnirecIds();
86+
87+
const std::string getOutputFieldName() const {
88+
return "FLOW_GROUP_KEY";
89+
}
90+
91+
const std::string getOutputTemplate(std::string inputTemplate) const {
92+
//check if input template already contains output field
93+
if (inputTemplate.find(" "+getOutputFieldName()) != std::string::npos) {
94+
return inputTemplate;
95+
}
96+
return inputTemplate + ", uint64 " + getOutputFieldName();
97+
}
98+
99+
private:
100+
FlowGrouperHashMap m_hashMap; ///< Hash map to keep flows
101+
102+
uint32_t m_newInserted {0}; ///< Count of new groups
103+
uint32_t m_replaced {0}; ///< Count of replaced groups
104+
uint32_t m_found {0}; ///< Count of when groupkey was found
105+
106+
telemetry::Holder m_holder;
107+
108+
struct UnirecIdStorage {
109+
ur_field_id_t srcIpId; ///< Unirec ID of source ip.
110+
ur_field_id_t dstIpId; ///< Unirec ID of destination ip.
111+
ur_field_id_t srcPortId; ///< Unirec ID of source port.
112+
ur_field_id_t dstPortId; ///< Unirec ID of destination port.
113+
ur_field_id_t protocolId; ///< Unirec ID of protocol field.
114+
115+
ur_field_id_t flowGroupKeyId; ///< Unirec ID of FLOW_GROUP_KEY field.
116+
};
117+
118+
UnirecIdStorage m_ids; ///< Ids of Unirec fields used by flowGrouper module
119+
};
120+
121+
} // namespace FlowGrouper

0 commit comments

Comments
 (0)