Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion bottlecap/src/lifecycle/invocation/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ pub struct Context {
/// tracing.
///
pub extracted_span_context: Option<SpanContext>,
/// Whether the tracer signaled (via the `Datadog-Client-Computed-Stats` header) that it
/// already computed trace stats client-side, propagated from the tracer's placeholder span.
///
/// Used when generating the extension-side `aws.lambda` span (Path B) so the backend
/// stats directive (`_dd.compute_stats`) is stamped consistently with Path A.
pub client_computed_stats: bool,
}

/// Struct containing the information needed to reparent a span.
Expand Down Expand Up @@ -94,6 +100,7 @@ impl Default for Context {
snapstart_restore_span: None,
tracer_span: None,
extracted_span_context: None,
client_computed_stats: false,
}
}
}
Expand Down Expand Up @@ -508,12 +515,20 @@ impl ContextBuffer {

/// Adds the tracer span to a `Context` in the buffer.
///
pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: &Span) {
/// `client_computed_stats` carries the tracer's `Datadog-Client-Computed-Stats` signal so
/// the extension-generated `aws.lambda` span can stamp `_dd.compute_stats` consistently.
pub fn add_tracer_span(
&mut self,
request_id: &String,
tracer_span: &Span,
client_computed_stats: bool,
) {
if let Some(context) = self
.buffer
.iter_mut()
.find(|context| context.request_id == *request_id)
{
context.client_computed_stats = client_computed_stats;
context
.invocation_span
.meta
Expand Down Expand Up @@ -639,6 +654,34 @@ mod tests {
assert!(buffer.get(&unexistent_request_id).is_none());
}

/// APMSVLS-487 Tier 2: `add_tracer_span` records the tracer's `client_computed_stats`
/// signal onto the matching context (and is a no-op when no context matches).
#[test]
fn test_add_tracer_span_sets_client_computed_stats() {
for client_computed_stats in [true, false] {
let mut buffer = ContextBuffer::with_capacity(2);
let request_id = String::from("req-1");
buffer.insert(Context::from_request_id(&request_id));

let mut tracer_span = Span::default();
tracer_span
.meta
.insert("request_id".to_string(), request_id.clone());

buffer.add_tracer_span(&request_id, &tracer_span, client_computed_stats);

assert_eq!(
buffer.get(&request_id).unwrap().client_computed_stats,
client_computed_stats
);
}

// No matching context -> no panic, nothing recorded.
let mut buffer = ContextBuffer::with_capacity(2);
buffer.add_tracer_span(&String::from("missing"), &Span::default(), true);
assert!(buffer.get(&String::from("missing")).is_none());
}

#[test]
fn test_add_start_time() {
let mut buffer = ContextBuffer::with_capacity(2);
Expand Down
160 changes: 154 additions & 6 deletions bottlecap/src/lifecycle/invocation/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -665,9 +665,17 @@ impl Processor {
trace_sender: &Arc<SendingTraceProcessor>,
context: Context,
) {
// Capture before `get_ctx_spans` consumes `context`.
let client_computed_stats = context.client_computed_stats;
let (traces, body_size) = self.get_ctx_spans(context);
self.send_spans(traces, body_size, tags_provider, trace_sender)
.await;
self.send_spans(
traces,
body_size,
tags_provider,
trace_sender,
client_computed_stats,
)
.await;
}

fn get_ctx_spans(&mut self, context: Context) -> (Vec<Span>, usize) {
Expand Down Expand Up @@ -732,7 +740,9 @@ impl Processor {
let traces = vec![cold_start_span.clone()];
let body_size = size_of_val(cold_start_span);

self.send_spans(traces, body_size, tags_provider, trace_sender)
// The cold start span is extension-generated and not tied to a tracer's stats
// signal, so the backend should compute its stats unless the extension does.
self.send_spans(traces, body_size, tags_provider, trace_sender, false)
.await;
}
}
Expand All @@ -746,8 +756,12 @@ impl Processor {
body_size: usize,
tags_provider: &Arc<provider::Provider>,
trace_sender: &Arc<SendingTraceProcessor>,
client_computed_stats: bool,
) {
// todo: figure out what to do here
// `client_computed_stats` is propagated from the tracer's placeholder span so the
// downstream `ChunkProcessor` (reused via `send_processed_traces` -> `process_traces`)
// stamps `_dd.compute_stats` on these extension-generated spans consistently with Path A.
let header_tags = tracer_header_tags::TracerHeaderTags {
lang: "",
lang_version: "",
Expand All @@ -756,7 +770,7 @@ impl Processor {
tracer_version: "",
container_id: "",
client_computed_top_level: false,
client_computed_stats: false,
client_computed_stats,
dropped_p0_traces: 0,
dropped_p0_spans: 0,
};
Expand Down Expand Up @@ -1403,9 +1417,10 @@ impl Processor {
///
/// This is used to enrich the invocation span with additional metadata from the tracers
/// top level span, since we discard the tracer span when we create the invocation span.
pub fn add_tracer_span(&mut self, span: &Span) {
pub fn add_tracer_span(&mut self, span: &Span, client_computed_stats: bool) {
if let Some(request_id) = span.meta.get("request_id") {
self.context_buffer.add_tracer_span(request_id, span);
self.context_buffer
.add_tracer_span(request_id, span, client_computed_stats);
}
}

Expand Down Expand Up @@ -2445,4 +2460,137 @@ mod tests {
"pre-existing _dd.appsec.enabled value must not be overwritten"
);
}

/// Build a [`Processor`] with a caller-supplied config (for toggling
/// `compute_trace_stats_on_extension`).
fn setup_with_config(config: Arc<config::Config>) -> Processor {
let aws_config = Arc::new(AwsConfig {
region: "us-east-1".into(),
aws_lwa_proxy_lambda_runtime_api: Some("***".into()),
function_name: "test-function".into(),
sandbox_init_time: Instant::now(),
runtime_api: "***".into(),
exec_wrapper: None,
initialization_type: "on-demand".into(),
});
let tags_provider = Arc::new(provider::Provider::new(
Arc::clone(&config),
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
));
let (service, handle) =
AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service");
tokio::spawn(service.run());
let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config)));
let (durable_context_tx, _) = tokio::sync::mpsc::channel(1);
Processor::new(
tags_provider,
config,
aws_config,
handle,
propagator,
durable_context_tx,
)
}

/// Like [`make_trace_sender`], but returns the receiver so the test can inspect the
/// processed payload that Path B sends downstream.
fn make_trace_sender_with_rx(
config: Arc<config::Config>,
) -> (
Arc<SendingTraceProcessor>,
tokio::sync::mpsc::Receiver<crate::traces::trace_aggregator::SendDataBuilderInfo>,
) {
use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig;
let (stats_concentrator_service, stats_concentrator_handle) =
StatsConcentratorService::new(Arc::clone(&config));
tokio::spawn(stats_concentrator_service.run());
let (trace_tx, trace_rx) = tokio::sync::mpsc::channel(8);
let sender = Arc::new(SendingTraceProcessor {
appsec: None,
processor: Arc::new(trace_processor::ServerlessTraceProcessor {
obfuscation_config: Arc::new(
ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"),
),
}),
trace_tx,
stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)),
});
(sender, trace_rx)
}

/// APMSVLS-487 Tier 2: the extension-generated `aws.lambda` span (Path B) stamps
/// `_dd.compute_stats="1"` only when neither the extension nor the tracer computes stats;
/// otherwise the key is absent. `client_computed_stats` is propagated from the context.
#[tokio::test]
#[allow(clippy::unwrap_used)]
async fn test_send_ctx_spans_stamps_compute_stats() {
use crate::tags::lambda::tags::COMPUTE_STATS_KEY;
use libdd_trace_utils::tracer_payload::TracerPayloadCollection;

#[cfg(feature = "fips")]
let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
#[cfg(not(feature = "fips"))]
let _ = rustls::crypto::ring::default_provider().install_default();

// (compute_on_extension, client_computed_stats) -> expected meta value on aws.lambda span
let cases = [
(false, false, Some("1")),
(false, true, None),
(true, false, None),
(true, true, None),
];

for (compute_on_extension, client_computed_stats, expected) in cases {
let config = Arc::new(config::Config {
apm_dd_url: "https://trace.agent.datadoghq.com".to_string(),
service: Some("test-service".to_string()),
compute_trace_stats_on_extension: compute_on_extension,
..config::Config::default()
});
let mut processor = setup_with_config(Arc::clone(&config));
let (trace_sender, mut trace_rx) = make_trace_sender_with_rx(Arc::clone(&config));

let mut context = Context::from_request_id("req-1");
context.client_computed_stats = client_computed_stats;
context.invocation_span = Span {
name: "aws.lambda".to_string(),
resource: "test-resource".to_string(),
service: "test-service".to_string(),
span_id: 1,
trace_id: 100,
..Default::default()
};

let tags_provider = Arc::new(provider::Provider::new(
Arc::clone(&config),
LAMBDA_RUNTIME_SLUG.to_string(),
&HashMap::from([("function_arn".to_string(), "test-arn".to_string())]),
));
processor
.send_ctx_spans(&tags_provider, &trace_sender, context)
.await;

let info = trace_rx.recv().await.expect("expected a sent payload");
let send_data = info.builder.build();
let TracerPayloadCollection::V07(payloads) = send_data.get_payloads() else {
panic!("expected V07 payload");
};
let aws_lambda_span = payloads
.iter()
.flat_map(|p| &p.chunks)
.flat_map(|c| &c.spans)
.find(|s| s.name == "aws.lambda")
.expect("aws.lambda span should be present");

assert_eq!(
aws_lambda_span
.meta
.get(COMPUTE_STATS_KEY)
.map(String::as_str),
expected,
"compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}"
);
}
}
}
10 changes: 8 additions & 2 deletions bottlecap/src/lifecycle/invocation/processor_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub enum ProcessorCommand {
},
AddTracerSpan {
span: Box<Span>,
client_computed_stats: bool,
},
ForwardDurableContext {
request_id: String,
Expand Down Expand Up @@ -378,10 +379,12 @@ impl InvocationProcessorHandle {
pub async fn add_tracer_span(
&self,
span: Span,
client_computed_stats: bool,
) -> Result<(), mpsc::error::SendError<ProcessorCommand>> {
self.sender
.send(ProcessorCommand::AddTracerSpan {
span: Box::new(span),
client_computed_stats,
})
.await
}
Expand Down Expand Up @@ -612,8 +615,11 @@ impl InvocationProcessorService {
let result = Ok(self.processor.set_cold_start_span_trace_id(trace_id));
let _ = response.send(result);
}
ProcessorCommand::AddTracerSpan { span } => {
self.processor.add_tracer_span(&span);
ProcessorCommand::AddTracerSpan {
span,
client_computed_stats,
} => {
self.processor.add_tracer_span(&span, client_computed_stats);
}
ProcessorCommand::ForwardDurableContext {
request_id,
Expand Down
5 changes: 5 additions & 0 deletions bottlecap/src/otlp/agent.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ impl TracePipeline {
}

let compute_trace_stats_on_extension = self.config.compute_trace_stats_on_extension;
// Capture before `tracer_header_tags` is moved into process_traces below.
let client_computed_stats = tracer_header_tags.client_computed_stats;
let (send_data_builder, processed_traces) = self.trace_processor.process_traces(
self.config.clone(),
self.tags_provider.clone(),
Expand All @@ -78,7 +80,10 @@ impl TracePipeline {

// This needs to be after process_traces() because process_traces()
// performs obfuscation, and we need to compute stats on the obfuscated traces.
// Skip extension-side stats generation when the tracer already computed stats
// client-side (Datadog-Client-Computed-Stats), to avoid double-counting.
if compute_trace_stats_on_extension
&& !client_computed_stats
&& let Err(err) = self.stats_generator.send(&processed_traces)
{
// Just log the error. We don't think trace stats are critical, so we don't want to
Expand Down
Loading
Loading