diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java new file mode 100644 index 00000000000..634dea23358 --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/AdversarialMetricsBenchmark.java @@ -0,0 +1,153 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.api.WellKnownTags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.monitor.HealthMetrics; +import de.thetaphi.forbiddenapis.SuppressForbidden; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.LongAdder; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Adversarial JMH benchmark designed to stress the metrics subsystem's capacity bounds. + * + *

The metrics aggregator is bounded at every layer: + * + *

+ * + *

The benchmark hammers all of these simultaneously with 8 producer threads, unique labels per + * op (so the aggregate cache fills+evicts repeatedly), random durations across a wide range (so + * histograms accept many distinct bins), and random {@code error}/{@code topLevel} flags (so both + * histograms are exercised). After the run, drop counters are printed so you can see how the + * subsystem absorbed the burst. + * + *

What "OOM the metrics subsystem" would look like if the bounds break: producer-thread + * allocation would grow unbounded (snapshots faster than the inbox can drain produces dropped + * snapshots, not heap growth); aggregator-thread heap would grow if entries weren't capped or + * histograms grew past their dense-store limit. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Threads(8) +@Fork(value = 1) +public class AdversarialMetricsBenchmark { + + private ConflatingMetricsAggregator aggregator; + private CountingHealthMetrics health; + + @State(Scope.Thread) + public static class ThreadState { + int cursor; + } + + @Setup + public void setup() { + this.health = new CountingHealthMetrics(); + this.aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()), + this.health, + new ConflatingMetricsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + this.aggregator.start(); + } + + @TearDown + @SuppressForbidden + public void tearDown() { + aggregator.close(); + // Counters accumulate across the trial (warmup + measurement iterations), since the + // CountingHealthMetrics instance is created once in @Setup and never reset. + System.err.println( + "[ADVERSARIAL] drops over the trial (8 threads, warmup + measurement combined):"); + System.err.println( + " onStatsInboxFull = " + + health.inboxFull.sum() + + " (snapshots dropped because the MPSC inbox was full)"); + System.err.println( + " onStatsAggregateDropped = " + + health.aggregateDropped.sum() + + " (snapshots dropped because the aggregate cache was full with no stale entry)"); + } + + @Benchmark + public void publish(ThreadState ts, Blackhole blackhole) { + int idx = ts.cursor++; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + // Mix indices so labels don't fall into linear order. Distinct labels exceed every reasonable + // working-set bound, so the aggregate cache evicts continuously and most ops force a fresh + // MetricKey construction on the consumer thread. + int scrambled = idx * 0x9E3779B1; // golden ratio multiplier + String service = "svc-" + (scrambled & 0xFFFF); + String operation = "op-" + ((scrambled >>> 8) & 0x3FFFF); + String resource = "res-" + ((scrambled ^ 0x5A5A5A) & 0xFFFFF); + String hostname = "host-" + ((scrambled >>> 12) & 0x7FFF); + boolean error = (idx & 7) == 0; + boolean topLevel = (idx & 3) == 0; + // Wide duration spread forces histogram bins to populate broadly. + long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); // 1 ns .. ~1.07 s + + SimpleSpan span = + new SimpleSpan( + service, operation, resource, "web", true, topLevel, error, 0, durationNanos, 200); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("peer.hostname", hostname); + + List> trace = Collections.singletonList(span); + blackhole.consume(aggregator.publish(trace)); + } + + /** + * Counts what gets dropped. Uses {@link LongAdder} so the printed totals hold up under 8-way + * contention -- {@code volatile long ++} loses ~20% of updates here, which would mask the + * order-of-magnitude shape the bench is trying to surface (inbox-full vs aggregate-dropped). + */ + static final class CountingHealthMetrics extends HealthMetrics { + final LongAdder inboxFull = new LongAdder(); + final LongAdder aggregateDropped = new LongAdder(); + + @Override + public void onStatsInboxFull() { + inboxFull.increment(); + } + + @Override + public void onStatsAggregateDropped() { + aggregateDropped.increment(); + } + } +} diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityPeerMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityPeerMetricsBenchmark.java new file mode 100644 index 00000000000..67caaca6ced --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityPeerMetricsBenchmark.java @@ -0,0 +1,107 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.api.WellKnownTags; +import datadog.trace.common.metrics.AdversarialMetricsBenchmark.CountingHealthMetrics; +import datadog.trace.core.CoreSpan; +import de.thetaphi.forbiddenapis.SuppressForbidden; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Cardinality-isolation companion to {@link AdversarialMetricsBenchmark}: only the {@code + * peer.hostname} tag value varies; {@code service}, {@code operation}, and {@code resource} are + * pinned to single values. Pairing this with the adversarial bench (all four dimensions + * high-cardinality) and {@link HighCardinalityResourceMetricsBenchmark} (only resource + * high-cardinality) lets you attribute any throughput delta to a specific axis. + * + *

This isolates the peer-tag-encoding hot path: {@code PEER_TAGS_CACHE} lookups, the per-tag + * UTF8 encoding of {@code "name:value"}, and the parallel-array capture inside the producer's + * {@code SpanSnapshot} build. With {@code 0x7FFF} (~32K) distinct hostnames the cache thrashes + * heavily and exceeds the default {@code tracerMetricsMaxAggregates=2048} so the LRU evicts + * continuously. + * + *

