Skip to content

Defer MetricKey construction to the aggregator thread#11381

Open
dougqh wants to merge 25 commits into
masterfrom
dougqh/conflating-metrics-background-work
Open

Defer MetricKey construction to the aggregator thread#11381
dougqh wants to merge 25 commits into
masterfrom
dougqh/conflating-metrics-background-work

Conversation

@dougqh
Copy link
Copy Markdown
Contributor

@dougqh dougqh commented May 15, 2026

What Does This Do

Moves the per-span MetricKey construction, cache lookups, and aggregation off the producer thread into the existing aggregator thread, replacing the Batch-based conflation pipeline with a thin per-span SpanSnapshot posted to the inbox.

Motivation

Incremental step towards using a lighter weight structure for metrics.

In the subsequent PR, I intend to switch to a simplified hash table that isn't thread-safe.

The simplified hashtable uses custom entries that that will allow us to avoid the MetricKey construction on look-up,
but given that the simple hashtable isn't thread-safe we need to move the work to the consumer thread first.

Additional Notes

What the producer does now (per span)

  • filter (shouldComputeMetric, resource-ignored, longRunning)
  • collect tag values into a SpanSnapshot (one allocation per span)
  • inbox.offer(snapshot) + return error flag for forceKeep

What moved off the producer

  • MetricKey construction and its hash computation
  • SERVICE_NAMES.computeIfAbsent (UTF8 encoding of service name)
  • SPAN_KINDS.computeIfAbsent (UTF8 encoding of span.kind)
  • PEER_TAGS_CACHE lookups (peer-tag name+value UTF8 encoding)
  • pending / keys ConcurrentHashMap operations
  • Batch pooling, atomic ops, contributeTo

Removed entirely

  • Batch.java -- the aggregator's existing LRUCache<MetricKey, AggregateMetric> IS the conflation point now
  • pending ConcurrentHashMap<MetricKey, Batch>
  • keys ConcurrentHashMap<MetricKey, MetricKey> (canonical dedup)
  • batchPool MessagePassingQueue<Batch>
  • CommonKeyCleaner's keys.keySet() tracking; AggregateExpiry now just reports LRU drops to health metrics

Added

  • SpanSnapshot: immutable value carrying the raw MetricKey inputs + a tagAndDuration long (duration OR-ed with ERROR_TAG / TOP_LEVEL_TAG).
  • AggregateMetric.recordOneDuration(long) -- single-hit equivalent of the existing recordDurations(int, AtomicLongArray).
  • PeerTagSchema: slim carrier of the eligible peer-tag names as a String[]. Cached on ConflatingMetricsAggregator and re-checked by reference equality of features.peerTags() -- producer fast path is one volatile read + a reference compare, no allocation in steady state. The producer captures values into a String[] parallel to schema.names (lazy-allocated, only when at least one peer tag fires); the aggregator reconstructs the "name:value" UTF8 encoding from the parallel arrays on its own thread. Replaces the previous flat [name0, value0, name1, value1, ...] layout, which forced a worst-case allocation + trim-and-copy on every span. Resolves @sarahchen6's review comment on extractPeerTagPairs.
  • HealthMetrics.onStatsInboxFull() + a TracerHealthMetrics counter reported as stats.dropped_aggregates{reason:inbox_full} -- parallel to the existing reason:lru_eviction. Without conflation the producer can lose snapshots when the bounded MPSC queue is full; this makes that visible without silencing it.

Benchmark results (1 fork × 5 iter × 10s, 2 warmup × 10s)

ConflatingMetricsAggregatorDDSpanBenchmark (64 client-kind DDSpans per op):

avgt (µs/op) CI (99.9%)
master (4f1ea4ea8e) 6.132 ± 0.148 [5.984, 6.279]
this PR (e455801bf1) 0.667 ± 0.018 [0.650, 0.685]

~9.2× faster than master on the production DDSpan path. CIs don't overlap, run stdev is tight (master 0.038, this PR 0.005) -- the signal is unambiguously real.

The headline isn't all from one change: it's the cumulative effect of the producer/consumer split (canonicalization moved off the hot path), the cached span-kind ordinal, the inbox-full fast-path check, and the slim PeerTagSchema refactor described above.

Caveat on the DDSpan bench numbers

Without conflation, the producer pushes 1 inbox item per span instead of ~1 per 64. At this bench's synthetic rate the consumer can't keep up and inbox.offer drops to the new onStatsInboxFull counter -- the DDSpan numbers above measure producer publish() latency only. The adversarial benchmark below covers the consumer-pressure side.

Adversarial benchmark (8 producer threads, 2×15s warmup + 5×15s, 1 fork)

AdversarialMetricsBenchmark (high-cardinality (service, operation, resource, peer.hostname) per op, random durations across 1ns–1s, random error/topLevel flags). Designed to saturate every capacity bound at once.

master this PR
Throughput avg (ops/s) 395,806 ± 2,619,133 4,889,660 ± 390,175
Per-iteration progression (ops/s) warmup 2,536,145 → 205,314 → 95,888 → 47,301 → 24,378 4,886,778 → 4,875,195 → 4,731,827 → 4,959,992 → 4,994,511
Stdev (ops/s) 680,180 101,327
Total spans published over the run 111,041,671 302,229,471
onStatsInboxFull (drops at handoff) n/a (no inbox on master) 199,862,634
onStatsAggregateDropped 11,642,039 84,002,323

