Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -789,6 +789,78 @@ let router_builder = NetflowParser::builder()
let mut scoped = RouterScopedParser::<String>::try_with_builder(router_builder).expect("valid config");
```

### Pluggable Template Storage (Horizontal Scale-Out)

When you scale flow ingestion across multiple parser instances behind a UDP
load balancer, every replica needs to observe the templates announced to
*any* replica — otherwise a data record routed to a fresh pod will be queued
or dropped because that pod has never seen the template. The
[`TemplateStore`] trait is the extension point that solves this.

With a store configured, the parser:

1. **Writes through** every successfully learned template to the store as
opaque bytes (a small custom binary wire format — no `serde_json` or
other runtime serializer is added to the dependency tree).
2. **Reads through** the store on every cache miss before declaring a
template unknown, repopulating the in-process LRU on hit so subsequent
records are served from the hot path.
3. **Propagates** LRU evictions, RFC 7011 §8.1 template withdrawals, and
explicit `clear_*_templates` calls so the store stays in sync.

`AutoScopedParser` automatically derives a per-source scope (e.g.
`v9:1.2.3.4:2055/0`, `ipfix:1.2.3.4:2055/42`) so that two exporters using
the same template ID with different layouts do not collide.

```rust,ignore
use netflow_parser::{AutoScopedParser, InMemoryTemplateStore, NetflowParser};
use std::sync::Arc;

// Plug in your own backend: implement TemplateStore for Redis, NATS KV,
// DynamoDB, etc. The reference InMemoryTemplateStore is shown here for
// illustration; in production it would be replaced.
let store = Arc::new(InMemoryTemplateStore::new());

let builder = NetflowParser::builder()
.with_template_store(store.clone());

// Multi-source: per-exporter scoping is automatic.
let mut parser = AutoScopedParser::try_with_builder(builder).expect("valid");

// Replica B can be brought up cold and start serving data records for
// templates Replica A learned, as long as both share the same store.
```

Implementing the trait against your backend of choice:

```rust,ignore
use netflow_parser::{TemplateStore, TemplateStoreError, TemplateStoreKey};

#[derive(Debug)]
struct RedisTemplateStore { /* your client */ }