Random {@code error}/{@code topLevel}/duration to keep histogram load comparable; only the + * cardinality profile changes. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Threads(8) +@Fork(value = 1) +public class HighCardinalityPeerMetricsBenchmark { + + private ConflatingMetricsAggregator aggregator; + private CountingHealthMetrics health; + + @State(Scope.Thread) + public static class ThreadState { + int cursor; + } + + @Setup + public void setup() { + this.health = new CountingHealthMetrics(); + this.aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()), + this.health, + new ConflatingMetricsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + this.aggregator.start(); + } + + @TearDown + @SuppressForbidden + public void tearDown() { + aggregator.close(); + System.err.println( + "[HIGH_CARD_PEER] drops over the trial (8 threads, warmup + measurement combined):"); + System.err.println(" onStatsInboxFull = " + health.inboxFull.sum()); + System.err.println(" onStatsAggregateDropped = " + health.aggregateDropped.sum()); + } + + @Benchmark + public void publish(ThreadState ts, Blackhole blackhole) { + int idx = ts.cursor++; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + int scrambled = idx * 0x9E3779B1; + String hostname = "host-" + ((scrambled >>> 12) & 0x7FFF); + boolean error = (idx & 7) == 0; + boolean topLevel = (idx & 3) == 0; + long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); + + SimpleSpan span = + new SimpleSpan("svc", "op", "res", "web", true, topLevel, error, 0, durationNanos, 200); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("peer.hostname", hostname); + + List> trace = Collections.singletonList(span); + blackhole.consume(aggregator.publish(trace)); + } +} diff --git a/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityResourceMetricsBenchmark.java b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityResourceMetricsBenchmark.java new file mode 100644 index 00000000000..5ae8c3a715f --- /dev/null +++ b/dd-trace-core/src/jmh/java/datadog/trace/common/metrics/HighCardinalityResourceMetricsBenchmark.java @@ -0,0 +1,103 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; +import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND_CLIENT; +import static java.util.concurrent.TimeUnit.SECONDS; + +import datadog.trace.api.WellKnownTags; +import datadog.trace.common.metrics.AdversarialMetricsBenchmark.CountingHealthMetrics; +import datadog.trace.core.CoreSpan; +import de.thetaphi.forbiddenapis.SuppressForbidden; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ThreadLocalRandom; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Threads; +import org.openjdk.jmh.annotations.Warmup; +import org.openjdk.jmh.infra.Blackhole; + +/** + * Cardinality-isolation companion to {@link AdversarialMetricsBenchmark}: only the {@code resource} + * dimension varies; {@code service}, {@code operation}, and {@code peer.hostname} are pinned to + * single values. Pairing this with the adversarial bench (all four dimensions high-cardinality) and + * {@link HighCardinalityPeerMetricsBenchmark} (only peer-tag high-cardinality) lets you attribute + * any throughput delta to a specific axis. + * + *

Same shape as the adversarial bench -- 8 producer threads, {@code 0xFFFFF} (~1M) distinct + * resource values which exceeds the default {@code tracerMetricsMaxAggregates=2048}, so the LRU + * cache evicts continuously. Random {@code error}/{@code topLevel}/duration to keep histogram load + * comparable; only the cardinality profile changes. + */ +@State(Scope.Benchmark) +@Warmup(iterations = 2, time = 15, timeUnit = SECONDS) +@Measurement(iterations = 5, time = 15, timeUnit = SECONDS) +@BenchmarkMode(Mode.Throughput) +@OutputTimeUnit(SECONDS) +@Threads(8) +@Fork(value = 1) +public class HighCardinalityResourceMetricsBenchmark { + + private ConflatingMetricsAggregator aggregator; + private CountingHealthMetrics health; + + @State(Scope.Thread) + public static class ThreadState { + int cursor; + } + + @Setup + public void setup() { + this.health = new CountingHealthMetrics(); + this.aggregator = + new ConflatingMetricsAggregator( + new WellKnownTags("", "", "", "", "", ""), + Collections.emptySet(), + new ConflatingMetricsAggregatorBenchmark.FixedAgentFeaturesDiscovery( + Collections.singleton("peer.hostname"), Collections.emptySet()), + this.health, + new ConflatingMetricsAggregatorBenchmark.NullSink(), + 2048, + 2048, + false); + this.aggregator.start(); + } + + @TearDown + @SuppressForbidden + public void tearDown() { + aggregator.close(); + System.err.println( + "[HIGH_CARD_RESOURCE] drops over the trial (8 threads, warmup + measurement combined):"); + System.err.println(" onStatsInboxFull = " + health.inboxFull.sum()); + System.err.println(" onStatsAggregateDropped = " + health.aggregateDropped.sum()); + } + + @Benchmark + public void publish(ThreadState ts, Blackhole blackhole) { + int idx = ts.cursor++; + ThreadLocalRandom rng = ThreadLocalRandom.current(); + + int scrambled = idx * 0x9E3779B1; + String resource = "res-" + ((scrambled ^ 0x5A5A5A) & 0xFFFFF); + boolean error = (idx & 7) == 0; + boolean topLevel = (idx & 3) == 0; + long durationNanos = 1L + (rng.nextLong() & 0x3FFFFFFFL); + + SimpleSpan span = + new SimpleSpan("svc", "op", resource, "web", true, topLevel, error, 0, durationNanos, 200); + span.setTag(SPAN_KIND, SPAN_KIND_CLIENT); + span.setTag("peer.hostname", "localhost"); + + List> trace = Collections.singletonList(span); + blackhole.consume(aggregator.publish(trace)); + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java index 478ff520a37..dba66a5ab9c 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/AggregateMetric.java @@ -46,6 +46,27 @@ public AggregateMetric recordDurations(int count, AtomicLongArray durations) { return this; } + /** + * Records a single hit. {@code tagAndDuration} carries the duration nanos with optional {@link + * #ERROR_TAG} / {@link #TOP_LEVEL_TAG} bits OR-ed in. + */ + public AggregateMetric recordOneDuration(long tagAndDuration) { + ++hitCount; + if ((tagAndDuration & TOP_LEVEL_TAG) == TOP_LEVEL_TAG) { + tagAndDuration ^= TOP_LEVEL_TAG; + ++topLevelCount; + } + if ((tagAndDuration & ERROR_TAG) == ERROR_TAG) { + tagAndDuration ^= ERROR_TAG; + errorLatencies.accept(tagAndDuration); + ++errorCount; + } else { + okLatencies.accept(tagAndDuration); + } + duration += tagAndDuration; + return this; + } + public int getErrorCount() { return errorCount; } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java index 8a69dbc6e56..9998c21ed0b 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Aggregator.java @@ -1,16 +1,26 @@ package datadog.trace.common.metrics; +import static datadog.trace.api.Functions.UTF8_ENCODE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.PEER_TAGS_CACHE_ADDER; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SERVICE_NAMES; +import static datadog.trace.common.metrics.ConflatingMetricsAggregator.SPAN_KINDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import datadog.trace.api.Pair; +import datadog.trace.api.cache.DDCache; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import datadog.trace.common.metrics.SignalItem.StopSignal; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.core.util.LRUCache; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.ArrayList; +import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeUnit; +import java.util.function.Function; import org.jctools.queues.MessagePassingQueue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,11 +31,8 @@ final class Aggregator implements Runnable { private static final Logger log = LoggerFactory.getLogger(Aggregator.class); - private final MessagePassingQueue batchPool; private final MessagePassingQueue inbox; private final LRUCache aggregates; - private final ConcurrentMap pending; - private final Set commonKeys; private final MetricWriter writer; // the reporting interval controls how much history will be buffered // when the agent is unresponsive (only 10 pending requests will be @@ -34,6 +41,15 @@ final class Aggregator implements Runnable { private final long sleepMillis; + /** + * Per-cycle hook run on the aggregator thread at the start of each report cycle, before the + * flush. Used by {@link ConflatingMetricsAggregator} to reconcile its cached peer-tag schema + * against {@link datadog.communication.ddagent.DDAgentFeaturesDiscovery}; running before the + * flush guarantees that any test awaiting {@code writer.finishBucket()} observes the schema in + * its post-reconcile state. May be {@code null}. + */ + private final Runnable onReportCycle; + @SuppressFBWarnings( value = "AT_STALE_THREAD_WRITE_OF_PRIMITIVE", justification = "the field is confined to the agent thread running the Aggregator") @@ -41,51 +57,56 @@ final class Aggregator implements Runnable { Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, - HealthMetrics healthMetrics) { + HealthMetrics healthMetrics, + Runnable onReportCycle) { this( writer, - batchPool, inbox, - pending, - commonKeys, maxAggregates, reportingInterval, reportingIntervalTimeUnit, DEFAULT_SLEEP_MILLIS, - healthMetrics); + healthMetrics, + onReportCycle); } Aggregator( MetricWriter writer, - MessagePassingQueue batchPool, MessagePassingQueue inbox, - ConcurrentMap pending, - final Set commonKeys, int maxAggregates, long reportingInterval, TimeUnit reportingIntervalTimeUnit, long sleepMillis, - HealthMetrics healthMetrics) { + HealthMetrics healthMetrics, + Runnable onReportCycle) { this.writer = writer; - this.batchPool = batchPool; this.inbox = inbox; - this.commonKeys = commonKeys; this.aggregates = new LRUCache<>( - new CommonKeyCleaner(commonKeys, healthMetrics), - maxAggregates * 4 / 3, - 0.75f, - maxAggregates); - this.pending = pending; + new AggregateExpiry(healthMetrics), maxAggregates * 4 / 3, 0.75f, maxAggregates); this.reportingIntervalNanos = reportingIntervalTimeUnit.toNanos(reportingInterval); this.sleepMillis = sleepMillis; + this.onReportCycle = onReportCycle; + } + + private static final class AggregateExpiry + implements LRUCache.ExpiryListener { + private final HealthMetrics healthMetrics; + + AggregateExpiry(HealthMetrics healthMetrics) { + this.healthMetrics = healthMetrics; + } + + @Override + public void accept(Map.Entry expired) { + if (expired.getValue().getHitCount() > 0) { + healthMetrics.onStatsAggregateDropped(); + } + } } public void clearAggregates() { @@ -129,21 +150,87 @@ public void accept(InboxItem item) { } else { signal.ignore(); } - } else if (item instanceof Batch && !stopped) { - Batch batch = (Batch) item; - MetricKey key = batch.getKey(); - // important that it is still *this* batch pending, must not remove otherwise - pending.remove(key, batch); + } else if (item instanceof SpanSnapshot && !stopped) { + SpanSnapshot snapshot = (SpanSnapshot) item; + MetricKey key = buildMetricKey(snapshot); AggregateMetric aggregate = aggregates.computeIfAbsent(key, k -> new AggregateMetric()); - batch.contributeTo(aggregate); + aggregate.recordOneDuration(snapshot.tagAndDuration); dirty = true; - // return the batch for reuse - batchPool.offer(batch); } } } + private static MetricKey buildMetricKey(SpanSnapshot s) { + return new MetricKey( + s.resourceName, + SERVICE_NAMES.computeIfAbsent(s.serviceName, UTF8_ENCODE), + s.operationName, + s.serviceNameSource, + s.spanType, + s.httpStatusCode, + s.synthetic, + s.traceRoot, + SPAN_KINDS.computeIfAbsent(s.spanKind, UTF8BytesString::create), + materializePeerTags(s.peerTagSchema, s.peerTagValues), + s.httpMethod, + s.httpEndpoint, + s.grpcStatusCode); + } + + /** + * Encodes the per-span peer-tag values into the {@code List} the {@link + * MetricKey} consumes. Reads name/value pairs at the same index from the schema's names and the + * snapshot's values; null value slots are skipped (the span didn't set that peer tag). + */ + private static List materializePeerTags(PeerTagSchema schema, String[] values) { + if (schema == null || values == null) { + return Collections.emptyList(); + } + String[] names = schema.names; + int n = names.length; + // First pass: count how many tags fired and remember the first index. The single-entry case + // is common (e.g. INTERNAL spans only emit base.service) and gets a singletonList to avoid an + // ArrayList allocation on the hot path. + int firstHit = -1; + int hitCount = 0; + for (int i = 0; i < n; i++) { + if (values[i] != null) { + if (hitCount == 0) { + firstHit = i; + } + hitCount++; + } + } + if (hitCount == 0) { + return Collections.emptyList(); + } + if (hitCount == 1) { + return Collections.singletonList(encodePeerTag(names[firstHit], values[firstHit])); + } + List tags = new ArrayList<>(hitCount); + for (int i = firstHit; i < n; i++) { + if (values[i] != null) { + tags.add(encodePeerTag(names[i], values[i])); + } + } + return tags; + } + + private static UTF8BytesString encodePeerTag(String name, String value) { + final Pair, Function> + cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(name, PEER_TAGS_CACHE_ADDER); + return cacheAndCreator.getLeft().computeIfAbsent(value, cacheAndCreator.getRight()); + } + private void report(long when, SignalItem signal) { + // Per-cycle hook on the aggregator thread -- used by ClientStatsAggregator to reconcile the + // cached peer-tag schema against feature discovery. Runs before the flush so any test that + // awaits writer.finishBucket() observes the schema in its post-reconcile state, and so + // subsequent producer publishes (which may happen as soon as the flush completes) see the new + // schema without an additional handoff. + if (onReportCycle != null) { + onReportCycle.run(); + } boolean skipped = true; if (dirty) { try { @@ -177,7 +264,6 @@ private void expungeStaleAggregates() { AggregateMetric metric = pair.getValue(); if (metric.getHitCount() == 0) { it.remove(); - commonKeys.remove(pair.getKey()); } } } @@ -185,24 +271,4 @@ private void expungeStaleAggregates() { private long wallClockTime() { return MILLISECONDS.toNanos(System.currentTimeMillis()); } - - private static final class CommonKeyCleaner - implements LRUCache.ExpiryListener { - - private final Set commonKeys; - private final HealthMetrics healthMetrics; - - private CommonKeyCleaner(Set commonKeys, HealthMetrics healthMetrics) { - this.commonKeys = commonKeys; - this.healthMetrics = healthMetrics; - } - - @Override - public void accept(Map.Entry expired) { - commonKeys.remove(expired.getKey()); - if (expired.getValue().getHitCount() > 0) { - healthMetrics.onStatsAggregateDropped(); - } - } - } } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java deleted file mode 100644 index 5f103805e98..00000000000 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/Batch.java +++ /dev/null @@ -1,90 +0,0 @@ -package datadog.trace.common.metrics; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongArray; - -/** - * This is a thread-safe container for partial conflating and accumulating partial aggregates on the - * same key. - * - *

Updates to an already consumed batch are rejected. - * - *

A batch can currently take at most 64 values. Attempts to add the 65th update will be - * rejected. - */ -public final class Batch implements InboxItem { - - private static final int MAX_BATCH_SIZE = 64; - private static final AtomicIntegerFieldUpdater COUNT = - AtomicIntegerFieldUpdater.newUpdater(Batch.class, "count"); - private static final AtomicIntegerFieldUpdater COMMITTED = - AtomicIntegerFieldUpdater.newUpdater(Batch.class, "committed"); - - /** - * This counter has two states: - * - *

    - *
  1. negative: the batch has been used, must not add values - *
  2. otherwise: the number of values added to the batch - *
- */ - private volatile int count = 0; - - /** incremented when a duration has been added. */ - private volatile int committed = 0; - - private MetricKey key; - private final AtomicLongArray durations; - - Batch(MetricKey key) { - this(new AtomicLongArray(MAX_BATCH_SIZE)); - this.key = key; - } - - Batch() { - this(new AtomicLongArray(MAX_BATCH_SIZE)); - } - - private Batch(AtomicLongArray durations) { - this.durations = durations; - } - - public MetricKey getKey() { - return key; - } - - public Batch reset(MetricKey key) { - this.key = key; - COUNT.lazySet(this, 0); - return this; - } - - public boolean isUsed() { - return count < 0; - } - - public boolean add(long tag, long durationNanos) { - // technically this would be wrong if there were 2^31 unsuccessful - // attempts to add a value, but this an acceptable risk - int position = COUNT.getAndIncrement(this); - if (position >= 0 && position < durations.length()) { - durations.set(position, tag | durationNanos); - COMMITTED.getAndIncrement(this); - return true; - } - return false; - } - - public void contributeTo(AggregateMetric aggregate) { - int count = Math.min(COUNT.getAndSet(this, Integer.MIN_VALUE), MAX_BATCH_SIZE); - if (count >= 0) { - // wait for the duration to have been set. - // note this mechanism only supports a single reader - while (committed != count) { - Thread.yield(); - } - COMMITTED.lazySet(this, 0); - aggregate.recordDurations(count, durations); - } - } -} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java index 69f1932f2d1..dc5d698bcc1 100644 --- a/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/ConflatingMetricsAggregator.java @@ -2,8 +2,6 @@ import static datadog.communication.ddagent.DDAgentFeaturesDiscovery.V06_METRICS_ENDPOINT; import static datadog.trace.api.DDSpanTypes.RPC; -import static datadog.trace.api.DDTags.BASE_SERVICE; -import static datadog.trace.api.Functions.UTF8_ENCODE; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_ENDPOINT; import static datadog.trace.bootstrap.instrumentation.api.Tags.HTTP_METHOD; import static datadog.trace.bootstrap.instrumentation.api.Tags.SPAN_KIND; @@ -14,9 +12,6 @@ import static datadog.trace.util.AgentThreadFactory.AgentThread.METRICS_AGGREGATOR; import static datadog.trace.util.AgentThreadFactory.THREAD_JOIN_TIMOUT_MS; import static datadog.trace.util.AgentThreadFactory.newAgentThread; -import static java.util.Collections.emptyList; -import static java.util.Collections.singletonList; -import static java.util.Collections.singletonMap; import static java.util.concurrent.TimeUnit.SECONDS; import datadog.common.queue.Queues; @@ -36,12 +31,12 @@ import datadog.trace.core.SpanKindFilter; import datadog.trace.core.monitor.HealthMetrics; import datadog.trace.util.AgentTaskScheduler; -import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -54,20 +49,18 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private static final Logger log = LoggerFactory.getLogger(ConflatingMetricsAggregator.class); private static final Map DEFAULT_HEADERS = - singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); + Collections.singletonMap(DDAgentApi.DATADOG_META_TRACER_VERSION, DDTraceCoreInfo.VERSION); - private static final DDCache SERVICE_NAMES = - DDCaches.newFixedSizeCache(32); + static final DDCache SERVICE_NAMES = DDCaches.newFixedSizeCache(32); - private static final DDCache SPAN_KINDS = - DDCaches.newFixedSizeCache(16); - private static final DDCache< + static final DDCache SPAN_KINDS = DDCaches.newFixedSizeCache(16); + static final DDCache< String, Pair, Function>> PEER_TAGS_CACHE = DDCaches.newFixedSizeCache( 64); // it can be unbounded since those values are returned by the agent and should be // under control. 64 entries is enough in this case to contain all the peer tags. - private static final Function< + static final Function< String, Pair, Function>> PEER_TAGS_CACHE_ADDER = key -> @@ -91,9 +84,6 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve SpanKindFilter.builder().includeInternal().build(); private final Set ignoredResources; - private final MessagePassingQueue batchPool; - private final ConcurrentHashMap pending; - private final ConcurrentHashMap keys; private final Thread thread; private final MessagePassingQueue inbox; private final Sink sink; @@ -104,6 +94,23 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve private final HealthMetrics healthMetrics; private final boolean includeEndpointInMetrics; + /** + * Cached peer-aggregation schema. Producers read this reference once per trace and pass it + * through to the consumer in {@link SpanSnapshot}; they never inspect the schema's discovery + * state or rebuild it. Reconciliation is the aggregator thread's job: {@link + * #reconcilePeerTagSchema()} compares the schema's {@link PeerTagSchema#state} against {@link + * DDAgentFeaturesDiscovery#state()} once per reporting cycle and either updates the state in + * place (when the tag set is unchanged) or swaps in a freshly-built schema. + * + *

{@code null} only on the bootstrap window before {@link #bootstrapPeerTagSchema()} runs on + * the first publish. + * + *

{@code volatile} so the consumer's reconcile-time replacement is visible to producer + * threads; the schema's own internal mutable state ({@link PeerTagSchema#state}) is exercised + * only on the aggregator thread. + */ + private volatile PeerTagSchema cachedPeerTagSchema; + private volatile AgentTaskScheduler.Scheduled cancellation; public ConflatingMetricsAggregator( @@ -187,23 +194,18 @@ public ConflatingMetricsAggregator( this.ignoredResources = ignoredResources; this.includeEndpointInMetrics = includeEndpointInMetrics; this.inbox = Queues.mpscArrayQueue(queueSize); - this.batchPool = Queues.spmcArrayQueue(maxAggregates); - this.pending = new ConcurrentHashMap<>(maxAggregates * 4 / 3); - this.keys = new ConcurrentHashMap<>(); this.features = features; this.healthMetrics = healthMetric; this.sink = sink; this.aggregator = new Aggregator( metricWriter, - batchPool, inbox, - pending, - keys.keySet(), maxAggregates, reportingInterval, timeUnit, - healthMetric); + healthMetric, + this::reconcilePeerTagSchema); this.thread = newAgentThread(METRICS_AGGREGATOR, aggregator); this.reportingInterval = reportingInterval; this.reportingIntervalTimeUnit = timeUnit; @@ -311,6 +313,19 @@ private boolean shouldComputeMetric(CoreSpan span, boolean isTopLevel) { } private boolean publish(CoreSpan span, boolean isTopLevel) { + // Error decision drives force-keep sampling regardless of whether the snapshot gets queued. + boolean error = span.getError() > 0; + + // Fast-path the inbox-full case before any tag extraction or snapshot allocation. size() is + // approximate on jctools' MPSC queue but that's fine: if we under-estimate, we fall through + // and let inbox.offer be the source of truth (existing behavior); if we over-estimate, we + // drop a snapshot that would have fit -- acceptable, onStatsInboxFull was going to fire + // imminently anyway. + if (inbox.size() >= inbox.capacity()) { + healthMetrics.onStatsInboxFull(); + return error; + } + // Extract HTTP method and endpoint only if the feature is enabled String httpMethod = null; String httpEndpoint = null; @@ -330,97 +345,149 @@ private boolean publish(CoreSpan span, boolean isTopLevel) { // CharSequence default keeps unsafeGetTag's generic at CharSequence so UTF8BytesString // tag values don't trigger a ClassCastException on the String assignment. final String spanKind = span.unsafeGetTag(SPAN_KIND, (CharSequence) "").toString(); - MetricKey newKey = - new MetricKey( + + long tagAndDuration = + span.getDurationNano() | (error ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); + + PeerTagSchema peerTagSchema = peerTagSchemaFor(span); + String[] peerTagValues = + peerTagSchema == null ? null : capturePeerTagValues(span, peerTagSchema); + if (peerTagValues == null) { + // No tags fired -- drop the schema reference so the consumer doesn't bother iterating an + // all-null array. + peerTagSchema = null; + } + + SpanSnapshot snapshot = + new SpanSnapshot( span.getResourceName(), - SERVICE_NAMES.computeIfAbsent(span.getServiceName(), UTF8_ENCODE), + span.getServiceName(), span.getOperationName(), span.getServiceNameSource(), spanType, span.getHttpStatusCode(), isSynthetic(span), span.getParentId() == 0, - SPAN_KINDS.computeIfAbsent( - spanKind, UTF8BytesString::create), // save repeated utf8 conversions - getPeerTags(span), + spanKind, + peerTagSchema, + peerTagValues, httpMethod, httpEndpoint, - grpcStatusCode); - MetricKey key = keys.putIfAbsent(newKey, newKey); - if (null == key) { - key = newKey; + grpcStatusCode, + tagAndDuration); + if (!inbox.offer(snapshot)) { + healthMetrics.onStatsInboxFull(); } - long tag = (span.getError() > 0 ? ERROR_TAG : 0L) | (isTopLevel ? TOP_LEVEL_TAG : 0L); - long durationNanos = span.getDurationNano(); - Batch batch = pending.get(key); - if (null != batch) { - // there is a pending batch, try to win the race to add to it - // returning false means that either the batch can't take any - // more data, or it has already been consumed - if (batch.add(tag, durationNanos)) { - // added to a pending batch prior to consumption, - // so skip publishing to the queue (we also know - // the key isn't rare enough to override the sampler) - return false; - } - // recycle the older key - key = batch.getKey(); - } - batch = newBatch(key); - batch.add(tag, durationNanos); - // overwrite the last one if present, it was already full - // or had been consumed by the time we tried to add to it - pending.put(key, batch); - // must offer to the queue after adding to pending - inbox.offer(batch); // force keep keys if there are errors - return span.getError() > 0; + return error; } - private List getPeerTags(CoreSpan span) { + /** + * Picks the peer-tag schema for a span. For internal-kind spans we always use the static {@link + * PeerTagSchema#INTERNAL} singleton (one entry for {@code base.service}); for {@code + * client}/{@code producer}/{@code consumer} kinds we use the cached peer-aggregation schema + * synced from {@link DDAgentFeaturesDiscovery#peerTags()}. Other kinds get {@code null}. + */ + private PeerTagSchema peerTagSchemaFor(CoreSpan span) { if (span.isKind(PEER_AGGREGATION_KINDS)) { - final Set eligiblePeerTags = features.peerTags(); - List peerTags = null; - for (String peerTag : eligiblePeerTags) { - Object value = span.unsafeGetTag(peerTag); - if (value != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(peerTag, PEER_TAGS_CACHE_ADDER); - if (peerTags == null) { - peerTags = new ArrayList<>(eligiblePeerTags.size()); - } - peerTags.add( - cacheAndCreator - .getLeft() - .computeIfAbsent(value.toString(), cacheAndCreator.getRight())); - } - } - return peerTags == null ? emptyList() : peerTags; - } else if (span.isKind(INTERNAL_KIND)) { - // in this case only the base service should be aggregated if present - final Object baseService = span.unsafeGetTag(BASE_SERVICE); - if (baseService != null) { - final Pair, Function> - cacheAndCreator = PEER_TAGS_CACHE.computeIfAbsent(BASE_SERVICE, PEER_TAGS_CACHE_ADDER); - return singletonList( - cacheAndCreator - .getLeft() - .computeIfAbsent(baseService.toString(), cacheAndCreator.getRight())); + PeerTagSchema schema = cachedPeerTagSchema; + if (schema == null) { + schema = bootstrapPeerTagSchema(); } + return schema.size() > 0 ? schema : null; } - return emptyList(); + if (span.isKind(INTERNAL_KIND)) { + return PeerTagSchema.INTERNAL; + } + return null; } - private static boolean isSynthetic(CoreSpan span) { - return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); + /** + * One-time producer-side bootstrap of {@link #cachedPeerTagSchema}. Synchronized double-check + * guards against two producers racing on the very first publish; after this returns, {@code + * cachedPeerTagSchema} is non-null forever and the aggregator thread is the sole subsequent + * mutator (see {@link #reconcilePeerTagSchema()}). + */ + private synchronized PeerTagSchema bootstrapPeerTagSchema() { + PeerTagSchema cached = cachedPeerTagSchema; + if (cached != null) { + return cached; + } + PeerTagSchema schema = buildPeerTagSchema(); + cachedPeerTagSchema = schema; + return schema; + } + + /** + * Builds a fresh {@link PeerTagSchema} from the current state of feature discovery. + * + *

Read order matters: {@code DDAgentFeaturesDiscovery} exposes {@code peerTags()} and {@code + * state()} as two separate accessors, each reading its volatile {@code discoveryState} + * independently. If a discovery refresh interleaves between the two reads, we want to be left + * with a schema whose embedded state is *stale* relative to its tag set rather than the other way + * around -- that way the next reconcile sees a state mismatch and re-runs the deep compare to + * pick up the change, instead of short-circuiting on a too-fresh state and missing it. + * + *

So read {@code state()} first, then {@code peerTags()}. + */ + private PeerTagSchema buildPeerTagSchema() { + String state = features.state(); + Set names = features.peerTags(); + return PeerTagSchema.of(names == null ? Collections.emptySet() : names, state); } - private Batch newBatch(MetricKey key) { - Batch batch = batchPool.poll(); - if (null == batch) { - return new Batch(key); + /** + * Reconciles {@link #cachedPeerTagSchema} with the latest feature discovery. Runs on the + * aggregator thread once per reporting cycle via the reset hook passed to {@link Aggregator}. + * Cheap fast path: an equality check against the cached schema's embedded {@link + * DDAgentFeaturesDiscovery#state()} hash short-circuits when discovery's response hasn't changed + * since the schema was built. On mismatch, a set compare distinguishes "discovery response + * changed but peer tags are the same" (just update the cached state in place) from "tags actually + * changed" (build a new schema and swap the volatile reference). + */ + private void reconcilePeerTagSchema() { + PeerTagSchema cached = cachedPeerTagSchema; + if (cached == null) { + // First reset before the first publish -- producer-side bootstrap hasn't run yet. + return; + } + String latestState = features.state(); + if (Objects.equals(cached.state, latestState)) { + return; } - return batch.reset(key); + Set latestNames = features.peerTags(); + Set normalized = latestNames == null ? Collections.emptySet() : latestNames; + if (cached.hasSameTagsAs(normalized)) { + cached.state = latestState; + } else { + cachedPeerTagSchema = PeerTagSchema.of(normalized, latestState); + } + } + + /** + * Captures the span's peer-tag values into a {@code String[]} parallel to {@code schema.names}. + * Slots remain {@code null} for tags the span didn't set; the array itself is lazily allocated on + * the first hit so spans that fire no peer tags pay zero allocation. Returns {@code null} when + * none of the configured peer tags are set on the span. + */ + private static String[] capturePeerTagValues(CoreSpan span, PeerTagSchema schema) { + String[] names = schema.names; + int n = names.length; + String[] values = null; + for (int i = 0; i < n; i++) { + Object v = span.unsafeGetTag(names[i]); + if (v != null) { + if (values == null) { + values = new String[n]; + } + values[i] = v.toString(); + } + } + return values; + } + + private static boolean isSynthetic(CoreSpan span) { + return span.getOrigin() != null && SYNTHETICS_ORIGIN.equals(span.getOrigin().toString()); } public void stop() { @@ -465,8 +532,6 @@ private void disable() { features.discover(); if (!features.supportsMetrics()) { log.debug("Disabling metric reporting because an agent downgrade was detected"); - this.pending.clear(); - this.batchPool.clear(); this.inbox.clear(); this.aggregator.clearAggregates(); } diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java new file mode 100644 index 00000000000..4821d1b33a4 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/PeerTagSchema.java @@ -0,0 +1,94 @@ +package datadog.trace.common.metrics; + +import static datadog.trace.api.DDTags.BASE_SERVICE; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import java.util.Set; + +/** + * Names of the peer-tags eligible for client-stats aggregation, packed into a flat {@code String[]} + * for parallel-array access by producers and the aggregator thread. + * + *

This is the minimal carrier shape used by {@link SpanSnapshot}: the producer captures per-span + * values into a {@code String[]} parallel to {@link #names}, and the aggregator reconstructs the + * encoded {@code tag:value} pairs from the same name index. It replaces the prior "flat pairs" + * {@code [name0, value0, name1, value1, ...]} layout, which forced a worst-case allocation + + * trim-and-copy on every span. + * + *

Two schemas exist: + * + *

    + *
  • {@link #INTERNAL} -- a singleton with one entry for {@code base.service}, used for + * internal-kind spans where only the base service is aggregated. + *
  • A peer-aggregation schema built via {@link #of(Set, String)} for {@code client}/{@code + * producer}/{@code consumer} spans. {@link ConflatingMetricsAggregator} caches the most + * recently built schema and reconciles it on the aggregator thread once per reporting cycle + * by comparing {@link #state} against {@link DDAgentFeaturesDiscovery#state()}. + *
+ * + *

This class deliberately has no cardinality limiters -- callers that need those layer them on + * top. + * + *

Thread-safety: {@link #names} is final and safe to read from any thread. {@link #state} + * is exercised only on the aggregator thread (read and updated in reconciliation); producer threads + * access the schema only through the volatile {@code cachedPeerTagSchema} reference in {@link + * ConflatingMetricsAggregator}. + */ +final class PeerTagSchema { + + /** Singleton schema for internal-kind spans -- only {@code base.service}. */ + static final PeerTagSchema INTERNAL = + // INTERNAL is never reconciled, so the state value is irrelevant. + new PeerTagSchema(new String[] {BASE_SERVICE}, null); + + final String[] names; + + /** + * The {@code DDAgentFeaturesDiscovery.state()} hash this schema was built from. The aggregator + * thread reads and updates this once per reporting cycle when reconciling against the latest + * discovery; producer threads never touch it. Plain (non-volatile, non-final) because the + * aggregator is the sole reader/writer. May be {@code null} before discovery has produced a + * response. + */ + String state; + + private PeerTagSchema(String[] names, String state) { + this.names = names; + this.state = state; + } + + /** Builds a schema for the given peer-tag names. Order is determined by the {@link Set}. */ + static PeerTagSchema of(Set tags, String state) { + return new PeerTagSchema(tags.toArray(new String[0]), state); + } + + /** + * Test-only factory that takes the names array directly so tests can build a schema in a specific + * order without going through a {@link Set}. + */ + static PeerTagSchema testSchema(String[] names) { + return new PeerTagSchema(names, null); + } + + /** + * Whether this schema's tag names exactly match {@code other}. Used by the aggregator's reconcile + * path: when a feature discovery refresh changes {@link DDAgentFeaturesDiscovery#state()} but the + * resulting set is unchanged, the aggregator can keep this schema and just update {@link #state} + * instead of rebuilding. + */ + boolean hasSameTagsAs(Set other) { + if (this.names.length != other.size()) { + return false; + } + for (String name : this.names) { + if (!other.contains(name)) { + return false; + } + } + return true; + } + + int size() { + return names.length; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java new file mode 100644 index 00000000000..eb9b741cea6 --- /dev/null +++ b/dd-trace-core/src/main/java/datadog/trace/common/metrics/SpanSnapshot.java @@ -0,0 +1,75 @@ +package datadog.trace.common.metrics; + +/** + * Immutable per-span value posted from the producer to the aggregator thread. Carries the raw + * inputs the aggregator needs to build a {@link MetricKey} and update an {@link AggregateMetric}. + * + *

All cache-canonicalization (service-name, span-kind, peer-tag string interning) happens on the + * aggregator thread; the producer just shuffles references. + */ +final class SpanSnapshot implements InboxItem { + + final CharSequence resourceName; + final String serviceName; + final CharSequence operationName; + final CharSequence serviceNameSource; + final CharSequence spanType; + final short httpStatusCode; + final boolean synthetic; + final boolean traceRoot; + final String spanKind; + + /** + * Schema for {@link #peerTagValues}. {@code null} when the span has no peer tags. The schema + * carries the names in parallel-array form; {@code peerTagValues} holds the per-span tag values + * at the same indices. + */ + final PeerTagSchema peerTagSchema; + + /** + * Peer tag values captured from the span, parallel to {@code peerTagSchema.names}. A {@code null} + * entry means the span didn't have that peer tag set. {@code null} (the whole array) when {@link + * #peerTagSchema} is {@code null}. + */ + final String[] peerTagValues; + + final String httpMethod; + final String httpEndpoint; + final String grpcStatusCode; + + /** Duration in nanoseconds, OR-ed with {@code ERROR_TAG} / {@code TOP_LEVEL_TAG} as needed. */ + final long tagAndDuration; + + SpanSnapshot( + CharSequence resourceName, + String serviceName, + CharSequence operationName, + CharSequence serviceNameSource, + CharSequence spanType, + short httpStatusCode, + boolean synthetic, + boolean traceRoot, + String spanKind, + PeerTagSchema peerTagSchema, + String[] peerTagValues, + String httpMethod, + String httpEndpoint, + String grpcStatusCode, + long tagAndDuration) { + this.resourceName = resourceName; + this.serviceName = serviceName; + this.operationName = operationName; + this.serviceNameSource = serviceNameSource; + this.spanType = spanType; + this.httpStatusCode = httpStatusCode; + this.synthetic = synthetic; + this.traceRoot = traceRoot; + this.spanKind = spanKind; + this.peerTagSchema = peerTagSchema; + this.peerTagValues = peerTagValues; + this.httpMethod = httpMethod; + this.httpEndpoint = httpEndpoint; + this.grpcStatusCode = grpcStatusCode; + this.tagAndDuration = tagAndDuration; + } +} diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java index 257d887029b..d1c7fe126b4 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/HealthMetrics.java @@ -93,6 +93,11 @@ public void onClientStatDowngraded() {} public void onStatsAggregateDropped() {} + /** + * Reports a single span whose stats snapshot was dropped because the aggregator inbox was full. + */ + public void onStatsInboxFull() {} + /** * @return Human-readable summary of the current health metrics. */ diff --git a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java index 2df54241e56..db384a7e42e 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/monitor/TracerHealthMetrics.java @@ -98,6 +98,7 @@ public class TracerHealthMetrics extends HealthMetrics implements AutoCloseable private final LongAdder clientStatsDowngrades = new LongAdder(); private final LongAdder statsAggregateDropped = new LongAdder(); + private final LongAdder statsInboxFull = new LongAdder(); private final StatsDClient statsd; private final long interval; @@ -357,6 +358,11 @@ public void onStatsAggregateDropped() { statsAggregateDropped.increment(); } + @Override + public void onStatsInboxFull() { + statsInboxFull.increment(); + } + @Override public void close() { if (null != cancellation) { @@ -374,8 +380,9 @@ private static class Flush implements AgentTaskScheduler.Task= 99 okLatencies.getMaxValue() <= 5 } - - def "consistent under concurrent attempts to read and write"() { - given: - AggregateMetric aggregate = new AggregateMetric() - MetricKey key = new MetricKey("foo", "bar", "qux", null, "type", 0, false, true, "corge", [UTF8BytesString.create("grault:quux")], null, null, null) - BlockingDeque queue = new LinkedBlockingDeque<>(1000) - ExecutorService reader = Executors.newSingleThreadExecutor() - int writerCount = 10 - ExecutorService writers = Executors.newFixedThreadPool(writerCount) - CountDownLatch readerLatch = new CountDownLatch(1) - CountDownLatch writerLatch = new CountDownLatch(writerCount) - CountDownLatch queueEmptyLatch = new CountDownLatch(1) - - AtomicInteger written = new AtomicInteger(0) - - when: - for (int i = 0; i < writerCount; ++i) { - writers.submit({ - readerLatch.await() - for (int j = 0; j < 10_000; ++j) { - Batch batch = queue.peekLast() - if (batch?.add(0L, 1)) { - written.incrementAndGet() - } else { - queue.offer(new Batch().reset(key)) - } - } - writerLatch.countDown() - }) - } - def future = reader.submit({ - readerLatch.countDown() - while (!Thread.currentThread().isInterrupted()) { - Batch batch = queue.poll(100, TimeUnit.MILLISECONDS) - if (null == batch && writerLatch.count == 0) { - queueEmptyLatch.countDown() - } else if (null != batch) { - batch.contributeTo(aggregate) - } - } - }) - assert writerLatch.await(10, TimeUnit.SECONDS) - // Wait here until we know that the queue is empty - assert queueEmptyLatch.await(10, TimeUnit.SECONDS) - future.cancel(true) - - then: - aggregate.getHitCount() == written.get() - } } diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy index 962ad2ce892..a95f6bcbdbc 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/ConflatingMetricAggregatorTest.groovy @@ -255,29 +255,44 @@ class ConflatingMetricAggregatorTest extends DDSpecification { def "should create bucket for each set of peer tags"() { setup: + // Peer-tag schema is reconciled with feature discovery once per reporting cycle (on the + // aggregator thread, in the post-report hook), not per-span on the producer. Drive two + // reporting cycles with different peerTags() configurations to verify the aggregator buckets + // each cycle by the schema that was current at publish time. MetricWriter writer = Mock(MetricWriter) Sink sink = Stub(Sink) DDAgentFeaturesDiscovery features = Mock(DDAgentFeaturesDiscovery) features.supportsMetrics() >> true - features.peerTags() >>> [["country"], ["country", "georegion"],] + features.peerTags() >>> [["country"], ["country", "georegion"]] + // Bump the discovered state hash so reconcile during report cycle 1 sees a mismatch and + // rebuilds the schema for span 2. Three calls: bootstrap (span1's publish), reconcile-during- + // report-1 (mismatch -> rebuild + 2nd peerTags() call), reconcile-during-report-2 (no change). + features.state() >>> ["state-1", "state-2", "state-2"] ConflatingMetricsAggregator aggregator = new ConflatingMetricsAggregator(empty, features, HealthMetrics.NO_OP, sink, writer, 10, queueSize, reportingInterval, SECONDS, false) aggregator.start() - when: - CountDownLatch latch = new CountDownLatch(1) + when: "cycle 1 -- peerTags=[country]" + CountDownLatch latch1 = new CountDownLatch(1) aggregator.publish([ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) - .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe"), + .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe") + ]) + aggregator.report() + def cycle1Triggered = latch1.await(2, SECONDS) + + and: "cycle 2 -- reconcile picks up peerTags=[country, georegion]" + CountDownLatch latch2 = new CountDownLatch(1) + aggregator.publish([ new SimpleSpan("service", "operation", "resource", "type", true, false, false, 0, 100, HTTP_OK) .setTag(SPAN_KIND, "client").setTag("country", "france").setTag("georegion", "europe") ]) aggregator.report() - def latchTriggered = latch.await(2, SECONDS) + def cycle2Triggered = latch2.await(2, SECONDS) then: - latchTriggered - 1 * writer.startBucket(2, _, _) + cycle1Triggered + cycle2Triggered 1 * writer.add( new MetricKey( "resource", @@ -314,7 +329,7 @@ class ConflatingMetricAggregatorTest extends DDSpecification { ), { AggregateMetric aggregateMetric -> aggregateMetric.getHitCount() == 1 && aggregateMetric.getTopLevelCount() == 0 && aggregateMetric.getDuration() == 100 }) - 1 * writer.finishBucket() >> { latch.countDown() } + 2 * writer.finishBucket() >> { latch1.countDown(); latch2.countDown() } cleanup: aggregator.close() diff --git a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy index bfc1ee2f4e7..2fd8554d499 100644 --- a/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy +++ b/dd-trace-core/src/test/groovy/datadog/trace/common/metrics/SimpleSpan.groovy @@ -2,8 +2,11 @@ package datadog.trace.common.metrics import datadog.trace.api.DDSpanId import datadog.trace.api.DDTraceId +import datadog.trace.bootstrap.instrumentation.api.Tags import datadog.trace.core.CoreSpan +import datadog.trace.core.DDSpanContext import datadog.trace.core.MetadataConsumer +import datadog.trace.core.SpanKindFilter class SimpleSpan implements CoreSpan { @@ -24,6 +27,8 @@ class SimpleSpan implements CoreSpan { private final Map tags = [:] + private byte spanKindOrdinal = 0 // SPAN_KIND_UNSET + SimpleSpan( String serviceName, String operationName, @@ -171,6 +176,9 @@ class SimpleSpan implements CoreSpan { @Override SimpleSpan setTag(String tag, Object value) { tags.put(tag, value) + if (Tags.SPAN_KIND == tag) { + spanKindOrdinal = DDSpanContext.spanKindOrdinalOf(value == null ? null : value.toString()) + } return this } @@ -211,6 +219,11 @@ class SimpleSpan implements CoreSpan { return false } + @Override + boolean isKind(SpanKindFilter filter) { + return filter.matches(spanKindOrdinal) + } + @Override CharSequence getType() { return type diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java new file mode 100644 index 00000000000..ef07e0fbc19 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorBootstrapTest.java @@ -0,0 +1,345 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.SpanKindFilter; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.Arrays; +import java.util.Collections; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +/** + * Coverage for the {@code ConflatingMetricsAggregator} peer-tag schema bootstrap and reconcile + * paths. + * + *

    + *
  • {@link #bootstrapHappensOnceOnFirstPublish()} -- verifies the synchronized producer-side + * bootstrap runs exactly once and is skipped on subsequent publishes. + *
  • {@link #reconcileSkipsDeepCompareWhenStateMatches()} -- verifies the aggregator-thread + * reconcile's state-only fast path: when the cached schema's {@code state} matches {@code + * features.state()}, reconcile returns without calling {@code features.peerTags()}. + *
  • {@link #reconcileSurvivesStateChangeWhenTagsUnchanged()} -- verifies that when the + * discovery state hash changes but the tag set is identical, the schema continues to function + * correctly across cycles. + *
  • {@link #reconcileSwapsSchemaWhenTagSetChanges()} -- verifies the slow-path swap branch: + * when discovery refreshes with a new tag set, the cached schema is replaced and subsequent + * publishes see the new tags. + *
+ */ +class ConflatingMetricsAggregatorBootstrapTest { + + @Test + void bootstrapHappensOnceOnFirstPublish() { + // Producer-side bootstrap is synchronized; we want to confirm only the first publish + // queries features and subsequent publishes hit the cached schema. + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + when(features.peerTags()).thenReturn(Collections.singleton("peer.hostname")); + when(features.state()).thenReturn("state-1"); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 64, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + + // Do not start the aggregator thread -- reconcile must not run, only bootstrap. + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + + // Bootstrap is the only path that queries features for peer-tag schema, and it runs + // exactly once across three publishes. + verify(features, times(1)).peerTags(); + verify(features, times(1)).state(); + aggregator.close(); + } + + @Test + void reconcileSkipsDeepCompareWhenStateMatches() throws Exception { + // Two reporting cycles with the same (mocked-constant) discovery state -- the second + // reconcile must short-circuit on the state compare and avoid touching peerTags(). + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + when(features.peerTags()).thenReturn(Collections.singleton("peer.hostname")); + when(features.state()).thenReturn("state-1"); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 64, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + aggregator.start(); + try { + CountDownLatch cycle1 = new CountDownLatch(1); + CountDownLatch cycle2 = new CountDownLatch(1); + // Both reports flush a bucket; the cycle1/cycle2 countdowns synchronize the test thread + // with the aggregator thread's per-cycle completion. + org.mockito.Mockito.doAnswer( + invocation -> { + cycle1.countDown(); + return null; + }) + .doAnswer( + invocation -> { + cycle2.countDown(); + return null; + }) + .when(writer) + .finishBucket(); + + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + aggregator.report(); + assertTrue(cycle1.await(2, SECONDS)); + + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + aggregator.report(); + assertTrue(cycle2.await(2, SECONDS)); + + // peerTags() is called only by bootstrap; both reconciles short-circuit on the state + // fast path (cached state == features.state() == "state-1"), so neither reconcile reaches + // the deep set compare. Total peerTags() calls: 1. + verify(features, times(1)).peerTags(); + // state() is called by bootstrap (1) + each reconcile (2) = 3 total. + verify(features, times(3)).state(); + } finally { + aggregator.close(); + } + } + + @Test + void reconcileSurvivesStateChangeWhenTagsUnchanged() throws Exception { + // Behavioral cross-check on the "set is unchanged, just update state" branch: discovery + // refreshes (state hash moves) but the underlying tag set is identical. The aggregator must + // continue producing valid buckets for the same logical peer tag across cycles. + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + // peerTags() returns content-equal sets across calls -- the reconcile slow path's + // hasSameTagsAs check should return true. + when(features.peerTags()) + .thenReturn(new LinkedHashSet<>(Collections.singleton("peer.hostname"))) + .thenReturn(new LinkedHashSet<>(Collections.singleton("peer.hostname"))) + .thenReturn(new LinkedHashSet<>(Collections.singleton("peer.hostname"))); + // State hash changes every reconcile -- forces reconcile into the slow path each time. + when(features.state()).thenReturn("state-1", "state-2", "state-3"); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 64, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + aggregator.start(); + try { + CountDownLatch cycle1 = new CountDownLatch(1); + CountDownLatch cycle2 = new CountDownLatch(1); + org.mockito.Mockito.doAnswer( + invocation -> { + cycle1.countDown(); + return null; + }) + .doAnswer( + invocation -> { + cycle2.countDown(); + return null; + }) + .when(writer) + .finishBucket(); + + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + aggregator.report(); + assertTrue(cycle1.await(2, SECONDS)); + + aggregator.publish(Collections.>singletonList(peerAggregationSpan())); + aggregator.report(); + assertTrue(cycle2.await(2, SECONDS)); + + // Both cycles flushed (both latches counted down via writer.finishBucket). The schema kept + // producing buckets across the state-hash changes; if the schema had been broken by the + // update-in-place path, the second cycle's flush would not have happened. + verify(writer, times(2)).finishBucket(); + // Bootstrap (1) + two reconciles (2) -- each reconcile saw a state mismatch and went + // through the deep compare, calling peerTags() once = 3 total. + verify(features, times(3)).peerTags(); + verify(features, atLeastOnce()).state(); + } finally { + aggregator.close(); + } + } + + @Test + void reconcileSwapsSchemaWhenTagSetChanges() throws Exception { + // The reconcile slow-path's swap branch: discovery refreshes the state AND the tag set + // grows. Cached schema is rebuilt and the volatile reference points at the new schema. + // Verification is end-to-end -- we look at the MetricKey the writer receives. Pre-swap the + // span snapshot was pinned to the old schema so only peer.hostname appears; post-swap a new + // publish reads the new schema and the next flush carries both peer tags. + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + // peerTags() shape evolves across calls: + // - bootstrap reads {peer.hostname} + // - cycle 1 reconcile slow-path reads {peer.hostname, peer.service} + // - cycle 2 reconcile is state fast-path (no peerTags call) + when(features.peerTags()) + .thenReturn(Collections.singleton("peer.hostname")) + .thenReturn(new LinkedHashSet<>(Arrays.asList("peer.hostname", "peer.service"))); + // state() evolves: bootstrap = "state-1", then changes to "state-2" for cycle 1's reconcile + // (mismatch -> slow path), stable at "state-2" for cycle 2's reconcile (match -> fast path). + when(features.state()).thenReturn("state-1", "state-2", "state-2"); + + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + /* queueSize */ 64, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + aggregator.start(); + try { + CountDownLatch cycle1 = new CountDownLatch(1); + CountDownLatch cycle2 = new CountDownLatch(1); + org.mockito.Mockito.doAnswer( + invocation -> { + cycle1.countDown(); + return null; + }) + .doAnswer( + invocation -> { + cycle2.countDown(); + return null; + }) + .when(writer) + .finishBucket(); + + // Publish 1: snapshot pinned to the original {peer.hostname} schema. cycle 1's reconcile + // will swap the cached schema BEFORE the flush, but this snapshot is already pinned so its + // MetricKey will still carry only peer.hostname. + aggregator.publish( + Collections.>singletonList(peerAggregationSpanWithBothPeerTags())); + aggregator.report(); + assertTrue(cycle1.await(2, SECONDS)); + + // Publish 2: now reads the post-swap schema {peer.hostname, peer.service} so the snapshot + // captures both tag values. cycle 2's reconcile short-circuits on timestamp match. + aggregator.publish( + Collections.>singletonList(peerAggregationSpanWithBothPeerTags())); + aggregator.report(); + assertTrue(cycle2.await(2, SECONDS)); + + // Capture every (MetricKey, AggregateMetric) the writer saw across both cycles. Pre-swap + // snapshot has 1 peer tag, post-swap has 2. + ArgumentCaptor keyCaptor = ArgumentCaptor.forClass(MetricKey.class); + verify(writer, times(2)).add(keyCaptor.capture(), any(AggregateMetric.class)); + List keys = keyCaptor.getAllValues(); + assertEquals( + Collections.singletonList(UTF8BytesString.create("peer.hostname:localhost")), + keys.get(0).getPeerTags(), + "pre-swap snapshot should encode only peer.hostname"); + assertEquals( + Arrays.asList( + UTF8BytesString.create("peer.hostname:localhost"), + UTF8BytesString.create("peer.service:billing")), + keys.get(1).getPeerTags(), + "post-swap snapshot should encode both peer.hostname and peer.service"); + + // Bootstrap (1) + cycle 1 slow-path (1) -- cycle 2 is fast-path so doesn't reach peerTags(). + verify(features, times(2)).peerTags(); + verify(features, atLeastOnce()).state(); + } finally { + aggregator.close(); + } + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static CoreSpan peerAggregationSpan() { + CoreSpan span = mock(CoreSpan.class); + when(span.isMeasured()).thenReturn(false); + when(span.isTopLevel()).thenReturn(true); + // Return true for any SpanKindFilter -- shouldComputeMetric will see METRICS_ELIGIBLE_KINDS + // match, and peerTagSchemaFor will see PEER_AGGREGATION_KINDS match (checked first), which + // routes the span through the bootstrap path. + when(span.isKind(any(SpanKindFilter.class))).thenReturn(true); + when(span.getLongRunningVersion()).thenReturn(0); + when(span.getDurationNano()).thenReturn(100L); + when(span.getError()).thenReturn(0); + when(span.getResourceName()).thenReturn("resource"); + when(span.getServiceName()).thenReturn("svc"); + when(span.getOperationName()).thenReturn("op"); + when(span.getServiceNameSource()).thenReturn(null); + when(span.getType()).thenReturn("web"); + when(span.getHttpStatusCode()).thenReturn((short) 200); + when(span.getParentId()).thenReturn(0L); + when(span.getOrigin()).thenReturn(null); + when(span.unsafeGetTag(eq(Tags.SPAN_KIND), any(CharSequence.class))).thenReturn("client"); + // peer.hostname tag is set so capturePeerTagValues fires for the bootstrapped schema. + when(span.unsafeGetTag("peer.hostname")).thenReturn("localhost"); + return span; + } + + /** + * Variant of {@link #peerAggregationSpan()} that sets both {@code peer.hostname} and {@code + * peer.service}. Used by {@link #reconcileSwapsSchemaWhenTagSetChanges()} where the schema + * evolves from {@code {peer.hostname}} to {@code {peer.hostname, peer.service}} mid-test, and the + * post-swap snapshot must be able to capture the newly-relevant tag value. + */ + @SuppressWarnings({"rawtypes", "unchecked"}) + private static CoreSpan peerAggregationSpanWithBothPeerTags() { + CoreSpan span = peerAggregationSpan(); + when(span.unsafeGetTag("peer.service")).thenReturn("billing"); + return span; + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorInboxFullTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorInboxFullTest.java new file mode 100644 index 00000000000..f4e4c2da253 --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/ConflatingMetricsAggregatorInboxFullTest.java @@ -0,0 +1,84 @@ +package datadog.trace.common.metrics; + +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import datadog.communication.ddagent.DDAgentFeaturesDiscovery; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.core.CoreSpan; +import datadog.trace.core.SpanKindFilter; +import datadog.trace.core.monitor.HealthMetrics; +import java.util.Collections; +import org.junit.jupiter.api.Test; + +/** + * Coverage for the inbox-full fast-path in {@code ConflatingMetricsAggregator.publish}: when the + * producer-side inbox is at capacity, the next {@code publish} call short-circuits before any tag + * extraction or {@code SpanSnapshot} allocation and reports {@code onStatsInboxFull()} to health + * metrics. + */ +class ConflatingMetricsAggregatorInboxFullTest { + + @Test + void publishFiresOnStatsInboxFullOnceInboxIsAtCapacity() { + HealthMetrics healthMetrics = mock(HealthMetrics.class); + MetricWriter writer = mock(MetricWriter.class); + Sink sink = mock(Sink.class); + DDAgentFeaturesDiscovery features = mock(DDAgentFeaturesDiscovery.class); + when(features.supportsMetrics()).thenReturn(true); + when(features.peerTags()).thenReturn(Collections.emptySet()); + + // Small inbox; jctools MPSC array queue rounds up to the next power of two, so use a power of + // two directly. Note: we deliberately do NOT call aggregator.start() so the consumer thread + // never drains -- snapshots accumulate in the inbox until capacity, then the next publish hits + // the size-vs-capacity fast path. + int queueSize = 8; + ConflatingMetricsAggregator aggregator = + new ConflatingMetricsAggregator( + Collections.emptySet(), + features, + healthMetrics, + sink, + writer, + /* maxAggregates */ 16, + queueSize, + /* reportingInterval */ 10, + SECONDS, + /* includeEndpointInMetrics */ false); + + // Publish well past capacity. The first `queueSize` calls land in the inbox; subsequent calls + // see size >= capacity and hit the fast path. + for (int i = 0; i < queueSize * 4; i++) { + aggregator.publish(Collections.>singletonList(metricsEligibleSpan())); + } + + verify(healthMetrics, atLeastOnce()).onStatsInboxFull(); + aggregator.close(); + } + + @SuppressWarnings({"rawtypes", "unchecked"}) + private static CoreSpan metricsEligibleSpan() { + CoreSpan span = mock(CoreSpan.class); + when(span.isMeasured()).thenReturn(false); + when(span.isTopLevel()).thenReturn(true); + when(span.isKind(any(SpanKindFilter.class))).thenReturn(false); + when(span.getLongRunningVersion()).thenReturn(0); + when(span.getDurationNano()).thenReturn(100L); + when(span.getError()).thenReturn(0); + when(span.getResourceName()).thenReturn("resource"); + when(span.getServiceName()).thenReturn("svc"); + when(span.getOperationName()).thenReturn("op"); + when(span.getServiceNameSource()).thenReturn(null); + when(span.getType()).thenReturn("web"); + when(span.getHttpStatusCode()).thenReturn((short) 200); + when(span.getParentId()).thenReturn(0L); + when(span.getOrigin()).thenReturn(null); + when(span.unsafeGetTag(eq(Tags.SPAN_KIND), any(CharSequence.class))).thenReturn("client"); + return span; + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/common/metrics/PeerTagSchemaTest.java b/dd-trace-core/src/test/java/datadog/trace/common/metrics/PeerTagSchemaTest.java new file mode 100644 index 00000000000..7d818a2686b --- /dev/null +++ b/dd-trace-core/src/test/java/datadog/trace/common/metrics/PeerTagSchemaTest.java @@ -0,0 +1,93 @@ +package datadog.trace.common.metrics; + +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashSet; +import java.util.Set; +import org.junit.jupiter.api.Test; + +/** + * Unit tests for {@link PeerTagSchema}. Covers the {@link PeerTagSchema#hasSameTagsAs(Set)} + * predicate that drives the aggregator's reconcile fast/slow path split, the factory shapes, and + * the {@link PeerTagSchema#INTERNAL} singleton. + */ +class PeerTagSchemaTest { + + @Test + void ofBuildsSchemaFromSetWithState() { + Set tags = new LinkedHashSet<>(Arrays.asList("peer.hostname", "peer.service")); + PeerTagSchema schema = PeerTagSchema.of(tags, "abc123"); + + assertArrayEquals(new String[] {"peer.hostname", "peer.service"}, schema.names); + assertEquals("abc123", schema.state); + assertEquals(2, schema.size()); + } + + @Test + void ofHandlesEmptySet() { + PeerTagSchema schema = PeerTagSchema.of(Collections.emptySet(), null); + + assertEquals(0, schema.size()); + assertEquals(0, schema.names.length); + assertNull(schema.state); + } + + @Test + void internalSingletonCarriesBaseService() { + assertEquals(1, PeerTagSchema.INTERNAL.size()); + assertEquals("_dd.base_service", PeerTagSchema.INTERNAL.names[0]); + } + + @Test + void hasSameTagsAsReturnsTrueForExactMatch() { + PeerTagSchema schema = + PeerTagSchema.of( + new LinkedHashSet<>(Arrays.asList("peer.hostname", "peer.service")), "state-1"); + + // Same content via a different Set reference -- this is the case the reconcile fast-path + // depends on (Set returned from a fresh discovery cycle is content-equal to the prior one). + Set equivalentSet = new HashSet<>(Arrays.asList("peer.service", "peer.hostname")); + assertTrue(schema.hasSameTagsAs(equivalentSet)); + } + + @Test + void hasSameTagsAsReturnsFalseWhenSetGrew() { + PeerTagSchema schema = + PeerTagSchema.of(Collections.singleton("peer.hostname"), "state-1"); + + Set larger = new HashSet<>(Arrays.asList("peer.hostname", "peer.service")); + assertFalse(schema.hasSameTagsAs(larger)); + } + + @Test + void hasSameTagsAsReturnsFalseWhenSetShrank() { + PeerTagSchema schema = + PeerTagSchema.of( + new LinkedHashSet<>(Arrays.asList("peer.hostname", "peer.service")), "state-1"); + + assertFalse(schema.hasSameTagsAs(Collections.singleton("peer.hostname"))); + } + + @Test + void hasSameTagsAsReturnsFalseWhenContentDifferent() { + PeerTagSchema schema = + PeerTagSchema.of(Collections.singleton("peer.hostname"), "state-1"); + + assertFalse(schema.hasSameTagsAs(Collections.singleton("peer.service"))); + } + + @Test + void hasSameTagsAsHandlesEmpty() { + PeerTagSchema empty = PeerTagSchema.of(Collections.emptySet(), "state-1"); + + assertTrue(empty.hasSameTagsAs(Collections.emptySet())); + assertFalse(empty.hasSameTagsAs(Collections.singleton("peer.hostname"))); + } +} diff --git a/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java b/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java index 670c4cda113..2f9ac1ea7da 100644 --- a/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java +++ b/dd-trace-core/src/test/java/datadog/trace/core/monitor/HealthMetricsTest.java @@ -398,6 +398,19 @@ void testOnStatsAggregateDropped() throws InterruptedException { verifyNoMoreInteractions(statsD); } + @Test + void testOnStatsInboxFull() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + try (TracerHealthMetrics healthMetrics = + new TracerHealthMetrics(new Latched(statsD, latch), 100, TimeUnit.MILLISECONDS)) { + healthMetrics.start(); + healthMetrics.onStatsInboxFull(); + assertTrue(latch.await(5, TimeUnit.SECONDS)); + } + verify(statsD).count("stats.dropped_aggregates", 1L, "reason:inbox_full"); + verifyNoMoreInteractions(statsD); + } + private static class Latched implements StatsDClient { private final StatsDClient delegate; private final CountDownLatch latch; diff --git a/internal-api/src/main/java/datadog/trace/api/Config.java b/internal-api/src/main/java/datadog/trace/api/Config.java index a463887f61a..6b9e38e2db3 100644 --- a/internal-api/src/main/java/datadog/trace/api/Config.java +++ b/internal-api/src/main/java/datadog/trace/api/Config.java @@ -809,6 +809,16 @@ public class Config { private static final Pattern COLON = Pattern.compile(":"); + // Historical conflating-Batch size; used to translate TRACER_METRICS_MAX_PENDING (configured in + // legacy batch units) into the new per-SpanSnapshot inbox capacity. + private static final int LEGACY_BATCH_SIZE = 64; + + // Practical upper bound on Object[] allocations. Sits a few bytes below Integer.MAX_VALUE + // because the JVM reserves header slack on array allocations; matches the JDK's own + // {@code java.util.ArraysSupport.SOFT_MAX_ARRAY_LENGTH} convention. Used to clamp computed + // capacities that feed into array-backed collections. + private static final int MAX_SAFE_ARRAY_SIZE = Integer.MAX_VALUE - 8; + private final InstrumenterConfig instrumenterConfig; private final long startTimeMillis = System.currentTimeMillis(); @@ -2173,7 +2183,22 @@ private Config(final ConfigProvider configProvider, final InstrumenterConfig ins tracerMetricsBufferingEnabled = configProvider.getBoolean(TRACER_METRICS_BUFFERING_ENABLED, false); tracerMetricsMaxAggregates = configProvider.getInteger(TRACER_METRICS_MAX_AGGREGATES, 2048); - tracerMetricsMaxPending = configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 2048); + /* + * TRACER_METRICS_MAX_PENDING historically counted conflating Batch slots (~64 spans per batch + * via Batch.MAX_BATCH_SIZE). The inbox now holds 1 SpanSnapshot per metrics-eligible span, so + * we multiply the configured value by the legacy batch size to preserve the effective + * span-throughput capacity of the prior default *and* of any existing customer override + * (e.g. a configured 4096 still means "~262144 spans before drops", same as before). ~100 B + * per SpanSnapshot * 131072 ≈ 13 MB worst-case heap floor at the default. + * + * Long-promote the multiplication and clamp to MAX_SAFE_ARRAY_SIZE so an absurd customer + * override (>= ~33M) can't silently wrap to a negative int. MAX_SAFE_ARRAY_SIZE sits a few + * bytes below Integer.MAX_VALUE because the JVM reserves header slack on array allocations; + * see java.util.ArraysSupport.SOFT_MAX_ARRAY_LENGTH for the same convention. + */ + long requestedMaxPending = + (long) configProvider.getInteger(TRACER_METRICS_MAX_PENDING, 2048) * LEGACY_BATCH_SIZE; + tracerMetricsMaxPending = (int) Math.min(requestedMaxPending, MAX_SAFE_ARRAY_SIZE); reportHostName = configProvider.getBoolean(TRACE_REPORT_HOSTNAME, DEFAULT_TRACE_REPORT_HOSTNAME);