Skip to content

Commit ec647d7

Browse files
authored
Merge pull request #42 from CESNET/deduplicator-module
Deduplicator module
2 parents 8e87067 + 9d60f88 commit ec647d7

17 files changed

Lines changed: 1124 additions & 1 deletion

CMakeLists.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ option(NM_NG_BUILD_WITH_UBSAN "Build with Undefined Behavior Sanitizer (only f
2323

2424
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -pedantic -Wall -Wextra -Wunused -Wconversion -Wsign-conversion")
2525
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O3 -Werror")
26-
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -ggdb3")
26+
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -g -ggdb3 -fsanitize=address")
2727

2828
if (NM_NG_BUILD_WITH_ASAN)
2929
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -fno-omit-frame-pointer -fsanitize=address -fsanitize-recover=address")

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,3 +6,4 @@ functionality/purposes are:
66

77
* [Sampler](modules/sampler/): sample records at the given rate.
88
* [Telemetry](modules/telemetry/): provides unirec telemetry of the input interface.
9+
* [Deduplicator](modules/deduplicator/): omit duplicate records.

common/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ target_link_libraries(common PUBLIC
1616
unirec::unirec++
1717
)
1818

19+
1920
target_include_directories(common PUBLIC
2021
include
2122
spdlog::spdlog

common/external/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,3 +7,4 @@ include(telemetry.cmake)
77
include(spdlog.cmake)
88
include(rapidcsv.cmake)
99
include(argparse.cmake)
10+
include(xxhash.cmake)

common/external/xxhash.cmake

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# XXHash library (C librabry that provides hash functions)
2+
3+
set(GIT_REPO https://github.com/Cyan4973/xxHash)
4+
5+
FetchContent_Declare(
6+
xxhash
7+
GIT_REPOSITORY ${GIT_REPO}
8+
GIT_TAG v0.8.2
9+
)
10+
11+
FetchContent_MakeAvailable(xxhash)
12+
13+
set(XXHASH_SRC
14+
${xxhash_SOURCE_DIR}/xxhash.c
15+
)
16+
17+
add_library(xxhash STATIC ${XXHASH_SRC})
18+
19+
target_include_directories(xxhash PUBLIC ${xxhash_SOURCE_DIR})

modules/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
add_subdirectory(sampler)
22
add_subdirectory(telemetry)
3+
add_subdirectory(deduplicator)
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
add_subdirectory(src)

modules/deduplicator/README.md

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Deduplicator module - README
2+
3+
## Description
4+
The module is used to avoid forwarding duplicate Unirec records
5+
that appear when the same flow is exported twice on different exporters and sent to same collector.
6+
It identifies and forwards only unique records, ignoring records that have already been seen.
7+
The storage is provided by hash map.
8+
9+
## Interfaces
10+
- Input: 1
11+
- Output: 1
12+
13+
## Parameters
14+
### Common TRAP parameters
15+
- `-h [trap,1]` Print help message for this module / for libtrap specific parameters.
16+
- `-i IFC_SPEC` Specification of interface types and their parameters.
17+
- `-v` Be verbose.
18+
- `-vv` Be more verbose.
19+
- `-vvv` Be even more verbose.
20+
21+
### Module specific parameters
22+
- `-s, --size <int>` Count of records that hash table can keep simultaneously. Default value is 2^20
23+
- `-t, --timeout <int>` Time to consider similar flows as duplicates in milliseconds. Default value 5000(5s)
24+
- `-m, --appfs-mountpoint <path>` Path where the appFs directory will be mounted
25+
26+
## Identification of duplicates flows
27+
Flows are considered as duplicates when they:
28+
- arrive to the collector with less than `--timeout` delay
29+
- have same source and destination ip addresses, ports and protocol field value
30+
- have distinct `LINK_BIT_FIELD` values
31+
32+
## Usage Examples
33+
```
34+
# Data from the input unix socket interface "in" is processed, and entries that
35+
are duplicates of entries received during last 1000 milliseconds are omitted, other are forwarded to the
36+
output interface "out." Transient storage is hash map with 2^15 records.
37+
38+
$ deduplicator -i "u:in,u:out" -s 15 -t 1000
39+
```
40+
41+
## Telemetry data format
42+
```
43+
├─ input/
44+
│ └─ stats
45+
└─ deduplicator/
46+
└─ statistics
47+
```
48+
49+
Statistics file contains counts of flows :
50+
- Replaced flows - flows that were inserted to the bucket and the oldest flow from the bucket is removed.
51+
- Deduplicated flows - flows that were identified as duplicates and were omitted.
52+
- Inserted flows - flows that were normally inserted (not Replaced nor Deduplicated).
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
add_executable(deduplicator
2+
main.cpp
3+
deduplicator.cpp
4+
)
5+
6+
target_link_libraries(deduplicator PRIVATE
7+
telemetry::telemetry
8+
telemetry::appFs
9+
common
10+
rapidcsv
11+
unirec::unirec++
12+
unirec::unirec
13+
trap::trap
14+
argparse
15+
xxhash
16+
)
17+
18+
install(TARGETS deduplicator DESTINATION ${INSTALL_DIR_BIN})
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/**
2+
* @file
3+
* @author Damir Zainullin <zaidamilda@gmail.com>
4+
* @brief Definition of the Deduplicator class
5+
*
6+
* SPDX-License-Identifier: BSD-3-Clause
7+
*/
8+
9+
#include "deduplicator.hpp"
10+
11+
#include <stdexcept>
12+
#include <type_traits>
13+
#include <xxhash.h>
14+
15+
using namespace Nemea;
16+
17+
namespace Deduplicator {
18+
19+
template <typename Key>
20+
static uint64_t xxHasher(const Key& key)
21+
{
22+
return XXH3_64bits(reinterpret_cast<const void*>(&key), sizeof(key));
23+
}
24+
25+
static Deduplicator::Timestamp timeSum(const Deduplicator::Timestamp& value, uint64_t timeout)
26+
{
27+
return value + std::chrono::milliseconds(timeout);
28+
}
29+
30+
static ur_field_id_t getUnirecIdByName(const char* str)
31+
{
32+
auto unirecId = ur_get_id_by_name(str);
33+
if (unirecId == UR_E_INVALID_NAME) {
34+
throw std::runtime_error(std::string("Invalid Unirec name:") + str);
35+
}
36+
return static_cast<ur_field_id_t>(unirecId);
37+
}
38+
39+
Deduplicator::Deduplicator(const DeduplicatorHashMap::TimeoutHashMapParameters& parameters)
40+
: m_hashMap(parameters, xxHasher<FlowKey>, std::less<>(), timeSum)
41+
{
42+
constexpr const size_t timeoutBucketSize = 256;
43+
static_assert(
44+
sizeof(DeduplicatorHashMap::HashMapTimeoutBucket) == timeoutBucketSize,
45+
"TimeoutBucket size is not 256 bytes");
46+
}
47+
48+
void Deduplicator::updateUnirecIds()
49+
{
50+
m_ids.srcIpId = getUnirecIdByName("SRC_IP");
51+
m_ids.dstIpId = getUnirecIdByName("DST_IP");
52+
m_ids.srcPortId = getUnirecIdByName("SRC_PORT");
53+
m_ids.dstPortId = getUnirecIdByName("DST_PORT");
54+
m_ids.protocolId = getUnirecIdByName("PROTOCOL");
55+
m_ids.linkBitFieldId = getUnirecIdByName("LINK_BIT_FIELD");
56+
m_ids.timeLastId = getUnirecIdByName("TIME_LAST");
57+
}
58+
59+
bool Deduplicator::isDuplicate(UnirecRecordView& view)
60+
{
61+
FlowKey flowKey;
62+
flowKey.srcIp = view.getFieldAsType<IpAddress>(m_ids.srcIpId);
63+
flowKey.dstIp = view.getFieldAsType<IpAddress>(m_ids.dstIpId);
64+
flowKey.srcPort = view.getFieldAsType<uint16_t>(m_ids.srcPortId);
65+
flowKey.dstPort = view.getFieldAsType<uint16_t>(m_ids.dstPortId);
66+
flowKey.proto = view.getFieldAsType<uint8_t>(m_ids.protocolId);
67+
auto linkBitField = view.getFieldAsType<uint64_t>(m_ids.linkBitFieldId);
68+
69+
const auto [it, insertResult]
70+
= m_hashMap.insert({flowKey, linkBitField}, std::chrono::steady_clock::now());
71+
72+
if (insertResult == DeduplicatorHashMap::HashMapTimeoutBucket::InsertResult::INSERTED) {
73+
m_inserted++;
74+
return false;
75+
}
76+
if (insertResult == DeduplicatorHashMap::HashMapTimeoutBucket::InsertResult::REPLACED) {
77+
m_replaced++;
78+
return false;
79+
}
80+
if (*it != linkBitField) {
81+
m_deduplicated++;
82+
return true;
83+
}
84+
m_inserted++;
85+
return false;
86+
}
87+
88+
void Deduplicator::setTelemetryDirectory(const std::shared_ptr<telemetry::Directory>& directory)
89+
{
90+
m_holder.add(directory);
91+
92+
const telemetry::FileOps fileOps
93+
= {[this]() {
94+
telemetry::Dict dict;
95+
dict["replacedCount"] = telemetry::Scalar((long unsigned int) m_replaced);
96+
dict["insertedCount"] = telemetry::Scalar((long unsigned int) m_inserted);
97+
dict["deduplicatedCount"] = telemetry::Scalar((long unsigned int) m_deduplicated);
98+
return dict;
99+
},
100+
nullptr};
101+
102+
m_holder.add(directory->addFile("statistics", fileOps));
103+
}
104+
105+
} // namespace Deduplicator

0 commit comments

Comments
 (0)