Skip to content

Commit c821240

Browse files
author
Pavel Siska
committed
Clickhouse - introduce main.cpp
Integrate module into main file
1 parent b89bb92 commit c821240

1 file changed

Lines changed: 171 additions & 3 deletions

File tree

modules/clickhouse/src/main.cpp

Lines changed: 171 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,175 @@
1+
/**
2+
* @file main.cpp
3+
* @author Daniel Pelanek <xpeland00@vutbr.cz>
4+
* @brief Clickhouse Module: resend flowdata to clickhouse
5+
*
6+
* This file contains the main function and supporting functions for the Unirec Clickhouse Module.
7+
* This module takes Unirec records from a unidirectional interface, converts them to
8+
* Clickhouse format buffers them and then sends them to the specified Clickhouse server in config.
9+
* It utilizes the Unirec++ library for record handling, argparse for command-line argument parsing,
10+
* rapidxml for config parsing and clickhouse cpp library.
11+
* Ported from ipfixcol2 clickhouse plugin.
12+
*
13+
* SPDX-License-Identifier: BSD-3-Clause
14+
*/
15+
16+
#include "clickhouse.hpp"
17+
#include "config.hpp"
18+
#include "logger/logger.hpp"
19+
#include "manager.hpp"
20+
21+
#include <argparse/argparse.hpp>
22+
#include <csignal>
23+
#include <iostream>
24+
#include <stdexcept>
25+
#include <unirec++/unirec.hpp>
26+
27+
using namespace Nemea;
28+
29+
static std::atomic<bool> g_stopFlag(false);
30+
31+
static void signalHandler(int signum)
32+
{
33+
auto logger = Nm::loggerGet("main");
34+
logger->info("Interrupt signal {} received", signum);
35+
g_stopFlag.store(true);
36+
}
37+
38+
/**
39+
* @brief Handle format change exception by adjusting the template and check template
40+
* against the one defined in config.
41+
*
42+
* This function is called when a `FormatChangeException` is caught in the main loop.
43+
* It adjusts the template in the input interface to handle the format change but in this
44+
* case the program only continues if the template is the same as defined in config. Meaning
45+
* it only continues if the template changes to the same one.
46+
*
47+
* @param interface input interface for Unirec communication.
48+
* @param manager Manager instance which buffers and sends data to clickhouse.
49+
*/
50+
static void handleFormatChange(UnirecInputInterface& interface, Manager& manager)
51+
{
52+
interface.changeTemplate();
53+
54+
ur_template_t* changedTemplate = interface.getTemplate();
55+
auto res = std::unique_ptr<char, decltype(&free)>(
56+
ur_template_string_delimiter(changedTemplate, ','),
57+
&free);
58+
59+
const Config cfg = manager.getConfig();
60+
61+
if (cfg.templateColumnCsv != res.get()) {
62+
throw std::runtime_error(
63+
"Template in input interface doesn't match template in configuration.");
64+
}
65+
66+
manager.updateFieldIDs();
67+
}
68+
69+
/**
70+
* @brief Process unirec record in manager and forward to
71+
*
72+
* @param interface input interface for Unirec communication.
73+
* @param manager Manager instance which buffers and sends data to clickhouse.
74+
*/
75+
static void processNextRecord(UnirecInputInterface& interface, Manager& manager)
76+
{
77+
std::optional<UnirecRecordView> unirecRecord = interface.receive();
78+
if (!unirecRecord) {
79+
return;
80+
}
81+
82+
manager.processRecord(*unirecRecord);
83+
}
84+
85+
/**
86+
* @brief Process Unirec records.
87+
*
88+
* The `processUnirecRecords` function continuously receives Unirec records through the provided
89+
* input interface (`interface`). Each received record is processed, buffered and
90+
* then sent to a clickhouse database.
91+
*
92+
* @param interface input interface for Unirec communication.
93+
* @param manager Manager instance which buffers and sends data to clickhouse.
94+
*/
95+
static void processUnirecRecords(UnirecInputInterface& interface, Manager& manager)
96+
{
97+
while (!g_stopFlag.load()) {
98+
try {
99+
processNextRecord(interface, manager);
100+
} catch (FormatChangeException& ex) {
101+
handleFormatChange(interface, manager);
102+
} catch (EoFException& ex) {
103+
break;
104+
} catch (std::exception& ex) {
105+
throw;
106+
}
107+
}
108+
}
109+
1110
int main(int argc, char** argv)
2111
{
3-
(void) argc;
4-
(void) argv;
112+
argparse::ArgumentParser program("Unirec Clickhouse");
113+
114+
program.add_argument("-c", "--config")
115+
.required()
116+
.help("specify the xml config file. Format is in readme.")
117+
.metavar("xml_file");
118+
119+
Unirec unirec({1, 0, "clickhouse", "Unirec clickhouse module"});
120+
121+
Nm::loggerInit();
122+
auto logger = Nm::loggerGet("main");
123+
124+
signal(SIGINT, signalHandler);
125+
126+
try {
127+
unirec.init(argc, argv);
128+
} catch (HelpException& ex) {
129+
std::cerr << program;
130+
return EXIT_SUCCESS;
131+
} catch (std::exception& ex) {
132+
logger->error(ex.what());
133+
return EXIT_FAILURE;
134+
}
135+
136+
try {
137+
program.parse_args(argc, argv);
138+
} catch (const std::exception& ex) {
139+
logger->error(ex.what());
140+
return EXIT_FAILURE;
141+
}
142+
143+
Config config;
144+
try {
145+
config = parseConfig(program.get<std::string>("--config"));
146+
147+
} catch (const std::exception& ex) {
148+
logger->error(ex.what());
149+
return EXIT_FAILURE;
150+
}
151+
152+
std::unique_ptr<Manager> manager;
153+
try {
154+
manager = std::make_unique<Manager>(config);
155+
} catch (const std::exception& ex) {
156+
logger->error(ex.what());
157+
return EXIT_FAILURE;
158+
}
159+
160+
try {
161+
UnirecInputInterface interface = unirec.buildInputInterface();
162+
163+
processUnirecRecords(interface, *manager);
164+
165+
logger->info("here");
166+
167+
manager->stop();
168+
169+
} catch (std::exception& ex) {
170+
logger->error(ex.what());
171+
return EXIT_FAILURE;
172+
}
5173

6-
return 0;
174+
return EXIT_SUCCESS;
7175
}

0 commit comments

Comments
 (0)