Skip to content

Commit 2b78df1

Browse files
GalaxPjaroslavpesek
authored andcommitted
Flow Scatter - caching of rule branches
1 parent 646b090 commit 2b78df1

3 files changed

Lines changed: 83 additions & 13 deletions

File tree

modules/flowScatter/src/flowScatter.cpp

Lines changed: 50 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,21 @@ FlowScatter::FlowScatter(size_t numOutputs, std::string rule)
169169
m_logger->info("Initializing FlowScatter with {} outputs", numOutputs);
170170
m_logger->info("Rule string: '{}'", rule);
171171
ruleParse(rule);
172+
// Resolve fields for the current UniRec template at construction time.
173+
try {
174+
changeTemplate();
175+
} catch (const std::exception& ex) {
176+
m_logger->warn("Unable to fully resolve rule fields at construction: {}", ex.what());
177+
// We don't fail here. Fields will be resolved when template is available
178+
// (e.g., on first format change notification).
179+
}
172180
m_logger->info("FlowScatter initialization completed successfully");
173181
}
174182

175183
void FlowScatter::ruleParse(const std::string& rule)
176184
{
177185
m_rules.branches.clear();
186+
m_cachedBranches.clear();
178187

179188
std::istringstream ruleStream(rule);
180189
std::string branchStr;
@@ -248,6 +257,41 @@ void FlowScatter::ruleParse(const std::string& rule)
248257
}
249258
}
250259