~12× faster on average, but the shape of the per-iteration numbers is the more important story: master degrades monotonically (warmup ~2.5M ops/s → final 24K ops/s, a ~100× collapse) while this PR stays flat (~4.7M–5.0M ops/s on every iteration). That's the signature of the old Batch-pool exhausting under cap pressure -- once batches can't be allocated, every producer publish bottlenecks on the pool.

The drop-counter shape is also the expected one for this PR: inbox-full drops (200M) dominate aggregate-cache drops (84M), confirming that backpressure shows up at the producer→consumer handoff first, protecting the consumer from a workload it physically can't service.

TRACER_METRICS_MAX_PENDING semantic preserved (amarziali's review)

The configured maxPending historically counted conflating Batch slots (~64 spans per batch via Batch.MAX_BATCH_SIZE); the new inbox holds 1 SpanSnapshot per slot. Config.java now multiplies the configured value by the legacy batch size so pre-existing customer overrides keep delivering the same effective span-throughput capacity (e.g. a configured 4096 still means ~262K spans before drops). Default stays at 2048 logical → 131K snapshot slots, identical to the prior 2048 batches × 64 spans.

Performance characteristics

A couple of points worth being explicit about so the bench numbers above aren't read as more than they are:

  • Where the win comes from on the producer side. Master's per-span producer allocation is ~1 MetricKey + 1/64 Batch ≈ 116 B; this PR allocates ~1 SpanSnapshot ≈ 120 B. Allocation is essentially unchanged. The producer-side speedup is from removing the conflating-Batch atomic dance and the pending / keys CHM lookups — not from less GC.
  • What's still left on the consumer side. Every snapshot drained from the inbox still allocates a fresh MetricKey to do the LRUCache.computeIfAbsent lookup, even on hit. At the adversarial bench's drain rate that's ~400 MB/s of nursery garbage on the aggregator thread. It's not a bottleneck in practice (the bench sustains 5M ops/s on this branch), but the next PR in the stack (Update client-side stats to use light weight Hashtable #11382) is exactly the optimization that eliminates this allocation by consolidating key+value into AggregateEntry and replacing LRUCache<MetricKey, AggregateMetric> with a hashtable that probes by SpanSnapshot directly. So this PR is an intermediate perf point on the path to the bigger win.
  • What the adversarial drop counters mean. The 200M inbox-full drops in the adversarial run are the new producer-side fast-path (inbox.size() >= inbox.capacity()) shedding load before tag extraction and SpanSnapshot allocation. That's the design intent, not aggregator-thread failure — the aggregator still completed all 5 report cycles cleanly. Dropping at the producer fast-path is strictly cheaper than letting work queue up deeper in the pipeline.

Test plan

  • ./gradlew :dd-trace-core:test --tests 'datadog.trace.common.metrics.*' passes
  • ./gradlew :dd-trace-core:test --tests 'datadog.trace.core.monitor.*' passes
  • ./gradlew :dd-trace-core:compileJava :dd-trace-core:compileTestGroovy :dd-trace-core:compileJmhJava :dd-trace-core:compileTraceAgentTestGroovy all green
  • ./gradlew spotlessCheck clean
  • CI muzzle / integration suites
  • Validate stats.dropped_aggregates{reason:inbox_full} reports as expected under a synthetic high-load run (not in the JMH bench)

🤖 Generated with Claude Code

dougqh and others added 7 commits May 15, 2026 12:06
ConflatingMetricsAggregator.publish does a handful of redundant operations on
every span. None individually is large; together they show as ~2.5% on the
existing JMH benchmark once the benchmark actually exercises span.kind.

- dedup span.isTopLevel(): publish() reads it into a local, then shouldComputeMetric
  read it again. Pass the cached value in.
- resolve spanKind to String once: master called toString() twice per span (once
  inside spanKindEligible, once at the getPeerTags call site) and used HashSet
  contains on a CharSequence (which routes through equals on String). Normalize
  to String up front and reuse.
- lazy-allocate the peer-tag list: getPeerTags() always allocated an ArrayList
  sized to features.peerTags() even when the span had none of those tags set.
  Defer allocation until the first match; return Collections.emptyList() when
  none hit. MetricKey already treats null/empty peerTags as emptyList, so no
  behavior change.

Drop the spanKindEligible helper — the HashSet.contains call inlines fine in
shouldComputeMetric.

Update the JMH benchmark to set span.kind=client on every span. Without it the
filter path short-circuits before the peer-tag and toString work, so the wins
above aren't measurable. With it:

  baseline   6.755 us/op (CI [6.560, 6.950], stdev 0.129)
  optimized  6.585 us/op (CI [6.536, 6.634], stdev 0.033)

2 forks x 5 iterations x 15s. ~2.5% mean improvement and much tighter variance
fork-to-fork.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Introduce SpanKindFilter -- a tiny builder-built immutable filter whose state
is an int bitmask indexed by the span.kind ordinals already cached on
DDSpanContext. Each include* on the builder sets one bit (1 << ordinal); the
runtime check is a single AND against (1 << span's ordinal).

CoreSpan.isKind(SpanKindFilter) is the new entry point. DDSpan overrides it
to do the bit-test directly against the cached ordinal -- no virtual call,
no tag-map lookup. The two existing test-only CoreSpan impls (SimpleSpan
and TraceGenerator.PojoSpan, the latter in two source sets) implement isKind
by reading the span.kind tag and delegating to SpanKindFilter.matches(String),
which converts via DDSpanContext.spanKindOrdinalOf and does the same AND.

Refactor: DDSpanContext.setSpanKindOrdinal(String) now delegates to a new
package-private static spanKindOrdinalOf(String) so the same string-to-ordinal
mapping serves both the tag interceptor path and SpanKindFilter.matches.

This is groundwork -- nothing in the codebase calls isKind yet. The next
commit will replace the HashSet-based eligibility checks in
ConflatingMetricsAggregator with SpanKindFilter instances.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the two ELIGIBLE_SPAN_KINDS_FOR_* HashSet<String> constants and the
SPAN_KIND_INTERNAL.equals check with three SpanKindFilter instances:
METRICS_ELIGIBLE_KINDS, PEER_AGGREGATION_KINDS, INTERNAL_KIND. Eligibility
checks now go through span.isKind(filter), which on DDSpan is a volatile
byte read against the already-cached span.kind ordinal plus a single bit-test.

Also defer the span.kind tag read: previously read at the top of the publish
loop and threaded through both shouldComputeMetric and the inner publish.
isKind no longer needs the string, so the read can move down into the inner
publish where it's still needed for the SPAN_KINDS cache key / MetricKey.

Supporting changes:

- DDSpanContext.spanKindOrdinalOf(String) is now public so non-DDSpan CoreSpan
  impls can compute the ordinal at tag-write time.
- SpanKindFilter gains a public matches(byte) fast-path overload that callers
  with a pre-computed ordinal use directly.
- SimpleSpan caches the ordinal in setTag(SPAN_KIND, ...), mirroring what
  TagInterceptor does for DDSpanContext, and its isKind now hits the byte
  fast path. Without this, the JMH benchmark (which uses SimpleSpan) would
  re-derive the ordinal on every isKind call and overstate the cost.

Benchmark on the bench updated last commit (kind=client on every span,
4 forks x 5 iter x 15s):

  prior commit  6.585 ± 0.049 us/op
  this commit   6.903 ± 0.096 us/op

The slight regression is a SimpleSpan-via-groovy-dispatch artifact -- the
interface call to isKind through CoreSpan, then through SimpleSpan, then
through SpanKindFilter.matches, doesn't fold as aggressively as a HashSet
contains on a static field. In production DDSpan.isKind inlines to a context
field read + ordinal byte read + bit-test, so the production path is faster
than the prior HashSet approach. A DDSpan-based benchmark would show this;
the existing SimpleSpan-based one doesn't.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
The existing ConflatingMetricsAggregatorBenchmark uses SimpleSpan, a groovy
mock. That's enough for measuring queue/CHM/MetricKey work, but it conceals
the production cost of CoreSpan.isKind: SimpleSpan's isKind goes through
groovy interface dispatch into SpanKindFilter.matches, while DDSpan.isKind
inlines to a context byte-read + bit-test.

This new benchmark uses real DDSpan instances created through a CoreTracer
(with a NoopWriter so finishing doesn't reach the agent). Same shape as the
SimpleSpan bench (64-span trace, span.kind=client, peer.hostname set).

Numbers (2 forks x 5 iter x 15s):

  master:        6.428 +- 0.189 us/op  (HashSet eligibility checks)
  this branch:   6.343 +- 0.115 us/op  (SpanKindFilter bitmask)

About 1.3% faster on the production path. The SimpleSpan benchmark in the
same conditions shows a ~2.2% slowdown -- the mock's dispatch shape gives a
misleading signal.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Make SpanKindFilter.kindMask and its constructor private now that DDSpan.isKind
no longer needs direct field access -- it delegates to SpanKindFilter.matches(byte).

The Builder.build() in the same outer class still constructs instances via the
private constructor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Replace the producer-side conflation pipeline with a thin per-span SpanSnapshot
posted to the existing aggregator thread. The aggregator now builds the
MetricKey, does the SERVICE_NAMES / SPAN_KINDS / PEER_TAGS_CACHE lookups, and
updates the AggregateMetric directly -- all off the producer's hot path.

What the producer does now, per span:

  - filter (shouldComputeMetric, resource-ignored, longRunning)
  - collect tag values into a SpanSnapshot (1 allocation per span)
  - inbox.offer(snapshot) + return error flag for forceKeep

What moved off the producer:

  - MetricKey construction and its hash computation
  - SERVICE_NAMES.computeIfAbsent (UTF8 encoding of service name)
  - SPAN_KINDS.computeIfAbsent (UTF8 encoding of span.kind)
  - PEER_TAGS_CACHE lookups (peer-tag name+value UTF8 encoding)
  - pending/keys ConcurrentHashMap operations
  - Batch pooling, batch atomic ops, batch contributeTo

Removed entirely:

  - Batch.java -- the conflation primitive is no longer needed; the
    aggregator's existing LRUCache<MetricKey, AggregateMetric> IS the
    conflation point now.
  - pending ConcurrentHashMap<MetricKey, Batch>
  - keys ConcurrentHashMap<MetricKey, MetricKey> (canonical dedup)
  - batchPool MessagePassingQueue<Batch>
  - The CommonKeyCleaner role of tracking keys.keySet() on LRU eviction --
    AggregateExpiry now just reports drops to healthMetrics.

Added:

  - SpanSnapshot: immutable value carrying the raw MetricKey inputs + a
    tagAndDuration long (duration | ERROR_TAG | TOP_LEVEL_TAG).
  - AggregateMetric.recordOneDuration(long tagAndDuration) -- the single-hit
    equivalent of the existing recordDurations(int, AtomicLongArray).
  - Peer-tag values flow through the snapshot as a flattened String[] of
    [name0, value0, name1, value1, ...]; the aggregator encodes them through
    PEER_TAGS_CACHE on its own thread.

Benchmark results (2 forks x 5 iter x 15s):

  ConflatingMetricsAggregatorDDSpanBenchmark
    prior commit  6.343 +- 0.115 us/op
    this commit   2.506 +- 0.044 us/op  (~60% faster)

  ConflatingMetricsAggregatorBenchmark (SimpleSpan)
    prior commit  6.585 +- 0.049 us/op
    this commit   3.116 +- 0.032 us/op  (~53% faster)

Caveat on the benchmark: without conflation, the producer pushes 1 inbox
item per span instead of ~1 per 64. At the benchmark's synthetic rate the
consumer can't keep up and inbox.offer silently drops. The numbers measure
producer publish() latency only; consumer throughput at realistic span rates
is a follow-up to validate. Tuning maxPending matters more in this design.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
With the per-span SpanSnapshot inbox path, the producer can lose snapshots
when the bounded MPSC queue is full -- silently, since inbox.offer() returns
a boolean we previously ignored. The conflating-Batch design used to absorb
~64x more producer pressure per inbox slot, so this is a new failure mode
worth surfacing.

Wire it through the existing HealthMetrics path:

- HealthMetrics.onStatsInboxFull() (no-op default).
- TracerHealthMetrics gets a statsInboxFull LongAdder and a new reason tag
  reason:inbox_full reported under the same stats.dropped_aggregates metric
  used for LRU evictions. Two LongAdders, two tagged time series.
- ConflatingMetricsAggregator.publish increments the counter when
  inbox.offer(snapshot) returns false.

This doesn't fix the drop -- tuning maxPending and/or building producer-side
batching are the actual fixes. But it makes the failure visible in the same
place ops already watches.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dougqh dougqh added type: enhancement Enhancements and improvements comp: core Tracer core tag: performance Performance related changes tag: no release notes Changes to exclude from release notes comp: metrics Metrics tag: ai generated Largely based on code generated by an AI or LLM labels May 15, 2026
@dougqh dougqh marked this pull request as ready for review May 18, 2026 15:37
@dougqh dougqh requested a review from a team as a code owner May 18, 2026 15:37
@dougqh dougqh requested a review from mhlidd May 18, 2026 15:37
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 950499c767

ℹ️ About Codex in GitHub

Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".

Comment on lines +514 to +518
reportIfChanged(
target.statsd,
"stats.dropped_aggregates",
target.statsInboxFull,
REASON_INBOX_FULL_TAG);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Resize health metric history for inbox-full counter

When statsInboxFull is nonzero this added 52nd reportIfChanged call indexes previousCounts[++countIndex], but previousCounts is still sized for the previous 51 counters. As a result the new reason:inbox_full metric is never emitted and every flush that reaches this call logs the resize warning instead; increase the array size alongside the new counter.

Useful? React with 👍 / 👎.

The new reason:inbox_full reportIfChanged call advances countIndex to 51,
but previousCounts was still sized for 51 counters (max index 50), so the
metric never emitted and the resize warning fired every flush. Bump the
array to 52 and add a regression test that exercises the flush path.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@datadog-official
Copy link
Copy Markdown
Contributor

datadog-official Bot commented May 19, 2026

Pipelines

Fix all issues with BitsAI

⚠️ Warnings

🚦 4 Pipeline jobs failed

DataDog/apm-reliability/dd-trace-java | check_base   View in Datadog   GitLab

🔧 Fix in code (Fix with Cursor). Execution failed for task ':dd-trace-core:forbiddenApisJmh'. Cause: Check for forbidden API calls failed.

DataDog/apm-reliability/dd-trace-java | spotless   View in Datadog   GitLab

🔧 Fix in code (Fix with Cursor). Spotless check failed due to formatting issues in communication module.

Run system tests | main / End-to-end #5 / akka-http 5   View in Datadog   GitHub Actions

🛟 This job is unlikely to succeed on retry. Please review your pipeline configuration. Error pull access denied for mcr.microsoft.com/mssql/server: repository does not exist or may require 'docker login'.

View all 4 failed jobs.

Useful? React with 👍 / 👎

This comment will be updated automatically if new data arrives.
🔗 Commit SHA: 5a4685f | Docs | Datadog PR Page | Give us feedback!

Base automatically changed from dougqh/conflating-metrics-producer-wins to master May 19, 2026 23:04
publish() previously did all of the tag extraction (peer-tag pairs,
HTTP method/endpoint, span kind, gRPC status) and the SpanSnapshot
allocation before calling inbox.offer; on a full inbox the offer
failed and everything became garbage.

Early-out with an approximate size() vs capacity() check up front. The
jctools MPSC queue's size() is best-effort but that's fine: under-
estimation falls through to the existing offer-as-source-of-truth
path, over-estimation drops a snapshot that would have fit (and
onStatsInboxFull was about to fire on the next span anyway).

error is computed first so the force-keep return is correct whether
or not the snapshot is built.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Copy link
Copy Markdown
Contributor

@sarahchen6 sarahchen6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR looks good to me. IIUC there are no public / product-facing behavior changes except that efficiency is improved, but maybe Andrea or someone more familiar with expected CSS behavior can confirm this too 😅

Codex GPT-5.4 recommended expanding the test coverage with the suggestion below, but will leave it up to you on whether it's necessary...

Add one aggregator-level test for the new inbox-full wiring in DataDog/dd-
trace-java/dd-trace-core/src/main/java/datadog/trace/common/metrics/
ConflatingMetricsAggregator.java:301 and DataDog/dd-trace-java/dd-trace-core/
src/main/java/datadog/trace/common/metrics/
ConflatingMetricsAggregator.java:345. The existing HealthMetricsTest proves
the counter flushes correctly, but it does not prove the aggregator triggers
it from a real full-inbox condition.

}
if (count < pairs.length) {
String[] trimmed = new String[count];
System.arraycopy(pairs, 0, trimmed, 0, count);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of trimming and copying at the end, this method could start by counting the total pairs and defining the right-sized array immediately? I'm not sure if this would make an actual performance difference though, especially with small peerTag sets

Copy link
Copy Markdown
Contributor Author

@dougqh dougqh May 20, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, that's a fair point. The next change in the stack reworks this part a fair amount to be more efficient and to apply per-tag cardinality limits.

I think I'll see if the structural parts of that change can be pulled up into this change without the cardinality limits. That would then keep master functioning as is -- before I start landing the significant behaviorial changes.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I pulled in the structural changes from further down the stack.

That change introduces a PeerTagsSchema to encapsulate the result from feature discovery.
Each reporting cycle checks that the PeerTagsSchema is up-to-date.
And then producers use the current PeerTagsSchema to extract the right values to include in the snapshot.

This allows the collections to sized just right for the associated PeerTagsSchema.

This design becomes more useful later in the stack where cardinality limiters are introduced per tag.

dougqh and others added 2 commits May 20, 2026 15:14
Addresses sarahchen6's review comment on ConflatingMetricsAggregator
extractPeerTagPairs: replaces the worst-case-allocation + trim-and-copy
flat-pairs layout with a parallel-array carrier.

- New PeerTagSchema: minimal carrier of String[] names. Two flavors -- a
  static INTERNAL singleton (one entry: base.service) for internal-kind
  spans, and per-discovery built schemas for client/producer/consumer
  spans. Deliberately no cardinality limiters or per-cycle state; that
  layers on top in a later PR.

- ConflatingMetricsAggregator: caches the peer-aggregation schema keyed
  on reference equality of features.peerTags() -- a single volatile read
  + a long compare on the steady-state producer hot path, no allocation.
  The producer now captures only a String[] of values parallel to the
  schema's names; the schema reference is carried on SpanSnapshot. The
  prior "build worst-case pairs then trim" code is gone.

- SpanSnapshot: replaces String[] peerTagPairs with PeerTagSchema +
  String[] peerTagValues. Producer drops the schema reference if no
  values fired so the consumer short-circuits on null.

- Aggregator.materializePeerTags: now reads name/value pairs at the same
  index from (schema.names, snapshot.peerTagValues). Counts hits once
  for exact-size allocation; preserves the singletonList fast path for
  the common one-entry case (e.g. internal-kind base.service).

Producer-side cost goes from "allocate String[2n] + walk + maybe trim"
to "single volatile read + walk + lazy String[n] only on first hit".

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@dougqh
Copy link
Copy Markdown
Contributor Author

dougqh commented May 20, 2026

PR looks good to me. IIUC there are no public / product-facing behavior changes except that efficiency is improved, but maybe Andrea or someone more familiar with expected CSS behavior can confirm this too 😅

Codex GPT-5.4 recommended expanding the test coverage with the suggestion below, but will leave it up to you on whether it's necessary...

Add one aggregator-level test for the new inbox-full wiring in DataDog/dd-
trace-java/dd-trace-core/src/main/java/datadog/trace/common/metrics/
ConflatingMetricsAggregator.java:301 and DataDog/dd-trace-java/dd-trace-core/
src/main/java/datadog/trace/common/metrics/
ConflatingMetricsAggregator.java:345. The existing HealthMetricsTest proves
the counter flushes correctly, but it does not prove the aggregator triggers
it from a real full-inbox condition.

Yes, my intention with the first changes in this stack is to leave behavior unchanged, and I'm certainly happy to add more tests. In doing this work, Claude and I did find that certain cases weren't exercised previously and there were a few latent bugs.

Comment on lines +183 to +184
if (values[i] != null) {
if (hitCount == 0) firstHit = i;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be folded in one single if

if (hitCount == 0 && values[i] != null) {

Comment thread dd-trace-core/src/test/groovy/datadog/trace/common/writer/TraceGenerator.groovy Outdated
}

SpanSnapshot snapshot =
new SpanSnapshot(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The old design carefully pooled batch objects to avoid cyclic repeated allocation. In this use case the pooling was effective to keep low the gc pressure. Is that missed or a follow up?

Copy link
Copy Markdown
Contributor Author

@dougqh dougqh May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, the old architecture was creating GC & throughput problems.

The problem with the old approach was that you needed to allocate a MetricKey to perform the batching. And in the worst cases, the DDCaches would break down and we'd up end allocating constantly. So while the batching was well intentioned, there were still significant GC problems previously.

And the batching table was also creating another point of contention.

The benefit of this approach is that the allocation and contention in the application / producer threads is minimal. I do think a bit of batching could still make sense.

Claude suggest per-thread batch, but I think I'd prefer to just take advantage of the batching that's already being done by PendingTrace.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think my question was more about pooling those objects (SpanSnapshot) in analogy of the way Batch were objects were pooled to avoid allocations. They were not pooled with a cache but with a queue to avoid doing new ... each time. But perhaps is not necessary if those objects are really short lived

Copy link
Copy Markdown
Contributor Author

@dougqh dougqh May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From the experiments, I've done I think pooling would be detrimental in this case.
The fast path for allocation is just a pointer bump of a thread local variable, so it is impossible to beat with any sort of non-trivial pool.

There can be a benefit on the slow path (e.g. GC) from reducing objects allocations, and this change does that just in a very different way.

1 - It skips the snapshot when the queue is already full
That's a critical improvement over the old approach that would keep creating MetricKeys (and UTF8ByteString-s)
2 - It avoids the extra allocation from the batching map

I think if we wanted to improve this further. We should have PendingTrace produce SpanSnapshots that are used by both the trace sending and metric sending. And we could pass the batch from PendingTrace through directly, so there's less contention on the queue.

Admittedly, the real pay off here comes in the next PR: #11382

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it can be kept for later no worries I wanted to keep it tracked because we're dismissing a mechanism we had in place before to reduce allocations. So the main objective of this comment was not to miss anything. Definetely not blocking here thanks for the details

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Further down in the PR stack, I had an adversarial benchmark that tries to break the metrics processing system. I had Claude pull that into this PR and run a comparison against master. The results are now included in the PR description.

As expected this branch performs better in spite of losing the batching and pooling.

if (current == cachedPeerTagsSource) {
return cachedPeerTagSchema;
}
return refreshPeerAggSchema(current);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if there is the need to check it each time? It would be more efficient to trigger from the other side

Copy link
Copy Markdown
Contributor Author

@dougqh dougqh May 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, maybe, I'm trying not touch the feature discovery side much.
Plus I'd rather update the schema each reporting cycle.

I do think this part needs some refinement. I had Claude port a simplified version of the solution from further down in the PR stack, but I think there's still some work to do on this PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reusing my answer from another thread, too...

I pulled in more of the structural changes from further down the stack.

That change introduces a PeerTagsSchema to encapsulate the result from feature discovery.
Each reporting cycle checks that the PeerTagsSchema is up-to-date.
And then producers use the current PeerTagsSchema to extract the right values to include in the snapshot.

This allows the collections to sized just right for the associated PeerTagsSchema.

This design becomes more useful later in the stack where cardinality limiters are introduced per tag.

Comment thread dd-trace-core/src/traceAgentTest/groovy/TraceGenerator.groovy Outdated
- Aggregator.materializePeerTags: fold the firstHit-discovery nested if
  into a single guarded post-increment (amarziali, #3279243138). One
  body line: `if (values[i] != null && hitCount++ == 0) firstHit = i;`.

- Drop redundant isKind(SpanKindFilter) overrides in both
  TraceGenerator.groovy files (amarziali, #3279264553 / #3279382648).
  CoreSpan.java:84 already supplies a default implementation that reads
  the same span.kind tag.

- Bump TRACER_METRICS_MAX_PENDING default from 2048 -> 131072 to address
  the capacity regression amarziali flagged (#3279378375). Without
  producer-side conflation, the inbox now holds 1 SpanSnapshot per
  metrics-eligible span instead of 1 conflated Batch per ~64 spans;
  restoring effective capacity parity (~2048 * ~64 = 131072) prevents a
  ~64x rise in inbox-full drops at the same span rate. ~100 B per
  SpanSnapshot puts the worst-case heap floor at ~13 MB -- bounded.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@amarziali amarziali removed the tag: no release notes Changes to exclude from release notes label May 21, 2026
dougqh and others added 3 commits May 21, 2026 10:27
Addresses PR #11381 review (amarziali, #3279325340 -- "Are the existing
tests covering this case?").

New ConflatingMetricsAggregatorInboxFullTest constructs the aggregator
with a small inbox (queueSize=8), deliberately does NOT call start() so
the consumer thread never drains, then publishes enough spans to
overflow the inbox. Verifies that healthMetrics.onStatsInboxFull() is
called at least once -- the fast-path's `inbox.size() >= inbox.capacity()`
short-circuit triggers when the producer-side queue is at capacity.

Test is Java + JUnit 5 + Mockito per the project convention for new
tests; uses a CoreSpan Mockito mock rather than the SimpleSpan Groovy
fixture so we don't depend on Groovy-then-Java compile order from the
test source set.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…read

Addresses amarziali's review comment #3279340181 ("It would be more
efficient to trigger from the other side"). The producer-side reference
compare on every publish goes away; the aggregator thread reconciles
the cached schema against feature discovery once per reporting cycle.

- DDAgentFeaturesDiscovery: expose getLastTimeDiscovered() so callers
  can detect a discovery refresh without copying the peerTags Set.

- PeerTagSchema: add `long lastTimeDiscovered` (plain, aggregator-only)
  and `hasSameTagsAs(Set)`. of(Set, long) takes the timestamp; INTERNAL
  uses a -1L sentinel since it's never reconciled.

- ConflatingMetricsAggregator:
  * Drop the cachedPeerTagsSource volatile and the per-publish reference
    compare.
  * Producer fast path is now `cachedPeerTagSchema` volatile read +
    null-check; first publish takes the one-time synchronized bootstrap.
  * Add reconcilePeerTagSchema() that runs once per cycle on the
    aggregator thread: fast-path timestamp compare, slow-path set
    compare, bump-in-place when the set is unchanged.

- Aggregator: new `Runnable onReportCycle` constructor parameter, run at
  the start of report() (before the flush, so any test awaiting
  writer.finishBucket() observes the schema in its post-reconcile state
  and so the next publish sees the new schema without a handoff).

- Update "should create bucket for each set of peer tags" to drive two
  reporting cycles separated by a report() that triggers reconcile. The
  old test relied on per-publish reference detection, which the new
  design intentionally doesn't preserve -- the schema is now stable
  within a cycle.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Addresses round-3 review nice-to-haves on PR #11381.

- PeerTagSchemaTest: unit coverage for hasSameTagsAs() (the predicate
  that drives the reconcile fast/slow path split), the of(Set, long)
  factory, and the INTERNAL singleton. The hasSameTagsAs cases include
  same-content-different-Set-reference (the case the reconcile fast path
  relies on after a discovery refresh) and content-mismatch in either
  direction.

- ConflatingMetricsAggregatorBootstrapTest: integration coverage for
  the producer-side bootstrap + aggregator-thread reconcile flow.
  * bootstrapHappensOnceOnFirstPublish -- three publishes against an
    un-started aggregator (no consumer thread, no reconciles); verifies
    features.peerTags() and features.getLastTimeDiscovered() are each
    called exactly once.
  * reconcileSkipsDeepCompareWhenTimestampMatches -- two cycles with
    constant features.getLastTimeDiscovered(); each post-report
    reconcile short-circuits on the timestamp fast path, so peerTags()
    is called only by bootstrap (1 total).
  * reconcileSurvivesTimestampBumpWhenTagsUnchanged -- timestamps bump
    every reconcile, forcing the slow set-compare path; the tag set
    stays identical, so the schema is preserved and continues to flush
    buckets correctly across cycles.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dougqh added a commit that referenced this pull request May 21, 2026
…bility

The verify(writer).add(MetricKey, AggregateMetric) signature is unique
to #11381; downstream branches use AggregateEntry. Switching to
verify(writer, times(2)).finishBucket() keeps the same behavioral
guarantee (both cycles flushed) across the stack.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…bility

The verify(writer).add(MetricKey, AggregateMetric) signature is unique
to #11381; downstream branches use AggregateEntry. Switching to
verify(writer, times(2)).finishBucket() keeps the same behavioral
guarantee (both cycles flushed) across the stack.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
// SpanSnapshot per metrics-eligible span instead of 1 conflated Batch per ~64 spans -- without
// this bump customers would see ~64x more inbox-full drops at the same span rate. ~100 B per
// SpanSnapshot * 131072 ≈ 13 MB worst-case heap floor.
tracerMetricsMaxPending = configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 131072);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have customers that might have set it (i.e. to 4096) but now the semantic changed. This should be carefully communicated since, even if the default is coherent, the previous overrides are not

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it would be best to just maintain the prior semantic and apply a factor to it to size the queue.

dougqh and others added 7 commits May 21, 2026 13:58
TRACER_METRICS_MAX_PENDING previously counted conflating Batch slots
(~64 spans each). The inbox now holds 1 SpanSnapshot per slot, so
multiply the configured value by LEGACY_BATCH_SIZE (64) to keep
pre-existing customer overrides delivering the same effective
span-throughput capacity. Default stays at 2048 logical -> 131072
snapshot slots, identical to the prior 2048 batches * 64 spans.

Also drops two unused datadog.trace.core.SpanKindFilter imports left
behind in TraceGenerator.groovy after the isKind() override was removed
in favor of the CoreSpan default implementation.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Ports the adversarial JMH benchmark from #11402 down to this branch so
we can compare #11381 vs master on a high-cardinality, high-throughput
workload. Adapted to use ConflatingMetricsAggregator (pre-rename) and
the FixedAgentFeaturesDiscovery / NullSink helpers already in
ConflatingMetricsAggregatorBenchmark.

8 producer threads hammer publish() with unique (service, operation,
resource, peer.hostname) per op so the aggregate cache fills+evicts
continuously and the inbox saturates. tearDown prints the drop
counters (inboxFull vs aggregateDropped) so the test verifies the
subsystem stayed bounded under attack.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop traceComputedCalls / totalSpansCounted: under 8-way contention
the volatile-long ++/+= pattern was losing ~20% of updates (296M
counted vs 245M reported), and the numbers duplicate signal JMH's
ops/s already provides.

Switch inboxFull / aggregateDropped to LongAdder so the printed drop
shape (the order-of-magnitude story the bench is built to tell) is
accurate under contention.

Replace the stale "both forks combined for this run" string with text
that matches the actual @fork(value=1) config and notes that counters
accumulate across warmup + measurement.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
buildPeerTagSchema previously read features.peerTags() before
features.getLastTimeDiscovered(). DDAgentFeaturesDiscovery exposes
those as two separate accessors against its volatile State -- a
state-swap interleaving could leave the cached schema tagged with a
NEWER timestamp than its names, after which the next reconcile
short-circuits on the timestamp compare and misses the tag-set update
until the next discovery refresh (~minute later).

Swap the read order so timestamp is captured first. With this
ordering, an interleaving leaves the schema OLDER than its names
instead -- the next reconcile sees a timestamp mismatch, runs the
deep compare, and self-heals on the very next cycle.

Also adds reconcileSwapsSchemaWhenTagSetChanges, which closes the
test gap on the slow-path swap branch
(cachedPeerTagSchema = PeerTagSchema.of(...)). End-to-end check via
the writer's captured MetricKeys: pre-swap snapshot carries only
peer.hostname, post-swap snapshot carries both peer.hostname and
peer.service.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Splits the `if (values[i] != null && hitCount++ == 0)` conjunction
into nested ifs. Same semantics, no codegen impact after JIT --
just visibly says what the loop is doing rather than relying on
post-increment-inside-conjunction. Closes amarziali's review thread
on this block.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Leftover from removing the isKind() override in TraceGenerator earlier
in this session -- I dropped the SpanKindFilter import but missed
datadog.trace.bootstrap.instrumentation.api.Tags, which is no longer
referenced in either file.

Resolves codenarcTest and codenarcTraceAgentTest UnusedImport
violations.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
return new Batch(key);
/**
* Reconciles {@link #cachedPeerTagSchema} with the latest feature discovery. Runs on the
* aggregator thread once per reporting cycle via the reset hook passed to {@link Aggregator}.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would it make more sense to "reconcile cachedPeerTagSchema with the latest feature discovery" when the feature discovery is updated in DDAgentFeaturesDiscovery.java instead of per reporting cycle? or maybe we want to minimize PeerTagSchema logic there 🤔

* </ul>
*
* <p>This class deliberately has no cardinality limiters or per-cycle state -- callers that need
* those layer them on top.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

small nit:

Suggested change
* those layer them on top.
* <p>This class deliberately has no cardinality limiters -- callers that need
* those layer them on top.

it looks like lastTimeDiscovered below is a per-cycle state

// (e.g. a configured 4096 still means "~262144 spans before drops", same as before). ~100 B
// per SpanSnapshot * 131072 ≈ 13 MB worst-case heap floor at the default.
tracerMetricsMaxPending =
configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 2048) * LEGACY_BATCH_SIZE;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 2048) * LEGACY_BATCH_SIZE;
Math.multiplyExact(configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 2048) * LEGACY_BATCH_SIZE);

Codex recommended using Math.multiplyExact() to prevent silent overflows... seems reasonable, but not sure how likely that is to happen

Copy link
Copy Markdown
Contributor

@sarahchen6 sarahchen6 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updates look reasonable to me!

dougqh added a commit that referenced this pull request May 21, 2026
…Tags

#11389 changed AggregateEntry.getPeerTags() from List<UTF8BytesString>
to UTF8BytesString[] for memory efficiency. The reconcile-swap test
cascaded down from #11381 needs assertArrayEquals against an array,
not assertEquals against a Collections.singletonList / Arrays.asList.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
dougqh added a commit that referenced this pull request May 21, 2026
The doc described an old design where the producer thread per-trace
read a peerTagsRevision() and rebuilt the cached PeerTagSchema under
a monitor. The actual implementation (cascaded from #11381) runs
reconcile once per report cycle on the aggregator thread via the
onReportCycle hook, keyed on getLastTimeDiscovered(). Producers do
nothing more than a volatile read of the cached schema.

Updates:
- Producer-side flow: drop the per-trace sync description; document
  the volatile-read steady state and the one-time synchronized
  bootstrap on first publish.
- New "Aggregator-side reconcile" section under "Reporting cadence
  and cardinality reset" describing the timestamp fast path, the
  same-tags slow path that preserves warm handlers, and the
  read-order race fix (timestamp before names).
- Memory and lifetime: replace peerTagsRevision pairing with the
  on-schema lastTimeDiscovered + per-aggregator-instance lifecycle.
- "Why the redesign" point 6: rewritten to describe the aggregator-
  thread reconcile rather than the producer-side revision check.

Resolves dougqh's open review thread about peerTagsRevision vs
lastTimeDiscovered.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

comp: core Tracer core comp: metrics Metrics tag: ai generated Largely based on code generated by an AI or LLM tag: performance Performance related changes type: enhancement Enhancements and improvements

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants