Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions changelog.d/reduce_trace_events.feature.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
The `reduce` transform now accepts a `data_type` option (`log` or
`trace`, default `log`) that selects whether the instance collapses log
events or trace events. The existing merge strategies and conditions
apply unchanged to trace events.

authors: p120ph37
42 changes: 39 additions & 3 deletions src/transforms/reduce/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,43 @@ use crate::{
},
};

/// The event data type a `reduce` transform instance accepts and emits.
///
/// A single `reduce` instance handles exactly one data type. To reduce both
/// logs and traces, instantiate two `reduce` transforms with different
/// `data_type` values.
#[configurable_component]
#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)]
#[serde(rename_all = "snake_case")]
pub enum ReduceDataType {
/// Accept and emit `log` events.
#[default]
Log,

/// Accept and emit `trace` events.
Trace,
}

/// Configuration for the `reduce` transform.
#[serde_as]
#[configurable_component(transform(
"reduce",
"Collapse multiple log events into a single event based on a set of conditions and merge strategies.",
"Collapse multiple log or trace events into a single event based on a set of conditions and merge strategies.",
))]
#[derive(Clone, Debug, Derivative)]
#[derivative(Default)]
#[serde(deny_unknown_fields)]
pub struct ReduceConfig {
/// The event data type this transform instance operates on.
///
/// `reduce` accepts and emits a single data type per instance. Defaults
/// to `log` to preserve historical behavior; set to `trace` to reduce
/// trace events instead. The selected value drives both the topology-
/// level input type filter and the type of the emitted reduced events.
#[serde(default)]
#[configurable(metadata(docs::human_name = "Data Type"))]
pub data_type: ReduceDataType,

/// The maximum period of time to wait after the last event is received, in milliseconds, before
/// a combined event should be considered complete.
#[serde(default = "default_expire_after_ms")]
Expand Down Expand Up @@ -124,7 +151,10 @@ impl TransformConfig for ReduceConfig {
}

fn input(&self) -> Input {
Input::log()
match self.data_type {
ReduceDataType::Log => Input::log(),
ReduceDataType::Trace => Input::trace(),
}
}

fn outputs(
Expand Down Expand Up @@ -229,7 +259,13 @@ impl TransformConfig for ReduceConfig {
output_definitions.insert(output.clone(), schema_definition.clone());
}

vec![TransformOutput::new(DataType::Log, output_definitions)]
vec![TransformOutput::new(
match self.data_type {
ReduceDataType::Log => DataType::Log,
ReduceDataType::Trace => DataType::Trace,
},
output_definitions,
)]
}
}

Expand Down
237 changes: 218 additions & 19 deletions src/transforms/reduce/transform.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ use vrl::{
};

use crate::{
conditions::Condition,
event::{Event, EventMetadata, LogEvent, discriminant::Discriminant},
conditions::{AnyCondition, Condition, ConditionConfig},
event::{Event, EventMetadata, LogEvent, TraceEvent, discriminant::Discriminant},
internal_events::{ReduceAddEventError, ReduceStaleEventFlushed},
transforms::{
TaskTransform,
reduce::{
config::ReduceConfig,
config::{ReduceConfig, ReduceDataType},
merge_strategy::{MergeStrategy, ReduceValueMerger, get_value_merger},
},
},
Expand Down Expand Up @@ -150,6 +150,7 @@ pub struct Reduce {
ends_when: Option<Condition>,
starts_when: Option<Condition>,
max_events: Option<usize>,
data_type: ReduceDataType,
}

fn validate_merge_strategies(strategies: IndexMap<KeyString, MergeStrategy>) -> crate::Result<()> {
Expand Down Expand Up @@ -181,6 +182,22 @@ impl Reduce {
return Err("only one of `ends_when` and `starts_when` can be provided".into());
}

if config.data_type == ReduceDataType::Trace {
for (field, condition) in [
("ends_when", &config.ends_when),
("starts_when", &config.starts_when),
] {
if let Some(AnyCondition::Map(ConditionConfig::DatadogSearch(_))) = condition {
return Err(format!(
"`{field}` does not support `datadog_search` conditions when \
`data_type = \"trace\"`: the `datadog_search` matcher only \
evaluates log events and would silently never match trace inputs."
)
.into());
}
}
}

let ends_when = config
.ends_when
.as_ref()
Expand Down Expand Up @@ -219,6 +236,7 @@ impl Reduce {
ends_when,
starts_when,
max_events,
data_type: config.data_type,
})
}

