diff --git a/changelog.d/reduce_trace_events.feature.md b/changelog.d/reduce_trace_events.feature.md new file mode 100644 index 0000000000000..47bd6b7618d13 --- /dev/null +++ b/changelog.d/reduce_trace_events.feature.md @@ -0,0 +1,6 @@ +The `reduce` transform now accepts a `data_type` option (`log` or +`trace`, default `log`) that selects whether the instance collapses log +events or trace events. The existing merge strategies and conditions +apply unchanged to trace events. + +authors: p120ph37 diff --git a/src/transforms/reduce/config.rs b/src/transforms/reduce/config.rs index 5d7233438a1f2..fdfe048d047b3 100644 --- a/src/transforms/reduce/config.rs +++ b/src/transforms/reduce/config.rs @@ -20,16 +20,43 @@ use crate::{ }, }; +/// The event data type a `reduce` transform instance accepts and emits. +/// +/// A single `reduce` instance handles exactly one data type. To reduce both +/// logs and traces, instantiate two `reduce` transforms with different +/// `data_type` values. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ReduceDataType { + /// Accept and emit `log` events. + #[default] + Log, + + /// Accept and emit `trace` events. + Trace, +} + /// Configuration for the `reduce` transform. #[serde_as] #[configurable_component(transform( "reduce", - "Collapse multiple log events into a single event based on a set of conditions and merge strategies.", + "Collapse multiple log or trace events into a single event based on a set of conditions and merge strategies.", ))] #[derive(Clone, Debug, Derivative)] #[derivative(Default)] #[serde(deny_unknown_fields)] pub struct ReduceConfig { + /// The event data type this transform instance operates on. + /// + /// `reduce` accepts and emits a single data type per instance. Defaults + /// to `log` to preserve historical behavior; set to `trace` to reduce + /// trace events instead. The selected value drives both the topology- + /// level input type filter and the type of the emitted reduced events. + #[serde(default)] + #[configurable(metadata(docs::human_name = "Data Type"))] + pub data_type: ReduceDataType, + /// The maximum period of time to wait after the last event is received, in milliseconds, before /// a combined event should be considered complete. #[serde(default = "default_expire_after_ms")] @@ -124,7 +151,10 @@ impl TransformConfig for ReduceConfig { } fn input(&self) -> Input { - Input::log() + match self.data_type { + ReduceDataType::Log => Input::log(), + ReduceDataType::Trace => Input::trace(), + } } fn outputs( @@ -229,7 +259,13 @@ impl TransformConfig for ReduceConfig { output_definitions.insert(output.clone(), schema_definition.clone()); } - vec![TransformOutput::new(DataType::Log, output_definitions)] + vec![TransformOutput::new( + match self.data_type { + ReduceDataType::Log => DataType::Log, + ReduceDataType::Trace => DataType::Trace, + }, + output_definitions, + )] } } diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 6db0ddc2742fd..5bbcab1ccae84 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -14,13 +14,13 @@ use vrl::{ }; use crate::{ - conditions::Condition, - event::{Event, EventMetadata, LogEvent, discriminant::Discriminant}, + conditions::{AnyCondition, Condition, ConditionConfig}, + event::{Event, EventMetadata, LogEvent, TraceEvent, discriminant::Discriminant}, internal_events::{ReduceAddEventError, ReduceStaleEventFlushed}, transforms::{ TaskTransform, reduce::{ - config::ReduceConfig, + config::{ReduceConfig, ReduceDataType}, merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger}, }, }, @@ -150,6 +150,7 @@ pub struct Reduce { ends_when: Option, starts_when: Option, max_events: Option, + data_type: ReduceDataType, } fn validate_merge_strategies(strategies: IndexMap) -> crate::Result<()> { @@ -181,6 +182,22 @@ impl Reduce { return Err("only one of `ends_when` and `starts_when` can be provided".into()); } + if config.data_type == ReduceDataType::Trace { + for (field, condition) in [ + ("ends_when", &config.ends_when), + ("starts_when", &config.starts_when), + ] { + if let Some(AnyCondition::Map(ConditionConfig::DatadogSearch(_))) = condition { + return Err(format!( + "`{field}` does not support `datadog_search` conditions when \ + `data_type = \"trace\"`: the `datadog_search` matcher only \ + evaluates log events and would silently never match trace inputs." + ) + .into()); + } + } + } + let ends_when = config .ends_when .as_ref() @@ -219,6 +236,7 @@ impl Reduce { ends_when, starts_when, max_events, + data_type: config.data_type, }) } @@ -239,15 +257,16 @@ impl Reduce { for k in &flush_discriminants { if let Some(t) = self.reduce_merge_states.remove(k) { emit!(ReduceStaleEventFlushed); - emitter.emit(Event::from(t.flush())); + emitter.emit(wrap_flushed(t.flush(), self.data_type)); } } } fn flush_all_into(&mut self, emitter: &mut Emitter) { + let data_type = self.data_type; self.reduce_merge_states .drain() - .for_each(|(_, s)| emitter.emit(Event::from(s.flush()))); + .for_each(|(_, s)| emitter.emit(wrap_flushed(s.flush(), data_type))); } fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) { @@ -274,7 +293,18 @@ impl Reduce { None => (false, event), }; - let event = event.into_log(); + // `input()` restricts the variants we can see here to `Log` or + // `Trace` based on `data_type`. `TraceEvent` is a newtype around + // `LogEvent` (`From for LogEvent` is lossless), so we + // operate on its inner `LogEvent` for the duration of the reduce + // and re-wrap on flush via `wrap_flushed`. + let event = match event { + Event::Log(log) => log, + Event::Trace(trace) => LogEvent::from(trace), + Event::Metric(_) => { + unreachable!("reduce input() rejects metric events") + } + }; let discriminant = Discriminant::from_log_event(&event, &self.group_by); if let Some(max_events) = self.max_events { @@ -290,28 +320,42 @@ impl Reduce { if starts_here { if let Some(state) = self.reduce_merge_states.remove(&discriminant) { - emitter.emit(state.flush().into()); + emitter.emit(wrap_flushed(state.flush(), self.data_type)); } self.push_or_new_reduce_state(event, discriminant) } else if ends_here { - emitter.emit(match self.reduce_merge_states.remove(&discriminant) { - Some(mut state) => { - state.add_event(event, &self.merge_strategies); - state.flush().into() - } - None => { - let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies); - state.flush().into() - } - }); + emitter.emit(wrap_flushed( + match self.reduce_merge_states.remove(&discriminant) { + Some(mut state) => { + state.add_event(event, &self.merge_strategies); + state.flush() + } + None => { + let mut state = ReduceState::new(); + state.add_event(event, &self.merge_strategies); + state.flush() + } + }, + self.data_type, + )); } else { self.push_or_new_reduce_state(event, discriminant) } } } +/// Wrap a reduced `LogEvent` back into the `Event` variant declared by the +/// transform's `data_type`. Pairs with the input-variant extraction in +/// `transform_one`: a `Log` input round-trips as `Event::Log`, a `Trace` +/// input round-trips as `Event::Trace(TraceEvent::from(log))`. +fn wrap_flushed(log: LogEvent, data_type: ReduceDataType) -> Event { + match data_type { + ReduceDataType::Log => Event::Log(log), + ReduceDataType::Trace => Event::Trace(TraceEvent::from(log)), + } +} + impl TaskTransform for Reduce { fn transform( self: Box, @@ -367,7 +411,7 @@ mod test { use super::*; use crate::{ config::{OutputId, TransformConfig, schema, schema::Definition}, - event::{LogEvent, Value}, + event::{LogEvent, TraceEvent, Value}, test_util::components::assert_transform_compliance, transforms::test::create_topology, }; @@ -1047,4 +1091,159 @@ merge_strategies.bar = "concat" }) .await } + + #[tokio::test] + async fn reduce_trace_events() { + // Mirror of `reduce_from_condition`, but configured with + // `data_type = "trace"` so the reduce instance accepts and emits + // `Event::Trace` rather than `Event::Log`. Verifies that the + // existing field-merge machinery works unchanged across event + // variants and that the output round-trips through `TraceEvent`. + let reduce_config = toml::from_str::( + r#" +data_type = "trace" +group_by = [ "request_id" ] + +[ends_when] + type = "vrl" + source = "exists(.test_end)" +"#, + ) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; + + let mut e_1 = LogEvent::from("trace message 1"); + e_1.insert("counter", 1); + e_1.insert("request_id", "1"); + + let mut e_2 = LogEvent::from("trace message 2"); + e_2.insert("counter", 2); + e_2.insert("request_id", "1"); + + let mut e_3 = LogEvent::from("trace message 3"); + e_3.insert("counter", 3); + e_3.insert("request_id", "1"); + e_3.insert("test_end", "yep"); + + for log in [e_1, e_2, e_3] { + tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap(); + } + + let output = out.recv().await.unwrap(); + // Output variant must match the configured data_type. + let trace = match output { + Event::Trace(t) => t, + other => panic!("expected Event::Trace, got {other:?}"), + }; + assert_eq!(trace.get("message"), Some(&"trace message 1".into())); + assert_eq!(trace.get("counter"), Some(&Value::from(6))); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn reduce_trace_merge_strategies() { + // Exercise per-field merge strategies on trace events to confirm + // the strategy machinery is fully event-type-agnostic. + let reduce_config = toml::from_str::( + r#" +data_type = "trace" +group_by = [ "request_id" ] + +merge_strategies.foo = "concat" +merge_strategies.bar = "array" +merge_strategies.baz = "max" + +[ends_when] + type = "vrl" + source = "exists(.test_end)" +"#, + ) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; + + let mut e_1 = LogEvent::from("trace message 1"); + e_1.insert("foo", "first foo"); + e_1.insert("bar", "first bar"); + e_1.insert("baz", 2); + e_1.insert("request_id", "1"); + tx.send(Event::Trace(TraceEvent::from(e_1))).await.unwrap(); + + let mut e_2 = LogEvent::from("trace message 2"); + e_2.insert("foo", "second foo"); + e_2.insert("bar", 2); + e_2.insert("baz", "not number"); + e_2.insert("request_id", "1"); + tx.send(Event::Trace(TraceEvent::from(e_2))).await.unwrap(); + + let mut e_3 = LogEvent::from("trace message 3"); + e_3.insert("foo", 10); + e_3.insert("bar", "third bar"); + e_3.insert("baz", 3); + e_3.insert("request_id", "1"); + e_3.insert("test_end", "yep"); + tx.send(Event::Trace(TraceEvent::from(e_3))).await.unwrap(); + + let trace = match out.recv().await.unwrap() { + Event::Trace(t) => t, + other => panic!("expected Event::Trace, got {other:?}"), + }; + assert_eq!(trace.get("message"), Some(&"trace message 1".into())); + assert_eq!(trace.get("foo"), Some(&"first foo second foo".into())); + assert_eq!( + trace.get("bar"), + Some(&Value::Array(vec![ + "first bar".into(), + 2.into(), + "third bar".into(), + ])), + ); + assert_eq!(trace.get("baz"), Some(&3.into())); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[test] + fn rejects_datadog_search_with_trace_data_type() { + // `DatadogSearchRunner::matches` is log-only, so combining it with + // `data_type = "trace"` would silently never fire the boundary + // condition. Reject the combination at config build time. + let config = toml::from_str::(indoc!( + r#" + data_type = "trace" + group_by = [ "request_id" ] + + [ends_when] + type = "datadog_search" + source = "@test_end:yep" + "#, + )) + .unwrap(); + let error = Reduce::new( + &config, + &TableRegistry::default(), + &MetricsStorage::default(), + ) + .unwrap_err(); + assert!( + error + .to_string() + .contains("`datadog_search` conditions when `data_type = \"trace\"`"), + "unexpected error: {error}" + ); + } } diff --git a/website/cue/reference/components/transforms/generated/reduce.cue b/website/cue/reference/components/transforms/generated/reduce.cue index 8f494c884e0bd..7c0650144d280 100644 --- a/website/cue/reference/components/transforms/generated/reduce.cue +++ b/website/cue/reference/components/transforms/generated/reduce.cue @@ -1,6 +1,24 @@ package metadata generated: components: transforms: reduce: configuration: { + data_type: { + description: """ + The event data type this transform instance operates on. + + `reduce` accepts and emits a single data type per instance. Defaults + to `log` to preserve historical behavior; set to `trace` to reduce + trace events instead. The selected value drives both the topology- + level input type filter and the type of the emitted reduced events. + """ + required: false + type: string: { + default: "log" + enum: { + log: "Accept and emit `log` events." + trace: "Accept and emit `trace` events." + } + } + } end_every_period_ms: { description: """ If supplied, every time this interval elapses for a given grouping, the reduced value diff --git a/website/cue/reference/components/transforms/reduce.cue b/website/cue/reference/components/transforms/reduce.cue index 49fa0184df561..40fd9a48d62ba 100644 --- a/website/cue/reference/components/transforms/reduce.cue +++ b/website/cue/reference/components/transforms/reduce.cue @@ -4,7 +4,7 @@ components: transforms: reduce: { title: "Reduce" description: """ - Reduces multiple log events into a single log event based on a set of + Reduces multiple log or trace events into a single event based on a set of conditions and merge strategies. """ @@ -30,13 +30,16 @@ components: transforms: reduce: { input: { logs: true metrics: null - traces: false + traces: true } output: { logs: "": { description: "The modified input `log` event." } + traces: "": { + description: "The modified input `trace` event." + } } examples: [