From 94b73e5803636b163dc84297cc00dff03682b634 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 27 May 2026 11:45:00 +0200 Subject: [PATCH 1/7] Add lazy-transformer to KeyClassifier for deferred header decoding --- .../instrumentation/api/AgentPropagation.java | 19 +++++ .../api/KeyClassifierTest.java | 78 +++++++++++++++++++ 2 files changed, 97 insertions(+) create mode 100644 internal-api/src/test/java/datadog/trace/bootstrap/instrumentation/api/KeyClassifierTest.java diff --git a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java index 586b2f08612..094dc7e9fbd 100644 --- a/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java +++ b/internal-api/src/main/java/datadog/trace/bootstrap/instrumentation/api/AgentPropagation.java @@ -10,6 +10,7 @@ import datadog.context.propagation.Concern; import datadog.context.propagation.Propagators; import java.util.function.BiConsumer; +import java.util.function.Function; import javax.annotation.ParametersAreNonnullByDefault; public final class AgentPropagation { @@ -38,6 +39,24 @@ public static AgentSpanContext.Extracted extractContextAndGetSpanContext( public interface KeyClassifier { boolean accept(String key, String value); + + /** + * Variant of {@link #accept(String, String)} for carriers that store header values in a raw + * form (e.g. {@code byte[]}) and want to defer string conversion until after the key is known + * to be relevant. + * + *

The default implementation applies {@code transformer} eagerly and delegates to {@link + * #accept(String, String)}, so existing classifiers work without any changes. + * + * @param key the header name + * @param value the raw header value, in whatever form the carrier provides + * @param transformer converts {@code value} to a string; called at most once by the default + * implementation + * @return {@code false} to stop iteration, {@code true} to continue + */ + default boolean accept(String key, T value, Function transformer) { + return accept(key, transformer.apply(value)); + } } public interface ContextVisitor extends CarrierVisitor { diff --git a/internal-api/src/test/java/datadog/trace/bootstrap/instrumentation/api/KeyClassifierTest.java b/internal-api/src/test/java/datadog/trace/bootstrap/instrumentation/api/KeyClassifierTest.java new file mode 100644 index 00000000000..35e085a7892 --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/bootstrap/instrumentation/api/KeyClassifierTest.java @@ -0,0 +1,78 @@ +package datadog.trace.bootstrap.instrumentation.api; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.nio.charset.StandardCharsets; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import org.junit.jupiter.api.Test; + +class KeyClassifierTest { + + static class RecordingClassifier implements AgentPropagation.KeyClassifier { + String lastKey; + String lastValue; + boolean returnValue; + + RecordingClassifier(boolean returnValue) { + this.returnValue = returnValue; + } + + @Override + public boolean accept(String key, String value) { + lastKey = key; + lastValue = value; + return returnValue; + } + } + + @Test + void defaultTransformerMethodAppliesTransformerAndDelegates() { + RecordingClassifier classifier = new RecordingClassifier(true); + + boolean result = + classifier.accept( + "my-key", + "raw".getBytes(StandardCharsets.UTF_8), + bytes -> new String(bytes, StandardCharsets.UTF_8)); + + assertEquals("my-key", classifier.lastKey); + assertEquals("raw", classifier.lastValue); + assertTrue(result); + } + + @Test + void transformerIsCalledExactlyOnce() { + AtomicInteger callCount = new AtomicInteger(0); + AtomicReference transformed = new AtomicReference<>(); + + AgentPropagation.KeyClassifier classifier = + (key, value) -> { + transformed.set(value); + return true; + }; + + classifier.accept( + "key", + "input", + v -> { + callCount.incrementAndGet(); + return v.toUpperCase(); + }); + + assertEquals(1, callCount.get()); + assertEquals("INPUT", transformed.get()); + } + + @Test + void existingAcceptStringStringContractUnchanged() { + RecordingClassifier classifier = new RecordingClassifier(true); + + boolean result = classifier.accept("trace-id", "abc123"); + + assertEquals("trace-id", classifier.lastKey); + assertEquals("abc123", classifier.lastValue); + assertTrue(result); + } +} From 5c36e3073a45d4dfb2146dcf2203a9fe95a05d9e Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Wed, 27 May 2026 16:59:01 +0200 Subject: [PATCH 2/7] Add safe extraction for malformed base64 kafka headers --- .../kafka_clients/TextMapExtractAdapter.java | 29 ++++++++---- .../test/groovy/KafkaClientTestBase.groovy | 26 +++++++++- .../groovy/TextMapExtractAdapterTest.groovy | 26 +++++++++- .../TextMapExtractAdapter.java | 29 ++++++++---- .../test/groovy/KafkaClientTestBase.groovy | 26 +++++++++- .../groovy/TextMapExtractAdapterTest.groovy | 26 +++++++++- .../java/datadog/trace/api/Functions.java | 29 ++++++++++++ .../trace/api/FunctionsBase64Test.java | 47 +++++++++++++++++++ 8 files changed, 216 insertions(+), 22 deletions(-) create mode 100644 internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java index 4f2c178e0c1..1c63742b789 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java @@ -1,13 +1,17 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.api.Functions.UTF8_BYTES_TO_STRING; +import static datadog.trace.api.telemetry.LogCollector.EXCLUDE_TELEMETRY; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_PRODUCED_KEY; import static java.nio.charset.StandardCharsets.UTF_8; import datadog.trace.api.Config; +import datadog.trace.api.Functions; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation.ContextVisitor; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.function.Function; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; @@ -19,10 +23,17 @@ public class TextMapExtractAdapter implements ContextVisitor { public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(Config.get().isKafkaClientBase64DecodingEnabled()); - private final Base64.Decoder base64; + private final Function headerValueTransformer; + private final Base64.Decoder decoder; - public TextMapExtractAdapter(boolean base64DecodeHeaders) { - this.base64 = base64DecodeHeaders ? Base64.getDecoder() : null; + public TextMapExtractAdapter(boolean decodeBase64Headers) { + if (decodeBase64Headers) { + this.headerValueTransformer = Functions.base64Decode(UTF_8); + this.decoder = Base64.getDecoder(); + } else { + this.headerValueTransformer = UTF8_BYTES_TO_STRING; + this.decoder = null; + } } @Override @@ -33,10 +44,12 @@ public void forEachKey(Headers carrier, AgentPropagation.KeyClassifier classifie if (null == value) { continue; } - if (base64 != null) { - value = base64.decode(value); + String decoded = headerValueTransformer.apply(value); + if (decoded == null) { + log.debug(EXCLUDE_TELEMETRY, "Failed to Base64-decode Kafka header '{}', skipping", key); + continue; } - if (!classifier.accept(key, new String(value, UTF_8))) { + if (!classifier.accept(key, decoded)) { return; } } @@ -47,11 +60,11 @@ public long extractTimeInQueueStart(Headers carrier) { if (null != header) { try { ByteBuffer buf = ByteBuffer.allocate(8); - buf.put(base64 != null ? base64.decode(header.value()) : header.value()); + buf.put(decoder != null ? decoder.decode(header.value()) : header.value()); buf.flip(); return buf.getLong(); } catch (Exception e) { - log.debug("Unable to get kafka produced time", e); + log.debug(EXCLUDE_TELEMETRY, "Unable to get kafka produced time", e); } } return 0; diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 0c3c022201b..7932e8c01be 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1,6 +1,7 @@ import datadog.trace.api.datastreams.DataStreamsTags import datadog.trace.api.datastreams.DataStreamsTransactionExtractor import datadog.trace.api.config.TraceInstrumentationConfig +import datadog.trace.api.config.TracerConfig import datadog.trace.instrumentation.kafka_common.ClusterIdHolder import static datadog.trace.agent.test.utils.TraceUtils.basicSpan @@ -1544,9 +1545,32 @@ class KafkaClientDataStreamsDisabledForkedTest extends KafkaClientTestBase { } class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { - @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false") } } + +class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { + def "producer span is created when message carries non-Base64 headers and base64 decoding is enabled"() { + setup: + injectSysConfig(TraceInstrumentationConfig.KAFKA_CLIENT_BASE64_DECODING_ENABLED, "true") + injectSysConfig(TracerConfig.HEADER_TAGS, "x-custom-header:my.custom.tag") + def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(senderProps, new StringSerializer(), new StringSerializer()) + + when: + def headers = new RecordHeaders([ + new RecordHeader("x-custom-header", "not-valid-base64!@#".getBytes(StandardCharsets.UTF_8)), + new RecordHeader("x-another-header", "also-not-base64!!".getBytes(StandardCharsets.UTF_8)) + ]) + producer.send(new ProducerRecord<>(SHARED_TOPIC, 0, null, "hello", headers)).get() + + then: + TEST_WRITER.waitForTraces(1) + !TEST_WRITER.isEmpty() + + cleanup: + producer.close() + } +} diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/TextMapExtractAdapterTest.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/TextMapExtractAdapterTest.groovy index 666a0d8357a..b2a8feb66da 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/TextMapExtractAdapterTest.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/TextMapExtractAdapterTest.groovy @@ -2,12 +2,11 @@ import com.google.common.io.BaseEncoding import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.bootstrap.instrumentation.api.AgentPropagation import datadog.trace.instrumentation.kafka_clients.TextMapExtractAdapter +import java.nio.charset.StandardCharsets import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders -import java.nio.charset.StandardCharsets - class TextMapExtractAdapterTest extends InstrumentationSpecification { def "check can decode base64 mangled headers"() { @@ -32,4 +31,27 @@ class TextMapExtractAdapterTest extends InstrumentationSpecification { where: base64Decode << [true, false] } + + def "invalid base64 header is skipped and subsequent valid headers are still processed"() { + given: + def validBase64 = BaseEncoding.base64().encode("bar".getBytes(StandardCharsets.UTF_8)) + Headers headers = new RecordHeaders([ + new RecordHeader("bad-key", "not-valid-base64!@#".getBytes(StandardCharsets.UTF_8)), + new RecordHeader("good-key", validBase64.getBytes(StandardCharsets.UTF_8)) + ]) + TextMapExtractAdapter adapter = new TextMapExtractAdapter(true) + when: + Map extracted = [:] + adapter.forEachKey(headers, new AgentPropagation.KeyClassifier() { + @Override + boolean accept(String key, String value) { + extracted[key] = value + return true + } + }) + then: + noExceptionThrown() + !extracted.containsKey("bad-key") + extracted["good-key"] == "bar" + } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java index a7523885e60..923bf259a16 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java @@ -1,12 +1,16 @@ package datadog.trace.instrumentation.kafka_clients38; +import static datadog.trace.api.Functions.UTF8_BYTES_TO_STRING; +import static datadog.trace.api.telemetry.LogCollector.EXCLUDE_TELEMETRY; import static java.nio.charset.StandardCharsets.UTF_8; import datadog.trace.api.Config; +import datadog.trace.api.Functions; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation.ContextVisitor; import java.nio.ByteBuffer; import java.util.Base64; +import java.util.function.Function; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; import org.slf4j.Logger; @@ -18,10 +22,17 @@ public class TextMapExtractAdapter implements ContextVisitor { public static final TextMapExtractAdapter GETTER = new TextMapExtractAdapter(Config.get().isKafkaClientBase64DecodingEnabled()); - private final Base64.Decoder base64; + private final Function headerValueTransformer; + private final Base64.Decoder decoder; - public TextMapExtractAdapter(boolean base64DecodeHeaders) { - this.base64 = base64DecodeHeaders ? Base64.getDecoder() : null; + public TextMapExtractAdapter(boolean decodeBase64Headers) { + if (decodeBase64Headers) { + this.headerValueTransformer = Functions.base64Decode(UTF_8); + this.decoder = Base64.getDecoder(); + } else { + this.headerValueTransformer = UTF8_BYTES_TO_STRING; + this.decoder = null; + } } @Override @@ -32,10 +43,12 @@ public void forEachKey(Headers carrier, AgentPropagation.KeyClassifier classifie if (null == value) { continue; } - if (base64 != null) { - value = base64.decode(value); + String decoded = headerValueTransformer.apply(value); + if (decoded == null) { + log.debug(EXCLUDE_TELEMETRY, "Failed to Base64-decode Kafka header '{}', skipping", key); + continue; } - if (!classifier.accept(key, new String(value, UTF_8))) { + if (!classifier.accept(key, decoded)) { return; } } @@ -46,11 +59,11 @@ public long extractTimeInQueueStart(Headers carrier) { if (null != header) { try { ByteBuffer buf = ByteBuffer.allocate(8); - buf.put(base64 != null ? base64.decode(header.value()) : header.value()); + buf.put(decoder != null ? decoder.decode(header.value()) : header.value()); buf.flip(); return buf.getLong(); } catch (Exception e) { - log.debug("Unable to get kafka produced time", e); + log.debug(EXCLUDE_TELEMETRY, "Unable to get kafka produced time", e); } } return 0; diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 0e10388a8d2..788b54ba9f4 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -2,6 +2,7 @@ import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.naming.VersionedNamingTestBase import datadog.trace.api.Config import datadog.trace.api.config.TraceInstrumentationConfig +import datadog.trace.api.config.TracerConfig import datadog.trace.api.DDTags import datadog.trace.api.datastreams.DataStreamsTags import datadog.trace.bootstrap.instrumentation.api.InstrumentationTags @@ -1210,9 +1211,32 @@ class KafkaClientDataStreamsDisabledForkedTest extends KafkaClientTestBase { } class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { - @Override void configurePreAgent() { super.configurePreAgent() injectSysConfig(TraceInstrumentationConfig.LEGACY_CONTEXT_MANAGER_ENABLED, "false") } } + +class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { + def "producer span is created when message carries non-Base64 headers and base64 decoding is enabled"() { + setup: + injectSysConfig(TraceInstrumentationConfig.KAFKA_CLIENT_BASE64_DECODING_ENABLED, "true") + injectSysConfig(TracerConfig.HEADER_TAGS, "x-custom-header:my.custom.tag") + def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) + def producer = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer()) + + when: + def headers = new RecordHeaders([ + new RecordHeader("x-custom-header", "not-valid-base64!@#".getBytes(StandardCharsets.UTF_8)), + new RecordHeader("x-another-header", "also-not-base64!!".getBytes(StandardCharsets.UTF_8)) + ]) + producer.send(new ProducerRecord<>(SHARED_TOPIC, 0, null, "hello", headers)).get() + + then: + TEST_WRITER.waitForTraces(1) + !TEST_WRITER.isEmpty() + + cleanup: + producer.close() + } +} diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy index 103e2ed95ba..e0d155f588e 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/TextMapExtractAdapterTest.groovy @@ -2,12 +2,11 @@ import com.google.common.io.BaseEncoding import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.bootstrap.instrumentation.api.AgentPropagation import datadog.trace.instrumentation.kafka_clients38.TextMapExtractAdapter +import java.nio.charset.StandardCharsets import org.apache.kafka.common.header.Headers import org.apache.kafka.common.header.internals.RecordHeader import org.apache.kafka.common.header.internals.RecordHeaders -import java.nio.charset.StandardCharsets - class TextMapExtractAdapterTest extends InstrumentationSpecification { def "check can decode base64 mangled headers"() { @@ -32,4 +31,27 @@ class TextMapExtractAdapterTest extends InstrumentationSpecification { where: base64Decode << [true, false] } + + def "invalid base64 header is skipped and subsequent valid headers are still processed"() { + given: + def validBase64 = BaseEncoding.base64().encode("bar".getBytes(StandardCharsets.UTF_8)) + Headers headers = new RecordHeaders([ + new RecordHeader("bad-key", "not-valid-base64!@#".getBytes(StandardCharsets.UTF_8)), + new RecordHeader("good-key", validBase64.getBytes(StandardCharsets.UTF_8)) + ]) + TextMapExtractAdapter adapter = new TextMapExtractAdapter(true) + when: + Map extracted = [:] + adapter.forEachKey(headers, new AgentPropagation.KeyClassifier() { + @Override + boolean accept(String key, String value) { + extracted[key] = value + return true + } + }) + then: + noExceptionThrown() + !extracted.containsKey("bad-key") + extracted["good-key"] == "bar" + } } diff --git a/internal-api/src/main/java/datadog/trace/api/Functions.java b/internal-api/src/main/java/datadog/trace/api/Functions.java index 42c030316d9..623b74f9680 100644 --- a/internal-api/src/main/java/datadog/trace/api/Functions.java +++ b/internal-api/src/main/java/datadog/trace/api/Functions.java @@ -1,11 +1,14 @@ package datadog.trace.api; +import static java.nio.charset.StandardCharsets.UTF_8; import static java.util.function.Function.identity; import datadog.trace.bootstrap.instrumentation.api.UTF8BytesString; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; +import java.nio.charset.Charset; +import java.util.Base64; import java.util.Locale; import java.util.function.BiFunction; import java.util.function.Function; @@ -174,4 +177,30 @@ public T apply(Object input) { } } } + + public static final Function UTF8_BYTES_TO_STRING = + bytes -> new String(bytes, UTF_8); + + public static Function base64Decode(Charset charset) { + return new Base64Decode(charset); + } + + private static final class Base64Decode implements Function { + private final Base64.Decoder decoder; + private final Charset charset; + + private Base64Decode(Charset charset) { + this.decoder = Base64.getDecoder(); + this.charset = charset; + } + + @Override + public String apply(byte[] bytes) { + try { + return new String(decoder.decode(bytes), charset); + } catch (final Exception ignored) { + return null; + } + } + } } diff --git a/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java b/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java new file mode 100644 index 00000000000..a591cd27a1b --- /dev/null +++ b/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java @@ -0,0 +1,47 @@ +package datadog.trace.api; + +import static datadog.trace.api.Functions.UTF8_BYTES_TO_STRING; +import static java.nio.charset.StandardCharsets.UTF_8; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; + +import java.util.Base64; +import java.util.function.Function; +import org.junit.jupiter.api.Test; + +class FunctionsBase64Test { + + @Test + void utf8BytesToStringConvertsBytes() { + byte[] bytes = "hello".getBytes(UTF_8); + assertEquals("hello", UTF8_BYTES_TO_STRING.apply(bytes)); + } + + @Test + void base64DecodeDecodesValidInput() { + String original = "x-datadog-trace-id"; + byte[] encoded = Base64.getEncoder().encode(original.getBytes(UTF_8)); + + Function decoder = Functions.base64Decode(UTF_8); + assertEquals(original, decoder.apply(encoded)); + } + + @Test + void base64DecodeReturnsNullForInvalidBase64() { + Function decoder = Functions.base64Decode(UTF_8); + assertNull(decoder.apply("not-valid-base64!@#".getBytes(UTF_8))); + } + + @Test + void base64DecodeReturnsNullForUrlSafeChars() { + // URL-safe Base64 uses '-' and '_' which the standard decoder rejects + Function decoder = Functions.base64Decode(UTF_8); + assertNull(decoder.apply("abc-def_ghi".getBytes(UTF_8))); + } + + @Test + void base64DecodeInstanceIsNotNull() { + assertNotNull(Functions.base64Decode(UTF_8)); + } +} From a45f0decc379fe93e5f4ae6d04e8f403f21b1b37 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 28 May 2026 17:20:38 +0200 Subject: [PATCH 3/7] Apply suggestions from code review Co-authored-by: Brice Dutheil --- .../src/test/groovy/KafkaClientTestBase.groovy | 11 ++++++++--- .../src/test/groovy/KafkaClientTestBase.groovy | 11 ++++++++--- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 7932e8c01be..c694f2a2098 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -1552,10 +1552,15 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { } class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { - def "producer span is created when message carries non-Base64 headers and base64 decoding is enabled"() { - setup: + @Override + void configurePreAgent() { + super.configurePreAgent() injectSysConfig(TraceInstrumentationConfig.KAFKA_CLIENT_BASE64_DECODING_ENABLED, "true") injectSysConfig(TracerConfig.HEADER_TAGS, "x-custom-header:my.custom.tag") + } + + def "producer span is created when message carries non-Base64 headers and base64 decoding is enabled"() { + setup: def senderProps = KafkaTestUtils.senderProps(embeddedKafka.getBrokersAsString()) def producer = new KafkaProducer(senderProps, new StringSerializer(), new StringSerializer()) @@ -1571,6 +1576,6 @@ class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { !TEST_WRITER.isEmpty() cleanup: - producer.close() + producer?.close() } } diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 788b54ba9f4..973205eb55a 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -1218,10 +1218,15 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { } class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { - def "producer span is created when message carries non-Base64 headers and base64 decoding is enabled"() { - setup: + @Override + void configurePreAgent() { + super.configurePreAgent() injectSysConfig(TraceInstrumentationConfig.KAFKA_CLIENT_BASE64_DECODING_ENABLED, "true") injectSysConfig(TracerConfig.HEADER_TAGS, "x-custom-header:my.custom.tag") + } + + def "producer span is created when message carries non-Base64 headers and base64 decoding is enabled"() { + setup: def producerProps = KafkaTestUtils.producerProps(embeddedKafka.getBrokersAsString()) def producer = new KafkaProducer(producerProps, new StringSerializer(), new StringSerializer()) @@ -1237,6 +1242,6 @@ class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { !TEST_WRITER.isEmpty() cleanup: - producer.close() + producer?.close() } } From 9bba2270ee551085e0e57660f1f7c8d674f65358 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Thu, 28 May 2026 21:43:25 +0200 Subject: [PATCH 4/7] Only test the issue --- .../src/test/groovy/KafkaClientTestBase.groovy | 8 ++++++-- .../src/test/groovy/KafkaClientTestBase.groovy | 16 ++++++++++++++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index c694f2a2098..2325ebf4d16 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -9,6 +9,7 @@ import static datadog.trace.agent.test.utils.TraceUtils.runUnderTrace import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.isAsyncPropagationEnabled +import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.naming.VersionedNamingTestBase import datadog.trace.api.Config @@ -1551,7 +1552,10 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { } } -class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { +class KafkaClientBadBase64HeaderForkedTest extends InstrumentationSpecification { + @Rule + KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, KafkaClientTestBase.SHARED_TOPIC) + @Override void configurePreAgent() { super.configurePreAgent() @@ -1569,7 +1573,7 @@ class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { new RecordHeader("x-custom-header", "not-valid-base64!@#".getBytes(StandardCharsets.UTF_8)), new RecordHeader("x-another-header", "also-not-base64!!".getBytes(StandardCharsets.UTF_8)) ]) - producer.send(new ProducerRecord<>(SHARED_TOPIC, 0, null, "hello", headers)).get() + producer.send(new ProducerRecord<>(KafkaClientTestBase.SHARED_TOPIC, 0, null, "hello", headers)).get() then: TEST_WRITER.waitForTraces(1) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy index 973205eb55a..5d505c5398c 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/test/groovy/KafkaClientTestBase.groovy @@ -1,3 +1,4 @@ +import datadog.trace.agent.test.InstrumentationSpecification import datadog.trace.agent.test.asserts.TraceAssert import datadog.trace.agent.test.naming.VersionedNamingTestBase import datadog.trace.api.Config @@ -1217,7 +1218,18 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { } } -class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { +class KafkaClientBadBase64HeaderForkedTest extends InstrumentationSpecification { + EmbeddedKafkaBroker embeddedKafka + + def setup() { + embeddedKafka = new EmbeddedKafkaKraftBroker(1, 2, KafkaClientTestBase.SHARED_TOPIC) + embeddedKafka.afterPropertiesSet() + } + + def cleanup() { + embeddedKafka.destroy() + } + @Override void configurePreAgent() { super.configurePreAgent() @@ -1235,7 +1247,7 @@ class KafkaClientBadBase64HeaderForkedTest extends KafkaClientV0ForkedTest { new RecordHeader("x-custom-header", "not-valid-base64!@#".getBytes(StandardCharsets.UTF_8)), new RecordHeader("x-another-header", "also-not-base64!!".getBytes(StandardCharsets.UTF_8)) ]) - producer.send(new ProducerRecord<>(SHARED_TOPIC, 0, null, "hello", headers)).get() + producer.send(new ProducerRecord<>(KafkaClientTestBase.SHARED_TOPIC, 0, null, "hello", headers)).get() then: TEST_WRITER.waitForTraces(1) From d661d93b2719cbb49e8e6ea7fda9b0230115007b Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Mon, 1 Jun 2026 12:55:23 +0200 Subject: [PATCH 5/7] always use UTF8 --- .../kafka_clients/TextMapExtractAdapter.java | 5 ++- .../TextMapExtractAdapter.java | 5 ++- .../java/datadog/trace/api/Functions.java | 31 +++++-------------- .../trace/api/FunctionsBase64Test.java | 16 ++++------ 4 files changed, 18 insertions(+), 39 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java index 1c63742b789..429038773ba 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/main/java/datadog/trace/instrumentation/kafka_clients/TextMapExtractAdapter.java @@ -1,12 +1,11 @@ package datadog.trace.instrumentation.kafka_clients; +import static datadog.trace.api.Functions.BASE64_DECODE; import static datadog.trace.api.Functions.UTF8_BYTES_TO_STRING; import static datadog.trace.api.telemetry.LogCollector.EXCLUDE_TELEMETRY; import static datadog.trace.instrumentation.kafka_clients.KafkaDecorator.KAFKA_PRODUCED_KEY; -import static java.nio.charset.StandardCharsets.UTF_8; import datadog.trace.api.Config; -import datadog.trace.api.Functions; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation.ContextVisitor; import java.nio.ByteBuffer; @@ -28,7 +27,7 @@ public class TextMapExtractAdapter implements ContextVisitor { public TextMapExtractAdapter(boolean decodeBase64Headers) { if (decodeBase64Headers) { - this.headerValueTransformer = Functions.base64Decode(UTF_8); + this.headerValueTransformer = BASE64_DECODE; this.decoder = Base64.getDecoder(); } else { this.headerValueTransformer = UTF8_BYTES_TO_STRING; diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java index 923bf259a16..30b35216a4b 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-3.8/src/main/java17/datadog/trace/instrumentation/kafka_clients38/TextMapExtractAdapter.java @@ -1,11 +1,10 @@ package datadog.trace.instrumentation.kafka_clients38; +import static datadog.trace.api.Functions.BASE64_DECODE; import static datadog.trace.api.Functions.UTF8_BYTES_TO_STRING; import static datadog.trace.api.telemetry.LogCollector.EXCLUDE_TELEMETRY; -import static java.nio.charset.StandardCharsets.UTF_8; import datadog.trace.api.Config; -import datadog.trace.api.Functions; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation; import datadog.trace.bootstrap.instrumentation.api.AgentPropagation.ContextVisitor; import java.nio.ByteBuffer; @@ -27,7 +26,7 @@ public class TextMapExtractAdapter implements ContextVisitor { public TextMapExtractAdapter(boolean decodeBase64Headers) { if (decodeBase64Headers) { - this.headerValueTransformer = Functions.base64Decode(UTF_8); + this.headerValueTransformer = BASE64_DECODE; this.decoder = Base64.getDecoder(); } else { this.headerValueTransformer = UTF8_BYTES_TO_STRING; diff --git a/internal-api/src/main/java/datadog/trace/api/Functions.java b/internal-api/src/main/java/datadog/trace/api/Functions.java index 623b74f9680..731cd711a9a 100644 --- a/internal-api/src/main/java/datadog/trace/api/Functions.java +++ b/internal-api/src/main/java/datadog/trace/api/Functions.java @@ -7,7 +7,6 @@ import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; -import java.nio.charset.Charset; import java.util.Base64; import java.util.Locale; import java.util.function.BiFunction; @@ -181,26 +180,12 @@ public T apply(Object input) { public static final Function UTF8_BYTES_TO_STRING = bytes -> new String(bytes, UTF_8); - public static Function base64Decode(Charset charset) { - return new Base64Decode(charset); - } - - private static final class Base64Decode implements Function { - private final Base64.Decoder decoder; - private final Charset charset; - - private Base64Decode(Charset charset) { - this.decoder = Base64.getDecoder(); - this.charset = charset; - } - - @Override - public String apply(byte[] bytes) { - try { - return new String(decoder.decode(bytes), charset); - } catch (final Exception ignored) { - return null; - } - } - } + public static final Function BASE64_DECODE = + bytes -> { + try { + return new String(Base64.getDecoder().decode(bytes), UTF_8); + } catch (final Exception ignored) { + return null; + } + }; } diff --git a/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java b/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java index a591cd27a1b..7330e9a5538 100644 --- a/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java +++ b/internal-api/src/test/java/datadog/trace/api/FunctionsBase64Test.java @@ -1,5 +1,6 @@ package datadog.trace.api; +import static datadog.trace.api.Functions.BASE64_DECODE; import static datadog.trace.api.Functions.UTF8_BYTES_TO_STRING; import static java.nio.charset.StandardCharsets.UTF_8; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -7,7 +8,6 @@ import static org.junit.jupiter.api.Assertions.assertNull; import java.util.Base64; -import java.util.function.Function; import org.junit.jupiter.api.Test; class FunctionsBase64Test { @@ -22,26 +22,22 @@ void utf8BytesToStringConvertsBytes() { void base64DecodeDecodesValidInput() { String original = "x-datadog-trace-id"; byte[] encoded = Base64.getEncoder().encode(original.getBytes(UTF_8)); - - Function decoder = Functions.base64Decode(UTF_8); - assertEquals(original, decoder.apply(encoded)); + assertEquals(original, BASE64_DECODE.apply(encoded)); } @Test void base64DecodeReturnsNullForInvalidBase64() { - Function decoder = Functions.base64Decode(UTF_8); - assertNull(decoder.apply("not-valid-base64!@#".getBytes(UTF_8))); + assertNull(BASE64_DECODE.apply("not-valid-base64!@#".getBytes(UTF_8))); } @Test void base64DecodeReturnsNullForUrlSafeChars() { // URL-safe Base64 uses '-' and '_' which the standard decoder rejects - Function decoder = Functions.base64Decode(UTF_8); - assertNull(decoder.apply("abc-def_ghi".getBytes(UTF_8))); + assertNull(BASE64_DECODE.apply("abc-def_ghi".getBytes(UTF_8))); } @Test - void base64DecodeInstanceIsNotNull() { - assertNotNull(Functions.base64Decode(UTF_8)); + void base64DecodeIsNotNull() { + assertNotNull(BASE64_DECODE); } } From 02c89f0830f141eb9013d0a02d7700197f37ddc1 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Mon, 1 Jun 2026 14:06:30 +0200 Subject: [PATCH 6/7] Add missing import --- .../src/test/groovy/KafkaClientTestBase.groovy | 1 + 1 file changed, 1 insertion(+) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index 2325ebf4d16..a9980a8e6eb 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -42,6 +42,7 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils +import org.junit.Rule import spock.lang.Shared import java.util.concurrent.ExecutionException From 46edb6e13d0c03b5d6327bbb6d7e8af2de7a8356 Mon Sep 17 00:00:00 2001 From: Andrea Marziali Date: Mon, 1 Jun 2026 14:58:09 +0200 Subject: [PATCH 7/7] do not use rule --- .../src/test/groovy/KafkaClientTestBase.groovy | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy index a9980a8e6eb..8fd96e55745 100644 --- a/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy +++ b/dd-java-agent/instrumentation/kafka/kafka-clients-0.11/src/test/groovy/KafkaClientTestBase.groovy @@ -42,7 +42,6 @@ import org.springframework.kafka.listener.MessageListener import org.springframework.kafka.test.rule.KafkaEmbedded import org.springframework.kafka.test.utils.ContainerTestUtils import org.springframework.kafka.test.utils.KafkaTestUtils -import org.junit.Rule import spock.lang.Shared import java.util.concurrent.ExecutionException @@ -1554,8 +1553,16 @@ class KafkaClientContextSwapForkedTest extends KafkaClientV0ForkedTest { } class KafkaClientBadBase64HeaderForkedTest extends InstrumentationSpecification { - @Rule - KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, KafkaClientTestBase.SHARED_TOPIC) + KafkaEmbedded embeddedKafka + + def setup() { + embeddedKafka = new KafkaEmbedded(1, true, KafkaClientTestBase.SHARED_TOPIC) + embeddedKafka.before() + } + + def cleanup() { + embeddedKafka?.after() + } @Override void configurePreAgent() {