260+
void FlowScatter::changeTemplate()
261+
{
262+
m_cachedBranches.clear();
263+
264+
for (const auto& branch : m_rules.branches) {
265+
CachedBranch cb;
266+
267+
if (!branch.conditionalFieldId.empty()) {
268+
int fieldId = ur_get_id_by_name(branch.conditionalFieldId.c_str());
269+
if (fieldId < 0) {
270+
throw std::runtime_error("Conditional field not found: " + branch.conditionalFieldId);
271+
}
272+
cb.conditionalId = static_cast<ur_field_id_t>(fieldId);
273+
cb.conditionalType = ur_get_type(cb.conditionalId);
274+
}
275+
276+
cb.fieldIds.reserve(branch.fieldNames.size());
277+
cb.fieldTypes.reserve(branch.fieldNames.size());
278+
279+
for (const auto& fname : branch.fieldNames) {
280+
int fieldId = ur_get_id_by_name(fname.c_str());
281+
if (fieldId < 0) {
282+
throw std::runtime_error("Field for hashing not found in template: " + fname);
283+
}
284+
auto fid = static_cast<ur_field_id_t>(fieldId);
285+
cb.fieldIds.push_back(fid);
286+
cb.fieldTypes.push_back(ur_get_type(fid));
287+
}
288+
289+
m_cachedBranches.push_back(std::move(cb));
290+
}
291+
292+
m_logger->info("Resolved {} cached rule branches for current UniRec template", m_cachedBranches.size());
293+
}
294+
251295
size_t FlowScatter::outputIndex(UnirecRecordView& record)
252296
{
253297
m_totalRecords++;
@@ -261,25 +305,20 @@ size_t FlowScatter::outputIndex(UnirecRecordView& record)
261305

262306
std::vector<uint8_t> hashInput;
263307

264-
// Iterate through conditional rules
265-
for (const auto& branch : m_rules.branches) {
308+
// Iterate through cached conditional rules
309+
for (const auto& cb : m_cachedBranches) {
266310
bool useThisRule = false;
267311

268-
if (branch.conditionalFieldId.empty()) {
312+
if (cb.conditionalId == 0) {
269313
useThisRule = true;
270314
} else {
271-
auto fieldId = static_cast<ur_field_id_t>(ur_get_id_by_name(branch.conditionalFieldId.c_str()));
272-
auto fieldType = ur_get_type(fieldId);
273-
useThisRule = checkNonZeroValue(fieldId, fieldType, record);
315+
useThisRule = checkNonZeroValue(cb.conditionalId, cb.conditionalType, record);
274316
}
275317

276318
if (useThisRule) {
277-
// Concatenate all fields for this rule
278319
hashInput.clear();
279-
for (const auto& field : branch.fieldNames) {
280-
auto hashFieldId = static_cast<ur_field_id_t>(ur_get_id_by_name(field.c_str()));
281-
auto hashFieldType = ur_get_type(hashFieldId);
282-
appendFieldToHash(hashInput, hashFieldId, hashFieldType, record);
320+
for (size_t i = 0; i < cb.fieldIds.size(); ++i) {
321+
appendFieldToHash(hashInput, cb.fieldIds[i], cb.fieldTypes[i], record);
283322
}
284323
break;
285324
}

modules/flowScatter/src/flowScatter.hpp

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,18 @@ class FlowScatter {
8787
*/
8888
size_t outputIndex(UnirecRecordView& record);
8989

90+
/**
91+
* @brief Re-resolve field ids/types after a UniRec template/format change.
92+
*
93+
* Call this when the UniRec template changes (format change). This will
94+
* resolve all field names from the parsed rules to UniRec field ids and
95+
* cache their types so `outputIndex` does not need to look them up per-record.
96+
*
97+
* This method may throw if any required field is not present in the
98+
* current UniRec template.
99+
*/
100+
void changeTemplate();
101+
90102
/**
91103
* @brief Returns the current flow scatter statistics.
92104
* @return The current flow scatter statistics.
@@ -102,6 +114,16 @@ class FlowScatter {
102114
Rules m_rules;
103115
std::shared_ptr<spdlog::logger> m_logger = Nm::loggerGet("FlowScatter");
104116

117+
/** Cached mapping of rule branches to UniRec field ids and types. */
118+
struct CachedBranch {
119+
ur_field_id_t conditionalId = 0;
120+
ur_field_type_t conditionalType = static_cast<ur_field_type_t>(0);
121+
std::vector<ur_field_id_t> fieldIds;
122+
std::vector<ur_field_type_t> fieldTypes;
123+
};
124+
125+
std::vector<CachedBranch> m_cachedBranches;
126+
105127
};
106128

107129
} // namespace FlowScatter

modules/flowScatter/src/main.cpp

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ void signalHandler(int signum)
4545
* @param outputInterfaces Output interfaces for Unirec communication.
4646
*/
4747
void handleFormatChange(UnirecInputInterface& inputInterface,
48-
std::vector<UnirecOutputInterface>& outputInterfaces)
48+
std::vector<UnirecOutputInterface>& outputInterfaces,
49+
Fs::FlowScatter& scatter)
4950
{
5051
inputInterface.changeTemplate();
5152
uint8_t dataType; const char* spec = nullptr;
@@ -55,6 +56,14 @@ void handleFormatChange(UnirecInputInterface& inputInterface,
5556
for (auto& outIfc : outputInterfaces) {
5657
outIfc.changeTemplate(spec);
5758
}
59+
// Notify scatter so it can resolve UniRec field ids/types once per format change.
60+
try {
61+
scatter.changeTemplate();
62+
} catch (const std::exception& ex) {
63+
Nm::loggerGet("main")->warn("FlowScatter: unable to resolve fields after format change: {}", ex.what());
64+
// Don't rethrow — module can continue and will fallback to round-robin until
65+
// fields are available.
66+
}
5867
}
5968

6069
/**
@@ -99,7 +108,7 @@ void processUnirecRecords(UnirecInputInterface& inputInterface,
99108
try {
100109
processNextRecord(inputInterface, outputInterfaces, scatter);
101110
} catch (FormatChangeException& ex) {
102-
handleFormatChange(inputInterface, outputInterfaces);
111+
handleFormatChange(inputInterface, outputInterfaces, scatter);
103112
} catch (EoFException& ex) {
104113
break;
105114
} catch (std::exception& ex) {

0 commit comments

Comments
 (0)