Expand All @@ -239,15 +257,16 @@ impl Reduce {
for k in &flush_discriminants {
if let Some(t) = self.reduce_merge_states.remove(k) {
emit!(ReduceStaleEventFlushed);
emitter.emit(Event::from(t.flush()));
emitter.emit(wrap_flushed(t.flush(), self.data_type));
}
}
}

fn flush_all_into(&mut self, emitter: &mut Emitter<Event>) {
let data_type = self.data_type;
self.reduce_merge_states
.drain()
.for_each(|(_, s)| emitter.emit(Event::from(s.flush())));
.for_each(|(_, s)| emitter.emit(wrap_flushed(s.flush(), data_type)));
}

fn push_or_new_reduce_state(&mut self, event: LogEvent, discriminant: Discriminant) {
Expand All @@ -274,7 +293,18 @@ impl Reduce {
None => (false, event),
};

let event = event.into_log();
// `input()` restricts the variants we can see here to `Log` or
// `Trace` based on `data_type`. `TraceEvent` is a newtype around
// `LogEvent` (`From<TraceEvent> for LogEvent` is lossless), so we
// operate on its inner `LogEvent` for the duration of the reduce
// and re-wrap on flush via `wrap_flushed`.
let event = match event {
Event::Log(log) => log,
Event::Trace(trace) => LogEvent::from(trace),
Event::Metric(_) => {
unreachable!("reduce input() rejects metric events")
}
};
let discriminant = Discriminant::from_log_event(&event, &self.group_by);

if let Some(max_events) = self.max_events {
Expand All @@ -290,28 +320,42 @@ impl Reduce {

if starts_here {
if let Some(state) = self.reduce_merge_states.remove(&discriminant) {
emitter.emit(state.flush().into());
emitter.emit(wrap_flushed(state.flush(), self.data_type));
}

self.push_or_new_reduce_state(event, discriminant)
} else if ends_here {
emitter.emit(match self.reduce_merge_states.remove(&discriminant) {
Some(mut state) => {
state.add_event(event, &self.merge_strategies);
state.flush().into()
}
None => {
let mut state = ReduceState::new();
state.add_event(event, &self.merge_strategies);
state.flush().into()
}
});
emitter.emit(wrap_flushed(
match self.reduce_merge_states.remove(&discriminant) {
Some(mut state) => {
state.add_event(event, &self.merge_strategies);
state.flush()
}
None => {
let mut state = ReduceState::new();
state.add_event(event, &self.merge_strategies);
state.flush()
}
},
self.data_type,
));
} else {
self.push_or_new_reduce_state(event, discriminant)
}
}
}

/// Wrap a reduced `LogEvent` back into the `Event` variant declared by the
/// transform's `data_type`. Pairs with the input-variant extraction in
/// `transform_one`: a `Log` input round-trips as `Event::Log`, a `Trace`
/// input round-trips as `Event::Trace(TraceEvent::from(log))`.
fn wrap_flushed(log: LogEvent, data_type: ReduceDataType) -> Event {
match data_type {
ReduceDataType::Log => Event::Log(log),
ReduceDataType::Trace => Event::Trace(TraceEvent::from(log)),
}
}

impl TaskTransform<Event> for Reduce {
fn transform(
self: Box<Self>,
Expand Down Expand Up @@ -367,7 +411,7 @@ mod test {
use super::*;
use crate::{
config::{OutputId, TransformConfig, schema, schema::Definition},
event::{LogEvent, Value},
event::{LogEvent, TraceEvent, Value},
test_util::components::assert_transform_compliance,
transforms::test::create_topology,
};
Expand Down Expand Up @@ -1047,4 +1091,159 @@ merge_strategies.bar = "concat"
})
.await
}

#[tokio::test]
async fn reduce_trace_events() {
// Mirror of `reduce_from_condition`, but configured with
// `data_type = "trace"` so the reduce instance accepts and emits
// `Event::Trace` rather than `Event::Log`. Verifies that the
// existing field-merge machinery works unchanged across event
// variants and that the output round-trips through `TraceEvent`.
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
data_type = "trace"
group_by = [ "request_id" ]

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

let mut e_1 = LogEvent::from("trace message 1");
e_1.insert("counter", 1);
e_1.insert("request_id", "1");

let mut e_2 = LogEvent::from("trace message 2");
e_2.insert("counter", 2);
e_2.insert("request_id", "1");

let mut e_3 = LogEvent::from("trace message 3");
e_3.insert("counter", 3);
e_3.insert("request_id", "1");
e_3.insert("test_end", "yep");

for log in [e_1, e_2, e_3] {
tx.send(Event::Trace(TraceEvent::from(log))).await.unwrap();
}

let output = out.recv().await.unwrap();
// Output variant must match the configured data_type.
let trace = match output {
Event::Trace(t) => t,
other => panic!("expected Event::Trace, got {other:?}"),
};
assert_eq!(trace.get("message"), Some(&"trace message 1".into()));
assert_eq!(trace.get("counter"), Some(&Value::from(6)));

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[tokio::test]
async fn reduce_trace_merge_strategies() {
// Exercise per-field merge strategies on trace events to confirm
// the strategy machinery is fully event-type-agnostic.
let reduce_config = toml::from_str::<ReduceConfig>(
r#"
data_type = "trace"
group_by = [ "request_id" ]

merge_strategies.foo = "concat"
merge_strategies.bar = "array"
merge_strategies.baz = "max"

[ends_when]
type = "vrl"
source = "exists(.test_end)"
"#,
)
.unwrap();

assert_transform_compliance(async move {
let (tx, rx) = mpsc::channel(1);
let (topology, mut out) = create_topology(ReceiverStream::new(rx), reduce_config).await;

let mut e_1 = LogEvent::from("trace message 1");
e_1.insert("foo", "first foo");
e_1.insert("bar", "first bar");
e_1.insert("baz", 2);
e_1.insert("request_id", "1");
tx.send(Event::Trace(TraceEvent::from(e_1))).await.unwrap();

let mut e_2 = LogEvent::from("trace message 2");
e_2.insert("foo", "second foo");
e_2.insert("bar", 2);
e_2.insert("baz", "not number");
e_2.insert("request_id", "1");
tx.send(Event::Trace(TraceEvent::from(e_2))).await.unwrap();

let mut e_3 = LogEvent::from("trace message 3");
e_3.insert("foo", 10);
e_3.insert("bar", "third bar");
e_3.insert("baz", 3);
e_3.insert("request_id", "1");
e_3.insert("test_end", "yep");
tx.send(Event::Trace(TraceEvent::from(e_3))).await.unwrap();

let trace = match out.recv().await.unwrap() {
Event::Trace(t) => t,
other => panic!("expected Event::Trace, got {other:?}"),
};
assert_eq!(trace.get("message"), Some(&"trace message 1".into()));
assert_eq!(trace.get("foo"), Some(&"first foo second foo".into()));
assert_eq!(
trace.get("bar"),
Some(&Value::Array(vec![
"first bar".into(),
2.into(),
"third bar".into(),
])),
);
assert_eq!(trace.get("baz"), Some(&3.into()));

drop(tx);
topology.stop().await;
assert_eq!(out.recv().await, None);
})
.await;
}

#[test]
fn rejects_datadog_search_with_trace_data_type() {
// `DatadogSearchRunner::matches` is log-only, so combining it with
// `data_type = "trace"` would silently never fire the boundary
// condition. Reject the combination at config build time.
let config = toml::from_str::<ReduceConfig>(indoc!(
r#"
data_type = "trace"
group_by = [ "request_id" ]

[ends_when]
type = "datadog_search"
source = "@test_end:yep"
"#,
))
.unwrap();
let error = Reduce::new(
&config,
&TableRegistry::default(),
&MetricsStorage::default(),
)
.unwrap_err();
assert!(
error
.to_string()
.contains("`datadog_search` conditions when `data_type = \"trace\"`"),
"unexpected error: {error}"
);
}
}
Loading
Loading