impl TemplateStore for RedisTemplateStore {
fn get(&self, key: &TemplateStoreKey) -> Result<Option<Vec<u8>>, TemplateStoreError> {
// GET "{scope}/{kind:?}/{template_id}" from Redis, returning the bytes.
unimplemented!()
}

fn put(&self, key: &TemplateStoreKey, value: &[u8]) -> Result<(), TemplateStoreError> {
// SET with an appropriate TTL matching your operational envelope.
unimplemented!()
}

fn remove(&self, key: &TemplateStoreKey) -> Result<(), TemplateStoreError> {
// DEL — must be idempotent for absent keys.
unimplemented!()
}
}
```

**Single-source deployments** can still benefit from a store for surviving
parser restarts without losing template state. Set the scope explicitly
(or leave it empty) via `with_template_store_scope`.

### Template Lifecycle Management

#### Template Introspection
Expand Down
33 changes: 33 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
# 1.0.3

## Features

* **Pluggable secondary template storage (`TemplateStore`).** Templates can now
be persisted to and re-read from an external backend such as Redis or NATS
KV via a new `TemplateStore` trait. With a store configured the parser
writes through every learned template, consults the store on every cache
miss, and propagates LRU evictions, withdrawals, and explicit clears so the
store stays in sync. This unblocks running multiple stateless parser
instances behind a UDP load balancer without source-IP-affinity routing.

Wire up via the new builder hooks:

```rust
let store = Arc::new(my_redis_backed_store);
let parser = NetflowParser::builder()
.with_template_store(store)
.with_template_store_scope("collector-eu-west-1")
.build()?;
```

`AutoScopedParser` automatically derives a per-source scope so two exporters
using the same template ID with different layouts do not collide in the
store. The trait sees opaque `Vec<u8>` payloads encoded with a small custom
binary wire format — no `serde_json` or other runtime serializer is added
to the dependency tree. An `InMemoryTemplateStore` reference impl is
provided for tests. See the new `template_store` module for the protocol.

* New public API: `TemplateStore`, `TemplateStoreKey`, `TemplateKind`,
`TemplateStoreError`, `InMemoryTemplateStore`,
`NetflowParserBuilder::with_template_store`,
`NetflowParserBuilder::with_template_store_scope`,
`NetflowParser::set_template_store_scope`.

## Tests

* **Restored 34 snapshot-based parser tests** in a new `restored_legacy_tests` module in `src/tests.rs`. These tests had been deleted in PR #210 ("further template validation"), leaving 33 orphaned `.snap` files that were swept up later in PR #262. Coverage included real-world v9/IPFIX captures (`it_parses_v9_ipv6flowlabel`, `it_parses_v9_template_and_data_packet`, `it_parses_ipfix_scappy_example`, mixed-enterprise field templates, multi-template IPFIX, etc.), version-filter checks (`it_doesnt_allow_v5/v7/v9/ipfix`), and re-export round-trip tests (`it_parses_*_and_re_exports`).
Expand Down
160 changes: 158 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ pub mod netflow_common;
pub mod protocol;
pub mod scoped_parser;
pub mod static_versions;
pub mod template_store;
mod tests;
pub mod variable_versions;

Expand Down Expand Up @@ -47,6 +48,9 @@ pub use variable_versions::template_events::{
};

// Re-export configuration and utility types for convenience
pub use template_store::{
InMemoryTemplateStore, TemplateKind, TemplateStore, TemplateStoreError, TemplateStoreKey,
};
pub use variable_versions::enterprise_registry::{EnterpriseFieldDef, EnterpriseFieldRegistry};
pub use variable_versions::metrics::{CacheInfo, CacheMetrics, ParserCacheInfo};
pub use variable_versions::ttl::TtlConfig;
Expand Down Expand Up @@ -274,6 +278,8 @@ pub struct NetflowParserBuilder {
requested_versions: Option<Vec<u16>>,
max_error_sample_size: usize,
template_hooks: TemplateHooks,
template_store: Option<Arc<dyn TemplateStore>>,
template_store_scope: Arc<str>,
}

/// Helper to create a `[bool; 11]` allowed_versions array from a set of version numbers.
Expand Down Expand Up @@ -304,6 +310,15 @@ impl std::fmt::Debug for NetflowParserBuilder {
"template_hooks",
&format!("{} hooks", self.template_hooks.len()),
)
.field(
"template_store",
&if self.template_store.is_some() {
"configured"
} else {
"none"
},
)
.field("template_store_scope", &self.template_store_scope)
.finish()
}
}
Expand All @@ -317,6 +332,8 @@ impl Default for NetflowParserBuilder {
requested_versions: None,
max_error_sample_size: 256,
template_hooks: TemplateHooks::new(),
template_store: None,
template_store_scope: Arc::from(""),
}
}
}
Expand Down Expand Up @@ -776,6 +793,52 @@ impl NetflowParserBuilder {
self
}

/// Attach a [`TemplateStore`] to act as a secondary tier behind the parser's
/// in-process LRU caches.
///
/// With a store configured, the parser writes through every successfully
/// learned template to the store and consults the store on every cache
/// miss before declaring a template unknown. This lets multiple parser
/// instances behind a UDP load balancer share template state without
/// requiring source-IP-affinity routing.
///
/// The store sees opaque `Vec<u8>` payloads encoded with a small custom
/// binary wire format documented in the [`template_store`] module. See
/// [`TemplateStore`] for the trait contract and threading model.
///
/// # Examples
///
/// ```rust
/// use netflow_parser::{NetflowParser, InMemoryTemplateStore};
/// use std::sync::Arc;
///
/// let store = Arc::new(InMemoryTemplateStore::new());
/// let parser = NetflowParser::builder()
/// .with_template_store(store)
/// .build()
/// .expect("Failed to build parser");
/// ```
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_template_store(mut self, store: Arc<dyn TemplateStore>) -> Self {
self.template_store = Some(store);
self
}

/// Set the scope string written into every [`TemplateStoreKey`].
///
/// Defaults to the empty string, which is appropriate for single-source
/// deployments. For multi-source deployments
/// [`AutoScopedParser`] automatically populates this with the source
/// `SocketAddr` of each per-source parser, so callers usually do not need
/// to set this directly.
///
/// Accepts anything `Into<Arc<str>>` — typically `&str` or `String`.
#[must_use = "builder methods consume self and return a new builder; the return value must be used"]
pub fn with_template_store_scope(mut self, scope: impl Into<Arc<str>>) -> Self {
self.template_store_scope = scope.into();
self
}

