From db4f74f5c9955730d346847525647c347bc3cb71 Mon Sep 17 00:00:00 2001 From: Aaron Meriwether Date: Fri, 22 May 2026 15:19:07 -0400 Subject: [PATCH 1/4] enhancement(reduce transform): support trace events via data_type config option Adds a data_type option (log|trace, default log) so a reduce transform instance can collapse trace events the same way it collapses logs. --- changelog.d/reduce_trace_events.feature.md | 6 + src/transforms/reduce/config.rs | 42 +++- src/transforms/reduce/transform.rs | 189 ++++++++++++++++-- .../transforms/generated/reduce.cue | 18 ++ .../components/transforms/reduce.cue | 7 +- 5 files changed, 239 insertions(+), 23 deletions(-) create mode 100644 changelog.d/reduce_trace_events.feature.md diff --git a/changelog.d/reduce_trace_events.feature.md b/changelog.d/reduce_trace_events.feature.md new file mode 100644 index 0000000000000..47bd6b7618d13 --- /dev/null +++ b/changelog.d/reduce_trace_events.feature.md @@ -0,0 +1,6 @@ +The `reduce` transform now accepts a `data_type` option (`log` or +`trace`, default `log`) that selects whether the instance collapses log +events or trace events. The existing merge strategies and conditions +apply unchanged to trace events. + +authors: p120ph37 diff --git a/src/transforms/reduce/config.rs b/src/transforms/reduce/config.rs index 5d7233438a1f2..fdfe048d047b3 100644 --- a/src/transforms/reduce/config.rs +++ b/src/transforms/reduce/config.rs @@ -20,16 +20,43 @@ use crate::{ }, }; +/// The event data type a `reduce` transform instance accepts and emits. +/// +/// A single `reduce` instance handles exactly one data type. To reduce both +/// logs and traces, instantiate two `reduce` transforms with different +/// `data_type` values. +#[configurable_component] +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +#[serde(rename_all = "snake_case")] +pub enum ReduceDataType { + /// Accept and emit `log` events. + #[default] + Log, + + /// Accept and emit `trace` events. + Trace, +} + /// Configuration for the `reduce` transform. #[serde_as] #[configurable_component(transform( "reduce", - "Collapse multiple log events into a single event based on a set of conditions and merge strategies.", + "Collapse multiple log or trace events into a single event based on a set of conditions and merge strategies.", ))] #[derive(Clone, Debug, Derivative)] #[derivative(Default)] #[serde(deny_unknown_fields)] pub struct ReduceConfig { + /// The event data type this transform instance operates on. + /// + /// `reduce` accepts and emits a single data type per instance. Defaults + /// to `log` to preserve historical behavior; set to `trace` to reduce + /// trace events instead. The selected value drives both the topology- + /// level input type filter and the type of the emitted reduced events. + #[serde(default)] + #[configurable(metadata(docs::human_name = "Data Type"))] + pub data_type: ReduceDataType, + /// The maximum period of time to wait after the last event is received, in milliseconds, before /// a combined event should be considered complete. #[serde(default = "default_expire_after_ms")] @@ -124,7 +151,10 @@ impl TransformConfig for ReduceConfig { } fn input(&self) -> Input { - Input::log() + match self.data_type { + ReduceDataType::Log => Input::log(), + ReduceDataType::Trace => Input::trace(), + } } fn outputs( @@ -229,7 +259,13 @@ impl TransformConfig for ReduceConfig { output_definitions.insert(output.clone(), schema_definition.clone()); } - vec![TransformOutput::new(DataType::Log, output_definitions)] + vec![TransformOutput::new( + match self.data_type { + ReduceDataType::Log => DataType::Log, + ReduceDataType::Trace => DataType::Trace, + }, + output_definitions, + )] } } diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 6db0ddc2742fd..946876453d1e3 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -15,12 +15,12 @@ use vrl::{ use crate::{ conditions::Condition, - event::{Event, EventMetadata, LogEvent, discriminant::Discriminant}, + event::{Event, EventMetadata, LogEvent, TraceEvent, discriminant::Discriminant}, internal_events::{ReduceAddEventError, ReduceStaleEventFlushed}, transforms::{ TaskTransform, reduce::{ - config::ReduceConfig, + config::{ReduceConfig, ReduceDataType}, merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger}, }, }, @@ -150,6 +150,7 @@ pub struct Reduce { ends_when: Option, starts_when: Option, max_events: Option, + data_type: ReduceDataType, } fn validate_merge_strategies(strategies: IndexMap) -> crate::Result<()> { @@ -219,6 +220,7 @@ impl Reduce { ends_when, starts_when, max_events, + data_type: config.data_type, }) } @@ -239,15 +241,16 @@ impl Reduce { for k in &flush_discriminants { if let Some(t) = self.reduce_merge_states.remove(k) { emit!(ReduceStaleEventFlushed); - emitter.emit(Event::from(t.flush())); + emitter.emit(wrap_flushed(t.flush(), self.data_type)); } } } fn flush_all_into(&mut self, emitter: &mut Emitter) { + let data_type = self.data_type; self.reduce_merge_states .drain() - .for_each(|(_, s)| emitter.emit(Event::from(s.flush()))); + .for_each(|(_, s)| emitter.emit(wrap_flushed(s.flush(), data_type))); } fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) { @@ -274,7 +277,18 @@ impl Reduce { None => (false, event), }; - let event = event.into_log(); + // `input()` restricts the variants we can see here to `Log` or + // `Trace` based on `data_type`. `TraceEvent` is a newtype around + // `LogEvent` (`From for LogEvent` is lossless), so we + // operate on its inner `LogEvent` for the duration of the reduce + // and re-wrap on flush via `wrap_flushed`. + let event = match event { + Event::Log(log) => log, + Event::Trace(trace) => LogEvent::from(trace), + Event::Metric(_) => { + unreachable!("reduce input() rejects metric events") + } + }; let discriminant = Discriminant::from_log_event(&event, &self.group_by); if let Some(max_events) = self.max_events { @@ -290,28 +304,42 @@ impl Reduce { if starts_here { if let Some(state) = self.reduce_merge_states.remove(&discriminant) { - emitter.emit(state.flush().into()); + emitter.emit(wrap_flushed(state.flush(), self.data_type)); } self.push_or_new_reduce_state(event, discriminant) } else if ends_here { - emitter.emit(match self.reduce_merge_states.remove(&discriminant) { - Some(mut state) => { - state.add_event(event, &self.merge_strategies); - state.flush().into() - } - None => { - let mut state = ReduceState::new(); - state.add_event(event, &self.merge_strategies); - state.flush().into() - } - }); + emitter.emit(wrap_flushed( + match self.reduce_merge_states.remove(&discriminant) { + Some(mut state) => { + state.add_event(event, &self.merge_strategies); + state.flush() + } + None => { + let mut state = ReduceState::new(); + state.add_event(event, &self.merge_strategies); + state.flush() + } + }, + self.data_type, + )); } else { self.push_or_new_reduce_state(event, discriminant) } } } +/// Wrap a reduced `LogEvent` back into the `Event` variant declared by the +/// transform's `data_type`. Pairs with the input-variant extraction in +/// `transform_one`: a `Log` input round-trips as `Event::Log`, a `Trace` +/// input round-trips as `Event::Trace(TraceEvent::from(log))`. +fn wrap_flushed(log: LogEvent, data_type: ReduceDataType) -> Event { + match data_type { + ReduceDataType::Log => Event::Log(log), + ReduceDataType::Trace => Event::Trace(TraceEvent::from(log)), + } +} + impl TaskTransform for Reduce { fn transform( self: Box, @@ -367,7 +395,7 @@ mod test { use super::*; use crate::{ config::{OutputId, TransformConfig, schema, schema::Definition}, - event::{LogEvent, Value}, + event::{LogEvent, TraceEvent, Value}, test_util::components::assert_transform_compliance, transforms::test::create_topology, }; @@ -1047,4 +1075,129 @@ merge_strategies.bar = "concat" }) .await } + + #[tokio::test] + async fn reduce_trace_events() { + // Mirror of `reduce_from_condition`, but configured with + // `data_type = "trace"` so the reduce instance accepts and emits + // `Event::Trace` rather than `Event::Log`. Verifies that the + // existing field-merge machinery works unchanged across event + // variants and that the output round-trips through `TraceEvent`. + let reduce_config = toml::from_str::( + r#" +data_type = "trace" +group_by = [ "request_id" ] + +[ends_when] + type = "vrl" + source = "exists(.test_end)" +"#, + ) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; + + let mut e_1 = LogEvent::from("trace message 1"); + e_1.insert("counter", 1); + e_1.insert("request_id", "1"); + + let mut e_2 = LogEvent::from("trace message 2"); + e_2.insert("counter", 2); + e_2.insert("request_id", "1"); + + let mut e_3 = LogEvent::from("trace message 3"); + e_3.insert("counter", 3); + e_3.insert("request_id", "1"); + e_3.insert("test_end", "yep"); + + for log in [e_1, e_2, e_3] { + tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap(); + } + + let output = out.recv().await.unwrap(); + // Output variant must match the configured data_type. + let trace = match output { + Event::Trace(t) => t, + other => panic!("expected Event::Trace, got {other:?}"), + }; + assert_eq!(trace.get("message"), Some(&"trace message 1".into())); + assert_eq!(trace.get("counter"), Some(&Value::from(6))); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } + + #[tokio::test] + async fn reduce_trace_merge_strategies() { + // Exercise per-field merge strategies on trace events to confirm + // the strategy machinery is fully event-type-agnostic. + let reduce_config = toml::from_str::( + r#" +data_type = "trace" +group_by = [ "request_id" ] + +merge_strategies.foo = "concat" +merge_strategies.bar = "array" +merge_strategies.baz = "max" + +[ends_when] + type = "vrl" + source = "exists(.test_end)" +"#, + ) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; + + let mut e_1 = LogEvent::from("trace message 1"); + e_1.insert("foo", "first foo"); + e_1.insert("bar", "first bar"); + e_1.insert("baz", 2); + e_1.insert("request_id", "1"); + tx.send(Event::Trace(TraceEvent::from(e_1))).await.unwrap(); + + let mut e_2 = LogEvent::from("trace message 2"); + e_2.insert("foo", "second foo"); + e_2.insert("bar", 2); + e_2.insert("baz", "not number"); + e_2.insert("request_id", "1"); + tx.send(Event::Trace(TraceEvent::from(e_2))).await.unwrap(); + + let mut e_3 = LogEvent::from("trace message 3"); + e_3.insert("foo", 10); + e_3.insert("bar", "third bar"); + e_3.insert("baz", 3); + e_3.insert("request_id", "1"); + e_3.insert("test_end", "yep"); + tx.send(Event::Trace(TraceEvent::from(e_3))).await.unwrap(); + + let trace = match out.recv().await.unwrap() { + Event::Trace(t) => t, + other => panic!("expected Event::Trace, got {other:?}"), + }; + assert_eq!(trace.get("message"), Some(&"trace message 1".into())); + assert_eq!(trace.get("foo"), Some(&"first foo second foo".into())); + assert_eq!( + trace.get("bar"), + Some(&Value::Array(vec![ + "first bar".into(), + 2.into(), + "third bar".into(), + ])), + ); + assert_eq!(trace.get("baz"), Some(&3.into())); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } } diff --git a/website/cue/reference/components/transforms/generated/reduce.cue b/website/cue/reference/components/transforms/generated/reduce.cue index 8f494c884e0bd..7c0650144d280 100644 --- a/website/cue/reference/components/transforms/generated/reduce.cue +++ b/website/cue/reference/components/transforms/generated/reduce.cue @@ -1,6 +1,24 @@ package metadata generated: components: transforms: reduce: configuration: { + data_type: { + description: """ + The event data type this transform instance operates on. + + `reduce` accepts and emits a single data type per instance. Defaults + to `log` to preserve historical behavior; set to `trace` to reduce + trace events instead. The selected value drives both the topology- + level input type filter and the type of the emitted reduced events. + """ + required: false + type: string: { + default: "log" + enum: { + log: "Accept and emit `log` events." + trace: "Accept and emit `trace` events." + } + } + } end_every_period_ms: { description: """ If supplied, every time this interval elapses for a given grouping, the reduced value diff --git a/website/cue/reference/components/transforms/reduce.cue b/website/cue/reference/components/transforms/reduce.cue index 49fa0184df561..40fd9a48d62ba 100644 --- a/website/cue/reference/components/transforms/reduce.cue +++ b/website/cue/reference/components/transforms/reduce.cue @@ -4,7 +4,7 @@ components: transforms: reduce: { title: "Reduce" description: """ - Reduces multiple log events into a single log event based on a set of + Reduces multiple log or trace events into a single event based on a set of conditions and merge strategies. """ @@ -30,13 +30,16 @@ components: transforms: reduce: { input: { logs: true metrics: null - traces: false + traces: true } output: { logs: "": { description: "The modified input `log` event." } + traces: "": { + description: "The modified input `trace` event." + } } examples: [ From 351602817a4129844b54118f75dea5d0500cf84b Mon Sep 17 00:00:00 2001 From: Aaron Meriwether Date: Fri, 22 May 2026 16:40:54 -0400 Subject: [PATCH 2/4] fix(reduce transform): evaluate conditions on log-wrapped trace events Move the trace-to-log unwrap to before the `starts_when`/`ends_when` checks (re-wrapping as `Event::Log`) so log-only matchers like `datadog_search` fire on trace inputs. Without this, trace-mode reduce configured with a `datadog_search` boundary would silently never match, leaving groups to flush only via timeout/`max_events`. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/transforms/reduce/transform.rs | 79 +++++++++++++++++++++++++----- 1 file changed, 67 insertions(+), 12 deletions(-) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 946876453d1e3..f7d1ee9da08ca 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -267,6 +267,22 @@ impl Reduce { } pub fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { + // `input()` restricts the variants we can see here to `Log` or + // `Trace` based on `data_type`. `TraceEvent` is a newtype around + // `LogEvent` (`From for LogEvent` is lossless), so we + // unwrap traces to `Event::Log` before evaluating conditions. This + // lets log-only matchers (e.g. `datadog_search`, whose runner only + // matches `EventRef::Log`) fire for trace inputs, and avoids a + // second unwrap after each condition. We re-wrap on flush via + // `wrap_flushed`. + let event = match event { + Event::Log(_) => event, + Event::Trace(trace) => Event::Log(LogEvent::from(trace)), + Event::Metric(_) => { + unreachable!("reduce input() rejects metric events") + } + }; + let (starts_here, event) = match &self.starts_when { Some(condition) => condition.check(event), None => (false, event), @@ -277,18 +293,7 @@ impl Reduce { None => (false, event), }; - // `input()` restricts the variants we can see here to `Log` or - // `Trace` based on `data_type`. `TraceEvent` is a newtype around - // `LogEvent` (`From for LogEvent` is lossless), so we - // operate on its inner `LogEvent` for the duration of the reduce - // and re-wrap on flush via `wrap_flushed`. - let event = match event { - Event::Log(log) => log, - Event::Trace(trace) => LogEvent::from(trace), - Event::Metric(_) => { - unreachable!("reduce input() rejects metric events") - } - }; + let event = event.into_log(); let discriminant = Discriminant::from_log_event(&event, &self.group_by); if let Some(max_events) = self.max_events { @@ -1200,4 +1205,54 @@ merge_strategies.baz = "max" }) .await; } + + #[tokio::test] + async fn reduce_trace_with_datadog_search_condition() { + // `DatadogSearchRunner::matches` only matches `EventRef::Log`, so + // boundary conditions would silently never fire on trace inputs if + // the trace were not unwrapped to `Event::Log` before condition + // evaluation. This test pins that contract. + let reduce_config = toml::from_str::( + r#" +data_type = "trace" +group_by = [ "request_id" ] + +[ends_when] + type = "datadog_search" + source = "@test_end:yep" +"#, + ) + .unwrap(); + + assert_transform_compliance(async move { + let (tx, rx) = mpsc::channel(1); + let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; + + let mut e_1 = LogEvent::from("trace message 1"); + e_1.insert("counter", 1); + e_1.insert("request_id", "1"); + + let mut e_2 = LogEvent::from("trace message 2"); + e_2.insert("counter", 2); + e_2.insert("request_id", "1"); + e_2.insert("test_end", "yep"); + + for log in [e_1, e_2] { + tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap(); + } + + let trace = match out.recv().await.unwrap() { + Event::Trace(t) => t, + other => panic!("expected Event::Trace, got {other:?}"), + }; + assert_eq!(trace.get("message"), Some(&"trace message 1".into())); + assert_eq!(trace.get("counter"), Some(&Value::from(3))); + assert_eq!(trace.get("test_end"), Some(&"yep".into())); + + drop(tx); + topology.stop().await; + assert_eq!(out.recv().await, None); + }) + .await; + } } From 512bb12f2da2774e9b68fc225bd7c81f2414be80 Mon Sep 17 00:00:00 2001 From: Aaron Meriwether Date: Fri, 22 May 2026 16:59:48 -0400 Subject: [PATCH 3/4] Revert "fix(reduce transform): evaluate conditions on log-wrapped trace events" This reverts commit 351602817a4129844b54118f75dea5d0500cf84b. Pre-converting `Event::Trace` to `Event::Log` before condition evaluation fixed the `datadog_search` mismatch but broke type-discriminating conditions: `Condition::IsTrace` would silently never match in trace mode and `Condition::IsLog` would always match. That's a worse failure mode than the one being addressed. Reverting; the original concern is handled in the follow-up commit by rejecting the unsupported combination at config build time. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/transforms/reduce/transform.rs | 79 +++++------------------------- 1 file changed, 12 insertions(+), 67 deletions(-) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index f7d1ee9da08ca..946876453d1e3 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -267,22 +267,6 @@ impl Reduce { } pub fn transform_one(&mut self, emitter: &mut Emitter, event: Event) { - // `input()` restricts the variants we can see here to `Log` or - // `Trace` based on `data_type`. `TraceEvent` is a newtype around - // `LogEvent` (`From for LogEvent` is lossless), so we - // unwrap traces to `Event::Log` before evaluating conditions. This - // lets log-only matchers (e.g. `datadog_search`, whose runner only - // matches `EventRef::Log`) fire for trace inputs, and avoids a - // second unwrap after each condition. We re-wrap on flush via - // `wrap_flushed`. - let event = match event { - Event::Log(_) => event, - Event::Trace(trace) => Event::Log(LogEvent::from(trace)), - Event::Metric(_) => { - unreachable!("reduce input() rejects metric events") - } - }; - let (starts_here, event) = match &self.starts_when { Some(condition) => condition.check(event), None => (false, event), @@ -293,7 +277,18 @@ impl Reduce { None => (false, event), }; - let event = event.into_log(); + // `input()` restricts the variants we can see here to `Log` or + // `Trace` based on `data_type`. `TraceEvent` is a newtype around + // `LogEvent` (`From for LogEvent` is lossless), so we + // operate on its inner `LogEvent` for the duration of the reduce + // and re-wrap on flush via `wrap_flushed`. + let event = match event { + Event::Log(log) => log, + Event::Trace(trace) => LogEvent::from(trace), + Event::Metric(_) => { + unreachable!("reduce input() rejects metric events") + } + }; let discriminant = Discriminant::from_log_event(&event, &self.group_by); if let Some(max_events) = self.max_events { @@ -1205,54 +1200,4 @@ merge_strategies.baz = "max" }) .await; } - - #[tokio::test] - async fn reduce_trace_with_datadog_search_condition() { - // `DatadogSearchRunner::matches` only matches `EventRef::Log`, so - // boundary conditions would silently never fire on trace inputs if - // the trace were not unwrapped to `Event::Log` before condition - // evaluation. This test pins that contract. - let reduce_config = toml::from_str::( - r#" -data_type = "trace" -group_by = [ "request_id" ] - -[ends_when] - type = "datadog_search" - source = "@test_end:yep" -"#, - ) - .unwrap(); - - assert_transform_compliance(async move { - let (tx, rx) = mpsc::channel(1); - let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await; - - let mut e_1 = LogEvent::from("trace message 1"); - e_1.insert("counter", 1); - e_1.insert("request_id", "1"); - - let mut e_2 = LogEvent::from("trace message 2"); - e_2.insert("counter", 2); - e_2.insert("request_id", "1"); - e_2.insert("test_end", "yep"); - - for log in [e_1, e_2] { - tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap(); - } - - let trace = match out.recv().await.unwrap() { - Event::Trace(t) => t, - other => panic!("expected Event::Trace, got {other:?}"), - }; - assert_eq!(trace.get("message"), Some(&"trace message 1".into())); - assert_eq!(trace.get("counter"), Some(&Value::from(3))); - assert_eq!(trace.get("test_end"), Some(&"yep".into())); - - drop(tx); - topology.stop().await; - assert_eq!(out.recv().await, None); - }) - .await; - } } From 66376f5dc87804b2ce8a192fd7735474d10d926f Mon Sep 17 00:00:00 2001 From: Aaron Meriwether Date: Fri, 22 May 2026 17:01:17 -0400 Subject: [PATCH 4/4] fix(reduce transform): reject `datadog_search` conditions in trace mode `DatadogSearchRunner::matches` only matches `EventRef::Log` and silently returns `false` for traces. Combined with `data_type = "trace"`, that means `starts_when`/`ends_when` would never fire, leaving groups to flush only via timeout/`max_events`. Reject the combination at config build time so the failure is loud rather than silent. Co-Authored-By: Claude Opus 4.7 (1M context) --- src/transforms/reduce/transform.rs | 48 +++++++++++++++++++++++++++++- 1 file changed, 47 insertions(+), 1 deletion(-) diff --git a/src/transforms/reduce/transform.rs b/src/transforms/reduce/transform.rs index 946876453d1e3..5bbcab1ccae84 100644 --- a/src/transforms/reduce/transform.rs +++ b/src/transforms/reduce/transform.rs @@ -14,7 +14,7 @@ use vrl::{ }; use crate::{ - conditions::Condition, + conditions::{AnyCondition, Condition, ConditionConfig}, event::{Event, EventMetadata, LogEvent, TraceEvent, discriminant::Discriminant}, internal_events::{ReduceAddEventError, ReduceStaleEventFlushed}, transforms::{ @@ -182,6 +182,22 @@ impl Reduce { return Err("only one of `ends_when` and `starts_when` can be provided".into()); } + if config.data_type == ReduceDataType::Trace { + for (field, condition) in [ + ("ends_when", &config.ends_when), + ("starts_when", &config.starts_when), + ] { + if let Some(AnyCondition::Map(ConditionConfig::DatadogSearch(_))) = condition { + return Err(format!( + "`{field}` does not support `datadog_search` conditions when \ + `data_type = \"trace\"`: the `datadog_search` matcher only \ + evaluates log events and would silently never match trace inputs." + ) + .into()); + } + } + } + let ends_when = config .ends_when .as_ref() @@ -1200,4 +1216,34 @@ merge_strategies.baz = "max" }) .await; } + + #[test] + fn rejects_datadog_search_with_trace_data_type() { + // `DatadogSearchRunner::matches` is log-only, so combining it with + // `data_type = "trace"` would silently never fire the boundary + // condition. Reject the combination at config build time. + let config = toml::from_str::(indoc!( + r#" + data_type = "trace" + group_by = [ "request_id" ] + + [ends_when] + type = "datadog_search" + source = "@test_end:yep" + "#, + )) + .unwrap(); + let error = Reduce::new( + &config, + &TableRegistry::default(), + &MetricsStorage::default(), + ) + .unwrap_err(); + assert!( + error + .to_string() + .contains("`datadog_search` conditions when `data_type = \"trace\"`"), + "unexpected error: {error}" + ); + } }