diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 04894f9c6..3fe0999ab 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -43,6 +43,12 @@ pub struct Context { /// tracing. /// pub extracted_span_context: Option, + /// Whether the tracer signaled (via the `Datadog-Client-Computed-Stats` header) that it + /// already computed trace stats client-side, propagated from the tracer's placeholder span. + /// + /// Used when generating the extension-side `aws.lambda` span (Path B) so the backend + /// stats directive (`_dd.compute_stats`) is stamped consistently with Path A. + pub client_computed_stats: bool, } /// Struct containing the information needed to reparent a span. @@ -94,6 +100,7 @@ impl Default for Context { snapstart_restore_span: None, tracer_span: None, extracted_span_context: None, + client_computed_stats: false, } } } @@ -508,12 +515,20 @@ impl ContextBuffer { /// Adds the tracer span to a `Context` in the buffer. /// - pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: &Span) { + /// `client_computed_stats` carries the tracer's `Datadog-Client-Computed-Stats` signal so + /// the extension-generated `aws.lambda` span can stamp `_dd.compute_stats` consistently. + pub fn add_tracer_span( + &mut self, + request_id: &String, + tracer_span: &Span, + client_computed_stats: bool, + ) { if let Some(context) = self .buffer .iter_mut() .find(|context| context.request_id == *request_id) { + context.client_computed_stats = client_computed_stats; context .invocation_span .meta @@ -639,6 +654,34 @@ mod tests { assert!(buffer.get(&unexistent_request_id).is_none()); } + /// APMSVLS-487 Tier 2: `add_tracer_span` records the tracer's `client_computed_stats` + /// signal onto the matching context (and is a no-op when no context matches). + #[test] + fn test_add_tracer_span_sets_client_computed_stats() { + for client_computed_stats in [true, false] { + let mut buffer = ContextBuffer::with_capacity(2); + let request_id = String::from("req-1"); + buffer.insert(Context::from_request_id(&request_id)); + + let mut tracer_span = Span::default(); + tracer_span + .meta + .insert("request_id".to_string(), request_id.clone()); + + buffer.add_tracer_span(&request_id, &tracer_span, client_computed_stats); + + assert_eq!( + buffer.get(&request_id).unwrap().client_computed_stats, + client_computed_stats + ); + } + + // No matching context -> no panic, nothing recorded. + let mut buffer = ContextBuffer::with_capacity(2); + buffer.add_tracer_span(&String::from("missing"), &Span::default(), true); + assert!(buffer.get(&String::from("missing")).is_none()); + } + #[test] fn test_add_start_time() { let mut buffer = ContextBuffer::with_capacity(2); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ec1af99ab..4be42944a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -665,9 +665,17 @@ impl Processor { trace_sender: &Arc, context: Context, ) { + // Capture before `get_ctx_spans` consumes `context`. + let client_computed_stats = context.client_computed_stats; let (traces, body_size) = self.get_ctx_spans(context); - self.send_spans(traces, body_size, tags_provider, trace_sender) - .await; + self.send_spans( + traces, + body_size, + tags_provider, + trace_sender, + client_computed_stats, + ) + .await; } fn get_ctx_spans(&mut self, context: Context) -> (Vec, usize) { @@ -732,7 +740,9 @@ impl Processor { let traces = vec![cold_start_span.clone()]; let body_size = size_of_val(cold_start_span); - self.send_spans(traces, body_size, tags_provider, trace_sender) + // The cold start span is extension-generated and not tied to a tracer's stats + // signal, so the backend should compute its stats unless the extension does. + self.send_spans(traces, body_size, tags_provider, trace_sender, false) .await; } } @@ -746,8 +756,12 @@ impl Processor { body_size: usize, tags_provider: &Arc, trace_sender: &Arc, + client_computed_stats: bool, ) { // todo: figure out what to do here + // `client_computed_stats` is propagated from the tracer's placeholder span so the + // downstream `ChunkProcessor` (reused via `send_processed_traces` -> `process_traces`) + // stamps `_dd.compute_stats` on these extension-generated spans consistently with Path A. let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", lang_version: "", @@ -756,7 +770,7 @@ impl Processor { tracer_version: "", container_id: "", client_computed_top_level: false, - client_computed_stats: false, + client_computed_stats, dropped_p0_traces: 0, dropped_p0_spans: 0, }; @@ -1403,9 +1417,10 @@ impl Processor { /// /// This is used to enrich the invocation span with additional metadata from the tracers /// top level span, since we discard the tracer span when we create the invocation span. - pub fn add_tracer_span(&mut self, span: &Span) { + pub fn add_tracer_span(&mut self, span: &Span, client_computed_stats: bool) { if let Some(request_id) = span.meta.get("request_id") { - self.context_buffer.add_tracer_span(request_id, span); + self.context_buffer + .add_tracer_span(request_id, span, client_computed_stats); } } @@ -2445,4 +2460,137 @@ mod tests { "pre-existing _dd.appsec.enabled value must not be overwritten" ); } + + /// Build a [`Processor`] with a caller-supplied config (for toggling + /// `compute_trace_stats_on_extension`). + fn setup_with_config(config: Arc) -> Processor { + let aws_config = Arc::new(AwsConfig { + region: "us-east-1".into(), + aws_lwa_proxy_lambda_runtime_api: Some("***".into()), + function_name: "test-function".into(), + sandbox_init_time: Instant::now(), + runtime_api: "***".into(), + exec_wrapper: None, + initialization_type: "on-demand".into(), + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + tokio::spawn(service.run()); + let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); + let (durable_context_tx, _) = tokio::sync::mpsc::channel(1); + Processor::new( + tags_provider, + config, + aws_config, + handle, + propagator, + durable_context_tx, + ) + } + + /// Like [`make_trace_sender`], but returns the receiver so the test can inspect the + /// processed payload that Path B sends downstream. + fn make_trace_sender_with_rx( + config: Arc, + ) -> ( + Arc, + tokio::sync::mpsc::Receiver, + ) { + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(stats_concentrator_service.run()); + let (trace_tx, trace_rx) = tokio::sync::mpsc::channel(8); + let sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx, + stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), + }); + (sender, trace_rx) + } + + /// APMSVLS-487 Tier 2: the extension-generated `aws.lambda` span (Path B) stamps + /// `_dd.compute_stats="1"` only when neither the extension nor the tracer computes stats; + /// otherwise the key is absent. `client_computed_stats` is propagated from the context. + #[tokio::test] + #[allow(clippy::unwrap_used)] + async fn test_send_ctx_spans_stamps_compute_stats() { + use crate::tags::lambda::tags::COMPUTE_STATS_KEY; + use libdd_trace_utils::tracer_payload::TracerPayloadCollection; + + #[cfg(feature = "fips")] + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + #[cfg(not(feature = "fips"))] + let _ = rustls::crypto::ring::default_provider().install_default(); + + // (compute_on_extension, client_computed_stats) -> expected meta value on aws.lambda span + let cases = [ + (false, false, Some("1")), + (false, true, None), + (true, false, None), + (true, true, None), + ]; + + for (compute_on_extension, client_computed_stats, expected) in cases { + let config = Arc::new(config::Config { + apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), + service: Some("test-service".to_string()), + compute_trace_stats_on_extension: compute_on_extension, + ..config::Config::default() + }); + let mut processor = setup_with_config(Arc::clone(&config)); + let (trace_sender, mut trace_rx) = make_trace_sender_with_rx(Arc::clone(&config)); + + let mut context = Context::from_request_id("req-1"); + context.client_computed_stats = client_computed_stats; + context.invocation_span = Span { + name: "aws.lambda".to_string(), + resource: "test-resource".to_string(), + service: "test-service".to_string(), + span_id: 1, + trace_id: 100, + ..Default::default() + }; + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + processor + .send_ctx_spans(&tags_provider, &trace_sender, context) + .await; + + let info = trace_rx.recv().await.expect("expected a sent payload"); + let send_data = info.builder.build(); + let TracerPayloadCollection::V07(payloads) = send_data.get_payloads() else { + panic!("expected V07 payload"); + }; + let aws_lambda_span = payloads + .iter() + .flat_map(|p| &p.chunks) + .flat_map(|c| &c.spans) + .find(|s| s.name == "aws.lambda") + .expect("aws.lambda span should be present"); + + assert_eq!( + aws_lambda_span + .meta + .get(COMPUTE_STATS_KEY) + .map(String::as_str), + expected, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}" + ); + } + } } diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index a41a95b26..70c85d47c 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -109,6 +109,7 @@ pub enum ProcessorCommand { }, AddTracerSpan { span: Box, + client_computed_stats: bool, }, ForwardDurableContext { request_id: String, @@ -378,10 +379,12 @@ impl InvocationProcessorHandle { pub async fn add_tracer_span( &self, span: Span, + client_computed_stats: bool, ) -> Result<(), mpsc::error::SendError> { self.sender .send(ProcessorCommand::AddTracerSpan { span: Box::new(span), + client_computed_stats, }) .await } @@ -612,8 +615,11 @@ impl InvocationProcessorService { let result = Ok(self.processor.set_cold_start_span_trace_id(trace_id)); let _ = response.send(result); } - ProcessorCommand::AddTracerSpan { span } => { - self.processor.add_tracer_span(&span); + ProcessorCommand::AddTracerSpan { + span, + client_computed_stats, + } => { + self.processor.add_tracer_span(&span, client_computed_stats); } ProcessorCommand::ForwardDurableContext { request_id, diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index ee648c7f2..9cb3f9024 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -58,6 +58,8 @@ impl TracePipeline { } let compute_trace_stats_on_extension = self.config.compute_trace_stats_on_extension; + // Capture before `tracer_header_tags` is moved into process_traces below. + let client_computed_stats = tracer_header_tags.client_computed_stats; let (send_data_builder, processed_traces) = self.trace_processor.process_traces( self.config.clone(), self.tags_provider.clone(), @@ -78,7 +80,10 @@ impl TracePipeline { // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. + // Skip extension-side stats generation when the tracer already computed stats + // client-side (Datadog-Client-Computed-Stats), to avoid double-counting. if compute_trace_stats_on_extension + && !client_computed_stats && let Err(err) = self.stats_generator.send(&processed_traces) { // Just log the error. We don't think trace stats are critical, so we don't want to diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index db053c44b..bd9663369 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -38,8 +38,10 @@ const VERSION_KEY: &str = "version"; // ServiceKey is the tag key for a function's service environment variable const SERVICE_KEY: &str = "service"; -// ComputeStatsKey is the tag key indicating whether trace stats should be computed -const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; +// ComputeStatsKey is the tag key indicating whether trace stats should be computed. +// This is a per-span backend directive that is stamped on each span's meta by the trace +// processor (see `trace_processor::ChunkProcessor`), NOT baked into the function tags here. +pub const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; // FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload const FUNCTION_TAGS_KEY: &str = "_dd.tags.function"; // TODO(astuyve) decide what to do with the version @@ -135,11 +137,11 @@ fn tags_from_env( tags_map.extend(config.tags.clone()); } - // The value of _dd.compute_stats is the opposite of config.compute_trace_stats_on_extension. - // "config.compute_trace_stats_on_extension == true" means computing stats on the extension side, - // so we set _dd.compute_stats to 0 so stats won't be computed on the backend side. - let compute_stats = i32::from(!config.compute_trace_stats_on_extension); - tags_map.insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string()); + // NOTE: `_dd.compute_stats` is intentionally NOT set here. It is a per-span backend + // directive (whether the backend should compute trace stats) that depends on both + // `compute_trace_stats_on_extension` AND the tracer's `Datadog-Client-Computed-Stats` + // header. The trace processor stamps it on each span's meta at processing time; baking + // it into the function tags would leak it into `_dd.tags.function` and ignore the header. tags_map } @@ -299,8 +301,14 @@ mod tests { fn test_new_from_config() { let metadata = HashMap::new(); let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata); - assert_eq!(tags.tags_map.len(), 3); - assert_eq!(tags.tags_map.get(COMPUTE_STATS_KEY).unwrap(), "1"); + assert_eq!(tags.tags_map.len(), 2); + // _dd.compute_stats is a per-span backend directive stamped by the trace processor, + // not a function tag. It must NOT be present in the tags map. + assert!(!tags.tags_map.contains_key(COMPUTE_STATS_KEY)); + assert!( + !tags.get_tags_map().contains_key(COMPUTE_STATS_KEY), + "_dd.compute_stats must not leak into the tags map" + ); let arch = arch_to_platform(); assert_eq!( tags.tags_map.get(ARCHITECTURE_KEY).unwrap(), @@ -436,7 +444,9 @@ mod tests { (parts[0].to_string(), parts[1].to_string()) }) .collect(); - assert_eq!(fn_tags_map.len(), 14); + assert_eq!(fn_tags_map.len(), 13); + // _dd.compute_stats is a per-span backend directive, not a function tag. + assert!(!fn_tags_map.contains_key(COMPUTE_STATS_KEY)); assert_eq!(fn_tags_map.get("key1").unwrap(), "value1"); assert_eq!(fn_tags_map.get("key2").unwrap(), "value2"); assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012"); @@ -478,7 +488,9 @@ mod tests { (parts[0].to_string(), parts[1].to_string()) }) .collect(); - assert_eq!(fn_tags_map.len(), 14); + assert_eq!(fn_tags_map.len(), 13); + // _dd.compute_stats is a per-span backend directive, not a function tag. + assert!(!fn_tags_map.contains_key(COMPUTE_STATS_KEY)); assert_eq!(fn_tags_map.get("key1").unwrap(), "value1"); assert_eq!(fn_tags_map.get("key2").unwrap(), "value2"); assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012"); diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 9e0175efa..599fa50b9 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -503,7 +503,7 @@ impl TraceAgent { ); } - let tracer_header_tags = (&parts.headers).into(); + let tracer_header_tags: trace_utils::TracerHeaderTags<'_> = (&parts.headers).into(); let (body_size, mut traces): (usize, Vec>) = match version { ApiVersion::V04 => { @@ -590,7 +590,7 @@ impl TraceAgent { if span.resource == INVOCATION_SPAN_RESOURCE && let Err(e) = invocation_processor_handle - .add_tracer_span(span.clone()) + .add_tracer_span(span.clone(), tracer_header_tags.client_computed_stats) .await { error!("Failed to add tracer span to processor: {}", e); diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index e2694a13a..c83bbbb09 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -6,6 +6,7 @@ use crate::appsec::processor::context::HoldArguments; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::lifecycle::invocation::triggers::get_default_service_name; +use crate::tags::lambda::tags::COMPUTE_STATS_KEY; use crate::tags::provider; use crate::traces::span_pointers::{SpanPointer, attach_span_pointers_to_meta}; use crate::traces::{ @@ -47,6 +48,9 @@ struct ChunkProcessor { obfuscation_config: Arc, tags_provider: Arc, span_pointers: Option>, + /// Whether the tracer signaled (via `Datadog-Client-Computed-Stats`) that it already + /// computed trace stats client-side. Used to decide whether to stamp `_dd.compute_stats`. + client_computed_stats: bool, } impl TraceChunkProcessor for ChunkProcessor { @@ -93,6 +97,17 @@ impl TraceChunkProcessor for ChunkProcessor { self.tags_provider.get_tags_map().iter().for_each(|(k, v)| { span.meta.insert(k.clone(), v.clone()); }); + + // Stamp `_dd.compute_stats="1"` to tell the backend to compute trace stats ONLY + // when nobody else did: neither the extension (compute_trace_stats_on_extension) + // nor the tracer (client_computed_stats). Otherwise leave the key absent, which the + // backend treats as "do not compute" (matching the Go agent's tag.go semantics, + // which only ever set "1" and never "0"). + if !self.config.compute_trace_stats_on_extension && !self.client_computed_stats { + span.meta + .insert(COMPUTE_STATS_KEY.to_string(), "1".to_string()); + } + // TODO(astuyve) generalize this and delegate to an enum span.meta.insert("origin".to_string(), "lambda".to_string()); span.meta @@ -355,6 +370,7 @@ impl TraceProcessor for ServerlessTraceProcessor { obfuscation_config: self.obfuscation_config.clone(), tags_provider: tags_provider.clone(), span_pointers, + client_computed_stats: header_tags.client_computed_stats, }, true, // send agentless since we are the agent ) @@ -520,6 +536,9 @@ impl SendingTraceProcessor { return Ok(()); } + // Capture before `header_tags` is moved into process_traces below. + let client_computed_stats = header_tags.client_computed_stats; + let (payload, processed_traces) = self.processor.process_traces( config.clone(), tags_provider, @@ -531,7 +550,10 @@ impl SendingTraceProcessor { // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. + // Skip extension-side stats generation when the tracer already computed stats + // client-side (Datadog-Client-Computed-Stats), to avoid double-counting. if config.compute_trace_stats_on_extension + && !client_computed_stats && let Err(err) = self.stats_generator.send(&processed_traces) { // Just log the error. We don't think trace stats are critical, so we don't want to @@ -644,10 +666,16 @@ mod tests { let start = get_current_timestamp_nanos(); let tags_provider = create_tags_provider(create_test_config()); - let span = create_test_span(11, 222, 333, start, true, tags_provider); + let mut span = create_test_span(11, 222, 333, start, true, tags_provider); let traces: Vec> = vec![vec![span.clone()]]; + // The trace processor stamps `_dd.compute_stats="1"` on each span's meta when neither + // the extension nor the tracer computes stats (default test config: both false). + // Mirror that on the expected span (input `traces` was already cloned above). + span.meta + .insert(COMPUTE_STATS_KEY.to_string(), "1".to_string()); + let header_tags = tracer_header_tags::TracerHeaderTags { lang: "nodejs", lang_version: "v19.7.0", @@ -933,6 +961,7 @@ mod tests { )]), )), span_pointers: None, + client_computed_stats: false, }; processor.process(&mut chunk, 0); @@ -1017,6 +1046,7 @@ mod tests { )]), )), span_pointers: None, + client_computed_stats: false, }; processor.process(&mut chunk, 0); @@ -1100,6 +1130,7 @@ mod tests { )]), )), span_pointers: None, + client_computed_stats: false, }; processor.process(&mut chunk, 0); @@ -1414,6 +1445,13 @@ mod tests { } fn create_chunk_processor(config: Arc) -> ChunkProcessor { + create_chunk_processor_with(config, false) + } + + fn create_chunk_processor_with( + config: Arc, + client_computed_stats: bool, + ) -> ChunkProcessor { let tags_provider = create_tags_provider(config.clone()); ChunkProcessor { config, @@ -1422,6 +1460,7 @@ mod tests { ), tags_provider, span_pointers: None, + client_computed_stats, } } @@ -1590,4 +1629,149 @@ mod tests { "base_service should not be overwritten when already set by the tracer" ); } + + /// APMSVLS-487 Tier 1: `ChunkProcessor::process` stamps `_dd.compute_stats="1"` on each + /// span's meta IFF neither the extension nor the tracer computes stats; otherwise the key + /// is absent. Assert directly on `Span.meta` (NOT `TracerPayload.tags`) — guards #1118. + #[test] + fn test_compute_stats_truth_table() { + // (compute_trace_stats_on_extension, client_computed_stats) -> expected meta value + let cases = [ + (false, false, Some("1")), // nobody computed -> tell backend to compute + (false, true, None), // tracer computed -> absent + (true, false, None), // extension computes -> absent + (true, true, None), // both -> absent + ]; + + for (compute_on_extension, client_computed_stats, expected) in cases { + let config = Arc::new(Config { + compute_trace_stats_on_extension: compute_on_extension, + ..Config::default() + }); + let mut processor = create_chunk_processor_with(config, client_computed_stats); + + let span = pb::Span { + name: "http.request".to_string(), + service: "my-service".to_string(), + resource: "GET /users".to_string(), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 1000, + duration: 500, + error: 0, + meta: HashMap::new(), + metrics: HashMap::new(), + r#type: "web".to_string(), + meta_struct: HashMap::new(), + span_links: vec![], + span_events: vec![], + }; + let mut chunk = pb::TraceChunk { + priority: 1, + origin: "lambda".to_string(), + spans: vec![span], + tags: HashMap::new(), + dropped_trace: false, + }; + + processor.process(&mut chunk, 0); + + let actual = chunk.spans[0] + .meta + .get(COMPUTE_STATS_KEY) + .map(String::as_str); + assert_eq!( + actual, expected, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}" + ); + } + } + + /// APMSVLS-487 Tier 1: `send_processed_traces` only generates extension-side stats when + /// `compute_trace_stats_on_extension == true AND client_computed_stats == false`. Drive the + /// real concentrator and assert a flushed payload is present/absent accordingly. + #[tokio::test] + #[allow(clippy::unwrap_used)] + #[cfg_attr(miri, ignore)] + async fn test_stats_skip_guard_via_send_processed_traces() { + use crate::traces::stats_concentrator_service::StatsConcentratorService; + + #[cfg(feature = "fips")] + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + #[cfg(not(feature = "fips"))] + let _ = rustls::crypto::ring::default_provider().install_default(); + + // (compute_on_extension, client_computed_stats) -> stats payload expected? + let cases = [ + (true, false, true), // extension computes, tracer didn't -> stats generated + (true, true, false), // tracer computed -> skip + (false, false, false), // extension doesn't compute -> skip + (false, true, false), // both off / tracer computed -> skip + ]; + + for (compute_on_extension, client_computed_stats, expect_stats) in cases { + let config = Arc::new(Config { + apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), + service: Some("test-service".to_string()), + compute_trace_stats_on_extension: compute_on_extension, + ..Config::default() + }); + + let (concentrator_service, concentrator_handle) = + StatsConcentratorService::new(config.clone()); + tokio::spawn(concentrator_service.run()); + + let (trace_tx, mut trace_rx) = tokio::sync::mpsc::channel(8); + // Drain the trace channel so send() never blocks. + tokio::spawn(async move { while trace_rx.recv().await.is_some() {} }); + + let sender = SendingTraceProcessor { + appsec: None, + processor: Arc::new(ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx, + stats_generator: Arc::new(StatsGenerator::new(concentrator_handle.clone())), + }; + + let start = get_current_timestamp_nanos(); + let tags_provider = create_tags_provider(config.clone()); + let span = create_test_span(1, 2, 0, start, true, tags_provider.clone()); + let header_tags = tracer_header_tags::TracerHeaderTags { + lang: "nodejs", + lang_version: "v19.7.0", + lang_interpreter: "v8", + lang_vendor: "vendor", + tracer_version: "4.0.0", + container_id: "33", + client_computed_top_level: false, + client_computed_stats, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + }; + + sender + .send_processed_traces( + config, + tags_provider, + header_tags, + vec![vec![span]], + 100, + None, + ) + .await + .expect("send_processed_traces failed"); + + let payload = concentrator_handle.flush(true).await.expect("flush failed"); + + assert_eq!( + payload.is_some(), + expect_stats, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}" + ); + } + } } diff --git a/bottlecap/tests/apm_integration_test.rs b/bottlecap/tests/apm_integration_test.rs index fcdc9bfbc..b7213801f 100644 --- a/bottlecap/tests/apm_integration_test.rs +++ b/bottlecap/tests/apm_integration_test.rs @@ -17,16 +17,22 @@ use std::str::FromStr; use std::sync::Arc; +use bottlecap::LAMBDA_RUNTIME_SLUG; use bottlecap::config::Config; +use bottlecap::tags::lambda::tags::COMPUTE_STATS_KEY; +use bottlecap::tags::provider::Provider; use bottlecap::traces::http_client::create_client; use bottlecap::traces::stats_aggregator::StatsAggregator; use bottlecap::traces::stats_concentrator_service::StatsConcentratorService; use bottlecap::traces::stats_flusher::StatsFlusher; +use bottlecap::traces::stats_generator::StatsGenerator; use bottlecap::traces::trace_aggregator::SendDataBuilderInfo; use bottlecap::traces::trace_aggregator_service::AggregatorService; use bottlecap::traces::trace_flusher::TraceFlusher; +use bottlecap::traces::trace_processor::{SendingTraceProcessor, ServerlessTraceProcessor}; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; +use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; use libdd_trace_protobuf::pb; use libdd_trace_utils::send_data::SendDataBuilder; use libdd_trace_utils::trace_utils::TracerHeaderTags; @@ -267,3 +273,225 @@ async fn trace_payload_roundtrip_through_fake_intake() { assert_eq!(span.trace_id, 0x1111_1111_1111_1111); assert_eq!(span.span_id, 0x2222_2222_2222_2222); } + +// --------------------------------------------------------------------------- +// APMSVLS-487 Tier 3: full fake-intake E2E through `SendingTraceProcessor`. +// +// Unlike `trace_payload_roundtrip_through_fake_intake` (which inserts a hand-built +// `pb::TracerPayload` directly), these tests route a trace through +// `SendingTraceProcessor::send_processed_traces` so they exercise the real +// `process_traces`/`ChunkProcessor` stamping of `_dd.compute_stats` and the +// extension-side stats-generation guard. +// --------------------------------------------------------------------------- + +fn header_tags_with(client_computed_stats: bool) -> TracerHeaderTags<'static> { + TracerHeaderTags { + client_computed_stats, + ..header_tags() + } +} + +/// Outcome of routing a single trace through the processor + flushers. +struct PipelineOutcome { + traces: Vec, + stats: Vec, +} + +/// Drives one trace through `SendingTraceProcessor::send_processed_traces` with the given +/// `compute_trace_stats_on_extension` / `client_computed_stats`, then flushes both the trace +/// and stats pipelines into a fresh fake-intake and returns what it captured. +async fn run_processor_pipeline( + compute_on_extension: bool, + client_computed_stats: bool, +) -> PipelineOutcome { + let fake_intake = FakeIntake::start().await; + + let config = Arc::new(Config { + api_key: DD_API_KEY.to_string(), + site: "datadoghq.com".to_string(), + // process_traces builds its trace endpoint directly from apm_dd_url. + apm_dd_url: fake_intake.traces_url(), + service: Some("fake-intake-trace-service".to_string()), + compute_trace_stats_on_extension: compute_on_extension, + ..Config::default() + }); + + // --- Trace pipeline: trace_tx -> (drained below) -> AggregatorService -> TraceFlusher --- + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(aggregator_service.run()); + let (trace_tx, mut trace_rx) = tokio::sync::mpsc::channel::(8); + + // --- Stats pipeline: StatsConcentratorService -> StatsAggregator -> StatsFlusher --- + let (concentrator_service, concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(concentrator_service.run()); + + let sender = SendingTraceProcessor { + appsec: None, + processor: Arc::new(ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx, + stats_generator: Arc::new(StatsGenerator::new(concentrator_handle.clone())), + }; + + let tags_provider = Arc::new(Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &std::collections::HashMap::from([( + "function_arn".to_string(), + "arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(), + )]), + )); + + // A top-level root span so the concentrator produces stats. + let mut span = pb::Span { + service: "fake-intake-trace-service".to_string(), + name: "web.request".to_string(), + resource: "GET /fake".to_string(), + trace_id: 0x1111_1111_1111_1111, + span_id: 0x2222_2222_2222_2222, + parent_id: 0, + start: 1_700_000_000_000_000_000, + duration: 5_000_000, + error: 0, + r#type: "web".to_string(), + ..pb::Span::default() + }; + span.metrics.insert("_top_level".to_string(), 1.0); + + sender + .send_processed_traces( + Arc::clone(&config), + tags_provider, + header_tags_with(client_computed_stats), + vec![vec![span]], + 100, + None, + ) + .await + .expect("send_processed_traces failed"); + + // Drain whatever `send_processed_traces` produced into the aggregator before flushing. + // `send_processed_traces` has already returned, so the payload (if any) is buffered in the + // channel and `try_recv` will surface it without racing a background task. + drop(sender); + while let Ok(info) = trace_rx.try_recv() { + aggregator_handle + .insert_payload(info) + .expect("insert_payload must succeed"); + } + + // Flush traces. + let http_client = create_client(None, None, false).expect("failed to create http client"); + let api_key_factory = Arc::new(ApiKeyFactory::new(DD_API_KEY)); + let trace_flusher = TraceFlusher::new( + aggregator_handle, + Arc::clone(&config), + Arc::clone(&api_key_factory), + http_client.clone(), + ); + let failed = trace_flusher.flush(None).await; + assert!(failed.is_none(), "trace flush failed: {failed:?}"); + + // Flush stats (pulls from the concentrator via the aggregator). + let stats_aggregator = Arc::new(Mutex::new(StatsAggregator::new_with_concentrator( + concentrator_handle, + ))); + let stats_flusher = StatsFlusher::new( + api_key_factory, + stats_aggregator, + Arc::clone(&config), + http_client, + fake_intake.stats_url(), + ); + let failed = stats_flusher.flush(true, None).await; + assert!(failed.is_none(), "stats flush failed: {failed:?}"); + + PipelineOutcome { + traces: fake_intake.trace_payloads(), + stats: fake_intake.stats_payloads(), + } +} + +/// Finds the single span in the captured trace payloads and returns its `_dd.compute_stats`. +fn captured_compute_stats(traces: &[pb::AgentPayload]) -> Option { + let span = traces + .iter() + .flat_map(|p| &p.tracer_payloads) + .flat_map(|tp| &tp.chunks) + .flat_map(|c| &c.spans) + .find(|s| s.name == "web.request") + .expect("web.request span should be present"); + span.meta.get(COMPUTE_STATS_KEY).cloned() +} + +/// T3.1: `client_computed_stats=true` → captured span meta has `_dd.compute_stats` absent. +#[tokio::test] +async fn e2e_client_computed_stats_leaves_compute_stats_absent() { + let outcome = run_processor_pipeline(false, true).await; + assert!( + captured_compute_stats(&outcome.traces).is_none(), + "_dd.compute_stats must be absent when the tracer computed stats", + ); +} + +/// T3.2: control matrix — `"1"` only for the (neither computes) row; absent otherwise. +#[tokio::test] +async fn e2e_compute_stats_truth_table_on_captured_span() { + let cases = [ + (false, false, Some("1")), + (false, true, None), + (true, false, None), + (true, true, None), + ]; + for (compute_on_extension, client_computed_stats, expected) in cases { + let outcome = run_processor_pipeline(compute_on_extension, client_computed_stats).await; + assert_eq!( + captured_compute_stats(&outcome.traces).as_deref(), + expected, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}", + ); + } +} + +/// T3.3: stats suppression — a stats payload is produced only when the extension computes +/// stats and the tracer did not; otherwise the stats intake stays empty. +#[tokio::test] +async fn e2e_stats_suppressed_unless_extension_computes() { + // Extension computes, tracer didn't -> exactly one stats payload. + let outcome = run_processor_pipeline(true, false).await; + assert_eq!( + outcome.stats.len(), + 1, + "expected one stats payload when the extension computes stats", + ); + + // Tracer computed -> extension skips stats generation. + let outcome = run_processor_pipeline(true, true).await; + assert!( + outcome.stats.is_empty(), + "stats must be suppressed when the tracer computed them", + ); + + // Extension not computing -> no stats either way. + let outcome = run_processor_pipeline(false, false).await; + assert!( + outcome.stats.is_empty(), + "stats must be empty when the extension does not compute them", + ); +} + +/// T3.4: combined path — when the tracer computed stats, the captured trace has no +/// `_dd.compute_stats` AND zero stats payloads reach the intake. +#[tokio::test] +async fn e2e_client_computed_stats_absent_meta_and_no_stats() { + let outcome = run_processor_pipeline(true, true).await; + assert!( + captured_compute_stats(&outcome.traces).is_none(), + "_dd.compute_stats must be absent", + ); + assert!(outcome.stats.is_empty(), "no stats payloads must be sent",); +} diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 551942a43..77f5eec15 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -20,7 +20,9 @@ async fn test_logs() { // protobuf is using hashmap, can't set a btreemap to have sorted keys. Using multiple regexp since // Can't do look around since -> error: look-around, including look-ahead and look-behind, is not supported let regexp_message = r#"[{"message":{"message":"START RequestId: 459921b5-681c-4a96-beb0-81e0aa586026 Version: $LATEST","lambda":{"arn":"test-arn","request_id":"459921b5-681c-4a96-beb0-81e0aa586026"},"timestamp":1666361103165,"status":"info"},"hostname":"test-arn","service":"","#; - let regexp_compute_state = r#"_dd.compute_stats:1"#; + // NOTE: `_dd.compute_stats` is intentionally NOT asserted here. It is a per-span backend + // directive stamped on trace spans by the trace processor, not a log/function tag, so it + // must not appear in `ddtags` (APMSVLS-487). let regexp_arch = format!(r#"architecture:{}"#, arch); let regexp_function_arn = r#"function_arn:test-arn"#; let regexp_extension_version = r#"dd_extension_version"#; @@ -32,7 +34,6 @@ async fn test_logs() { .header("DD-API-KEY", dd_api_key) .header("Content-Type", "application/json") .body_contains(regexp_message) - .body_contains(regexp_compute_state) .body_contains(regexp_arch) .body_contains(regexp_function_arn) .body_contains(regexp_extension_version); diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 5456a5d73..f49da31eb 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -14,7 +14,7 @@ async fn test_enhanced_metrics() { let dd_api_key = "my_test_key"; // payload looks like - // aws.lambda.enhanced.invocations"_dd.compute_stats:1"architecture:x86_64"function_arn:test-arn:�൴ �?! �?) �?1 �?:�B + // aws.lambda.enhanced.invocations"architecture:x86_64"function_arn:test-arn:�൴ �?! �?) �?1 �?:�B // protobuf is using hashmap, can't set a btreemap to have sorted keys. Using multiple regexp since // Can't do look around since -> error: look-around, including look-ahead and look-behind, is not supported let regexp_metric_name = r#"aws.lambda.enhanced.invocations"#;