From a7f599009cdffec58f0f92bdd7136e8fd2865337 Mon Sep 17 00:00:00 2001 From: mikemiles-dev Date: Sun, 3 May 2026 15:21:33 -0500 Subject: [PATCH] feat: pluggable TemplateStore for horizontal parser scale-out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a TemplateStore trait so V9 and IPFIX templates can be persisted to and re-read from an external backend such as Redis or NATS KV. With a store configured the parser: - writes through every learned template to the store - consults the store on every primary-cache miss before declaring a template unknown (read-through), repopulating the in-process LRU on hit so subsequent records take the hot path - propagates LRU evictions, RFC 7011 §8.1 template withdrawals, and explicit clear_*_templates calls so the store stays in sync This unblocks running multiple stateless parser replicas behind a UDP load balancer without source-IP-affinity routing — replica B can boot cold and start serving data records for templates that replica A learned, as long as both share the same store. API additions ------------- - TemplateStore trait (get / put / remove with explicit error semantics: Ok(None) means absent; Err means backend failure) - TemplateStoreKey { scope: Arc, kind: TemplateKind, template_id } - TemplateKind enum: V9Data, V9Options, IpfixData, IpfixOptions, IpfixV9Data, IpfixV9Options - TemplateStoreError { Backend(Box<...>), Codec(String) } - InMemoryTemplateStore reference impl (Mutex) - NetflowParserBuilder::with_template_store(Arc) - NetflowParserBuilder::with_template_store_scope(impl Into>) - NetflowParser::set_template_store_scope(impl Into>) - TemplateEvent::Restored variant — fires when a template is pulled in via read-through; observability tools that count Learned can also count Restored after a parser restart - Three new CacheMetrics counters: template_store_restored, template_store_codec_errors, template_store_backend_errors Wire format ----------- A small versioned binary format (WIRE_VERSION = 1) is used to encode templates as opaque Vec payloads. The store sees only bytes — no serde_json or other runtime serializer is added to the dependency tree. Codec errors on read are counted in metrics AND the corrupted key is removed so a fresh template announce can repopulate cleanly. The module docs cover the upgrade story for future wire-version bumps (drain-before-upgrade or version-namespaced scope). Multi-source ------------ AutoScopedParser auto-derives a per-source scope of the form "v9:{addr}/{source_id}", "ipfix:{addr}/{obs_domain}", or "legacy:{addr}" so two exporters using the same template ID with different layouts do not collide in the store. Performance ----------- Hot path impact when no store is configured: a single Option::is_none branch. With a store configured: zero atomic refcount bumps on the read-through path. The store handle is held as a borrowed &Arc rather than cloned; every other field touched (metrics, templates, scope, etc.) is accessed via direct field access so the borrow checker can split disjoint borrows. Method calls that would re-borrow the whole struct (`self.store_key(...)`, `template.is_valid(self)`) are avoided by inlining or using limit-taking validation variants. The scope is held as Arc rather than String so the per-key clone is a refcount bump rather than a heap allocation. Validation ---------- V9 Template / OptionsTemplate gain `is_valid_with_limits` methods that take numeric limits instead of a parser reference. The IPFIX CommonTemplate trait gains the same. The live-parse and read-through paths both call these helpers so validation rules cannot drift between paths. Pending-flow replay ------------------- Templates restored via read-through are added to the per-parse "learned IDs" set passed to pending-flow replay, so queued data records for a previously-missing template resolve as soon as the template is recovered from the store, not only when the exporter re-announces it. Tests ----- 17 integration tests in tests/template_store.rs cover: - write-through on learn (V9 + IPFIX) - cross-replica read-through (V9 + IPFIX + IPFIX options) - baseline behavior unchanged when no store is configured - clear_*_templates propagating to the store - IPFIX template withdrawal propagating to the store - AutoScopedParser per-source scope isolation - backend Err propagation (FaultStore fault injection) - corrupted-payload codec rejection + cleanup - LRU eviction on a full cache propagating to the store - duplicate-template-ID write-through overwriting - TemplateEvent::Restored event firing - pending-flow replay after read-through - set_template_store_scope retrofit after build All 216 lib + 17 integration + 66 doctests pass. Docs ---- README gains a "Pluggable Template Storage (Horizontal Scale-Out)" section under the Template Management Guide with usage and a backend-impl sketch. RELEASES.md notes the feature under 1.0.3. --- README.md | 72 +++ RELEASES.md | 33 + src/lib.rs | 160 ++++- src/scoped_parser.rs | 23 +- src/template_store.rs | 627 +++++++++++++++++++ src/tests.rs | 14 + src/variable_versions/config.rs | 15 + src/variable_versions/ipfix/parser.rs | 449 +++++++++++++- src/variable_versions/ipfix/types.rs | 28 +- src/variable_versions/metrics.rs | 44 ++ src/variable_versions/template_events.rs | 14 + src/variable_versions/v9/parser.rs | 305 +++++++++- tests/template_store.rs | 740 +++++++++++++++++++++++ 13 files changed, 2499 insertions(+), 25 deletions(-) create mode 100644 src/template_store.rs create mode 100644 tests/template_store.rs diff --git a/README.md b/README.md index b5aa07b..d894329 100644 --- a/README.md +++ b/README.md @@ -789,6 +789,78 @@ let router_builder = NetflowParser::builder() let mut scoped = RouterScopedParser::::try_with_builder(router_builder).expect("valid config"); ``` +### Pluggable Template Storage (Horizontal Scale-Out) + +When you scale flow ingestion across multiple parser instances behind a UDP +load balancer, every replica needs to observe the templates announced to +*any* replica — otherwise a data record routed to a fresh pod will be queued +or dropped because that pod has never seen the template. The +[`TemplateStore`] trait is the extension point that solves this. + +With a store configured, the parser: + +1. **Writes through** every successfully learned template to the store as + opaque bytes (a small custom binary wire format — no `serde_json` or + other runtime serializer is added to the dependency tree). +2. **Reads through** the store on every cache miss before declaring a + template unknown, repopulating the in-process LRU on hit so subsequent + records are served from the hot path. +3. **Propagates** LRU evictions, RFC 7011 §8.1 template withdrawals, and + explicit `clear_*_templates` calls so the store stays in sync. + +`AutoScopedParser` automatically derives a per-source scope (e.g. +`v9:1.2.3.4:2055/0`, `ipfix:1.2.3.4:2055/42`) so that two exporters using +the same template ID with different layouts do not collide. + +```rust,ignore +use netflow_parser::{AutoScopedParser, InMemoryTemplateStore, NetflowParser}; +use std::sync::Arc; + +// Plug in your own backend: implement TemplateStore for Redis, NATS KV, +// DynamoDB, etc. The reference InMemoryTemplateStore is shown here for +// illustration; in production it would be replaced. +let store = Arc::new(InMemoryTemplateStore::new()); + +let builder = NetflowParser::builder() + .with_template_store(store.clone()); + +// Multi-source: per-exporter scoping is automatic. +let mut parser = AutoScopedParser::try_with_builder(builder).expect("valid"); + +// Replica B can be brought up cold and start serving data records for +// templates Replica A learned, as long as both share the same store. +``` + +Implementing the trait against your backend of choice: + +```rust,ignore +use netflow_parser::{TemplateStore, TemplateStoreError, TemplateStoreKey}; + +#[derive(Debug)] +struct RedisTemplateStore { /* your client */ } + +impl TemplateStore for RedisTemplateStore { + fn get(&self, key: &TemplateStoreKey) -> Result>, TemplateStoreError> { + // GET "{scope}/{kind:?}/{template_id}" from Redis, returning the bytes. + unimplemented!() + } + + fn put(&self, key: &TemplateStoreKey, value: &[u8]) -> Result<(), TemplateStoreError> { + // SET with an appropriate TTL matching your operational envelope. + unimplemented!() + } + + fn remove(&self, key: &TemplateStoreKey) -> Result<(), TemplateStoreError> { + // DEL — must be idempotent for absent keys. + unimplemented!() + } +} +``` + +**Single-source deployments** can still benefit from a store for surviving +parser restarts without losing template state. Set the scope explicitly +(or leave it empty) via `with_template_store_scope`. + ### Template Lifecycle Management #### Template Introspection diff --git a/RELEASES.md b/RELEASES.md index 49e07df..fb046e3 100644 --- a/RELEASES.md +++ b/RELEASES.md @@ -1,5 +1,38 @@ # 1.0.3 +## Features + +* **Pluggable secondary template storage (`TemplateStore`).** Templates can now + be persisted to and re-read from an external backend such as Redis or NATS + KV via a new `TemplateStore` trait. With a store configured the parser + writes through every learned template, consults the store on every cache + miss, and propagates LRU evictions, withdrawals, and explicit clears so the + store stays in sync. This unblocks running multiple stateless parser + instances behind a UDP load balancer without source-IP-affinity routing. + + Wire up via the new builder hooks: + + ```rust + let store = Arc::new(my_redis_backed_store); + let parser = NetflowParser::builder() + .with_template_store(store) + .with_template_store_scope("collector-eu-west-1") + .build()?; + ``` + + `AutoScopedParser` automatically derives a per-source scope so two exporters + using the same template ID with different layouts do not collide in the + store. The trait sees opaque `Vec` payloads encoded with a small custom + binary wire format — no `serde_json` or other runtime serializer is added + to the dependency tree. An `InMemoryTemplateStore` reference impl is + provided for tests. See the new `template_store` module for the protocol. + +* New public API: `TemplateStore`, `TemplateStoreKey`, `TemplateKind`, + `TemplateStoreError`, `InMemoryTemplateStore`, + `NetflowParserBuilder::with_template_store`, + `NetflowParserBuilder::with_template_store_scope`, + `NetflowParser::set_template_store_scope`. + ## Tests * **Restored 34 snapshot-based parser tests** in a new `restored_legacy_tests` module in `src/tests.rs`. These tests had been deleted in PR #210 ("further template validation"), leaving 33 orphaned `.snap` files that were swept up later in PR #262. Coverage included real-world v9/IPFIX captures (`it_parses_v9_ipv6flowlabel`, `it_parses_v9_template_and_data_packet`, `it_parses_ipfix_scappy_example`, mixed-enterprise field templates, multi-template IPFIX, etc.), version-filter checks (`it_doesnt_allow_v5/v7/v9/ipfix`), and re-export round-trip tests (`it_parses_*_and_re_exports`). diff --git a/src/lib.rs b/src/lib.rs index 8c28d60..8930e24 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -6,6 +6,7 @@ pub mod netflow_common; pub mod protocol; pub mod scoped_parser; pub mod static_versions; +pub mod template_store; mod tests; pub mod variable_versions; @@ -47,6 +48,9 @@ pub use variable_versions::template_events::{ }; // Re-export configuration and utility types for convenience +pub use template_store::{ + InMemoryTemplateStore, TemplateKind, TemplateStore, TemplateStoreError, TemplateStoreKey, +}; pub use variable_versions::enterprise_registry::{EnterpriseFieldDef, EnterpriseFieldRegistry}; pub use variable_versions::metrics::{CacheInfo, CacheMetrics, ParserCacheInfo}; pub use variable_versions::ttl::TtlConfig; @@ -274,6 +278,8 @@ pub struct NetflowParserBuilder { requested_versions: Option>, max_error_sample_size: usize, template_hooks: TemplateHooks, + template_store: Option>, + template_store_scope: Arc, } /// Helper to create a `[bool; 11]` allowed_versions array from a set of version numbers. @@ -304,6 +310,15 @@ impl std::fmt::Debug for NetflowParserBuilder { "template_hooks", &format!("{} hooks", self.template_hooks.len()), ) + .field( + "template_store", + &if self.template_store.is_some() { + "configured" + } else { + "none" + }, + ) + .field("template_store_scope", &self.template_store_scope) .finish() } } @@ -317,6 +332,8 @@ impl Default for NetflowParserBuilder { requested_versions: None, max_error_sample_size: 256, template_hooks: TemplateHooks::new(), + template_store: None, + template_store_scope: Arc::from(""), } } } @@ -776,6 +793,52 @@ impl NetflowParserBuilder { self } + /// Attach a [`TemplateStore`] to act as a secondary tier behind the parser's + /// in-process LRU caches. + /// + /// With a store configured, the parser writes through every successfully + /// learned template to the store and consults the store on every cache + /// miss before declaring a template unknown. This lets multiple parser + /// instances behind a UDP load balancer share template state without + /// requiring source-IP-affinity routing. + /// + /// The store sees opaque `Vec` payloads encoded with a small custom + /// binary wire format documented in the [`template_store`] module. See + /// [`TemplateStore`] for the trait contract and threading model. + /// + /// # Examples + /// + /// ```rust + /// use netflow_parser::{NetflowParser, InMemoryTemplateStore}; + /// use std::sync::Arc; + /// + /// let store = Arc::new(InMemoryTemplateStore::new()); + /// let parser = NetflowParser::builder() + /// .with_template_store(store) + /// .build() + /// .expect("Failed to build parser"); + /// ``` + #[must_use = "builder methods consume self and return a new builder; the return value must be used"] + pub fn with_template_store(mut self, store: Arc) -> Self { + self.template_store = Some(store); + self + } + + /// Set the scope string written into every [`TemplateStoreKey`]. + /// + /// Defaults to the empty string, which is appropriate for single-source + /// deployments. For multi-source deployments + /// [`AutoScopedParser`] automatically populates this with the source + /// `SocketAddr` of each per-source parser, so callers usually do not need + /// to set this directly. + /// + /// Accepts anything `Into>` — typically `&str` or `String`. + #[must_use = "builder methods consume self and return a new builder; the return value must be used"] + pub fn with_template_store_scope(mut self, scope: impl Into>) -> Self { + self.template_store_scope = scope.into(); + self + } + /// Validates the builder configuration without constructing a parser. /// /// This is cheaper than [`build`](Self::build) since it only checks that @@ -822,8 +885,14 @@ impl NetflowParserBuilder { /// ``` pub fn build(self) -> Result { self.validate()?; - let v9_parser = V9Parser::try_new(self.v9_config)?; - let ipfix_parser = IPFixParser::try_new(self.ipfix_config)?; + let mut v9_config = self.v9_config; + let mut ipfix_config = self.ipfix_config; + v9_config.template_store = self.template_store.clone(); + v9_config.template_store_scope = Arc::clone(&self.template_store_scope); + ipfix_config.template_store = self.template_store; + ipfix_config.template_store_scope = self.template_store_scope; + let v9_parser = V9Parser::try_new(v9_config)?; + let ipfix_parser = IPFixParser::try_new(ipfix_config)?; Ok(NetflowParser { v9_parser, @@ -1092,6 +1161,21 @@ impl NetflowParser { NetflowParserBuilder::default() } + /// Override the scope string used for [`TemplateStore`] reads/writes by + /// the underlying V9 and IPFIX parsers. + /// + /// Used by [`AutoScopedParser`] to give each per-source parser a scope + /// derived from the exporter's `SocketAddr`. Callers managing their own + /// per-source parsers may also use this to retrofit a scope after the + /// builder has produced a parser. + /// + /// Accepts anything `Into>` — typically `&str` or `String`. + pub fn set_template_store_scope(&mut self, scope: impl Into>) { + let scope: Arc = scope.into(); + self.v9_parser.set_template_store_scope(Arc::clone(&scope)); + self.ipfix_parser.set_template_store_scope(scope); + } + /// Returns the allowed versions array. /// /// Indexed by version number: index 5 = V5, 7 = V7, 9 = V9, 10 = IPFIX. @@ -1340,6 +1424,26 @@ impl NetflowParser { /// parser.clear_v9_templates(); /// ``` pub fn clear_v9_templates(&mut self) { + // Mirror to the secondary store first (best-effort) so that + // subsequent reads do not transparently repopulate the in-process + // cache via read-through. + if let Some(store) = self.v9_parser.template_store.clone() { + let scope = self.v9_parser.template_store_scope.clone(); + for (id, _) in self.v9_parser.templates.iter() { + let _ = store.remove(&template_store::TemplateStoreKey::new( + scope.clone(), + template_store::TemplateKind::V9Data, + *id, + )); + } + for (id, _) in self.v9_parser.options_templates.iter() { + let _ = store.remove(&template_store::TemplateStoreKey::new( + scope.clone(), + template_store::TemplateKind::V9Options, + *id, + )); + } + } self.v9_parser.templates.clear(); self.v9_parser.options_templates.clear(); self.v9_parser.clear_pending_flows(); @@ -1358,6 +1462,37 @@ impl NetflowParser { /// parser.clear_ipfix_templates(); /// ``` pub fn clear_ipfix_templates(&mut self) { + if let Some(store) = self.ipfix_parser.template_store.clone() { + let scope = self.ipfix_parser.template_store_scope.clone(); + for (id, _) in self.ipfix_parser.templates.iter() { + let _ = store.remove(&template_store::TemplateStoreKey::new( + scope.clone(), + template_store::TemplateKind::IpfixData, + *id, + )); + } + for (id, _) in self.ipfix_parser.ipfix_options_templates.iter() { + let _ = store.remove(&template_store::TemplateStoreKey::new( + scope.clone(), + template_store::TemplateKind::IpfixOptions, + *id, + )); + } + for (id, _) in self.ipfix_parser.v9_templates.iter() { + let _ = store.remove(&template_store::TemplateStoreKey::new( + scope.clone(), + template_store::TemplateKind::IpfixV9Data, + *id, + )); + } + for (id, _) in self.ipfix_parser.v9_options_templates.iter() { + let _ = store.remove(&template_store::TemplateStoreKey::new( + scope.clone(), + template_store::TemplateKind::IpfixV9Options, + *id, + )); + } + } self.ipfix_parser.templates.clear(); self.ipfix_parser.v9_templates.clear(); self.ipfix_parser.ipfix_options_templates.clear(); @@ -1581,6 +1716,22 @@ impl NetflowParser { if let ParsedNetflow::Success { ref packet, .. } = result { self.fire_template_events(packet); } + // Fire Restored events for templates pulled in from the secondary + // store during this parse. Drained from each parser whether the + // outer result was Success or Error — restoration that happened + // before a later flowset failed should still be reported. + for (protocol, template_id) in self.v9_parser.drain_restored_templates() { + self.template_hooks.trigger(&TemplateEvent::Restored { + template_id: Some(template_id), + protocol, + }); + } + for (protocol, template_id) in self.ipfix_parser.drain_restored_templates() { + self.template_hooks.trigger(&TemplateEvent::Restored { + template_id: Some(template_id), + protocol, + }); + } // Fire metric-based events for collisions, evictions, and expirations. // Copy the after-metrics to avoid borrowing self immutably while // fire_metric_delta_events borrows self mutably (for hook_errors). @@ -1592,6 +1743,11 @@ impl NetflowParser { let after = self.ipfix_parser.metrics; self.fire_metric_delta_events(&before, &after, TemplateProtocol::Ipfix); } + } else { + // Even when no hooks are registered, drain the buffers so they + // do not accumulate across parse_bytes calls. + let _ = self.v9_parser.drain_restored_templates(); + let _ = self.ipfix_parser.drain_restored_templates(); } result diff --git a/src/scoped_parser.rs b/src/scoped_parser.rs index 4dc45a8..e0320be 100644 --- a/src/scoped_parser.rs +++ b/src/scoped_parser.rs @@ -842,7 +842,8 @@ impl AutoScopedParser { observation_domain_id, }; if !self.ipfix_parsers.contains(&key) { - let parser = Self::build_parser(builder)?; + let scope = format!("ipfix:{}/{}", source, observation_domain_id); + let parser = Self::build_parser(builder, &scope)?; self.ipfix_parsers.push(key, (parser, now)); } let (parser, last_seen) = @@ -856,7 +857,8 @@ impl AutoScopedParser { source_id, }; if !self.v9_parsers.contains(&key) { - let parser = Self::build_parser(builder)?; + let scope = format!("v9:{}/{}", source, source_id); + let parser = Self::build_parser(builder, &scope)?; self.v9_parsers.push(key, (parser, now)); } let (parser, last_seen) = self.v9_parsers.get_mut(&key).expect("just ensured"); @@ -865,7 +867,8 @@ impl AutoScopedParser { } ScopingInfo::Legacy => { if !self.legacy_parsers.contains(&source) { - let parser = Self::build_parser(builder)?; + let scope = format!("legacy:{}", source); + let parser = Self::build_parser(builder, &scope)?; self.legacy_parsers.push(source, (parser, now)); } let (parser, last_seen) = @@ -933,17 +936,23 @@ impl AutoScopedParser { /// Create a new parser instance using the configured builder or default fn build_parser( builder: Option<&NetflowParserBuilder>, + scope: &str, ) -> Result { - if let Some(builder) = builder { + let mut parser = if let Some(builder) = builder { builder .clone() .build() .map_err(|err| NetflowError::Partial { message: format!("Failed to build parser for source: {err}"), - }) + })? } else { - Ok(NetflowParser::default()) - } + NetflowParser::default() + }; + // Override the template-store scope so per-source parsers do not + // collide on shared template IDs in the secondary store. Cheap when + // no store is configured (just an Arc swap). + parser.set_template_store_scope(scope); + Ok(parser) } } diff --git a/src/template_store.rs b/src/template_store.rs new file mode 100644 index 0000000..3fd2c58 --- /dev/null +++ b/src/template_store.rs @@ -0,0 +1,627 @@ +//! Pluggable secondary-tier template storage. +//! +//! NetFlow v9 and IPFIX exporters announce *templates* describing the layout +//! of subsequent data records. The parser caches these templates in an +//! in-process LRU; without a template, data records cannot be decoded. +//! +//! For multi-instance deployments (horizontally scaled flow collectors behind +//! a UDP load balancer, for example) every parser instance must observe the +//! same templates as the exporters that route to it. Otherwise a data record +//! can land on a replica that has never seen the corresponding template and +//! must be queued or dropped. +//! +//! [`TemplateStore`] is the extension point that solves this. When configured +//! via [`NetflowParserBuilder::with_template_store`](crate::NetflowParserBuilder::with_template_store): +//! +//! 1. On every successful template insert the parser **writes through** to the +//! store, persisting the template in a small custom binary wire format. +//! 2. On every cache **miss** the parser consults the store. If the store +//! returns a payload, the parser decodes it, repopulates its in-process +//! LRU, and continues parsing the data record. +//! 3. On LRU eviction or explicit clear/withdrawal the parser issues a best +//! effort `remove` to keep the store from accumulating stale entries. +//! +//! The store sees opaque `Vec` payloads and a structured key — it does +//! not need to understand the wire format. This keeps the trait surface tiny +//! and lets implementations target arbitrary backends (Redis, NATS KV, +//! DynamoDB, an in-memory map for tests, ...). +//! +//! # Scoping +//! +//! Templates are scoped per exporter. The parser carries a `scope: String` +//! that is included in every [`TemplateStoreKey`]. For single-source +//! deployments callers may leave this empty (the default). For multi-source +//! deployments [`AutoScopedParser`](crate::AutoScopedParser) automatically +//! sets the scope to the source `SocketAddr` of each per-source parser, so +//! template ID collisions between different exporters are isolated by key. +//! +//! # Threading +//! +//! Implementations are wrapped in `Arc` and must be +//! `Send + Sync`. Parsers acquire only shared references to the store, so +//! implementations should use interior mutability (mutex, atomic, channel, +//! etc.) for any internal state. +//! +//! # Reference implementation +//! +//! [`InMemoryTemplateStore`] is a `Mutex`-backed reference impl +//! suitable for tests and single-process experiments. Production deployments +//! should provide their own backend. +//! +//! # Wire format & upgrades +//! +//! Templates are encoded with a small versioned binary format whose first +//! byte is `WIRE_VERSION` (currently `1`). The parser rejects payloads with +//! an unrecognized version: a [`TemplateStoreError::Codec`] is recorded in +//! metrics ([`crate::CacheMetrics::template_store_codec_errors`]), the +//! offending key is removed from the store, and the parser falls back to +//! treating the slot as a cache miss until the exporter re-announces the +//! template. +//! +//! When this crate ships a wire-format change, existing entries written by +//! older parser versions become unreadable. Rolling-upgrade strategies: +//! +//! * Drain the secondary store before deploying the new parser version +//! (template caches will rebuild from the next exporter announce). +//! * Or namespace your scope with the parser version +//! ([`NetflowParserBuilder::with_template_store_scope`]( +//! crate::NetflowParserBuilder::with_template_store_scope)) +//! so old- and new-version entries do not collide. +//! +//! Either way, monitor `template_store_codec_errors` after the upgrade — a +//! sustained non-zero rate indicates stale entries that the parser is +//! discarding on read. + +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; + +/// Identifies which of the parser's template caches an entry belongs to. +/// +/// V9 and IPFIX maintain separate template and options-template caches. +/// IPFIX additionally accepts V9-style templates embedded in IPFIX messages, +/// which are stored in their own caches. Each variant maps to exactly one +/// internal cache so that store entries can be round-tripped to the right +/// place on read-through. +#[non_exhaustive] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] +pub enum TemplateKind { + /// NetFlow v9 data template (decoded with the v9 parser). + V9Data, + /// NetFlow v9 options template. + V9Options, + /// IPFIX data template. + IpfixData, + /// IPFIX options template. + IpfixOptions, + /// V9-style data template embedded in an IPFIX message. + IpfixV9Data, + /// V9-style options template embedded in an IPFIX message. + IpfixV9Options, +} + +/// Composite key identifying a template entry in a [`TemplateStore`]. +/// +/// Implementations may format the key however they like (e.g. as a single +/// string `"{scope}/{kind}/{template_id}"` for Redis, or a structured object +/// for NATS KV) — the only requirement is that distinct keys must round-trip +/// independently. +/// +/// `scope` is stored as `Arc` so the parser can share the same string +/// across every store key it constructs without per-call heap allocations. +/// Implementations that need a `&str` can dereference (`&*key.scope`); +/// callers constructing keys can pass `&str`, `String`, or an existing +/// `Arc`. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct TemplateStoreKey { + /// Per-exporter scope. Empty when the parser is unscoped (single source). + /// `AutoScopedParser` populates this with the source identity. + pub scope: Arc, + /// Which template cache the entry belongs to. + pub kind: TemplateKind, + /// The template ID announced by the exporter. + pub template_id: u16, +} + +impl TemplateStoreKey { + /// Convenience constructor. + pub fn new(scope: impl Into>, kind: TemplateKind, template_id: u16) -> Self { + Self { + scope: scope.into(), + kind, + template_id, + } + } +} + +/// Errors returned by [`TemplateStore`] implementations. +#[derive(Debug)] +pub enum TemplateStoreError { + /// Underlying backend failure (network, IO, serialization in the backend, ...). + Backend(Box), + /// The store returned a payload the parser could not decode. + /// Typically indicates a wire-format version mismatch or corruption. + Codec(String), +} + +impl std::fmt::Display for TemplateStoreError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + TemplateStoreError::Backend(e) => write!(f, "template store backend error: {}", e), + TemplateStoreError::Codec(msg) => { + write!(f, "template store codec error: {}", msg) + } + } + } +} + +impl std::error::Error for TemplateStoreError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + match self { + TemplateStoreError::Backend(e) => Some(e.as_ref()), + TemplateStoreError::Codec(_) => None, + } + } +} + +/// Pluggable backend for sharing parsed templates across parser instances. +/// +/// See the [module docs](self) for the full read-through / write-through +/// protocol the parser implements on top of this trait. +/// +/// # Contract +/// +/// * `get` returns `Ok(None)` when the key is absent. Errors are reserved for +/// backend-level failures. +/// * `put` is best-effort from the parser's perspective: a returned error is +/// logged via the parser's metrics but does not abort packet parsing. +/// * `remove` must be idempotent — the parser may call it for keys that are +/// already absent. +/// +/// # Threading +/// +/// Implementations are wrapped in `Arc` and must be both +/// `Send` and `Sync`. The trait takes `&self` everywhere; implementations +/// that need internal mutation should use `Mutex`, `RwLock`, or atomics. +pub trait TemplateStore: Send + Sync + std::fmt::Debug { + /// Fetch a serialized template payload by key. + /// Returns `Ok(None)` when the key is absent (not an error). + fn get(&self, key: &TemplateStoreKey) -> Result>, TemplateStoreError>; + + /// Persist a serialized template payload. Overwrites any existing value + /// for the same key. + fn put(&self, key: &TemplateStoreKey, value: &[u8]) -> Result<(), TemplateStoreError>; + + /// Remove the entry for `key`. Must be idempotent for absent keys. + fn remove(&self, key: &TemplateStoreKey) -> Result<(), TemplateStoreError>; +} + +/// Reference [`TemplateStore`] backed by a `Mutex`. +/// +/// Suitable for tests and single-process experiments. Sharing one of these +/// across multiple parser instances within the same process is functionally +/// equivalent to enlarging the in-process LRU caches; the value of the +/// extension point is realized when the store is backed by an out-of-process +/// system such as Redis or NATS KV. +/// +/// # Panics +/// +/// Methods panic if the inner `Mutex` is poisoned (which only happens after +/// a panic in another thread holding the lock). Production stores backed +/// by your own backend should make their own choice — returning an error +/// via [`TemplateStoreError::Backend`] is generally safer than panicking. +#[derive(Debug, Default)] +pub struct InMemoryTemplateStore { + inner: Mutex>>, +} + +impl InMemoryTemplateStore { + /// Create an empty store. + pub fn new() -> Self { + Self::default() + } + + /// Number of entries currently held. + pub fn len(&self) -> usize { + self.inner.lock().expect("poisoned").len() + } + + /// Whether the store is empty. + pub fn is_empty(&self) -> bool { + self.len() == 0 + } +} + +impl TemplateStore for InMemoryTemplateStore { + fn get(&self, key: &TemplateStoreKey) -> Result>, TemplateStoreError> { + Ok(self.inner.lock().expect("poisoned").get(key).cloned()) + } + + fn put(&self, key: &TemplateStoreKey, value: &[u8]) -> Result<(), TemplateStoreError> { + self.inner + .lock() + .expect("poisoned") + .insert(key.clone(), value.to_vec()); + Ok(()) + } + + fn remove(&self, key: &TemplateStoreKey) -> Result<(), TemplateStoreError> { + self.inner.lock().expect("poisoned").remove(key); + Ok(()) + } +} + +// --------------------------------------------------------------------------- +// Wire format +// --------------------------------------------------------------------------- +// +// Templates are encoded using a small versioned binary format. The first byte +// is a version tag (currently `WIRE_VERSION = 1`); subsequent layout depends +// on `TemplateKind` and is documented at each `encode_*` / `decode_*` site. +// +// The format is intentionally minimal: it carries only the fields the parser +// needs to reconstruct an in-memory `Template` / `OptionsTemplate`, not the +// derived lookup-table values which are recomputed from the field-type +// numbers. This keeps payloads small (typically <100 bytes) and avoids +// pulling in serde_json or another runtime serializer. + +pub(crate) const WIRE_VERSION: u8 = 1; + +use crate::variable_versions::ipfix::lookup::IPFixField; +use crate::variable_versions::ipfix::{ + OptionsTemplate as IpfixOptionsTemplate, Template as IpfixTemplate, + TemplateField as IpfixTemplateField, +}; +use crate::variable_versions::v9::lookup::{ScopeFieldType, V9Field}; +use crate::variable_versions::v9::{ + OptionsTemplate as V9OptionsTemplate, OptionsTemplateScopeField, Template as V9Template, + TemplateField as V9TemplateField, +}; + +/// Encode a V9 data template into the wire format. +/// +/// Layout: `[version: u8][template_id: u16][field_count: u16][fields]` +/// where each field is `[field_type_number: u16][field_length: u16]`. +pub(crate) fn encode_v9_template(t: &V9Template) -> Vec { + let mut out = Vec::with_capacity(5 + t.fields.len() * 4); + out.push(WIRE_VERSION); + out.extend_from_slice(&t.template_id.to_be_bytes()); + out.extend_from_slice(&t.field_count.to_be_bytes()); + for f in &t.fields { + out.extend_from_slice(&f.field_type_number.to_be_bytes()); + out.extend_from_slice(&f.field_length.to_be_bytes()); + } + out +} + +pub(crate) fn decode_v9_template(bytes: &[u8]) -> Result { + let mut r = WireReader::new(bytes); + r.expect_version()?; + let template_id = r.u16()?; + let field_count = r.u16()?; + let mut fields = Vec::with_capacity(usize::from(field_count)); + for _ in 0..field_count { + let field_type_number = r.u16()?; + let field_length = r.u16()?; + fields.push(V9TemplateField { + field_type_number, + field_type: V9Field::from(field_type_number), + field_length, + }); + } + Ok(V9Template { + template_id, + field_count, + fields, + }) +} + +/// Encode a V9 options template. +/// +/// Layout: `[version: u8][template_id: u16][options_scope_length: u16] +/// [options_length: u16][scope_fields][option_fields]` +/// where each `scope_fields` entry is `[field_type_number: u16][field_length: u16]` +/// and each `option_fields` entry is `[field_type_number: u16][field_length: u16]`. +pub(crate) fn encode_v9_options_template(t: &V9OptionsTemplate) -> Vec { + let mut out = Vec::with_capacity(7 + t.scope_fields.len() * 4 + t.option_fields.len() * 4); + out.push(WIRE_VERSION); + out.extend_from_slice(&t.template_id.to_be_bytes()); + out.extend_from_slice(&t.options_scope_length.to_be_bytes()); + out.extend_from_slice(&t.options_length.to_be_bytes()); + for f in &t.scope_fields { + out.extend_from_slice(&f.field_type_number.to_be_bytes()); + out.extend_from_slice(&f.field_length.to_be_bytes()); + } + for f in &t.option_fields { + out.extend_from_slice(&f.field_type_number.to_be_bytes()); + out.extend_from_slice(&f.field_length.to_be_bytes()); + } + out +} + +pub(crate) fn decode_v9_options_template( + bytes: &[u8], +) -> Result { + let mut r = WireReader::new(bytes); + r.expect_version()?; + let template_id = r.u16()?; + let options_scope_length = r.u16()?; + let options_length = r.u16()?; + // Each scope/option field occupies 4 bytes (u16 type + u16 length); a + // length not divisible by 4 means the payload is corrupted. The live + // parse path enforces the same rule via `OptionsTemplate::is_valid`. + if !options_scope_length.is_multiple_of(4) || !options_length.is_multiple_of(4) { + return Err(TemplateStoreError::Codec(format!( + "v9 options template length not aligned to 4: scope={} options={}", + options_scope_length, options_length + ))); + } + let scope_count = usize::from(options_scope_length / 4); + let option_count = usize::from(options_length / 4); + let mut scope_fields = Vec::with_capacity(scope_count); + for _ in 0..scope_count { + let field_type_number = r.u16()?; + let field_length = r.u16()?; + scope_fields.push(OptionsTemplateScopeField { + field_type_number, + field_type: ScopeFieldType::from(field_type_number), + field_length, + }); + } + let mut option_fields = Vec::with_capacity(option_count); + for _ in 0..option_count { + let field_type_number = r.u16()?; + let field_length = r.u16()?; + option_fields.push(V9TemplateField { + field_type_number, + field_type: V9Field::from(field_type_number), + field_length, + }); + } + Ok(V9OptionsTemplate { + template_id, + options_scope_length, + options_length, + scope_fields, + option_fields, + }) +} + +/// Encode an IPFIX template. +/// +/// Layout: `[version: u8][template_id: u16][field_count: u16][fields]` where +/// each field is `[field_type_number: u16][field_length: u16] +/// [enterprise_present: u8][enterprise_number: u32?]`. The trailing u32 is +/// only present when `enterprise_present == 1`. +pub(crate) fn encode_ipfix_template(t: &IpfixTemplate) -> Vec { + let mut out = Vec::with_capacity(5 + t.fields.len() * 9); + out.push(WIRE_VERSION); + out.extend_from_slice(&t.template_id.to_be_bytes()); + out.extend_from_slice(&t.field_count.to_be_bytes()); + for f in &t.fields { + encode_ipfix_field(&mut out, f); + } + out +} + +pub(crate) fn decode_ipfix_template(bytes: &[u8]) -> Result { + let mut r = WireReader::new(bytes); + r.expect_version()?; + let template_id = r.u16()?; + let field_count = r.u16()?; + let mut fields = Vec::with_capacity(usize::from(field_count)); + for _ in 0..field_count { + fields.push(decode_ipfix_field(&mut r)?); + } + Ok(IpfixTemplate { + template_id, + field_count, + fields, + }) +} + +/// Encode an IPFIX options template. +/// +/// Layout: `[version: u8][template_id: u16][field_count: u16] +/// [scope_field_count: u16][fields]` where fields are encoded as in +/// `encode_ipfix_template`. +pub(crate) fn encode_ipfix_options_template(t: &IpfixOptionsTemplate) -> Vec { + let mut out = Vec::with_capacity(7 + t.fields.len() * 9); + out.push(WIRE_VERSION); + out.extend_from_slice(&t.template_id.to_be_bytes()); + out.extend_from_slice(&t.field_count.to_be_bytes()); + out.extend_from_slice(&t.scope_field_count.to_be_bytes()); + for f in &t.fields { + encode_ipfix_field(&mut out, f); + } + out +} + +pub(crate) fn decode_ipfix_options_template( + bytes: &[u8], +) -> Result { + let mut r = WireReader::new(bytes); + r.expect_version()?; + let template_id = r.u16()?; + let field_count = r.u16()?; + let scope_field_count = r.u16()?; + let mut fields = Vec::with_capacity(usize::from(field_count)); + for _ in 0..field_count { + fields.push(decode_ipfix_field(&mut r)?); + } + Ok(IpfixOptionsTemplate { + template_id, + field_count, + scope_field_count, + fields, + }) +} + +fn encode_ipfix_field(out: &mut Vec, f: &IpfixTemplateField) { + out.extend_from_slice(&f.field_type_number.to_be_bytes()); + out.extend_from_slice(&f.field_length.to_be_bytes()); + match f.enterprise_number { + Some(en) => { + out.push(1); + out.extend_from_slice(&en.to_be_bytes()); + } + None => out.push(0), + } +} + +fn decode_ipfix_field( + r: &mut WireReader<'_>, +) -> Result { + let field_type_number = r.u16()?; + let field_length = r.u16()?; + let enterprise_present = r.u8()?; + let enterprise_number = match enterprise_present { + 0 => None, + 1 => Some(r.u32()?), + other => { + return Err(TemplateStoreError::Codec(format!( + "invalid enterprise flag: {}", + other + ))); + } + }; + Ok(IpfixTemplateField { + field_type_number, + field_length, + enterprise_number, + field_type: IPFixField::new(field_type_number, enterprise_number), + }) +} + +struct WireReader<'a> { + buf: &'a [u8], + pos: usize, +} + +impl<'a> WireReader<'a> { + fn new(buf: &'a [u8]) -> Self { + Self { buf, pos: 0 } + } + + fn expect_version(&mut self) -> Result<(), TemplateStoreError> { + let v = self.u8()?; + if v != WIRE_VERSION { + return Err(TemplateStoreError::Codec(format!( + "unsupported wire version: {} (expected {})", + v, WIRE_VERSION + ))); + } + Ok(()) + } + + fn u8(&mut self) -> Result { + let bytes = self.take(1)?; + Ok(bytes[0]) + } + + fn u16(&mut self) -> Result { + let bytes = self.take(2)?; + Ok(u16::from_be_bytes([bytes[0], bytes[1]])) + } + + fn u32(&mut self) -> Result { + let bytes = self.take(4)?; + Ok(u32::from_be_bytes([bytes[0], bytes[1], bytes[2], bytes[3]])) + } + + fn take(&mut self, n: usize) -> Result<&'a [u8], TemplateStoreError> { + if self.pos + n > self.buf.len() { + return Err(TemplateStoreError::Codec(format!( + "unexpected end of payload at offset {} (need {} more)", + self.pos, n + ))); + } + let s = &self.buf[self.pos..self.pos + n]; + self.pos += n; + Ok(s) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn in_memory_round_trip() { + let store = InMemoryTemplateStore::new(); + let key = TemplateStoreKey::new("1.2.3.4:2055", TemplateKind::V9Data, 256); + assert!(store.get(&key).unwrap().is_none()); + store.put(&key, b"hello").unwrap(); + assert_eq!(store.get(&key).unwrap().as_deref(), Some(&b"hello"[..])); + assert_eq!(store.len(), 1); + store.remove(&key).unwrap(); + assert!(store.get(&key).unwrap().is_none()); + assert!(store.is_empty()); + } + + #[test] + fn v9_template_wire_round_trip() { + let original = V9Template { + template_id: 256, + field_count: 3, + fields: vec![ + V9TemplateField { + field_type_number: 8, + field_type: V9Field::from(8), + field_length: 4, + }, + V9TemplateField { + field_type_number: 12, + field_type: V9Field::from(12), + field_length: 4, + }, + V9TemplateField { + field_type_number: 1, + field_type: V9Field::from(1), + field_length: 8, + }, + ], + }; + let bytes = encode_v9_template(&original); + let decoded = decode_v9_template(&bytes).unwrap(); + assert_eq!(decoded, original); + } + + #[test] + fn ipfix_template_wire_round_trip_with_enterprise() { + let original = IpfixTemplate { + template_id: 300, + field_count: 2, + fields: vec![ + IpfixTemplateField { + field_type_number: 8, + field_length: 4, + enterprise_number: None, + field_type: IPFixField::new(8, None), + }, + IpfixTemplateField { + field_type_number: 1, + field_length: 8, + enterprise_number: Some(9), + field_type: IPFixField::new(1, Some(9)), + }, + ], + }; + let bytes = encode_ipfix_template(&original); + let decoded = decode_ipfix_template(&bytes).unwrap(); + assert_eq!(decoded, original); + } + + #[test] + fn rejects_bad_version() { + let bad = vec![99u8, 0, 0, 0, 0]; + let err = decode_v9_template(&bad).unwrap_err(); + assert!(matches!(err, TemplateStoreError::Codec(_))); + } + + #[test] + fn rejects_truncated_payload() { + let bytes = vec![WIRE_VERSION, 0]; // declares version, missing rest + let err = decode_v9_template(&bytes).unwrap_err(); + assert!(matches!(err, TemplateStoreError::Codec(_))); + } +} diff --git a/src/tests.rs b/src/tests.rs index 56d7840..9423eab 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -453,6 +453,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = V9Parser::try_new(config).unwrap(); @@ -476,6 +478,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = V9Parser::try_new(config).unwrap(); @@ -522,6 +526,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = V9Parser::try_new(config).unwrap(); @@ -568,6 +574,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = V9Parser::try_new(config).unwrap(); @@ -614,6 +622,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = V9Parser::try_new(config).unwrap(); @@ -659,6 +669,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = IPFixParser::try_new(config).unwrap(); @@ -707,6 +719,8 @@ mod base_tests { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: std::sync::Arc::from(""), }; let parser = IPFixParser::try_new(config).unwrap(); diff --git a/src/variable_versions/config.rs b/src/variable_versions/config.rs index 0a1b6a3..634e394 100644 --- a/src/variable_versions/config.rs +++ b/src/variable_versions/config.rs @@ -2,6 +2,7 @@ use super::metrics::CacheMetricsInner; use super::pending_flows::{PendingFlowCache, PendingFlowsConfig}; +use crate::template_store::TemplateStore; use crate::variable_versions::enterprise_registry::EnterpriseFieldRegistry; use crate::variable_versions::ttl::TtlConfig; use std::num::NonZeroUsize; @@ -54,6 +55,18 @@ pub struct Config { pub enterprise_registry: Arc, /// Configuration for pending flow caching. `None` means disabled (default). pub pending_flows_config: Option, + /// Optional secondary-tier [`TemplateStore`] for sharing parsed templates + /// across parser instances. `None` means the parser uses only its + /// in-process LRU (default behavior). + /// + /// See [`crate::template_store`] for the read-through / write-through + /// protocol the parser implements on top of this trait. + pub template_store: Option>, + /// Scope string written into every [`crate::template_store::TemplateStoreKey`]. + /// Empty for single-source deployments. Multi-source parsers + /// (`AutoScopedParser`) override this per source. Stored as `Arc` + /// so the parser can clone it cheaply per store key. + pub template_store_scope: Arc, } #[non_exhaustive] @@ -195,6 +208,8 @@ impl Default for Config { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: Arc::from(""), } } } diff --git a/src/variable_versions/ipfix/parser.rs b/src/variable_versions/ipfix/parser.rs index a552e27..170d130 100644 --- a/src/variable_versions/ipfix/parser.rs +++ b/src/variable_versions/ipfix/parser.rs @@ -10,10 +10,17 @@ use super::{ NoTemplateInfo, OPTIONS_TEMPLATE_IPFIX_ID, OptionsData, OptionsTemplate, Template, TemplateField, }; +use crate::template_store::{ + TemplateKind, TemplateStore, TemplateStoreKey, decode_ipfix_options_template, + decode_ipfix_template, decode_v9_options_template, decode_v9_template, + encode_ipfix_options_template, encode_ipfix_template, encode_v9_options_template, + encode_v9_template, +}; use crate::variable_versions::config::DEFAULT_MAX_RECORDS_PER_FLOWSET; use crate::variable_versions::enterprise_registry::EnterpriseFieldRegistry; use crate::variable_versions::field_value::FieldValue; use crate::variable_versions::metrics::CacheMetricsInner; +use crate::variable_versions::template_events::TemplateProtocol; use crate::variable_versions::ttl::{TemplateWithTtl, TtlConfig}; use crate::variable_versions::v9::{ DATA_TEMPLATE_V9_ID, Data as V9Data, OPTIONS_TEMPLATE_V9_ID, OptionsData as V9OptionsData, @@ -46,6 +53,8 @@ impl Default for IPFixParser { ttl_config: None, enterprise_registry: Arc::new(EnterpriseFieldRegistry::new()), pending_flows_config: None, + template_store: None, + template_store_scope: Arc::from(""), }; match Self::try_new(config) { @@ -92,8 +101,311 @@ impl IPFixParser { enterprise_registry: config.enterprise_registry, metrics: CacheMetricsInner::new(), pending_flows, + template_store: config.template_store, + template_store_scope: config.template_store_scope, + restored_templates: Vec::new(), }) } + + /// Override the scope written into [`TemplateStoreKey`]s for store + /// reads/writes. Used by `AutoScopedParser` to give each per-source + /// parser an exporter-specific scope. + pub(crate) fn set_template_store_scope(&mut self, scope: Arc) { + self.template_store_scope = scope; + } + + /// Write-through helper: persist a freshly learned template to the + /// secondary store, if one is configured. Backend failures are recorded + /// in metrics but do not abort packet parsing — the in-process LRU has + /// already been updated by the caller. + /// + /// Holds `store` as a borrow (no `Arc::clone`); `&self.template_store` + /// and `&mut self.metrics` are disjoint fields so both borrows coexist. + fn put_to_store(&mut self, kind: TemplateKind, template_id: u16, bytes: Vec) { + let Some(store) = self.template_store.as_ref() else { + return; + }; + let key = + TemplateStoreKey::new(Arc::clone(&self.template_store_scope), kind, template_id); + if store.put(&key, &bytes).is_err() { + self.metrics.record_template_store_backend_error(); + } + } + + /// Best-effort removal of an LRU-evicted, withdrawn, or cleared entry + /// from the secondary store. No-op when no store is configured. + /// Backend failures are recorded in metrics. + fn evict_from_store(&mut self, kind: TemplateKind, template_id: u16) { + let Some(store) = self.template_store.as_ref() else { + return; + }; + let key = + TemplateStoreKey::new(Arc::clone(&self.template_store_scope), kind, template_id); + if store.remove(&key).is_err() { + self.metrics.record_template_store_backend_error(); + } + } + + /// Install a read-through-recovered template into the in-process LRU + /// and queue a Restored event for hook firing. If the LRU eviction + /// returns a *different* key (i.e. the cache was full), mirror the + /// removal back to the secondary store and bump the eviction metric so + /// the primary and secondary tiers stay consistent. + /// + /// Takes raw `&mut` borrows of the cache, metrics, and event buffer + /// rather than `&mut self` so that the caller can keep separate borrows + /// of `self.templates` (or whichever cache) and `self.metrics` / + /// `self.restored_templates` live simultaneously without the borrow + /// checker reaching for splits. + #[allow(clippy::too_many_arguments)] + fn install_restored( + cache: &mut LruCache>>, + template_id: u16, + arc: &Arc, + ttl_enabled: bool, + metrics: &mut CacheMetricsInner, + store: &Arc, + scope: &Arc, + kind: TemplateKind, + restored: &mut Vec<(TemplateProtocol, u16)>, + protocol: TemplateProtocol, + ) { + let wrapped = TemplateWithTtl::new(Arc::clone(arc), ttl_enabled); + if let Some((evicted_key, _)) = cache.push(template_id, wrapped) + && evicted_key != template_id + { + metrics.record_eviction(); + let key = TemplateStoreKey::new(Arc::clone(scope), kind, evicted_key); + if store.remove(&key).is_err() { + metrics.record_template_store_backend_error(); + } + } + metrics.record_template_store_restored(); + restored.push((protocol, template_id)); + } + + /// Read-through: on a primary-cache miss for an IPFIX data template, + /// consult the secondary store. On hit the decoded template is pushed + /// into the in-process LRU (so subsequent flowsets are served from the + /// hot path) and a `Restored` event is queued for hook firing. On + /// codec failure the corrupted entry is removed so that a fresh + /// template announce can repopulate it cleanly. Returns `None` for + /// "not in store", "backend error", or "decoded but rejected by parser + /// limits" — the data record will then take the existing miss path. + /// + /// `store` is borrowed (no `Arc::clone`); every other field used + /// (`template_store_scope`, `metrics`, `templates`, `restored_templates`, + /// `ttl_config`, `max_field_count`, `max_template_total_size`) is + /// accessed via direct field access so the borrow checker can split + /// disjoint fields. We avoid `&self`/`&mut self` method calls inside + /// the borrow so the whole-self re-borrow that would force a clone + /// never happens. + fn fetch_ipfix_template_from_store(&mut self, template_id: u16) -> Option> { + let store = self.template_store.as_ref()?; + let key = TemplateStoreKey::new( + Arc::clone(&self.template_store_scope), + TemplateKind::IpfixData, + template_id, + ); + let bytes = match store.get(&key) { + Ok(Some(b)) => b, + Ok(None) => return None, + Err(_) => { + self.metrics.record_template_store_backend_error(); + return None; + } + }; + let template = match decode_ipfix_template(&bytes) { + Ok(t) => t, + Err(_) => { + self.metrics.record_template_store_codec_error(); + if store.remove(&key).is_err() { + self.metrics.record_template_store_backend_error(); + } + return None; + } + }; + // Inline the validation rule by passing limits explicitly — calling + // `template.is_valid(self)` would re-borrow whole self and conflict + // with the `store` borrow. + if !