From e73f10ca1f0930775810e1b37cca7f82f3343190 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Tue, 2 Jun 2026 13:39:30 -0400 Subject: [PATCH 01/10] docs: add APMSVLS-487 implementation plan (TODO.md) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tests-first plan for respecting the Datadog-Client-Computed-Stats header: canonical _dd.compute_stats rule, Path A/B production changes, and tiered tests (header contract, trace_processor unit, Path B unit, full fake-intake E2E). ๐Ÿค– Co-Authored-By: Claude Code --- TODO.md | 147 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 TODO.md diff --git a/TODO.md b/TODO.md new file mode 100644 index 000000000..e13809bfb --- /dev/null +++ b/TODO.md @@ -0,0 +1,147 @@ +# APMSVLS-487 โ€” respect `Datadog-Client-Computed-Stats` (branch `lpimentel/respect-client-computed-stats`) + +## Goal + +When a tracer sends `Datadog-Client-Computed-Stats: `, the extension must: +1. Skip its own (agent-side) stats generation for those traces. +2. Signal the backend not to compute stats by **leaving `_dd.compute_stats` absent** on each span's meta. +3. Do the same for the extension-generated `aws.lambda` span (Path B), which requires + propagating the flag from the tracer's placeholder span. + +## Canonical semantics (validated against the Go agent) + +`datadog-agent/cmd/serverless-init/tag/tag.go` + `pkg/serverless/tags/tags.go` only ever set +`_dd.compute_stats = "1"`, **never `"0"`**, and leave the key absent otherwise: +- `"1"` โ†’ backend computes stats (only when nobody else did: not tracer, not extension). +- absent โ†’ backend does not compute (someone already did, or accepted-incorrect). + +This **supersedes** the earlier "override to `0`" plan. `"0"` and absent are equivalent +"off" states to the backend, so emitting absent instead of today's `"0"` is **not a regression**; +it just aligns us with the canonical agent. + +Rule (one line): **set `_dd.compute_stats="1"` iff `!compute_trace_stats_on_extension && !client_computed_stats`; otherwise leave absent.** + +### Truth table + +| `compute_trace_stats_on_extension` | `client_computed_stats` | `_dd.compute_stats` on span meta | Extension computes stats? | +|---|---|---|---| +| false | false | `"1"` | No | +| false | true | absent | No | +| true | false | absent | Yes | +| true | true | absent | No | + +## Header-value finding (cross-runtime) + +`Datadog-Client-Computed-Stats` is not standardized across tracers: +- `"true"` โ€” .NET, Java, PHP, Python +- `"yes"` โ€” JS, Ruby, C++ +- `"t"` โ€” Go + +bottlecap consumes the **already-parsed** `client_computed_stats` bool from +`libdd_trace_utils` (`tracer_header_tags.rs:137`): `client_computed_stats = !value.is_empty()`. +All three values are non-empty โ†’ all map to `true` โ†’ **the fix triggers on every runtime.** +No production change needed here; we just consume the bool. + +### Latent divergence (NOT in scope โ€” follow-up) + +- bottlecap: any non-empty value โ†’ `true` (incl. `"0"`/`"false"`). +- agent `isHeaderTrue` (`pkg/trace/api/api.go:1065`): `ParseBool`, so `"0"`/`"false"` โ†’ `false`. +- No tracer sends falsey values today (they omit the header), so the divergence is latent. +- [ ] **Follow-up libdatadog PR**: make `From<&HeaderMap>` honor `"0"`/`"false"` as falsey to + match the agent. Lives in shared libdatadog, separate from this ticket. + +## Implementation (production code) + +- [ ] `bottlecap/src/tags/lambda/tags.rs` + - Keep `COMPUTE_STATS_KEY` `pub(crate)`. + - **Stop baking** `_dd.compute_stats` in `tags_from_env` (remove the unconditional insert). + This also drops it from `_dd.tags.function` (correct โ€” it's a per-span backend directive). + - Update unit tests: `test_new_from_config` (`tags_map.len()` 3โ†’2, delete `COMPUTE_STATS_KEY=="1"`), + `test_get_function_tags_map*` (`14โ†’13`), add a test asserting the key is absent from `get_tags_map()`. +- [ ] `bottlecap/src/traces/trace_processor.rs` + - `use crate::tags::lambda::tags::COMPUTE_STATS_KEY;` + - Add `client_computed_stats: bool` to `struct ChunkProcessor`. + - In `ChunkProcessor::process`, after the `tags_map` copy: insert `_dd.compute_stats="1"` + **only when** `!self.config.compute_trace_stats_on_extension && !self.client_computed_stats`. + - In `process_traces`, set `client_computed_stats: header_tags.client_computed_stats`. + - In `send_processed_traces`, capture `client_computed_stats` before `header_tags` is moved and + add `&& !client_computed_stats` to the `compute_trace_stats_on_extension` stats-gen guard. + - Update the 4 in-test `ChunkProcessor { โ€ฆ }` literals with `client_computed_stats: false`. +- [ ] `bottlecap/src/lifecycle/invocation/context.rs` (Path B) + - Add `pub client_computed_stats: bool` to `struct Context` (+ doc) and init `false` in `Default`. + - `ContextBuffer::add_tracer_span`: add `client_computed_stats: bool` param; set on context. +- [ ] `bottlecap/src/lifecycle/invocation/processor.rs` (Path B) + - `add_tracer_span`: add param, pass through. + - `send_ctx_spans`: read `context.client_computed_stats` before `get_ctx_spans` consumes it. + - `send_spans` (`processor.rs:743`): add param. It already builds a `TracerHeaderTags` and hardcodes + `client_computed_stats: false` (`:759`) before calling `send_processed_traces` โ†’ `process_traces` โ†’ + `ChunkProcessor`. Set that field from the param so Path B **reuses the Tier-1 `ChunkProcessor` logic** + to stamp `_dd.compute_stats` (single source of truth โ€” no separate manual meta insert). + - `send_cold_start_span`: pass `false`. +- [ ] `bottlecap/src/lifecycle/invocation/processor_service.rs` (Path B) + - Add `client_computed_stats: bool` to `ProcessorCommand::AddTracerSpan`, the handle method, + and the command handler. +- [ ] `bottlecap/src/traces/trace_agent.rs` (Path B source) + - Annotate `let tracer_header_tags: TracerHeaderTags<'_> = (&parts.headers).into();` + - At the placeholder-span branch (`span.resource == INVOCATION_SPAN_RESOURCE`), pass + `tracer_header_tags.client_computed_stats` to `add_tracer_span`. + +## Tests + +**Tests-first**: each tier is red โ†’ green. Write the failing test(s), then make the +minimal production change above that turns them green. Run `cargo nextest` for the +touched module after each tier. + +- [ ] **Tier 0 โ€” header-parsing contract** (bottlecap, at `(&headers).into()` boundary, `trace_agent.rs:506`) + - `"true"`, `"yes"`, `"t"` โ†’ `client_computed_stats == true` (all runtimes trigger the fix). + - `""`, absent โ†’ `false`. + - **Included (documenting test):** `"false"`/`"0"` โ†’ currently `true`; assert-and-comment to + surface the known libdatadog divergence intentionally and point at the follow-up. + - Green on existing code (no production change) โ€” locks the contract the rest relies on. +- [ ] **Tier 1 โ€” `trace_processor.rs` unit** + - Parameterized truth-table helper: assert `!span.meta.contains_key("_dd.compute_stats")` for the + three absent rows, `== "1"` for `(false,false)`. **Assert on `Span.meta`, not `TracerPayload.tags`** (#1118 guard). + - Stats-skip guard test via `send_processed_traces` + `concentrator.flush(true)`: + `Some` only for `(compute_on_extension=true, client_computed_stats=false)`, else `None`. + - `tags_from_env` no longer emits the key โ€” assert absence on `get_tags_map()`. +- [ ] **Tier 2 โ€” `processor.rs` Path B** + - `Context.client_computed_stats=true` โ†’ `aws.lambda` span meta absent. + - `"1"` only when `!compute_on_extension && !client_computed_stats`. + - `ContextBuffer::add_tracer_span` sets the flag (true/false). +- [ ] **Tier 3 โ€” full fake-intake E2E** (`bottlecap/tests/apm_integration_test.rs`, harness from PR #1194) + - โš ๏ธ The existing `trace_payload_roundtrip_through_fake_intake` flushes a hand-built + `pb::TracerPayload` and **never runs `process_traces`**, so it can't observe `_dd.compute_stats`. + New cases MUST route a trace **through `SendingTraceProcessor::send_processed_traces`** before flushing. + - Wiring per test: + - `AggregatorService::default()` + spawn `.run()`. `SendingTraceProcessor.trace_tx` is an + `mpsc::Sender` (distinct from `AggregatorHandle`): make a channel, pass `tx` + as `trace_tx`, spawn a forwarder `while let Some(info)=rx.recv().await { agg_handle.insert_payload(info) }`. + Flush via `TraceFlusher::new(agg_handle, โ€ฆ).flush(None)` โ†’ `fake_intake.trace_payloads()`. + - `StatsConcentratorService::new(config)` + spawn; `StatsGenerator::new(conc_handle)` as `stats_generator`. + After `send_processed_traces`, flush concentrator โ†’ `StatsAggregator` โ†’ `StatsFlusher` (mirror the + existing `stats_payload_roundtrip_*` test) โ†’ `fake_intake.stats_payloads()`. + - `SendingTraceProcessor { appsec: None, processor: Arc::new(ServerlessTraceProcessor { obfuscation_config }), trace_tx, stats_generator }`. + - Parameterize `header_tags()` (currently hardcodes `client_computed_stats: true`, `apm_integration_test.rs:43`) + and `Config.compute_trace_stats_on_extension`. + - T3.1: `client_computed_stats=true` โ†’ captured `AgentPayload` span meta has `_dd.compute_stats` **absent**. + - T3.2 control: absent when `compute_on_extension=true` OR `client_computed_stats=true`; `"1"` for the neither row. + - T3.3: stats suppression โ€” empty `stats_payloads()` when `client_computed_stats=true`; one payload for + `(compute_on_extension=true, client_computed_stats=false)`. + - T3.4: combined path โ€” trace meta absent AND zero stats payloads. + +## Validation + +```bash +cd bottlecap +cargo fmt --all +RUSTFLAGS="-D warnings" cargo clippy --workspace --all-targets --features default +cargo nextest run --workspace +``` + +## โš ๏ธ Risk carried from the ticket (not solved by this code) + +Go/Java tracers flush client-side stats async, which often doesn't complete in Lambda before the +extension forwards data. With this fix those runtimes could see zero stats (extension skips + +backend told to skip + tracer never flushed). Full fix chosen anyway. +- [ ] **Gate manual E2E** on the exact golang-default/java cases that failed before + (`test_trace_aws_lambda_hits_metric`) prior to merge. From d33cc383ad5cebba0be6cdba46cf2dc426f4cda0 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Tue, 2 Jun 2026 16:49:47 -0400 Subject: [PATCH 02/10] docs: expand APMSVLS-487 plan with header-parsing divergence vs Go agent MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Document how libdatadog parses client_computed_stats (non-empty) and client_computed_top_level (presence-only) differently from each other and from the Go agent's uniform isHeaderTrue/ParseBool rule. Note both are latent and fold both into the follow-up libdatadog PR. ๐Ÿค– Co-Authored-By: Claude Code --- TODO.md | 33 +++++++++++++++++++++++++++------ 1 file changed, 27 insertions(+), 6 deletions(-) diff --git a/TODO.md b/TODO.md index e13809bfb..cd1eb8b8d 100644 --- a/TODO.md +++ b/TODO.md @@ -42,13 +42,34 @@ bottlecap consumes the **already-parsed** `client_computed_stats` bool from All three values are non-empty โ†’ all map to `true` โ†’ **the fix triggers on every runtime.** No production change needed here; we just consume the bool. -### Latent divergence (NOT in scope โ€” follow-up) +### Latent divergence vs the Go agent (NOT in scope โ€” follow-up) -- bottlecap: any non-empty value โ†’ `true` (incl. `"0"`/`"false"`). -- agent `isHeaderTrue` (`pkg/trace/api/api.go:1065`): `ParseBool`, so `"0"`/`"false"` โ†’ `false`. -- No tracer sends falsey values today (they omit the header), so the divergence is latent. -- [ ] **Follow-up libdatadog PR**: make `From<&HeaderMap>` honor `"0"`/`"false"` as falsey to - match the agent. Lives in shared libdatadog, separate from this ticket. +The Go agent applies one uniform rule to both `client_computed_stats` and +`client_computed_top_level` via `isHeaderTrue` (`pkg/trace/api/api.go:1065`, used at +`:947`/`:1055`): empty โ†’ `false`; `strconv.ParseBool` success โ†’ that bool (so +`"0"`/`"false"`/`"f"` โ†’ `false`); ParseBool failure (e.g. `"yes"`) โ†’ `true`. + +libdatadog (`tracer_header_tags.rs:133-138`) parses the two headers **differently from +the agent and from each other**: +- `client_computed_stats` (`:136-137`): `!value.is_empty()` โ€” non-empty โ†’ `true`. +- `client_computed_top_level` (`:133-135`): `headers.get(...).is_some()` โ€” **presence-only**; + value ignored, so even an empty value โ†’ `true`. + +| Header value | libdd stats | libdd top_level | Go agent (both) | +|---|---|---|---| +| absent | false | false | false | +| `""` (present, empty) | false | **true** | false | +| `"true"`/`"yes"`/`"t"`/`"1"` | true | true | true | +| `"0"`/`"false"`/`"f"` | **true** | **true** | false | + +So `top_level` is the looser of the two (diverges on empty-present **and** falsey strings); +`stats` diverges only on falsey strings. Both are latent: tracers signal by sending a +truthy/present header and omit it otherwise, so the divergent rows don't occur in real traffic. + +- [ ] **Follow-up libdatadog PR**: make `From<&HeaderMap>` use `isHeaderTrue`/`ParseBool` + semantics for **both** `client_computed_stats` and `client_computed_top_level` to match the + agent (also collapses libdatadog's internal stats-vs-top_level inconsistency). Shared + libdatadog, separate from this ticket. (This ticket only consumes `client_computed_stats`.) ## Implementation (production code) From 0915c50d89b95399f6261f4ec4d984eaf6ffd3d7 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 14:49:11 -0400 Subject: [PATCH 03/10] test(traces): APMSVLS-487 Tier 0 client_computed_stats header-parsing contract Add a test module at the (&headers).into() boundary in trace_agent.rs that locks the client_computed_stats parsing contract the rest of the fix relies on: truthy values (true/yes/t/1) -> true across all runtimes, absent/empty -> false. A documenting test asserts the current libdatadog (db05e1f) divergence where falsey strings (false/0) parse as true, pointing at the follow-up libdatadog#2071. Green on current libdatadog with no production change. --- TODO.md | 3 +- bottlecap/src/traces/trace_agent.rs | 65 +++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/TODO.md b/TODO.md index cd1eb8b8d..c23b75012 100644 --- a/TODO.md +++ b/TODO.md @@ -113,7 +113,8 @@ truthy/present header and omit it otherwise, so the divergent rows don't occur i minimal production change above that turns them green. Run `cargo nextest` for the touched module after each tier. -- [ ] **Tier 0 โ€” header-parsing contract** (bottlecap, at `(&headers).into()` boundary, `trace_agent.rs:506`) +- [x] **Tier 0 โ€” header-parsing contract** (bottlecap, at `(&headers).into()` boundary, `trace_agent.rs:506`) + - โœ… Done: added `#[cfg(test)] mod tests` at end of `trace_agent.rs` (3 tests, green on current libdatadog db05e1f, no production change). Falsey-string divergence documented + points at libdatadog#2071. - `"true"`, `"yes"`, `"t"` โ†’ `client_computed_stats == true` (all runtimes trigger the fix). - `""`, absent โ†’ `false`. - **Included (documenting test):** `"false"`/`"0"` โ†’ currently `true`; assert-and-comment to diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 9e0175efa..e73d0ebec 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -774,3 +774,68 @@ fn success_response(message: &str) -> Response { debug!("{}", message); (StatusCode::OK, json!({"rate_by_service": {}}).to_string()).into_response() } + +#[cfg(test)] +mod tests { + //! Tier 0 (APMSVLS-487): lock the `client_computed_stats` parsing contract at the + //! `(&headers).into()` boundary that `handle_traces` uses (see `trace_agent.rs:506`). + //! + //! This is the bool the rest of the fix consumes. These tests pass on the current + //! libdatadog (rev `db05e1f`) with no production change โ€” they document the behavior + //! the higher tiers rely on, and guard against silent regressions when libdatadog is + //! bumped (e.g. once `DataDog/libdatadog#2071` lands; see the divergence rows below). + + use axum::http::{HeaderMap, HeaderValue}; + use libdd_trace_utils::trace_utils::TracerHeaderTags; + + /// Build a `HeaderMap` and convert it the same way `handle_traces` does. + fn parse_stats(value: Option<&str>) -> bool { + let mut headers = HeaderMap::new(); + if let Some(v) = value { + headers.insert( + "datadog-client-computed-stats", + HeaderValue::from_str(v).expect("valid header value"), + ); + } + let tags: TracerHeaderTags<'_> = (&headers).into(); + tags.client_computed_stats + } + + #[test] + fn truthy_values_trigger_the_fix_on_every_runtime() { + // Cross-runtime header values: ".NET/Java/PHP/Python" -> "true", "JS/Ruby/C++" -> "yes", + // "Go" -> "t", plus a canonical "1". All non-empty -> the fix must trigger. + for value in ["true", "yes", "t", "1"] { + assert!( + parse_stats(Some(value)), + "expected client_computed_stats == true for {value:?}" + ); + } + } + + #[test] + fn absent_or_empty_does_not_trigger_the_fix() { + assert!(!parse_stats(None), "absent header must be false"); + assert!(!parse_stats(Some("")), "empty-present header must be false"); + } + + #[test] + fn falsey_strings_currently_parse_as_true_known_libdatadog_divergence() { + // KNOWN DIVERGENCE (current libdatadog rev db05e1f): `client_computed_stats` is parsed as + // `!value.is_empty()`, so falsey literals like "false"/"0" resolve to `true` instead of + // `false` (the Go trace-agent's `isHeaderTrue`/`ParseBool` semantics). + // + // This is latent in real traffic: tracers signal by sending a truthy header and omit it + // otherwise, so these rows don't occur in practice. Tracked by the libdatadog follow-up + // PR `DataDog/libdatadog#2071`. When that lands and we bump libdatadog, flip these to + // `assert!(!...)`. + assert!( + parse_stats(Some("false")), + "documenting: \"false\" currently parses as true (libdatadog#2071)" + ); + assert!( + parse_stats(Some("0")), + "documenting: \"0\" currently parses as true (libdatadog#2071)" + ); + } +} From bf46a9aeb2ad3aad145a74eb4204a61423c397cd Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 16:35:55 -0400 Subject: [PATCH 04/10] feat(traces): APMSVLS-487 Tier 1 stamp _dd.compute_stats per-span instead of baking it Move _dd.compute_stats from a baked-in function tag to a per-span backend directive stamped by the trace processor, and respect the tracer's Datadog-Client-Computed-Stats signal. - tags.rs: stop inserting _dd.compute_stats in tags_from_env (it no longer leaks into _dd.tags.function); make COMPUTE_STATS_KEY pub(crate). - trace_processor.rs: add client_computed_stats to ChunkProcessor; stamp _dd.compute_stats="1" on each span's meta only when neither the extension (compute_trace_stats_on_extension) nor the tracer (client_computed_stats) computes stats, matching the Go agent (set "1" or leave absent). Add && !client_computed_stats to the extension-side stats-generation guard. - Tests: truth-table on Span.meta (#1118 guard), stats-skip guard via the real StatsConcentratorService, updated tags.rs unit tests, and removed the stale _dd.compute_stats assertions from the logs/metrics integration tests. All 548 workspace tests pass; fmt + clippy clean. --- TODO.md | 22 ++- bottlecap/src/tags/lambda/tags.rs | 34 ++-- bottlecap/src/traces/trace_processor.rs | 186 +++++++++++++++++++- bottlecap/tests/logs_integration_test.rs | 5 +- bottlecap/tests/metrics_integration_test.rs | 2 +- 5 files changed, 228 insertions(+), 21 deletions(-) diff --git a/TODO.md b/TODO.md index c23b75012..a4d0af0b6 100644 --- a/TODO.md +++ b/TODO.md @@ -120,12 +120,22 @@ touched module after each tier. - **Included (documenting test):** `"false"`/`"0"` โ†’ currently `true`; assert-and-comment to surface the known libdatadog divergence intentionally and point at the follow-up. - Green on existing code (no production change) โ€” locks the contract the rest relies on. -- [ ] **Tier 1 โ€” `trace_processor.rs` unit** - - Parameterized truth-table helper: assert `!span.meta.contains_key("_dd.compute_stats")` for the - three absent rows, `== "1"` for `(false,false)`. **Assert on `Span.meta`, not `TracerPayload.tags`** (#1118 guard). - - Stats-skip guard test via `send_processed_traces` + `concentrator.flush(true)`: - `Some` only for `(compute_on_extension=true, client_computed_stats=false)`, else `None`. - - `tags_from_env` no longer emits the key โ€” assert absence on `get_tags_map()`. +- [x] **Tier 1 โ€” `trace_processor.rs` unit** โœ… + - `tags.rs`: `COMPUTE_STATS_KEY` now `pub(crate)`; removed unconditional insert in `tags_from_env` + (no longer leaks into `_dd.tags.function`). Updated `test_new_from_config` (3โ†’2 + absence assert) + and both `test_get_function_tags_map*` (14โ†’13 + absence assert). + - `trace_processor.rs`: imported `COMPUTE_STATS_KEY`; added `client_computed_stats: bool` to + `ChunkProcessor`; stamp `_dd.compute_stats="1"` in `process` only when + `!compute_trace_stats_on_extension && !client_computed_stats`; wired field from + `header_tags.client_computed_stats` in `process_traces`; added `&& !client_computed_stats` to the + stats-gen guard in `send_processed_traces` (captured before `header_tags` is moved). Updated the + 4 in-test `ChunkProcessor` literals + `test_process_trace` expected span. + - New tests: `test_compute_stats_truth_table` (asserts on `Span.meta`, #1118 guard) and + `test_stats_skip_guard_via_send_processed_traces` (drives real `StatsConcentratorService`, + asserts flushed payload Some only for `(compute_on_extension=true, client_computed_stats=false)`). + - Fixed integration tests that asserted the old leak: removed `_dd.compute_stats:1` from + `logs_integration_test` body assertions; corrected the `metrics_integration_test` comment. + - All 548 workspace tests pass; fmt + clippy clean. - [ ] **Tier 2 โ€” `processor.rs` Path B** - `Context.client_computed_stats=true` โ†’ `aws.lambda` span meta absent. - `"1"` only when `!compute_on_extension && !client_computed_stats`. diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index db053c44b..cd6c4d69b 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -38,8 +38,10 @@ const VERSION_KEY: &str = "version"; // ServiceKey is the tag key for a function's service environment variable const SERVICE_KEY: &str = "service"; -// ComputeStatsKey is the tag key indicating whether trace stats should be computed -const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; +// ComputeStatsKey is the tag key indicating whether trace stats should be computed. +// This is a per-span backend directive that is stamped on each span's meta by the trace +// processor (see `trace_processor::ChunkProcessor`), NOT baked into the function tags here. +pub(crate) const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; // FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload const FUNCTION_TAGS_KEY: &str = "_dd.tags.function"; // TODO(astuyve) decide what to do with the version @@ -135,11 +137,11 @@ fn tags_from_env( tags_map.extend(config.tags.clone()); } - // The value of _dd.compute_stats is the opposite of config.compute_trace_stats_on_extension. - // "config.compute_trace_stats_on_extension == true" means computing stats on the extension side, - // so we set _dd.compute_stats to 0 so stats won't be computed on the backend side. - let compute_stats = i32::from(!config.compute_trace_stats_on_extension); - tags_map.insert(COMPUTE_STATS_KEY.to_string(), compute_stats.to_string()); + // NOTE: `_dd.compute_stats` is intentionally NOT set here. It is a per-span backend + // directive (whether the backend should compute trace stats) that depends on both + // `compute_trace_stats_on_extension` AND the tracer's `Datadog-Client-Computed-Stats` + // header. The trace processor stamps it on each span's meta at processing time; baking + // it into the function tags would leak it into `_dd.tags.function` and ignore the header. tags_map } @@ -299,8 +301,14 @@ mod tests { fn test_new_from_config() { let metadata = HashMap::new(); let tags = Lambda::new_from_config(Arc::new(Config::default()), &metadata); - assert_eq!(tags.tags_map.len(), 3); - assert_eq!(tags.tags_map.get(COMPUTE_STATS_KEY).unwrap(), "1"); + assert_eq!(tags.tags_map.len(), 2); + // _dd.compute_stats is a per-span backend directive stamped by the trace processor, + // not a function tag. It must NOT be present in the tags map. + assert!(!tags.tags_map.contains_key(COMPUTE_STATS_KEY)); + assert!( + !tags.get_tags_map().contains_key(COMPUTE_STATS_KEY), + "_dd.compute_stats must not leak into the tags map" + ); let arch = arch_to_platform(); assert_eq!( tags.tags_map.get(ARCHITECTURE_KEY).unwrap(), @@ -436,7 +444,9 @@ mod tests { (parts[0].to_string(), parts[1].to_string()) }) .collect(); - assert_eq!(fn_tags_map.len(), 14); + assert_eq!(fn_tags_map.len(), 13); + // _dd.compute_stats is a per-span backend directive, not a function tag. + assert!(!fn_tags_map.contains_key(COMPUTE_STATS_KEY)); assert_eq!(fn_tags_map.get("key1").unwrap(), "value1"); assert_eq!(fn_tags_map.get("key2").unwrap(), "value2"); assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012"); @@ -478,7 +488,9 @@ mod tests { (parts[0].to_string(), parts[1].to_string()) }) .collect(); - assert_eq!(fn_tags_map.len(), 14); + assert_eq!(fn_tags_map.len(), 13); + // _dd.compute_stats is a per-span backend directive, not a function tag. + assert!(!fn_tags_map.contains_key(COMPUTE_STATS_KEY)); assert_eq!(fn_tags_map.get("key1").unwrap(), "value1"); assert_eq!(fn_tags_map.get("key2").unwrap(), "value2"); assert_eq!(fn_tags_map.get(ACCOUNT_ID_KEY).unwrap(), "123456789012"); diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index e2694a13a..c83bbbb09 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -6,6 +6,7 @@ use crate::appsec::processor::context::HoldArguments; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; use crate::lifecycle::invocation::triggers::get_default_service_name; +use crate::tags::lambda::tags::COMPUTE_STATS_KEY; use crate::tags::provider; use crate::traces::span_pointers::{SpanPointer, attach_span_pointers_to_meta}; use crate::traces::{ @@ -47,6 +48,9 @@ struct ChunkProcessor { obfuscation_config: Arc, tags_provider: Arc, span_pointers: Option>, + /// Whether the tracer signaled (via `Datadog-Client-Computed-Stats`) that it already + /// computed trace stats client-side. Used to decide whether to stamp `_dd.compute_stats`. + client_computed_stats: bool, } impl TraceChunkProcessor for ChunkProcessor { @@ -93,6 +97,17 @@ impl TraceChunkProcessor for ChunkProcessor { self.tags_provider.get_tags_map().iter().for_each(|(k, v)| { span.meta.insert(k.clone(), v.clone()); }); + + // Stamp `_dd.compute_stats="1"` to tell the backend to compute trace stats ONLY + // when nobody else did: neither the extension (compute_trace_stats_on_extension) + // nor the tracer (client_computed_stats). Otherwise leave the key absent, which the + // backend treats as "do not compute" (matching the Go agent's tag.go semantics, + // which only ever set "1" and never "0"). + if !self.config.compute_trace_stats_on_extension && !self.client_computed_stats { + span.meta + .insert(COMPUTE_STATS_KEY.to_string(), "1".to_string()); + } + // TODO(astuyve) generalize this and delegate to an enum span.meta.insert("origin".to_string(), "lambda".to_string()); span.meta @@ -355,6 +370,7 @@ impl TraceProcessor for ServerlessTraceProcessor { obfuscation_config: self.obfuscation_config.clone(), tags_provider: tags_provider.clone(), span_pointers, + client_computed_stats: header_tags.client_computed_stats, }, true, // send agentless since we are the agent ) @@ -520,6 +536,9 @@ impl SendingTraceProcessor { return Ok(()); } + // Capture before `header_tags` is moved into process_traces below. + let client_computed_stats = header_tags.client_computed_stats; + let (payload, processed_traces) = self.processor.process_traces( config.clone(), tags_provider, @@ -531,7 +550,10 @@ impl SendingTraceProcessor { // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. + // Skip extension-side stats generation when the tracer already computed stats + // client-side (Datadog-Client-Computed-Stats), to avoid double-counting. if config.compute_trace_stats_on_extension + && !client_computed_stats && let Err(err) = self.stats_generator.send(&processed_traces) { // Just log the error. We don't think trace stats are critical, so we don't want to @@ -644,10 +666,16 @@ mod tests { let start = get_current_timestamp_nanos(); let tags_provider = create_tags_provider(create_test_config()); - let span = create_test_span(11, 222, 333, start, true, tags_provider); + let mut span = create_test_span(11, 222, 333, start, true, tags_provider); let traces: Vec> = vec![vec![span.clone()]]; + // The trace processor stamps `_dd.compute_stats="1"` on each span's meta when neither + // the extension nor the tracer computes stats (default test config: both false). + // Mirror that on the expected span (input `traces` was already cloned above). + span.meta + .insert(COMPUTE_STATS_KEY.to_string(), "1".to_string()); + let header_tags = tracer_header_tags::TracerHeaderTags { lang: "nodejs", lang_version: "v19.7.0", @@ -933,6 +961,7 @@ mod tests { )]), )), span_pointers: None, + client_computed_stats: false, }; processor.process(&mut chunk, 0); @@ -1017,6 +1046,7 @@ mod tests { )]), )), span_pointers: None, + client_computed_stats: false, }; processor.process(&mut chunk, 0); @@ -1100,6 +1130,7 @@ mod tests { )]), )), span_pointers: None, + client_computed_stats: false, }; processor.process(&mut chunk, 0); @@ -1414,6 +1445,13 @@ mod tests { } fn create_chunk_processor(config: Arc) -> ChunkProcessor { + create_chunk_processor_with(config, false) + } + + fn create_chunk_processor_with( + config: Arc, + client_computed_stats: bool, + ) -> ChunkProcessor { let tags_provider = create_tags_provider(config.clone()); ChunkProcessor { config, @@ -1422,6 +1460,7 @@ mod tests { ), tags_provider, span_pointers: None, + client_computed_stats, } } @@ -1590,4 +1629,149 @@ mod tests { "base_service should not be overwritten when already set by the tracer" ); } + + /// APMSVLS-487 Tier 1: `ChunkProcessor::process` stamps `_dd.compute_stats="1"` on each + /// span's meta IFF neither the extension nor the tracer computes stats; otherwise the key + /// is absent. Assert directly on `Span.meta` (NOT `TracerPayload.tags`) โ€” guards #1118. + #[test] + fn test_compute_stats_truth_table() { + // (compute_trace_stats_on_extension, client_computed_stats) -> expected meta value + let cases = [ + (false, false, Some("1")), // nobody computed -> tell backend to compute + (false, true, None), // tracer computed -> absent + (true, false, None), // extension computes -> absent + (true, true, None), // both -> absent + ]; + + for (compute_on_extension, client_computed_stats, expected) in cases { + let config = Arc::new(Config { + compute_trace_stats_on_extension: compute_on_extension, + ..Config::default() + }); + let mut processor = create_chunk_processor_with(config, client_computed_stats); + + let span = pb::Span { + name: "http.request".to_string(), + service: "my-service".to_string(), + resource: "GET /users".to_string(), + trace_id: 1, + span_id: 2, + parent_id: 0, + start: 1000, + duration: 500, + error: 0, + meta: HashMap::new(), + metrics: HashMap::new(), + r#type: "web".to_string(), + meta_struct: HashMap::new(), + span_links: vec![], + span_events: vec![], + }; + let mut chunk = pb::TraceChunk { + priority: 1, + origin: "lambda".to_string(), + spans: vec![span], + tags: HashMap::new(), + dropped_trace: false, + }; + + processor.process(&mut chunk, 0); + + let actual = chunk.spans[0] + .meta + .get(COMPUTE_STATS_KEY) + .map(String::as_str); + assert_eq!( + actual, expected, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}" + ); + } + } + + /// APMSVLS-487 Tier 1: `send_processed_traces` only generates extension-side stats when + /// `compute_trace_stats_on_extension == true AND client_computed_stats == false`. Drive the + /// real concentrator and assert a flushed payload is present/absent accordingly. + #[tokio::test] + #[allow(clippy::unwrap_used)] + #[cfg_attr(miri, ignore)] + async fn test_stats_skip_guard_via_send_processed_traces() { + use crate::traces::stats_concentrator_service::StatsConcentratorService; + + #[cfg(feature = "fips")] + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + #[cfg(not(feature = "fips"))] + let _ = rustls::crypto::ring::default_provider().install_default(); + + // (compute_on_extension, client_computed_stats) -> stats payload expected? + let cases = [ + (true, false, true), // extension computes, tracer didn't -> stats generated + (true, true, false), // tracer computed -> skip + (false, false, false), // extension doesn't compute -> skip + (false, true, false), // both off / tracer computed -> skip + ]; + + for (compute_on_extension, client_computed_stats, expect_stats) in cases { + let config = Arc::new(Config { + apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), + service: Some("test-service".to_string()), + compute_trace_stats_on_extension: compute_on_extension, + ..Config::default() + }); + + let (concentrator_service, concentrator_handle) = + StatsConcentratorService::new(config.clone()); + tokio::spawn(concentrator_service.run()); + + let (trace_tx, mut trace_rx) = tokio::sync::mpsc::channel(8); + // Drain the trace channel so send() never blocks. + tokio::spawn(async move { while trace_rx.recv().await.is_some() {} }); + + let sender = SendingTraceProcessor { + appsec: None, + processor: Arc::new(ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx, + stats_generator: Arc::new(StatsGenerator::new(concentrator_handle.clone())), + }; + + let start = get_current_timestamp_nanos(); + let tags_provider = create_tags_provider(config.clone()); + let span = create_test_span(1, 2, 0, start, true, tags_provider.clone()); + let header_tags = tracer_header_tags::TracerHeaderTags { + lang: "nodejs", + lang_version: "v19.7.0", + lang_interpreter: "v8", + lang_vendor: "vendor", + tracer_version: "4.0.0", + container_id: "33", + client_computed_top_level: false, + client_computed_stats, + dropped_p0_traces: 0, + dropped_p0_spans: 0, + }; + + sender + .send_processed_traces( + config, + tags_provider, + header_tags, + vec![vec![span]], + 100, + None, + ) + .await + .expect("send_processed_traces failed"); + + let payload = concentrator_handle.flush(true).await.expect("flush failed"); + + assert_eq!( + payload.is_some(), + expect_stats, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}" + ); + } + } } diff --git a/bottlecap/tests/logs_integration_test.rs b/bottlecap/tests/logs_integration_test.rs index 551942a43..77f5eec15 100644 --- a/bottlecap/tests/logs_integration_test.rs +++ b/bottlecap/tests/logs_integration_test.rs @@ -20,7 +20,9 @@ async fn test_logs() { // protobuf is using hashmap, can't set a btreemap to have sorted keys. Using multiple regexp since // Can't do look around since -> error: look-around, including look-ahead and look-behind, is not supported let regexp_message = r#"[{"message":{"message":"START RequestId: 459921b5-681c-4a96-beb0-81e0aa586026 Version: $LATEST","lambda":{"arn":"test-arn","request_id":"459921b5-681c-4a96-beb0-81e0aa586026"},"timestamp":1666361103165,"status":"info"},"hostname":"test-arn","service":"","#; - let regexp_compute_state = r#"_dd.compute_stats:1"#; + // NOTE: `_dd.compute_stats` is intentionally NOT asserted here. It is a per-span backend + // directive stamped on trace spans by the trace processor, not a log/function tag, so it + // must not appear in `ddtags` (APMSVLS-487). let regexp_arch = format!(r#"architecture:{}"#, arch); let regexp_function_arn = r#"function_arn:test-arn"#; let regexp_extension_version = r#"dd_extension_version"#; @@ -32,7 +34,6 @@ async fn test_logs() { .header("DD-API-KEY", dd_api_key) .header("Content-Type", "application/json") .body_contains(regexp_message) - .body_contains(regexp_compute_state) .body_contains(regexp_arch) .body_contains(regexp_function_arn) .body_contains(regexp_extension_version); diff --git a/bottlecap/tests/metrics_integration_test.rs b/bottlecap/tests/metrics_integration_test.rs index 5456a5d73..f49da31eb 100644 --- a/bottlecap/tests/metrics_integration_test.rs +++ b/bottlecap/tests/metrics_integration_test.rs @@ -14,7 +14,7 @@ async fn test_enhanced_metrics() { let dd_api_key = "my_test_key"; // payload looks like - // aws.lambda.enhanced.invocations"_dd.compute_stats:1"architecture:x86_64"function_arn:test-arn:๏ฟฝเตด ๏ฟฝ?! ๏ฟฝ?) ๏ฟฝ?1 ๏ฟฝ?:๏ฟฝB + // aws.lambda.enhanced.invocations"architecture:x86_64"function_arn:test-arn:๏ฟฝเตด ๏ฟฝ?! ๏ฟฝ?) ๏ฟฝ?1 ๏ฟฝ?:๏ฟฝB // protobuf is using hashmap, can't set a btreemap to have sorted keys. Using multiple regexp since // Can't do look around since -> error: look-around, including look-ahead and look-behind, is not supported let regexp_metric_name = r#"aws.lambda.enhanced.invocations"#; From 4dc51f78e47633d20f7d480576616b55215601b8 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 17:06:32 -0400 Subject: [PATCH 05/10] feat(traces): APMSVLS-487 Tier 2 propagate client_computed_stats to Path B aws.lambda span Thread the tracer's Datadog-Client-Computed-Stats signal through the extension-generated aws.lambda span (Path B) so _dd.compute_stats is stamped consistently with Path A (the tracer-trace path handled in Tier 1). - context.rs: add Context.client_computed_stats (+ Default false); ContextBuffer::add_tracer_span takes the flag and records it on the matching context. - processor.rs: add_tracer_span threads the param; send_ctx_spans captures context.client_computed_stats before get_ctx_spans consumes the context; send_spans takes the param and sets it on its TracerHeaderTags so Path B reuses the Tier-1 ChunkProcessor stamping (single source of truth); send_cold_start_span passes false. - processor_service.rs: ProcessorCommand::AddTracerSpan + handle + handler carry the flag. - trace_agent.rs: annotate tracer_header_tags type and pass client_computed_stats to add_tracer_span at the placeholder-span branch. - Tests: context-level flag recording, and an end-to-end Path B test that drives send_ctx_spans through the trace_tx channel and asserts _dd.compute_stats on the aws.lambda span across the full truth table. All 550 workspace tests pass; fmt + clippy clean. --- TODO.md | 19 ++- bottlecap/src/lifecycle/invocation/context.rs | 45 ++++- .../src/lifecycle/invocation/processor.rs | 160 +++++++++++++++++- .../lifecycle/invocation/processor_service.rs | 10 +- bottlecap/src/traces/trace_agent.rs | 4 +- 5 files changed, 223 insertions(+), 15 deletions(-) diff --git a/TODO.md b/TODO.md index a4d0af0b6..4910a9f5e 100644 --- a/TODO.md +++ b/TODO.md @@ -136,10 +136,21 @@ touched module after each tier. - Fixed integration tests that asserted the old leak: removed `_dd.compute_stats:1` from `logs_integration_test` body assertions; corrected the `metrics_integration_test` comment. - All 548 workspace tests pass; fmt + clippy clean. -- [ ] **Tier 2 โ€” `processor.rs` Path B** - - `Context.client_computed_stats=true` โ†’ `aws.lambda` span meta absent. - - `"1"` only when `!compute_on_extension && !client_computed_stats`. - - `ContextBuffer::add_tracer_span` sets the flag (true/false). +- [x] **Tier 2 โ€” `processor.rs` Path B** โœ… + - `context.rs`: added `pub client_computed_stats: bool` to `Context` (+ doc, `Default` init false); + `ContextBuffer::add_tracer_span` takes a `client_computed_stats` param and records it on the context. + - `processor.rs`: `add_tracer_span` threads the param through; `send_ctx_spans` reads + `context.client_computed_stats` before `get_ctx_spans` consumes it; `send_spans` takes the param and + sets it on its `TracerHeaderTags` so Path B reuses the Tier-1 `ChunkProcessor` logic (single source of + truth); `send_cold_start_span` passes `false`. + - `processor_service.rs`: `ProcessorCommand::AddTracerSpan` + handle method + command handler carry + `client_computed_stats`. + - `trace_agent.rs`: annotated `tracer_header_tags: trace_utils::TracerHeaderTags<'_>` and passes + `tracer_header_tags.client_computed_stats` to `add_tracer_span` at the placeholder-span branch. + - Tests: `test_add_tracer_span_sets_client_computed_stats` (context.rs) and + `test_send_ctx_spans_stamps_compute_stats` (processor.rs โ€” drives Path B end-to-end through the + `trace_tx` channel and asserts on the `aws.lambda` span's `_dd.compute_stats` for all 4 truth-table rows). + - All 550 workspace tests pass; fmt + clippy clean. - [ ] **Tier 3 โ€” full fake-intake E2E** (`bottlecap/tests/apm_integration_test.rs`, harness from PR #1194) - โš ๏ธ The existing `trace_payload_roundtrip_through_fake_intake` flushes a hand-built `pb::TracerPayload` and **never runs `process_traces`**, so it can't observe `_dd.compute_stats`. diff --git a/bottlecap/src/lifecycle/invocation/context.rs b/bottlecap/src/lifecycle/invocation/context.rs index 04894f9c6..3fe0999ab 100644 --- a/bottlecap/src/lifecycle/invocation/context.rs +++ b/bottlecap/src/lifecycle/invocation/context.rs @@ -43,6 +43,12 @@ pub struct Context { /// tracing. /// pub extracted_span_context: Option, + /// Whether the tracer signaled (via the `Datadog-Client-Computed-Stats` header) that it + /// already computed trace stats client-side, propagated from the tracer's placeholder span. + /// + /// Used when generating the extension-side `aws.lambda` span (Path B) so the backend + /// stats directive (`_dd.compute_stats`) is stamped consistently with Path A. + pub client_computed_stats: bool, } /// Struct containing the information needed to reparent a span. @@ -94,6 +100,7 @@ impl Default for Context { snapstart_restore_span: None, tracer_span: None, extracted_span_context: None, + client_computed_stats: false, } } } @@ -508,12 +515,20 @@ impl ContextBuffer { /// Adds the tracer span to a `Context` in the buffer. /// - pub fn add_tracer_span(&mut self, request_id: &String, tracer_span: &Span) { + /// `client_computed_stats` carries the tracer's `Datadog-Client-Computed-Stats` signal so + /// the extension-generated `aws.lambda` span can stamp `_dd.compute_stats` consistently. + pub fn add_tracer_span( + &mut self, + request_id: &String, + tracer_span: &Span, + client_computed_stats: bool, + ) { if let Some(context) = self .buffer .iter_mut() .find(|context| context.request_id == *request_id) { + context.client_computed_stats = client_computed_stats; context .invocation_span .meta @@ -639,6 +654,34 @@ mod tests { assert!(buffer.get(&unexistent_request_id).is_none()); } + /// APMSVLS-487 Tier 2: `add_tracer_span` records the tracer's `client_computed_stats` + /// signal onto the matching context (and is a no-op when no context matches). + #[test] + fn test_add_tracer_span_sets_client_computed_stats() { + for client_computed_stats in [true, false] { + let mut buffer = ContextBuffer::with_capacity(2); + let request_id = String::from("req-1"); + buffer.insert(Context::from_request_id(&request_id)); + + let mut tracer_span = Span::default(); + tracer_span + .meta + .insert("request_id".to_string(), request_id.clone()); + + buffer.add_tracer_span(&request_id, &tracer_span, client_computed_stats); + + assert_eq!( + buffer.get(&request_id).unwrap().client_computed_stats, + client_computed_stats + ); + } + + // No matching context -> no panic, nothing recorded. + let mut buffer = ContextBuffer::with_capacity(2); + buffer.add_tracer_span(&String::from("missing"), &Span::default(), true); + assert!(buffer.get(&String::from("missing")).is_none()); + } + #[test] fn test_add_start_time() { let mut buffer = ContextBuffer::with_capacity(2); diff --git a/bottlecap/src/lifecycle/invocation/processor.rs b/bottlecap/src/lifecycle/invocation/processor.rs index ec1af99ab..4be42944a 100644 --- a/bottlecap/src/lifecycle/invocation/processor.rs +++ b/bottlecap/src/lifecycle/invocation/processor.rs @@ -665,9 +665,17 @@ impl Processor { trace_sender: &Arc, context: Context, ) { + // Capture before `get_ctx_spans` consumes `context`. + let client_computed_stats = context.client_computed_stats; let (traces, body_size) = self.get_ctx_spans(context); - self.send_spans(traces, body_size, tags_provider, trace_sender) - .await; + self.send_spans( + traces, + body_size, + tags_provider, + trace_sender, + client_computed_stats, + ) + .await; } fn get_ctx_spans(&mut self, context: Context) -> (Vec, usize) { @@ -732,7 +740,9 @@ impl Processor { let traces = vec![cold_start_span.clone()]; let body_size = size_of_val(cold_start_span); - self.send_spans(traces, body_size, tags_provider, trace_sender) + // The cold start span is extension-generated and not tied to a tracer's stats + // signal, so the backend should compute its stats unless the extension does. + self.send_spans(traces, body_size, tags_provider, trace_sender, false) .await; } } @@ -746,8 +756,12 @@ impl Processor { body_size: usize, tags_provider: &Arc, trace_sender: &Arc, + client_computed_stats: bool, ) { // todo: figure out what to do here + // `client_computed_stats` is propagated from the tracer's placeholder span so the + // downstream `ChunkProcessor` (reused via `send_processed_traces` -> `process_traces`) + // stamps `_dd.compute_stats` on these extension-generated spans consistently with Path A. let header_tags = tracer_header_tags::TracerHeaderTags { lang: "", lang_version: "", @@ -756,7 +770,7 @@ impl Processor { tracer_version: "", container_id: "", client_computed_top_level: false, - client_computed_stats: false, + client_computed_stats, dropped_p0_traces: 0, dropped_p0_spans: 0, }; @@ -1403,9 +1417,10 @@ impl Processor { /// /// This is used to enrich the invocation span with additional metadata from the tracers /// top level span, since we discard the tracer span when we create the invocation span. - pub fn add_tracer_span(&mut self, span: &Span) { + pub fn add_tracer_span(&mut self, span: &Span, client_computed_stats: bool) { if let Some(request_id) = span.meta.get("request_id") { - self.context_buffer.add_tracer_span(request_id, span); + self.context_buffer + .add_tracer_span(request_id, span, client_computed_stats); } } @@ -2445,4 +2460,137 @@ mod tests { "pre-existing _dd.appsec.enabled value must not be overwritten" ); } + + /// Build a [`Processor`] with a caller-supplied config (for toggling + /// `compute_trace_stats_on_extension`). + fn setup_with_config(config: Arc) -> Processor { + let aws_config = Arc::new(AwsConfig { + region: "us-east-1".into(), + aws_lwa_proxy_lambda_runtime_api: Some("***".into()), + function_name: "test-function".into(), + sandbox_init_time: Instant::now(), + runtime_api: "***".into(), + exec_wrapper: None, + initialization_type: "on-demand".into(), + }); + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1024).expect("failed to create aggregator service"); + tokio::spawn(service.run()); + let propagator = Arc::new(DatadogCompositePropagator::new(Arc::clone(&config))); + let (durable_context_tx, _) = tokio::sync::mpsc::channel(1); + Processor::new( + tags_provider, + config, + aws_config, + handle, + propagator, + durable_context_tx, + ) + } + + /// Like [`make_trace_sender`], but returns the receiver so the test can inspect the + /// processed payload that Path B sends downstream. + fn make_trace_sender_with_rx( + config: Arc, + ) -> ( + Arc, + tokio::sync::mpsc::Receiver, + ) { + use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; + let (stats_concentrator_service, stats_concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(stats_concentrator_service.run()); + let (trace_tx, trace_rx) = tokio::sync::mpsc::channel(8); + let sender = Arc::new(SendingTraceProcessor { + appsec: None, + processor: Arc::new(trace_processor::ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx, + stats_generator: Arc::new(StatsGenerator::new(stats_concentrator_handle)), + }); + (sender, trace_rx) + } + + /// APMSVLS-487 Tier 2: the extension-generated `aws.lambda` span (Path B) stamps + /// `_dd.compute_stats="1"` only when neither the extension nor the tracer computes stats; + /// otherwise the key is absent. `client_computed_stats` is propagated from the context. + #[tokio::test] + #[allow(clippy::unwrap_used)] + async fn test_send_ctx_spans_stamps_compute_stats() { + use crate::tags::lambda::tags::COMPUTE_STATS_KEY; + use libdd_trace_utils::tracer_payload::TracerPayloadCollection; + + #[cfg(feature = "fips")] + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + #[cfg(not(feature = "fips"))] + let _ = rustls::crypto::ring::default_provider().install_default(); + + // (compute_on_extension, client_computed_stats) -> expected meta value on aws.lambda span + let cases = [ + (false, false, Some("1")), + (false, true, None), + (true, false, None), + (true, true, None), + ]; + + for (compute_on_extension, client_computed_stats, expected) in cases { + let config = Arc::new(config::Config { + apm_dd_url: "https://trace.agent.datadoghq.com".to_string(), + service: Some("test-service".to_string()), + compute_trace_stats_on_extension: compute_on_extension, + ..config::Config::default() + }); + let mut processor = setup_with_config(Arc::clone(&config)); + let (trace_sender, mut trace_rx) = make_trace_sender_with_rx(Arc::clone(&config)); + + let mut context = Context::from_request_id("req-1"); + context.client_computed_stats = client_computed_stats; + context.invocation_span = Span { + name: "aws.lambda".to_string(), + resource: "test-resource".to_string(), + service: "test-service".to_string(), + span_id: 1, + trace_id: 100, + ..Default::default() + }; + + let tags_provider = Arc::new(provider::Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &HashMap::from([("function_arn".to_string(), "test-arn".to_string())]), + )); + processor + .send_ctx_spans(&tags_provider, &trace_sender, context) + .await; + + let info = trace_rx.recv().await.expect("expected a sent payload"); + let send_data = info.builder.build(); + let TracerPayloadCollection::V07(payloads) = send_data.get_payloads() else { + panic!("expected V07 payload"); + }; + let aws_lambda_span = payloads + .iter() + .flat_map(|p| &p.chunks) + .flat_map(|c| &c.spans) + .find(|s| s.name == "aws.lambda") + .expect("aws.lambda span should be present"); + + assert_eq!( + aws_lambda_span + .meta + .get(COMPUTE_STATS_KEY) + .map(String::as_str), + expected, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}" + ); + } + } } diff --git a/bottlecap/src/lifecycle/invocation/processor_service.rs b/bottlecap/src/lifecycle/invocation/processor_service.rs index a41a95b26..70c85d47c 100644 --- a/bottlecap/src/lifecycle/invocation/processor_service.rs +++ b/bottlecap/src/lifecycle/invocation/processor_service.rs @@ -109,6 +109,7 @@ pub enum ProcessorCommand { }, AddTracerSpan { span: Box, + client_computed_stats: bool, }, ForwardDurableContext { request_id: String, @@ -378,10 +379,12 @@ impl InvocationProcessorHandle { pub async fn add_tracer_span( &self, span: Span, + client_computed_stats: bool, ) -> Result<(), mpsc::error::SendError> { self.sender .send(ProcessorCommand::AddTracerSpan { span: Box::new(span), + client_computed_stats, }) .await } @@ -612,8 +615,11 @@ impl InvocationProcessorService { let result = Ok(self.processor.set_cold_start_span_trace_id(trace_id)); let _ = response.send(result); } - ProcessorCommand::AddTracerSpan { span } => { - self.processor.add_tracer_span(&span); + ProcessorCommand::AddTracerSpan { + span, + client_computed_stats, + } => { + self.processor.add_tracer_span(&span, client_computed_stats); } ProcessorCommand::ForwardDurableContext { request_id, diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index e73d0ebec..efe10852d 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -503,7 +503,7 @@ impl TraceAgent { ); } - let tracer_header_tags = (&parts.headers).into(); + let tracer_header_tags: trace_utils::TracerHeaderTags<'_> = (&parts.headers).into(); let (body_size, mut traces): (usize, Vec>) = match version { ApiVersion::V04 => { @@ -590,7 +590,7 @@ impl TraceAgent { if span.resource == INVOCATION_SPAN_RESOURCE && let Err(e) = invocation_processor_handle - .add_tracer_span(span.clone()) + .add_tracer_span(span.clone(), tracer_header_tags.client_computed_stats) .await { error!("Failed to add tracer span to processor: {}", e); From d16031480bce921416150c4d07315fab304c9f83 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 17:22:16 -0400 Subject: [PATCH 06/10] test(traces): APMSVLS-487 Tier 3 fake-intake E2E for _dd.compute_stats + stats suppression Add end-to-end coverage that routes a trace through SendingTraceProcessor::send_processed_traces (so process_traces/ChunkProcessor actually run) and asserts on the wire payloads captured by the in-process fake-intake. - run_processor_pipeline(compute_on_extension, client_computed_stats): drains the trace_tx channel into AggregatorService synchronously after send (avoids a flush race), flushes via TraceFlusher, and flushes stats via StatsConcentratorService -> StatsAggregator -> StatsFlusher. Parameterizes header_tags_with(client_computed_stats). - T3.1 e2e_client_computed_stats_leaves_compute_stats_absent: tracer computed -> span meta absent. - T3.2 e2e_compute_stats_truth_table_on_captured_span: "1" only for the neither-computes row. - T3.3 e2e_stats_suppressed_unless_extension_computes: one stats payload only for (compute_on_extension, !client_computed_stats); empty otherwise. - T3.4 e2e_client_computed_stats_absent_meta_and_no_stats: combined meta-absent + zero stats. 554/554 workspace tests pass; fmt + clippy clean. --- TODO.md | 10 +- bottlecap/tests/apm_integration_test.rs | 229 ++++++++++++++++++++++++ 2 files changed, 238 insertions(+), 1 deletion(-) diff --git a/TODO.md b/TODO.md index 4910a9f5e..1daf9433d 100644 --- a/TODO.md +++ b/TODO.md @@ -151,7 +151,15 @@ touched module after each tier. `test_send_ctx_spans_stamps_compute_stats` (processor.rs โ€” drives Path B end-to-end through the `trace_tx` channel and asserts on the `aws.lambda` span's `_dd.compute_stats` for all 4 truth-table rows). - All 550 workspace tests pass; fmt + clippy clean. -- [ ] **Tier 3 โ€” full fake-intake E2E** (`bottlecap/tests/apm_integration_test.rs`, harness from PR #1194) +- [x] **Tier 3 โ€” full fake-intake E2E** (`bottlecap/tests/apm_integration_test.rs`, harness from PR #1194) โœ… + - Added `run_processor_pipeline(compute_on_extension, client_computed_stats)` which routes a trace + through `SendingTraceProcessor::send_processed_traces`, drains the `trace_tx` channel into + `AggregatorService` (synchronously, post-send, to avoid a flush race), flushes via `TraceFlusher`, + and flushes stats via `StatsConcentratorService` โ†’ `StatsAggregator` โ†’ `StatsFlusher`. Parameterized + `header_tags_with(client_computed_stats)`. Tests: T3.1 (`e2e_client_computed_stats_leaves_compute_stats_absent`), + T3.2 (`e2e_compute_stats_truth_table_on_captured_span`), T3.3 (`e2e_stats_suppressed_unless_extension_computes`), + T3.4 (`e2e_client_computed_stats_absent_meta_and_no_stats`). All assert on the captured `AgentPayload` + span meta and `stats_payloads()`. 554/554 workspace tests pass; fmt + clippy clean. - โš ๏ธ The existing `trace_payload_roundtrip_through_fake_intake` flushes a hand-built `pb::TracerPayload` and **never runs `process_traces`**, so it can't observe `_dd.compute_stats`. New cases MUST route a trace **through `SendingTraceProcessor::send_processed_traces`** before flushing. diff --git a/bottlecap/tests/apm_integration_test.rs b/bottlecap/tests/apm_integration_test.rs index fcdc9bfbc..aea951ccd 100644 --- a/bottlecap/tests/apm_integration_test.rs +++ b/bottlecap/tests/apm_integration_test.rs @@ -17,16 +17,21 @@ use std::str::FromStr; use std::sync::Arc; +use bottlecap::LAMBDA_RUNTIME_SLUG; use bottlecap::config::Config; +use bottlecap::tags::provider::Provider; use bottlecap::traces::http_client::create_client; use bottlecap::traces::stats_aggregator::StatsAggregator; use bottlecap::traces::stats_concentrator_service::StatsConcentratorService; use bottlecap::traces::stats_flusher::StatsFlusher; +use bottlecap::traces::stats_generator::StatsGenerator; use bottlecap::traces::trace_aggregator::SendDataBuilderInfo; use bottlecap::traces::trace_aggregator_service::AggregatorService; use bottlecap::traces::trace_flusher::TraceFlusher; +use bottlecap::traces::trace_processor::{SendingTraceProcessor, ServerlessTraceProcessor}; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; +use libdd_trace_obfuscation::obfuscation_config::ObfuscationConfig; use libdd_trace_protobuf::pb; use libdd_trace_utils::send_data::SendDataBuilder; use libdd_trace_utils::trace_utils::TracerHeaderTags; @@ -267,3 +272,227 @@ async fn trace_payload_roundtrip_through_fake_intake() { assert_eq!(span.trace_id, 0x1111_1111_1111_1111); assert_eq!(span.span_id, 0x2222_2222_2222_2222); } + +// --------------------------------------------------------------------------- +// APMSVLS-487 Tier 3: full fake-intake E2E through `SendingTraceProcessor`. +// +// Unlike `trace_payload_roundtrip_through_fake_intake` (which inserts a hand-built +// `pb::TracerPayload` directly), these tests route a trace through +// `SendingTraceProcessor::send_processed_traces` so they exercise the real +// `process_traces`/`ChunkProcessor` stamping of `_dd.compute_stats` and the +// extension-side stats-generation guard. +// --------------------------------------------------------------------------- + +const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; + +fn header_tags_with(client_computed_stats: bool) -> TracerHeaderTags<'static> { + TracerHeaderTags { + client_computed_stats, + ..header_tags() + } +} + +/// Outcome of routing a single trace through the processor + flushers. +struct PipelineOutcome { + traces: Vec, + stats: Vec, +} + +/// Drives one trace through `SendingTraceProcessor::send_processed_traces` with the given +/// `compute_trace_stats_on_extension` / `client_computed_stats`, then flushes both the trace +/// and stats pipelines into a fresh fake-intake and returns what it captured. +async fn run_processor_pipeline( + compute_on_extension: bool, + client_computed_stats: bool, +) -> PipelineOutcome { + let fake_intake = FakeIntake::start().await; + + let config = Arc::new(Config { + api_key: DD_API_KEY.to_string(), + site: "datadoghq.com".to_string(), + // process_traces builds its trace endpoint directly from apm_dd_url. + apm_dd_url: fake_intake.traces_url(), + service: Some("fake-intake-trace-service".to_string()), + compute_trace_stats_on_extension: compute_on_extension, + ..Config::default() + }); + + // --- Trace pipeline: trace_tx -> (drained below) -> AggregatorService -> TraceFlusher --- + let (aggregator_service, aggregator_handle) = AggregatorService::default(); + tokio::spawn(aggregator_service.run()); + let (trace_tx, mut trace_rx) = tokio::sync::mpsc::channel::(8); + + // --- Stats pipeline: StatsConcentratorService -> StatsAggregator -> StatsFlusher --- + let (concentrator_service, concentrator_handle) = + StatsConcentratorService::new(Arc::clone(&config)); + tokio::spawn(concentrator_service.run()); + + let sender = SendingTraceProcessor { + appsec: None, + processor: Arc::new(ServerlessTraceProcessor { + obfuscation_config: Arc::new( + ObfuscationConfig::new().expect("Failed to create ObfuscationConfig"), + ), + }), + trace_tx, + stats_generator: Arc::new(StatsGenerator::new(concentrator_handle.clone())), + }; + + let tags_provider = Arc::new(Provider::new( + Arc::clone(&config), + LAMBDA_RUNTIME_SLUG.to_string(), + &std::collections::HashMap::from([( + "function_arn".to_string(), + "arn:aws:lambda:us-west-2:123456789012:function:my-function".to_string(), + )]), + )); + + // A top-level root span so the concentrator produces stats. + let mut span = pb::Span { + service: "fake-intake-trace-service".to_string(), + name: "web.request".to_string(), + resource: "GET /fake".to_string(), + trace_id: 0x1111_1111_1111_1111, + span_id: 0x2222_2222_2222_2222, + parent_id: 0, + start: 1_700_000_000_000_000_000, + duration: 5_000_000, + error: 0, + r#type: "web".to_string(), + ..pb::Span::default() + }; + span.metrics.insert("_top_level".to_string(), 1.0); + + sender + .send_processed_traces( + Arc::clone(&config), + tags_provider, + header_tags_with(client_computed_stats), + vec![vec![span]], + 100, + None, + ) + .await + .expect("send_processed_traces failed"); + + // Drain whatever `send_processed_traces` produced into the aggregator before flushing. + // `send_processed_traces` has already returned, so the payload (if any) is buffered in the + // channel and `try_recv` will surface it without racing a background task. + drop(sender); + while let Ok(info) = trace_rx.try_recv() { + aggregator_handle + .insert_payload(info) + .expect("insert_payload must succeed"); + } + + // Flush traces. + let http_client = create_client(None, None, false).expect("failed to create http client"); + let api_key_factory = Arc::new(ApiKeyFactory::new(DD_API_KEY)); + let trace_flusher = TraceFlusher::new( + aggregator_handle, + Arc::clone(&config), + Arc::clone(&api_key_factory), + http_client.clone(), + ); + let failed = trace_flusher.flush(None).await; + assert!(failed.is_none(), "trace flush failed: {failed:?}"); + + // Flush stats (pulls from the concentrator via the aggregator). + let stats_aggregator = Arc::new(Mutex::new(StatsAggregator::new_with_concentrator( + concentrator_handle, + ))); + let stats_flusher = StatsFlusher::new( + api_key_factory, + stats_aggregator, + Arc::clone(&config), + http_client, + fake_intake.stats_url(), + ); + let failed = stats_flusher.flush(true, None).await; + assert!(failed.is_none(), "stats flush failed: {failed:?}"); + + PipelineOutcome { + traces: fake_intake.trace_payloads(), + stats: fake_intake.stats_payloads(), + } +} + +/// Finds the single span in the captured trace payloads and returns its `_dd.compute_stats`. +fn captured_compute_stats(traces: &[pb::AgentPayload]) -> Option { + let span = traces + .iter() + .flat_map(|p| &p.tracer_payloads) + .flat_map(|tp| &tp.chunks) + .flat_map(|c| &c.spans) + .find(|s| s.name == "web.request") + .expect("web.request span should be present"); + span.meta.get(COMPUTE_STATS_KEY).cloned() +} + +/// T3.1: `client_computed_stats=true` โ†’ captured span meta has `_dd.compute_stats` absent. +#[tokio::test] +async fn e2e_client_computed_stats_leaves_compute_stats_absent() { + let outcome = run_processor_pipeline(false, true).await; + assert!( + captured_compute_stats(&outcome.traces).is_none(), + "_dd.compute_stats must be absent when the tracer computed stats", + ); +} + +/// T3.2: control matrix โ€” `"1"` only for the (neither computes) row; absent otherwise. +#[tokio::test] +async fn e2e_compute_stats_truth_table_on_captured_span() { + let cases = [ + (false, false, Some("1")), + (false, true, None), + (true, false, None), + (true, true, None), + ]; + for (compute_on_extension, client_computed_stats, expected) in cases { + let outcome = run_processor_pipeline(compute_on_extension, client_computed_stats).await; + assert_eq!( + captured_compute_stats(&outcome.traces).as_deref(), + expected, + "compute_on_extension={compute_on_extension}, client_computed_stats={client_computed_stats}", + ); + } +} + +/// T3.3: stats suppression โ€” a stats payload is produced only when the extension computes +/// stats and the tracer did not; otherwise the stats intake stays empty. +#[tokio::test] +async fn e2e_stats_suppressed_unless_extension_computes() { + // Extension computes, tracer didn't -> exactly one stats payload. + let outcome = run_processor_pipeline(true, false).await; + assert_eq!( + outcome.stats.len(), + 1, + "expected one stats payload when the extension computes stats", + ); + + // Tracer computed -> extension skips stats generation. + let outcome = run_processor_pipeline(true, true).await; + assert!( + outcome.stats.is_empty(), + "stats must be suppressed when the tracer computed them", + ); + + // Extension not computing -> no stats either way. + let outcome = run_processor_pipeline(false, false).await; + assert!( + outcome.stats.is_empty(), + "stats must be empty when the extension does not compute them", + ); +} + +/// T3.4: combined path โ€” when the tracer computed stats, the captured trace has no +/// `_dd.compute_stats` AND zero stats payloads reach the intake. +#[tokio::test] +async fn e2e_client_computed_stats_absent_meta_and_no_stats() { + let outcome = run_processor_pipeline(true, true).await; + assert!( + captured_compute_stats(&outcome.traces).is_none(), + "_dd.compute_stats must be absent", + ); + assert!(outcome.stats.is_empty(), "no stats payloads must be sent",); +} From 7153eefe16dcd87a8d8a81683b1f6dd16c4a6e42 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 17:44:35 -0400 Subject: [PATCH 07/10] remove TODO planning file --- TODO.md | 198 -------------------------------------------------------- 1 file changed, 198 deletions(-) delete mode 100644 TODO.md diff --git a/TODO.md b/TODO.md deleted file mode 100644 index 1daf9433d..000000000 --- a/TODO.md +++ /dev/null @@ -1,198 +0,0 @@ -# APMSVLS-487 โ€” respect `Datadog-Client-Computed-Stats` (branch `lpimentel/respect-client-computed-stats`) - -## Goal - -When a tracer sends `Datadog-Client-Computed-Stats: `, the extension must: -1. Skip its own (agent-side) stats generation for those traces. -2. Signal the backend not to compute stats by **leaving `_dd.compute_stats` absent** on each span's meta. -3. Do the same for the extension-generated `aws.lambda` span (Path B), which requires - propagating the flag from the tracer's placeholder span. - -## Canonical semantics (validated against the Go agent) - -`datadog-agent/cmd/serverless-init/tag/tag.go` + `pkg/serverless/tags/tags.go` only ever set -`_dd.compute_stats = "1"`, **never `"0"`**, and leave the key absent otherwise: -- `"1"` โ†’ backend computes stats (only when nobody else did: not tracer, not extension). -- absent โ†’ backend does not compute (someone already did, or accepted-incorrect). - -This **supersedes** the earlier "override to `0`" plan. `"0"` and absent are equivalent -"off" states to the backend, so emitting absent instead of today's `"0"` is **not a regression**; -it just aligns us with the canonical agent. - -Rule (one line): **set `_dd.compute_stats="1"` iff `!compute_trace_stats_on_extension && !client_computed_stats`; otherwise leave absent.** - -### Truth table - -| `compute_trace_stats_on_extension` | `client_computed_stats` | `_dd.compute_stats` on span meta | Extension computes stats? | -|---|---|---|---| -| false | false | `"1"` | No | -| false | true | absent | No | -| true | false | absent | Yes | -| true | true | absent | No | - -## Header-value finding (cross-runtime) - -`Datadog-Client-Computed-Stats` is not standardized across tracers: -- `"true"` โ€” .NET, Java, PHP, Python -- `"yes"` โ€” JS, Ruby, C++ -- `"t"` โ€” Go - -bottlecap consumes the **already-parsed** `client_computed_stats` bool from -`libdd_trace_utils` (`tracer_header_tags.rs:137`): `client_computed_stats = !value.is_empty()`. -All three values are non-empty โ†’ all map to `true` โ†’ **the fix triggers on every runtime.** -No production change needed here; we just consume the bool. - -### Latent divergence vs the Go agent (NOT in scope โ€” follow-up) - -The Go agent applies one uniform rule to both `client_computed_stats` and -`client_computed_top_level` via `isHeaderTrue` (`pkg/trace/api/api.go:1065`, used at -`:947`/`:1055`): empty โ†’ `false`; `strconv.ParseBool` success โ†’ that bool (so -`"0"`/`"false"`/`"f"` โ†’ `false`); ParseBool failure (e.g. `"yes"`) โ†’ `true`. - -libdatadog (`tracer_header_tags.rs:133-138`) parses the two headers **differently from -the agent and from each other**: -- `client_computed_stats` (`:136-137`): `!value.is_empty()` โ€” non-empty โ†’ `true`. -- `client_computed_top_level` (`:133-135`): `headers.get(...).is_some()` โ€” **presence-only**; - value ignored, so even an empty value โ†’ `true`. - -| Header value | libdd stats | libdd top_level | Go agent (both) | -|---|---|---|---| -| absent | false | false | false | -| `""` (present, empty) | false | **true** | false | -| `"true"`/`"yes"`/`"t"`/`"1"` | true | true | true | -| `"0"`/`"false"`/`"f"` | **true** | **true** | false | - -So `top_level` is the looser of the two (diverges on empty-present **and** falsey strings); -`stats` diverges only on falsey strings. Both are latent: tracers signal by sending a -truthy/present header and omit it otherwise, so the divergent rows don't occur in real traffic. - -- [ ] **Follow-up libdatadog PR**: make `From<&HeaderMap>` use `isHeaderTrue`/`ParseBool` - semantics for **both** `client_computed_stats` and `client_computed_top_level` to match the - agent (also collapses libdatadog's internal stats-vs-top_level inconsistency). Shared - libdatadog, separate from this ticket. (This ticket only consumes `client_computed_stats`.) - -## Implementation (production code) - -- [ ] `bottlecap/src/tags/lambda/tags.rs` - - Keep `COMPUTE_STATS_KEY` `pub(crate)`. - - **Stop baking** `_dd.compute_stats` in `tags_from_env` (remove the unconditional insert). - This also drops it from `_dd.tags.function` (correct โ€” it's a per-span backend directive). - - Update unit tests: `test_new_from_config` (`tags_map.len()` 3โ†’2, delete `COMPUTE_STATS_KEY=="1"`), - `test_get_function_tags_map*` (`14โ†’13`), add a test asserting the key is absent from `get_tags_map()`. -- [ ] `bottlecap/src/traces/trace_processor.rs` - - `use crate::tags::lambda::tags::COMPUTE_STATS_KEY;` - - Add `client_computed_stats: bool` to `struct ChunkProcessor`. - - In `ChunkProcessor::process`, after the `tags_map` copy: insert `_dd.compute_stats="1"` - **only when** `!self.config.compute_trace_stats_on_extension && !self.client_computed_stats`. - - In `process_traces`, set `client_computed_stats: header_tags.client_computed_stats`. - - In `send_processed_traces`, capture `client_computed_stats` before `header_tags` is moved and - add `&& !client_computed_stats` to the `compute_trace_stats_on_extension` stats-gen guard. - - Update the 4 in-test `ChunkProcessor { โ€ฆ }` literals with `client_computed_stats: false`. -- [ ] `bottlecap/src/lifecycle/invocation/context.rs` (Path B) - - Add `pub client_computed_stats: bool` to `struct Context` (+ doc) and init `false` in `Default`. - - `ContextBuffer::add_tracer_span`: add `client_computed_stats: bool` param; set on context. -- [ ] `bottlecap/src/lifecycle/invocation/processor.rs` (Path B) - - `add_tracer_span`: add param, pass through. - - `send_ctx_spans`: read `context.client_computed_stats` before `get_ctx_spans` consumes it. - - `send_spans` (`processor.rs:743`): add param. It already builds a `TracerHeaderTags` and hardcodes - `client_computed_stats: false` (`:759`) before calling `send_processed_traces` โ†’ `process_traces` โ†’ - `ChunkProcessor`. Set that field from the param so Path B **reuses the Tier-1 `ChunkProcessor` logic** - to stamp `_dd.compute_stats` (single source of truth โ€” no separate manual meta insert). - - `send_cold_start_span`: pass `false`. -- [ ] `bottlecap/src/lifecycle/invocation/processor_service.rs` (Path B) - - Add `client_computed_stats: bool` to `ProcessorCommand::AddTracerSpan`, the handle method, - and the command handler. -- [ ] `bottlecap/src/traces/trace_agent.rs` (Path B source) - - Annotate `let tracer_header_tags: TracerHeaderTags<'_> = (&parts.headers).into();` - - At the placeholder-span branch (`span.resource == INVOCATION_SPAN_RESOURCE`), pass - `tracer_header_tags.client_computed_stats` to `add_tracer_span`. - -## Tests - -**Tests-first**: each tier is red โ†’ green. Write the failing test(s), then make the -minimal production change above that turns them green. Run `cargo nextest` for the -touched module after each tier. - -- [x] **Tier 0 โ€” header-parsing contract** (bottlecap, at `(&headers).into()` boundary, `trace_agent.rs:506`) - - โœ… Done: added `#[cfg(test)] mod tests` at end of `trace_agent.rs` (3 tests, green on current libdatadog db05e1f, no production change). Falsey-string divergence documented + points at libdatadog#2071. - - `"true"`, `"yes"`, `"t"` โ†’ `client_computed_stats == true` (all runtimes trigger the fix). - - `""`, absent โ†’ `false`. - - **Included (documenting test):** `"false"`/`"0"` โ†’ currently `true`; assert-and-comment to - surface the known libdatadog divergence intentionally and point at the follow-up. - - Green on existing code (no production change) โ€” locks the contract the rest relies on. -- [x] **Tier 1 โ€” `trace_processor.rs` unit** โœ… - - `tags.rs`: `COMPUTE_STATS_KEY` now `pub(crate)`; removed unconditional insert in `tags_from_env` - (no longer leaks into `_dd.tags.function`). Updated `test_new_from_config` (3โ†’2 + absence assert) - and both `test_get_function_tags_map*` (14โ†’13 + absence assert). - - `trace_processor.rs`: imported `COMPUTE_STATS_KEY`; added `client_computed_stats: bool` to - `ChunkProcessor`; stamp `_dd.compute_stats="1"` in `process` only when - `!compute_trace_stats_on_extension && !client_computed_stats`; wired field from - `header_tags.client_computed_stats` in `process_traces`; added `&& !client_computed_stats` to the - stats-gen guard in `send_processed_traces` (captured before `header_tags` is moved). Updated the - 4 in-test `ChunkProcessor` literals + `test_process_trace` expected span. - - New tests: `test_compute_stats_truth_table` (asserts on `Span.meta`, #1118 guard) and - `test_stats_skip_guard_via_send_processed_traces` (drives real `StatsConcentratorService`, - asserts flushed payload Some only for `(compute_on_extension=true, client_computed_stats=false)`). - - Fixed integration tests that asserted the old leak: removed `_dd.compute_stats:1` from - `logs_integration_test` body assertions; corrected the `metrics_integration_test` comment. - - All 548 workspace tests pass; fmt + clippy clean. -- [x] **Tier 2 โ€” `processor.rs` Path B** โœ… - - `context.rs`: added `pub client_computed_stats: bool` to `Context` (+ doc, `Default` init false); - `ContextBuffer::add_tracer_span` takes a `client_computed_stats` param and records it on the context. - - `processor.rs`: `add_tracer_span` threads the param through; `send_ctx_spans` reads - `context.client_computed_stats` before `get_ctx_spans` consumes it; `send_spans` takes the param and - sets it on its `TracerHeaderTags` so Path B reuses the Tier-1 `ChunkProcessor` logic (single source of - truth); `send_cold_start_span` passes `false`. - - `processor_service.rs`: `ProcessorCommand::AddTracerSpan` + handle method + command handler carry - `client_computed_stats`. - - `trace_agent.rs`: annotated `tracer_header_tags: trace_utils::TracerHeaderTags<'_>` and passes - `tracer_header_tags.client_computed_stats` to `add_tracer_span` at the placeholder-span branch. - - Tests: `test_add_tracer_span_sets_client_computed_stats` (context.rs) and - `test_send_ctx_spans_stamps_compute_stats` (processor.rs โ€” drives Path B end-to-end through the - `trace_tx` channel and asserts on the `aws.lambda` span's `_dd.compute_stats` for all 4 truth-table rows). - - All 550 workspace tests pass; fmt + clippy clean. -- [x] **Tier 3 โ€” full fake-intake E2E** (`bottlecap/tests/apm_integration_test.rs`, harness from PR #1194) โœ… - - Added `run_processor_pipeline(compute_on_extension, client_computed_stats)` which routes a trace - through `SendingTraceProcessor::send_processed_traces`, drains the `trace_tx` channel into - `AggregatorService` (synchronously, post-send, to avoid a flush race), flushes via `TraceFlusher`, - and flushes stats via `StatsConcentratorService` โ†’ `StatsAggregator` โ†’ `StatsFlusher`. Parameterized - `header_tags_with(client_computed_stats)`. Tests: T3.1 (`e2e_client_computed_stats_leaves_compute_stats_absent`), - T3.2 (`e2e_compute_stats_truth_table_on_captured_span`), T3.3 (`e2e_stats_suppressed_unless_extension_computes`), - T3.4 (`e2e_client_computed_stats_absent_meta_and_no_stats`). All assert on the captured `AgentPayload` - span meta and `stats_payloads()`. 554/554 workspace tests pass; fmt + clippy clean. - - โš ๏ธ The existing `trace_payload_roundtrip_through_fake_intake` flushes a hand-built - `pb::TracerPayload` and **never runs `process_traces`**, so it can't observe `_dd.compute_stats`. - New cases MUST route a trace **through `SendingTraceProcessor::send_processed_traces`** before flushing. - - Wiring per test: - - `AggregatorService::default()` + spawn `.run()`. `SendingTraceProcessor.trace_tx` is an - `mpsc::Sender` (distinct from `AggregatorHandle`): make a channel, pass `tx` - as `trace_tx`, spawn a forwarder `while let Some(info)=rx.recv().await { agg_handle.insert_payload(info) }`. - Flush via `TraceFlusher::new(agg_handle, โ€ฆ).flush(None)` โ†’ `fake_intake.trace_payloads()`. - - `StatsConcentratorService::new(config)` + spawn; `StatsGenerator::new(conc_handle)` as `stats_generator`. - After `send_processed_traces`, flush concentrator โ†’ `StatsAggregator` โ†’ `StatsFlusher` (mirror the - existing `stats_payload_roundtrip_*` test) โ†’ `fake_intake.stats_payloads()`. - - `SendingTraceProcessor { appsec: None, processor: Arc::new(ServerlessTraceProcessor { obfuscation_config }), trace_tx, stats_generator }`. - - Parameterize `header_tags()` (currently hardcodes `client_computed_stats: true`, `apm_integration_test.rs:43`) - and `Config.compute_trace_stats_on_extension`. - - T3.1: `client_computed_stats=true` โ†’ captured `AgentPayload` span meta has `_dd.compute_stats` **absent**. - - T3.2 control: absent when `compute_on_extension=true` OR `client_computed_stats=true`; `"1"` for the neither row. - - T3.3: stats suppression โ€” empty `stats_payloads()` when `client_computed_stats=true`; one payload for - `(compute_on_extension=true, client_computed_stats=false)`. - - T3.4: combined path โ€” trace meta absent AND zero stats payloads. - -## Validation - -```bash -cd bottlecap -cargo fmt --all -RUSTFLAGS="-D warnings" cargo clippy --workspace --all-targets --features default -cargo nextest run --workspace -``` - -## โš ๏ธ Risk carried from the ticket (not solved by this code) - -Go/Java tracers flush client-side stats async, which often doesn't complete in Lambda before the -extension forwards data. With this fix those runtimes could see zero stats (extension skips + -backend told to skip + tracer never flushed). Full fix chosen anyway. -- [ ] **Gate manual E2E** on the exact golang-default/java cases that failed before - (`test_trace_aws_lambda_hits_metric`) prior to merge. From 003415f84a65bb4c6de38f03a413113dcc69c825 Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 18:23:27 -0400 Subject: [PATCH 08/10] test(traces): move APMSVLS-487 Tier 0 header-parsing tests to the libdatadog bump PR The Tier 0 tests assert libdatadog parsing behavior (not the _dd.compute_stats feature), and libdatadog#2071 changes that behavior via the db05e1f -> 48da0d8 bump in PR #1244. Move them there so the bump carries its own behavioral coverage and the assertions don't need to assert opposite things on two branches. Feature branch keeps Tiers 1-3; rebase onto #1244 after it merges. --- bottlecap/src/traces/trace_agent.rs | 65 ----------------------------- 1 file changed, 65 deletions(-) diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index efe10852d..599fa50b9 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -774,68 +774,3 @@ fn success_response(message: &str) -> Response { debug!("{}", message); (StatusCode::OK, json!({"rate_by_service": {}}).to_string()).into_response() } - -#[cfg(test)] -mod tests { - //! Tier 0 (APMSVLS-487): lock the `client_computed_stats` parsing contract at the - //! `(&headers).into()` boundary that `handle_traces` uses (see `trace_agent.rs:506`). - //! - //! This is the bool the rest of the fix consumes. These tests pass on the current - //! libdatadog (rev `db05e1f`) with no production change โ€” they document the behavior - //! the higher tiers rely on, and guard against silent regressions when libdatadog is - //! bumped (e.g. once `DataDog/libdatadog#2071` lands; see the divergence rows below). - - use axum::http::{HeaderMap, HeaderValue}; - use libdd_trace_utils::trace_utils::TracerHeaderTags; - - /// Build a `HeaderMap` and convert it the same way `handle_traces` does. - fn parse_stats(value: Option<&str>) -> bool { - let mut headers = HeaderMap::new(); - if let Some(v) = value { - headers.insert( - "datadog-client-computed-stats", - HeaderValue::from_str(v).expect("valid header value"), - ); - } - let tags: TracerHeaderTags<'_> = (&headers).into(); - tags.client_computed_stats - } - - #[test] - fn truthy_values_trigger_the_fix_on_every_runtime() { - // Cross-runtime header values: ".NET/Java/PHP/Python" -> "true", "JS/Ruby/C++" -> "yes", - // "Go" -> "t", plus a canonical "1". All non-empty -> the fix must trigger. - for value in ["true", "yes", "t", "1"] { - assert!( - parse_stats(Some(value)), - "expected client_computed_stats == true for {value:?}" - ); - } - } - - #[test] - fn absent_or_empty_does_not_trigger_the_fix() { - assert!(!parse_stats(None), "absent header must be false"); - assert!(!parse_stats(Some("")), "empty-present header must be false"); - } - - #[test] - fn falsey_strings_currently_parse_as_true_known_libdatadog_divergence() { - // KNOWN DIVERGENCE (current libdatadog rev db05e1f): `client_computed_stats` is parsed as - // `!value.is_empty()`, so falsey literals like "false"/"0" resolve to `true` instead of - // `false` (the Go trace-agent's `isHeaderTrue`/`ParseBool` semantics). - // - // This is latent in real traffic: tracers signal by sending a truthy header and omit it - // otherwise, so these rows don't occur in practice. Tracked by the libdatadog follow-up - // PR `DataDog/libdatadog#2071`. When that lands and we bump libdatadog, flip these to - // `assert!(!...)`. - assert!( - parse_stats(Some("false")), - "documenting: \"false\" currently parses as true (libdatadog#2071)" - ); - assert!( - parse_stats(Some("0")), - "documenting: \"0\" currently parses as true (libdatadog#2071)" - ); - } -} From d2b126dfac26ad2fde68006270d643ae2cbffaec Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 22:28:35 -0400 Subject: [PATCH 09/10] fix(traces): skip OTLP extension-side stats when tracer computed them MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The OTLP trace pipeline's stats-generation guard only checked compute_trace_stats_on_extension, so an OTLP request carrying Datadog-Client-Computed-Stats would still generate extension-side stats, double-counting against the tracer's client-side stats. Mirror the guard already used in send_processed_traces by also skipping when client_computed_stats is set. ๐Ÿค– Co-Authored-By: Claude Code --- bottlecap/src/otlp/agent.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/bottlecap/src/otlp/agent.rs b/bottlecap/src/otlp/agent.rs index ee648c7f2..9cb3f9024 100644 --- a/bottlecap/src/otlp/agent.rs +++ b/bottlecap/src/otlp/agent.rs @@ -58,6 +58,8 @@ impl TracePipeline { } let compute_trace_stats_on_extension = self.config.compute_trace_stats_on_extension; + // Capture before `tracer_header_tags` is moved into process_traces below. + let client_computed_stats = tracer_header_tags.client_computed_stats; let (send_data_builder, processed_traces) = self.trace_processor.process_traces( self.config.clone(), self.tags_provider.clone(), @@ -78,7 +80,10 @@ impl TracePipeline { // This needs to be after process_traces() because process_traces() // performs obfuscation, and we need to compute stats on the obfuscated traces. + // Skip extension-side stats generation when the tracer already computed stats + // client-side (Datadog-Client-Computed-Stats), to avoid double-counting. if compute_trace_stats_on_extension + && !client_computed_stats && let Err(err) = self.stats_generator.send(&processed_traces) { // Just log the error. We don't think trace stats are critical, so we don't want to From 79cd1aefa33b4da08b8a25f2686b226177d442ea Mon Sep 17 00:00:00 2001 From: Lucas Pimentel Date: Wed, 3 Jun 2026 22:36:38 -0400 Subject: [PATCH 10/10] refactor(traces): reuse COMPUTE_STATS_KEY constant in integration test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Promote COMPUTE_STATS_KEY from pub(crate) to pub so the apm_integration test crate can import it instead of re-declaring the "_dd.compute_stats" string literal, keeping a single source of truth for the tag key. ๐Ÿค– Co-Authored-By: Claude Code --- bottlecap/src/tags/lambda/tags.rs | 2 +- bottlecap/tests/apm_integration_test.rs | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/bottlecap/src/tags/lambda/tags.rs b/bottlecap/src/tags/lambda/tags.rs index cd6c4d69b..bd9663369 100644 --- a/bottlecap/src/tags/lambda/tags.rs +++ b/bottlecap/src/tags/lambda/tags.rs @@ -41,7 +41,7 @@ const SERVICE_KEY: &str = "service"; // ComputeStatsKey is the tag key indicating whether trace stats should be computed. // This is a per-span backend directive that is stamped on each span's meta by the trace // processor (see `trace_processor::ChunkProcessor`), NOT baked into the function tags here. -pub(crate) const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; +pub const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; // FunctionTagsKey is the tag key for a function's tags to be set on the top level tracepayload const FUNCTION_TAGS_KEY: &str = "_dd.tags.function"; // TODO(astuyve) decide what to do with the version diff --git a/bottlecap/tests/apm_integration_test.rs b/bottlecap/tests/apm_integration_test.rs index aea951ccd..b7213801f 100644 --- a/bottlecap/tests/apm_integration_test.rs +++ b/bottlecap/tests/apm_integration_test.rs @@ -19,6 +19,7 @@ use std::sync::Arc; use bottlecap::LAMBDA_RUNTIME_SLUG; use bottlecap::config::Config; +use bottlecap::tags::lambda::tags::COMPUTE_STATS_KEY; use bottlecap::tags::provider::Provider; use bottlecap::traces::http_client::create_client; use bottlecap::traces::stats_aggregator::StatsAggregator; @@ -283,8 +284,6 @@ async fn trace_payload_roundtrip_through_fake_intake() { // extension-side stats-generation guard. // --------------------------------------------------------------------------- -const COMPUTE_STATS_KEY: &str = "_dd.compute_stats"; - fn header_tags_with(client_computed_stats: bool) -> TracerHeaderTags<'static> { TracerHeaderTags { client_computed_stats,