/// Validates the builder configuration without constructing a parser.
///
/// This is cheaper than [`build`](Self::build) since it only checks that
Expand Down Expand Up @@ -822,8 +885,14 @@ impl NetflowParserBuilder {
/// ```
pub fn build(self) -> Result<NetflowParser, ConfigError> {
self.validate()?;
let v9_parser = V9Parser::try_new(self.v9_config)?;
let ipfix_parser = IPFixParser::try_new(self.ipfix_config)?;
let mut v9_config = self.v9_config;
let mut ipfix_config = self.ipfix_config;
v9_config.template_store = self.template_store.clone();
v9_config.template_store_scope = Arc::clone(&self.template_store_scope);
ipfix_config.template_store = self.template_store;
ipfix_config.template_store_scope = self.template_store_scope;
let v9_parser = V9Parser::try_new(v9_config)?;
let ipfix_parser = IPFixParser::try_new(ipfix_config)?;

Ok(NetflowParser {
v9_parser,
Expand Down Expand Up @@ -1092,6 +1161,21 @@ impl NetflowParser {
NetflowParserBuilder::default()
}

/// Override the scope string used for [`TemplateStore`] reads/writes by
/// the underlying V9 and IPFIX parsers.
///
/// Used by [`AutoScopedParser`] to give each per-source parser a scope
/// derived from the exporter's `SocketAddr`. Callers managing their own
/// per-source parsers may also use this to retrofit a scope after the
/// builder has produced a parser.
///
/// Accepts anything `Into<Arc<str>>` — typically `&str` or `String`.
pub fn set_template_store_scope(&mut self, scope: impl Into<Arc<str>>) {
let scope: Arc<str> = scope.into();
self.v9_parser.set_template_store_scope(Arc::clone(&scope));
self.ipfix_parser.set_template_store_scope(scope);
}

/// Returns the allowed versions array.
///
/// Indexed by version number: index 5 = V5, 7 = V7, 9 = V9, 10 = IPFIX.
Expand Down Expand Up @@ -1340,6 +1424,26 @@ impl NetflowParser {
/// parser.clear_v9_templates();
/// ```
pub fn clear_v9_templates(&mut self) {
// Mirror to the secondary store first (best-effort) so that
// subsequent reads do not transparently repopulate the in-process
// cache via read-through.
if let Some(store) = self.v9_parser.template_store.clone() {
let scope = self.v9_parser.template_store_scope.clone();
for (id, _) in self.v9_parser.templates.iter() {
let _ = store.remove(&template_store::TemplateStoreKey::new(
scope.clone(),
template_store::TemplateKind::V9Data,
*id,
));
}
for (id, _) in self.v9_parser.options_templates.iter() {
let _ = store.remove(&template_store::TemplateStoreKey::new(
scope.clone(),
template_store::TemplateKind::V9Options,
*id,
));
}
}
self.v9_parser.templates.clear();
self.v9_parser.options_templates.clear();
self.v9_parser.clear_pending_flows();
Expand All @@ -1358,6 +1462,37 @@ impl NetflowParser {
/// parser.clear_ipfix_templates();
/// ```
pub fn clear_ipfix_templates(&mut self) {
if let Some(store) = self.ipfix_parser.template_store.clone() {
let scope = self.ipfix_parser.template_store_scope.clone();
for (id, _) in self.ipfix_parser.templates.iter() {
let _ = store.remove(&template_store::TemplateStoreKey::new(
scope.clone(),
template_store::TemplateKind::IpfixData,
*id,
));
}
for (id, _) in self.ipfix_parser.ipfix_options_templates.iter() {
let _ = store.remove(&template_store::TemplateStoreKey::new(
scope.clone(),
template_store::TemplateKind::IpfixOptions,
*id,
));
}
for (id, _) in self.ipfix_parser.v9_templates.iter() {
let _ = store.remove(&template_store::TemplateStoreKey::new(
scope.clone(),
template_store::TemplateKind::IpfixV9Data,
*id,
));
}
for (id, _) in self.ipfix_parser.v9_options_templates.iter() {
let _ = store.remove(&template_store::TemplateStoreKey::new(
scope.clone(),
template_store::TemplateKind::IpfixV9Options,
*id,
));
}
}
self.ipfix_parser.templates.clear();
self.ipfix_parser.v9_templates.clear();
self.ipfix_parser.ipfix_options_templates.clear();
Expand Down Expand Up @@ -1581,6 +1716,22 @@ impl NetflowParser {
if let ParsedNetflow::Success { ref packet, .. } = result {
self.fire_template_events(packet);
}
// Fire Restored events for templates pulled in from the secondary
// store during this parse. Drained from each parser whether the
// outer result was Success or Error — restoration that happened
// before a later flowset failed should still be reported.
for (protocol, template_id) in self.v9_parser.drain_restored_templates() {
self.template_hooks.trigger(&TemplateEvent::Restored {
template_id: Some(template_id),
protocol,
});
}
for (protocol, template_id) in self.ipfix_parser.drain_restored_templates() {
self.template_hooks.trigger(&TemplateEvent::Restored {
template_id: Some(template_id),
protocol,
});
}
// Fire metric-based events for collisions, evictions, and expirations.
// Copy the after-metrics to avoid borrowing self immutably while
// fire_metric_delta_events borrows self mutably (for hook_errors).
Expand All @@ -1592,6 +1743,11 @@ impl NetflowParser {
let after = self.ipfix_parser.metrics;
self.fire_metric_delta_events(&before, &after, TemplateProtocol::Ipfix);
}
} else {
// Even when no hooks are registered, drain the buffers so they
// do not accumulate across parse_bytes calls.
let _ = self.v9_parser.drain_restored_templates();
let _ = self.ipfix_parser.drain_restored_templates();
}

result
Expand Down
Loading
Loading