Skip to content

Commit 20df68a

Browse files
committed
Deduplicator - Add main.cpp
1 parent 9729680 commit 20df68a

1 file changed

Lines changed: 196 additions & 0 deletions

File tree

modules/deduplicator/src/main.cpp

Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
/**
2+
* @file
3+
* @author Damir Zainullin <zaidamilda@gmail.com>
4+
* @brief Deduplication Module: Deduplicate flowdata
5+
*
6+
* This file contains the main function and supporting functions for the Unirec Deduplication
7+
* Module. This module process Unirec records thourgh a bidirectional interface and ignores flows
8+
* that seem to be duplicates of already processed flows.
9+
*
10+
* SPDX-License-Identifier: BSD-3-Clause
11+
*/
12+
13+
#include "deduplicator.hpp"
14+
#include "logger/logger.hpp"
15+
#include "unirec/unirec-telemetry.hpp"
16+
17+
#include <appFs.hpp>
18+
#include <argparse/argparse.hpp>
19+
#include <iostream>
20+
#include <stdexcept>
21+
#include <telemetry.hpp>
22+
#include <unirec++/unirec.hpp>
23+
24+
using namespace Nemea;
25+
26+
/**
27+
* @brief Handle a format change exception by adjusting the template.
28+
*
29+
* This function is called when a `FormatChangeException` is caught in the main loop.
30+
* It adjusts the template in the bidirectional interface to handle the format change.
31+
*
32+
* @param biInterface Bidirectional interface for Unirec communication.
33+
* @param deduplicator Deduplicator instance.
34+
*/
35+
void handleFormatChange(
36+
UnirecBidirectionalInterface& biInterface,
37+
Deduplicator::Deduplicator& deduplicator)
38+
{
39+
biInterface.changeTemplate();
40+
deduplicator.updateUnirecIds();
41+
}
42+
43+
/**
44+
* @brief Process the next Unirec record and sample them.
45+
*
46+
* This function receives the next Unirec record through the bidirectional interface
47+
* saves it the hash map and send non-duplicate flows back to the bidirectional interface.
48+
*
49+
* @param biInterface Bidirectional interface for Unirec communication.
50+
* @param deduplicator Deduplicator instance to process flows.
51+
*/
52+
void processNextRecord(
53+
UnirecBidirectionalInterface& biInterface,
54+
Deduplicator::Deduplicator& deduplicator)
55+
{
56+
std::optional<UnirecRecordView> unirecRecord = biInterface.receive();
57+
if (!unirecRecord) {
58+
return;
59+
}
60+
61+
if (!deduplicator.isDuplicate(*unirecRecord)) {
62+
biInterface.send(*unirecRecord);
63+
}
64+
}
65+
66+
/**
67+
* @brief Process Unirec records.
68+
*
69+
* The `processUnirecRecords` function continuously receives Unirec records through the provided
70+
* bidirectional interface (`biInterface`) and process them. The loop runs indefinitely until
71+
* an end-of-file condition is encountered.
72+
*
73+
* @param biInterface Bidirectional interface for Unirec communication.
74+
* @param deduplicator Deduplicator instance to process flows.
75+
*/
76+
void processUnirecRecords(
77+
UnirecBidirectionalInterface& biInterface,
78+
Deduplicator::Deduplicator& deduplicator)
79+
{
80+
while (true) {
81+
try {
82+
processNextRecord(biInterface, deduplicator);
83+
} catch (FormatChangeException& ex) {
84+
handleFormatChange(biInterface, deduplicator);
85+
} catch (const EoFException& ex) {
86+
break;
87+
} catch (const std::exception& ex) {
88+
throw;
89+
}
90+
}
91+
}
92+
93+
int main(int argc, char** argv)
94+
{
95+
argparse::ArgumentParser program("Unirec Deduplicator");
96+
97+
Unirec unirec({1, 1, "deduplicator", "Unirec deduplicator module"});
98+
99+
Nm::loggerInit();
100+
auto logger = Nm::loggerGet("main");
101+
102+
try {
103+
unirec.init(argc, argv);
104+
} catch (const HelpException& ex) {
105+
std::cerr << program;
106+
return EXIT_SUCCESS;
107+
} catch (const std::exception& ex) {
108+
logger->error(ex.what());
109+
return EXIT_FAILURE;
110+
}
111+
112+
try {
113+
program.add_argument("-s", "--size")
114+
.required()
115+
.help("Size of the hash map. Default value is 2^20 for 1'048'576 records.")
116+
.default_value(Deduplicator::Deduplicator::DeduplicatorHashMap::
117+
TimeoutHashMapParameters::DEFAULT_HASHMAP_EXPONENT)
118+
.scan<'i', int>();
119+
program.add_argument("-t", "--timeout")
120+
.required()
121+
.help(
122+
"Count of millisecond to consider flows as duplicates. Default value is 5000(5s).")
123+
.default_value(Deduplicator::Deduplicator::DEFAULT_HASHMAP_TIMEOUT)
124+
.scan<'i', int>();
125+
program.add_argument("-m", "--appfs-mountpoint")
126+
.required()
127+
.help("path where the appFs directory will be mounted")
128+
.default_value(std::string(""));
129+
program.parse_args(argc, argv);
130+
} catch (const std::exception& ex) {
131+
logger->error(ex.what());
132+
std::cerr << program;
133+
return EXIT_FAILURE;
134+
}
135+
136+
std::shared_ptr<telemetry::Directory> telemetryRootDirectory;
137+
telemetryRootDirectory = telemetry::Directory::create();
138+
139+
std::unique_ptr<telemetry::appFs::AppFsFuse> appFs;
140+
141+
try {
142+
auto mountPoint = program.get<std::string>("--appfs-mountpoint");
143+
if (!mountPoint.empty()) {
144+
const bool tryToUnmountOnStart = true;
145+
const bool createMountPoint = true;
146+
appFs = std::make_unique<telemetry::appFs::AppFsFuse>(
147+
telemetryRootDirectory,
148+
mountPoint,
149+
tryToUnmountOnStart,
150+
createMountPoint);
151+
appFs->start();
152+
}
153+
} catch (std::exception& ex) {
154+
logger->error(ex.what());
155+
return EXIT_FAILURE;
156+
}
157+
158+
try {
159+
const auto tableSize = program.get<uint32_t>("--size");
160+
if (tableSize <= 0) {
161+
std::cerr << "Table size must be at least 8.\n";
162+
return EXIT_FAILURE;
163+
}
164+
const auto timeout = program.get<uint32_t>("--timeout");
165+
if (timeout <= 0) {
166+
std::cerr << "Timeout must be higher than zero.\n";
167+
return EXIT_FAILURE;
168+
}
169+
170+
UnirecBidirectionalInterface biInterface = unirec.buildBidirectionalInterface();
171+
172+
auto telemetryInputDirectory = telemetryRootDirectory->addDir("input");
173+
const telemetry::FileOps inputFileOps
174+
= {[&biInterface]() { return Nm::getInterfaceTelemetry(biInterface); }, nullptr};
175+
const auto inputFile = telemetryInputDirectory->addFile("stats", inputFileOps);
176+
177+
auto telemetryDeduplicatorDirectory = telemetryRootDirectory->addDir("deduplicator");
178+
179+
Deduplicator::Deduplicator::DeduplicatorHashMap::TimeoutHashMapParameters parameters;
180+
parameters.bucketCountExponent = tableSize;
181+
parameters.timeout = timeout;
182+
183+
Deduplicator::Deduplicator deduplicator(parameters);
184+
deduplicator.setTelemetryDirectory(telemetryDeduplicatorDirectory);
185+
biInterface.setRequieredFormat(
186+
"uint16 SRC_PORT, uint16 DST_PORT, ipaddr DST_IP,ipaddr SRC_IP, uint64 LINK_BIT_FIELD, "
187+
"uint8 PROTOCOL, time TIME_LAST");
188+
processUnirecRecords(biInterface, deduplicator);
189+
190+
} catch (std::exception& ex) {
191+
logger->error(ex.what());
192+
return EXIT_FAILURE;
193+
}
194+
195+
return EXIT_SUCCESS;
196+
}

0 commit comments

Comments
 (0)