diff --git a/dd-java-agent/instrumentation/graal/graal-native-image-20.0/src/main/java/datadog/trace/instrumentation/graal/nativeimage/NativeImageGeneratorRunnerInstrumentation.java b/dd-java-agent/instrumentation/graal/graal-native-image-20.0/src/main/java/datadog/trace/instrumentation/graal/nativeimage/NativeImageGeneratorRunnerInstrumentation.java index 405d3f35bc2..1ea3dcf3834 100644 --- a/dd-java-agent/instrumentation/graal/graal-native-image-20.0/src/main/java/datadog/trace/instrumentation/graal/nativeimage/NativeImageGeneratorRunnerInstrumentation.java +++ b/dd-java-agent/instrumentation/graal/graal-native-image-20.0/src/main/java/datadog/trace/instrumentation/graal/nativeimage/NativeImageGeneratorRunnerInstrumentation.java @@ -156,6 +156,7 @@ public static void onEnter(@Advice.Argument(value = 0, readOnly = false) String[ + "datadog.trace.instrumentation.reactivestreams.ReactiveStreamsAsyncResultExtension:build_time," + "datadog.trace.instrumentation.reactor.core.ReactorAsyncResultExtension:build_time," + "datadog.trace.instrumentation.rxjava2.RxJavaAsyncResultExtension:build_time," + + "datadog.trace.instrumentation.rxjava3.RxJavaAsyncResultExtension:build_time," + "datadog.trace.logging.ddlogger.DDLogger:build_time," + "datadog.trace.logging.ddlogger.DDLoggerFactory:build_time," + "datadog.trace.logging.ddlogger.DDLoggerFactory$HelperWrapper:build_time," diff --git a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java index 1d543a705d2..b4efb0928fc 100644 --- a/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java +++ b/dd-java-agent/instrumentation/java/java-concurrent/java-concurrent-1.8/src/main/java/datadog/trace/instrumentation/java/concurrent/AsyncPropagatingDisableInstrumentation.java @@ -46,6 +46,16 @@ public AsyncPropagatingDisableInstrumentation() { private static final ElementMatcher REACTOR_DISABLED_TYPE_INITIALIZERS = namedOneOf("reactor.core.scheduler.SchedulerTask", "reactor.core.scheduler.WorkerTask"); + /** + * RxJava 3's AbstractDirectTask creates FINISHED/DISPOSED sentinel FutureTask instances in its + * static initializer. If that initializer runs while a trace is active (e.g. the first scheduled + * delay/timeout under a span), the executor instrumentation captures a continuation on those + * static singletons that is never cancelled, leaking the pending trace. Disable async propagation + * while the type initializer runs. (RxJava 2 has no equivalent class.) + */ + private static final ElementMatcher RXJAVA3_DISABLED_TYPE_INITIALIZERS = + named("io.reactivex.rxjava3.internal.schedulers.AbstractDirectTask"); + @Override public boolean onlyMatchKnownTypes() { return false; // known type list is not complete, so always expand search to consider hierarchy @@ -77,7 +87,8 @@ public String[] knownMatchingTypes() { "net.sf.ehcache.store.disk.DiskStorageFactory", "org.springframework.jms.listener.DefaultMessageListenerContainer", "org.apache.activemq.broker.TransactionBroker", - "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager" + "com.mongodb.internal.connection.DefaultConnectionPool$AsyncWorkManager", + "io.reactivex.rxjava3.internal.schedulers.AbstractDirectTask" }; } @@ -88,7 +99,10 @@ public String hierarchyMarkerType() { @Override public ElementMatcher hierarchyMatcher() { - return RX_WORKERS.or(GRPC_MANAGED_CHANNEL).or(REACTOR_DISABLED_TYPE_INITIALIZERS); + return RX_WORKERS + .or(GRPC_MANAGED_CHANNEL) + .or(REACTOR_DISABLED_TYPE_INITIALIZERS) + .or(RXJAVA3_DISABLED_TYPE_INITIALIZERS); } @Override @@ -172,6 +186,8 @@ public void methodAdvice(MethodTransformer transformer) { advice); transformer.applyAdvice( isTypeInitializer().and(isDeclaredBy(REACTOR_DISABLED_TYPE_INITIALIZERS)), advice); + transformer.applyAdvice( + isTypeInitializer().and(isDeclaredBy(RXJAVA3_DISABLED_TYPE_INITIALIZERS)), advice); } public static class DisableAsyncAdvice { diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle new file mode 100644 index 00000000000..50d136149d4 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/build.gradle @@ -0,0 +1,36 @@ +muzzle { + pass { + group = "io.reactivex.rxjava3" + module = "rxjava" + versions = "[3.0.0,)" + } + // Assert the rxjava3 advice never resolves against rxjava2 — the two namespaces + // must not overlap. rxjava3 references io.reactivex.rxjava3.core.*, absent from the + // rxjava2 artifact, so muzzle must fail to match it. + fail { + name = "rxjava2-must-not-match" + group = "io.reactivex.rxjava2" + module = "rxjava" + versions = "[2.0.0,)" + } +} + +apply from: "$rootDir/gradle/java.gradle" + +addTestSuiteForDir('latestDepTest', 'test') + +dependencies { + compileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3' + compileOnly group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0' + + testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') + testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20') + testImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0' + testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0' + + // Load the rxjava2 instrumenter at test runtime to prove the two versions coexist on + // the agent without interference (it stays dormant with only rxjava3 on the classpath). + testRuntimeOnly project(':dd-java-agent:instrumentation:rxjava:rxjava-2.0') + + latestDepTestImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '+' +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/CompletableInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/CompletableInstrumentation.java new file mode 100644 index 00000000000..3af41d89fa8 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/CompletableInstrumentation.java @@ -0,0 +1,73 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.CompletableObserver; +import net.bytebuddy.asm.Advice; + +public final class CompletableInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Completable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.CompletableObserver"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Completable completable) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + InstrumentationContext.get(Completable.class, Context.class) + .put(completable, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Completable completable, + @Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) { + if (observer != null) { + Context parentContext = + InstrumentationContext.get(Completable.class, Context.class).get(completable); + if (parentContext != null) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingCompletableObserver(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/FlowableInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/FlowableInstrumentation.java new file mode 100644 index 00000000000..3b4b8a5c53d --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/FlowableInstrumentation.java @@ -0,0 +1,72 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import net.bytebuddy.asm.Advice; + +public final class FlowableInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Flowable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.FlowableSubscriber"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Flowable flowable) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Flowable flowable, + @Advice.Argument(value = 0, readOnly = false) FlowableSubscriber subscriber) { + if (subscriber != null) { + Context parentContext = + InstrumentationContext.get(Flowable.class, Context.class).get(flowable); + if (parentContext != null) { + // wrap the subscriber so spans from its events treat the captured span as their parent + subscriber = new TracingSubscriber<>(subscriber, parentContext); + // attach the context here in case additional subscribers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/MaybeInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/MaybeInstrumentation.java new file mode 100644 index 00000000000..49bf3e35acf --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/MaybeInstrumentation.java @@ -0,0 +1,70 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.MaybeObserver; +import net.bytebuddy.asm.Advice; + +public final class MaybeInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Maybe"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.MaybeObserver"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Maybe maybe) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Maybe maybe, + @Advice.Argument(value = 0, readOnly = false) MaybeObserver observer) { + if (observer != null) { + Context parentContext = InstrumentationContext.get(Maybe.class, Context.class).get(maybe); + if (parentContext != null) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingMaybeObserver<>(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/ObservableInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/ObservableInstrumentation.java new file mode 100644 index 00000000000..4548471fc1d --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/ObservableInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Observer; +import net.bytebuddy.asm.Advice; + +public final class ObservableInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Observable"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.Observer"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Observable observable) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null) { + InstrumentationContext.get(Observable.class, Context.class).put(observable, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Observable observable, + @Advice.Argument(value = 0, readOnly = false) Observer observer) { + if (observer != null) { + Context parentContext = + InstrumentationContext.get(Observable.class, Context.class).get(observable); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingObserver<>(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaAsyncResultExtension.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaAsyncResultExtension.java new file mode 100644 index 00000000000..26ad58cfcf3 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaAsyncResultExtension.java @@ -0,0 +1,68 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.EagerHelper; +import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtension; +import datadog.trace.bootstrap.instrumentation.java.concurrent.AsyncResultExtensions; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; + +public class RxJavaAsyncResultExtension implements AsyncResultExtension, EagerHelper { + static { + AsyncResultExtensions.register(new RxJavaAsyncResultExtension()); + } + + /** + * Register the extension as an {@link AsyncResultExtension} using static class initialization. + *
+ * It uses an empty static method call to ensure the class loading and the one-time-only static + * class initialization. This will ensure this extension will only be registered once under {@link + * AsyncResultExtensions}. + */ + public static void init() {} + + @Override + public boolean supports(Class result) { + return Completable.class.isAssignableFrom(result) + || Maybe.class.isAssignableFrom(result) + || Single.class.isAssignableFrom(result) + || Observable.class.isAssignableFrom(result) + || Flowable.class.isAssignableFrom(result); + } + + @Override + public Object apply(Object result, AgentSpan span) { + if (result instanceof Completable) { + return ((Completable) result) + .doOnEvent(throwable -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Maybe) { + return ((Maybe) result) + .doOnEvent((o, throwable) -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Single) { + return ((Single) result) + .doOnEvent((o, throwable) -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Observable) { + return ((Observable) result) + .doOnComplete(span::finish) + .doOnError(throwable -> onError(span, throwable)) + .doOnDispose(span::finish); + } else if (result instanceof Flowable) { + return ((Flowable) result) + .doOnComplete(span::finish) + .doOnError(throwable -> onError(span, throwable)) + .doOnCancel(span::finish); + } + return null; + } + + private static void onError(AgentSpan span, Throwable throwable) { + span.addThrowable(throwable); + span.finish(); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaModule.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaModule.java new file mode 100644 index 00000000000..842bd3b703c --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/RxJavaModule.java @@ -0,0 +1,51 @@ +package datadog.trace.instrumentation.rxjava3; + +import static java.util.Arrays.asList; + +import com.google.auto.service.AutoService; +import datadog.context.Context; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.agent.tooling.InstrumenterModule; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +@AutoService(InstrumenterModule.class) +public final class RxJavaModule extends InstrumenterModule.ContextTracking { + public RxJavaModule() { + super("rxjava", "rxjava-3"); + } + + @Override + public String[] helperClassNames() { + return new String[] { + packageName + ".TracingCompletableObserver", + packageName + ".TracingSubscriber", + packageName + ".TracingMaybeObserver", + packageName + ".TracingObserver", + packageName + ".RxJavaAsyncResultExtension", + packageName + ".TracingSingleObserver", + }; + } + + @Override + public Map contextStore() { + final Map store = new HashMap<>(); + store.put("io.reactivex.rxjava3.core.Flowable", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Completable", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Maybe", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Observable", Context.class.getName()); + store.put("io.reactivex.rxjava3.core.Single", Context.class.getName()); + return store; + } + + @Override + public List typeInstrumentations() { + return asList( + new CompletableInstrumentation(), + new FlowableInstrumentation(), + new MaybeInstrumentation(), + new ObservableInstrumentation(), + new SingleInstrumentation()); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/SingleInstrumentation.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/SingleInstrumentation.java new file mode 100644 index 00000000000..6ed709e0b50 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/SingleInstrumentation.java @@ -0,0 +1,71 @@ +package datadog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; +import static net.bytebuddy.matcher.ElementMatchers.isConstructor; +import static net.bytebuddy.matcher.ElementMatchers.isMethod; +import static net.bytebuddy.matcher.ElementMatchers.takesArgument; +import static net.bytebuddy.matcher.ElementMatchers.takesArguments; + +import datadog.context.Context; +import datadog.context.ContextScope; +import datadog.trace.agent.tooling.Instrumenter; +import datadog.trace.bootstrap.InstrumentationContext; +import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; +import io.reactivex.rxjava3.core.Single; +import io.reactivex.rxjava3.core.SingleObserver; +import net.bytebuddy.asm.Advice; + +public final class SingleInstrumentation + implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { + + @Override + public String instrumentedType() { + return "io.reactivex.rxjava3.core.Single"; + } + + @Override + public void methodAdvice(MethodTransformer transformer) { + transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); + transformer.applyAdvice( + isMethod() + .and(named("subscribe")) + .and(takesArguments(1)) + .and(takesArgument(0, named("io.reactivex.rxjava3.core.SingleObserver"))), + getClass().getName() + "$PropagateParentSpanAdvice"); + } + + public static class CaptureParentSpanAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) + public static void onConstruct(@Advice.This final Single single) { + Context parentContext = Java8BytecodeBridge.getCurrentContext(); + if (parentContext != null) { + InstrumentationContext.get(Single.class, Context.class).put(single, parentContext); + } + } + } + + public static class PropagateParentSpanAdvice { + @Advice.OnMethodEnter(suppress = Throwable.class) + public static ContextScope onSubscribe( + @Advice.This final Single single, + @Advice.Argument(value = 0, readOnly = false) SingleObserver observer) { + if (observer != null) { + Context parentContext = InstrumentationContext.get(Single.class, Context.class).get(single); + if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { + // wrap the observer so spans from its events treat the captured span as their parent + observer = new TracingSingleObserver<>(observer, parentContext); + // attach the context here in case additional observers are created during subscribe + return parentContext.attach(); + } + } + return null; + } + + @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) + public static void closeScope(@Advice.Enter final ContextScope scope) { + if (scope != null) { + scope.close(); + } + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingCompletableObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingCompletableObserver.java new file mode 100644 index 00000000000..8a0dd7254e1 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingCompletableObserver.java @@ -0,0 +1,38 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.CompletableObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import javax.annotation.Nonnull; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingCompletableObserver implements CompletableObserver { + private final CompletableObserver observer; + private final Context parentContext; + + public TracingCompletableObserver( + @Nonnull final CompletableObserver observer, @Nonnull final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + observer.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingMaybeObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingMaybeObserver.java new file mode 100644 index 00000000000..0cbf34c61e4 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingMaybeObserver.java @@ -0,0 +1,45 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.MaybeObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import javax.annotation.Nonnull; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingMaybeObserver implements MaybeObserver { + private final MaybeObserver observer; + private final Context parentContext; + + public TracingMaybeObserver( + @Nonnull final MaybeObserver observer, @Nonnull final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onSuccess(final T value) { + try (final ContextScope scope = parentContext.attach()) { + observer.onSuccess(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + observer.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingObserver.java new file mode 100644 index 00000000000..32018611cd1 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingObserver.java @@ -0,0 +1,43 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.Observer; +import io.reactivex.rxjava3.disposables.Disposable; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingObserver implements Observer { + private final Observer observer; + private final Context parentContext; + + public TracingObserver(final Observer observer, final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onNext(final T value) { + try (final ContextScope scope = parentContext.attach()) { + observer.onNext(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + observer.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSingleObserver.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSingleObserver.java new file mode 100644 index 00000000000..3e05d1124bc --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSingleObserver.java @@ -0,0 +1,38 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.SingleObserver; +import io.reactivex.rxjava3.disposables.Disposable; +import javax.annotation.Nonnull; + +/** Wrapper that makes sure spans from observer events treat the captured span as their parent. */ +public final class TracingSingleObserver implements SingleObserver { + private final SingleObserver observer; + private final Context parentContext; + + public TracingSingleObserver( + @Nonnull final SingleObserver observer, @Nonnull final Context parentContext) { + this.observer = observer; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Disposable d) { + observer.onSubscribe(d); + } + + @Override + public void onSuccess(final T value) { + try (final ContextScope scope = parentContext.attach()) { + observer.onSuccess(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + observer.onError(e); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java new file mode 100644 index 00000000000..49caa0e6ecf --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/main/java/datadog/trace/instrumentation/rxjava3/TracingSubscriber.java @@ -0,0 +1,45 @@ +package datadog.trace.instrumentation.rxjava3; + +import datadog.context.Context; +import datadog.context.ContextScope; +import io.reactivex.rxjava3.core.FlowableSubscriber; +import javax.annotation.Nonnull; +import org.reactivestreams.Subscription; + +/** Wrapper that makes sure spans from subscriber events treat the captured span as their parent. */ +public final class TracingSubscriber implements FlowableSubscriber { + private final FlowableSubscriber subscriber; + private final Context parentContext; + + public TracingSubscriber( + @Nonnull final FlowableSubscriber subscriber, @Nonnull final Context parentContext) { + this.subscriber = subscriber; + this.parentContext = parentContext; + } + + @Override + public void onSubscribe(final Subscription subscription) { + subscriber.onSubscribe(subscription); + } + + @Override + public void onNext(final T value) { + try (final ContextScope scope = parentContext.attach()) { + subscriber.onNext(value); + } + } + + @Override + public void onError(final Throwable e) { + try (final ContextScope scope = parentContext.attach()) { + subscriber.onError(e); + } + } + + @Override + public void onComplete() { + try (final ContextScope scope = parentContext.attach()) { + subscriber.onComplete(); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/annotatedsample/RxJava3TracedMethods.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/annotatedsample/RxJava3TracedMethods.java new file mode 100644 index 00000000000..c2518ff1de7 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/annotatedsample/RxJava3TracedMethods.java @@ -0,0 +1,112 @@ +package annotatedsample; + +import static java.util.concurrent.TimeUnit.SECONDS; + +import io.opentelemetry.instrumentation.annotations.WithSpan; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.util.concurrent.CountDownLatch; + +public class RxJava3TracedMethods { + @WithSpan + public static Completable traceAsyncCompletable(CountDownLatch latch) { + return Completable.fromRunnable(() -> await(latch)); + } + + @WithSpan + public static Completable traceAsyncFailingCompletable( + CountDownLatch latch, Exception exception) { + return Completable.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Maybe traceAsyncMaybe(CountDownLatch latch) { + return Maybe.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Maybe traceAsyncFailingMaybe(CountDownLatch latch, Exception exception) { + return Maybe.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Single traceAsyncSingle(CountDownLatch latch) { + return Single.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Single traceAsyncFailingSingle(CountDownLatch latch, Exception exception) { + return Single.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Observable traceAsyncObservable(CountDownLatch latch) { + return Observable.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Observable traceAsyncFailingObservable( + CountDownLatch latch, Exception exception) { + return Observable.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + @WithSpan + public static Flowable traceAsyncFlowable(CountDownLatch latch) { + return Flowable.fromCallable( + () -> { + await(latch); + return "hello"; + }); + } + + @WithSpan + public static Flowable traceAsyncFailingFlowable( + CountDownLatch latch, Exception exception) { + return Flowable.fromCallable( + () -> { + await(latch); + throw exception; + }); + } + + private static void await(CountDownLatch latch) { + try { + if (!latch.await(5, SECONDS)) { + throw new IllegalStateException("Latch still locked"); + } + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3InteropTest.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3InteropTest.java new file mode 100644 index 00000000000..a070dfb23ef --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3InteropTest.java @@ -0,0 +1,162 @@ +package testdog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.Matchers.validates; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static org.junit.jupiter.api.Assertions.assertEquals; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.agent.test.assertions.TagsMatcher; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Single; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.function.Supplier; +import java.util.stream.Stream; +import org.junit.jupiter.api.Test; + +// NOTE: This test lives in the `testdog` package (not `datadog`) on purpose: the agent ignores +// `datadog.*` classes for instrumentation, so `@Trace`-annotated methods declared under `datadog.*` +// would never be instrumented. See RxJava3Test for the same convention. +// +// PURPOSE: investigate whether a Datadog trace context propagates through RxJava 3's Java 8 interop +// factory methods (fromCompletionStage / fromOptional / fromStream). There is no dedicated reactive +// instrumentation for these bridges; any propagation must come from the agent's +// concurrent/executor instrumentation. Each test asserts the ACTUAL observed behavior. +class RxJava3InteropTest extends AbstractInstrumentationTest { + + static { + // Async completion / scheduler hops can finish child spans after the local root is written, + // tripping strict trace write ordering checks. Mirror RxJava3Test. + testConfig.strictTraceWrites(false); + } + + // The component tag is stored as a UTF8BytesString, so compare by string content. + static TagsMatcher componentTrace() { + return tag(Tags.COMPONENT, validates(o -> "trace".equals(String.valueOf(o)))); + } + + static class Worker { + static long parentId; + + static int child(int i) { + return childTraced(i); + } + + @Trace(operationName = "child", resourceName = "child") + static int childTraced(int i) { + return i + 1; + } + + @Trace(operationName = "interop-parent", resourceName = "interop-parent") + static T runUnderParent(Supplier work) { + parentId = activeSpan().getSpanId(); + return work.get(); + } + } + + @Test + void fromCompletionStageSync() { + Integer result = + Worker.runUnderParent( + () -> + Single.fromCompletionStage(CompletableFuture.completedFuture(1)) + .map(Worker::child) + .blockingGet()); + assertEquals(2, result); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("interop-parent").resourceName("interop-parent"), + span() + .childOf(Worker.parentId) + .operationName("child") + .resourceName("child") + .tags(componentTrace(), defaultTags()))); + } + + /** + * FINDING: context propagates even when the CompletableFuture is completed on another thread. + * There is no rxjava3 instrumentation for fromCompletionStage; propagation comes from the agent's + * concurrent/executor instrumentation, which carries the active context across the ForkJoinPool + * used by supplyAsync. blockingGet() runs the map() on the calling thread, where the + * interop-parent scope is still active, so the child span is a direct child of interop-parent. + */ + @Test + void fromCompletionStageAsync() { + Integer result = + Worker.runUnderParent( + () -> + Single.fromCompletionStage(CompletableFuture.supplyAsync(() -> 1)) + .map(Worker::child) + .blockingGet()); + assertEquals(2, result); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("interop-parent").resourceName("interop-parent"), + span() + .childOf(Worker.parentId) + .operationName("child") + .resourceName("child") + .tags(componentTrace(), defaultTags()))); + } + + @Test + void fromOptional() { + Integer result = + Worker.runUnderParent( + () -> Maybe.fromOptional(Optional.of(1)).map(Worker::child).blockingGet()); + assertEquals(2, result); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("interop-parent").resourceName("interop-parent"), + span() + .childOf(Worker.parentId) + .operationName("child") + .resourceName("child") + .tags(componentTrace(), defaultTags()))); + } + + /** + * FINDING: fromStream(2 elements) emits one child span per element (2 spans here), each a direct + * child of interop-parent. The map() runs synchronously on the subscribing thread under the + * active interop-parent scope, so no async/concurrent instrumentation is involved. + */ + @Test + void fromStream() { + List result = + Worker.runUnderParent( + () -> Flowable.fromStream(Stream.of(1, 2)).map(Worker::child).toList().blockingGet()); + assertEquals(2, result.size()); + assertEquals(2, result.get(0)); + assertEquals(3, result.get(1)); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("interop-parent").resourceName("interop-parent"), + span() + .childOf(Worker.parentId) + .operationName("child") + .resourceName("child") + .tags(componentTrace(), defaultTags()), + span() + .childOf(Worker.parentId) + .operationName("child") + .resourceName("child") + .tags(componentTrace(), defaultTags()))); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3ResultExtensionTest.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3ResultExtensionTest.java new file mode 100644 index 00000000000..833dd6ab5be --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3ResultExtensionTest.java @@ -0,0 +1,209 @@ +package testdog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.Matchers.validates; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.error; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import annotatedsample.RxJava3TracedMethods; +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.agent.test.assertions.SpanMatcher; +import datadog.trace.agent.test.assertions.TagsMatcher; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import datadog.trace.junit.utils.config.WithConfig; +import io.reactivex.rxjava3.core.Completable; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Observable; +import io.reactivex.rxjava3.core.Single; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +@WithConfig(key = "trace.otel.enabled", value = "true") +@WithConfig(key = "integration.opentelemetry-annotations-1.20.enabled", value = "true") +class RxJava3ResultExtensionTest extends AbstractInstrumentationTest { + + static final String EXCEPTION_MESSAGE = "Test exception"; + + // The COMPONENT and SPAN_KIND tags are stored as UTF8BytesString, so we compare by string content + // rather than using is("...") which would fail the asymmetric String#equals(UTF8BytesString) + // check. + static TagsMatcher otelComponent() { + return tag(Tags.COMPONENT, validates(o -> "opentelemetry".equals(String.valueOf(o)))); + } + + static TagsMatcher internalSpanKind() { + return tag(Tags.SPAN_KIND, validates(o -> Tags.SPAN_KIND_INTERNAL.equals(String.valueOf(o)))); + } + + // The operation and resource names are stored as UTF8BytesString, so we compare by string content + // (CharSequence equality is asymmetric: String#equals(UTF8BytesString) is false). + static SpanMatcher otelSpan(String name) { + return span() + .operationName(java.util.regex.Pattern.compile(java.util.regex.Pattern.quote(name))) + .resourceName((CharSequence cs) -> name.contentEquals(cs)); + } + + /** + * The five reactive types exercised by the test, with their type-specific terminal operations. + */ + enum ReactiveType { + COMPLETABLE("Completable"), + MAYBE("Maybe"), + SINGLE("Single"), + OBSERVABLE("Observable"), + FLOWABLE("Flowable"); + + final String type; + + ReactiveType(String type) { + this.type = type; + } + + /** Runs the blocking terminal operation that drives the async result to completion. */ + void runTerminal(Object asyncType) { + switch (this) { + case COMPLETABLE: + ((Completable) asyncType).blockingAwait(); + break; + case MAYBE: + ((Maybe) asyncType).blockingGet(); + break; + case SINGLE: + ((Single) asyncType).blockingGet(); + break; + case OBSERVABLE: + ((Observable) asyncType).blockingLast(); + break; + case FLOWABLE: + ((Flowable) asyncType).blockingLast(); + break; + default: + throw new IllegalStateException("Unknown type: " + this); + } + } + + /** Subscribes and immediately disposes (cancels) the async result. */ + void subscribeAndDispose(Object asyncType) { + switch (this) { + case COMPLETABLE: + ((Completable) asyncType).subscribe().dispose(); + break; + case MAYBE: + ((Maybe) asyncType).subscribe().dispose(); + break; + case SINGLE: + ((Single) asyncType).subscribe().dispose(); + break; + case OBSERVABLE: + ((Observable) asyncType).subscribe().dispose(); + break; + case FLOWABLE: + ((Flowable) asyncType).subscribe().dispose(); + break; + default: + throw new IllegalStateException("Unknown type: " + this); + } + } + + Object traceAsync(CountDownLatch latch) { + switch (this) { + case COMPLETABLE: + return RxJava3TracedMethods.traceAsyncCompletable(latch); + case MAYBE: + return RxJava3TracedMethods.traceAsyncMaybe(latch); + case SINGLE: + return RxJava3TracedMethods.traceAsyncSingle(latch); + case OBSERVABLE: + return RxJava3TracedMethods.traceAsyncObservable(latch); + case FLOWABLE: + return RxJava3TracedMethods.traceAsyncFlowable(latch); + default: + throw new IllegalStateException("Unknown type: " + this); + } + } + + Object traceAsyncFailing(CountDownLatch latch, Exception exception) { + switch (this) { + case COMPLETABLE: + return RxJava3TracedMethods.traceAsyncFailingCompletable(latch, exception); + case MAYBE: + return RxJava3TracedMethods.traceAsyncFailingMaybe(latch, exception); + case SINGLE: + return RxJava3TracedMethods.traceAsyncFailingSingle(latch, exception); + case OBSERVABLE: + return RxJava3TracedMethods.traceAsyncFailingObservable(latch, exception); + case FLOWABLE: + return RxJava3TracedMethods.traceAsyncFailingFlowable(latch, exception); + default: + throw new IllegalStateException("Unknown type: " + this); + } + } + } + + @ParameterizedTest(name = "test WithSpan annotated async method {0}") + @EnumSource(ReactiveType.class) + void success(ReactiveType type) { + CountDownLatch latch = new CountDownLatch(1); + Object asyncType = type.traceAsync(latch); + + // The span must not be finished before the async result completes. + assertEquals(0, writer.size()); + + latch.countDown(); + type.runTerminal(asyncType); + + String method = "traceAsync" + type.type; + assertTraces( + trace( + otelSpan("RxJava3TracedMethods." + method) + .tags(defaultTags(), otelComponent(), internalSpanKind()))); + } + + @ParameterizedTest(name = "test WithSpan annotated async method failing {0}") + @EnumSource(ReactiveType.class) + void failing(ReactiveType type) { + CountDownLatch latch = new CountDownLatch(1); + IllegalStateException expectedException = new IllegalStateException(EXCEPTION_MESSAGE); + Object asyncType = type.traceAsyncFailing(latch, expectedException); + + assertEquals(0, writer.size()); + + latch.countDown(); + assertThrows(IllegalStateException.class, () -> type.runTerminal(asyncType)); + + String method = "traceAsyncFailing" + type.type; + assertTraces( + trace( + otelSpan("RxJava3TracedMethods." + method) + .error() + .tags( + defaultTags(), + otelComponent(), + internalSpanKind(), + error(IllegalStateException.class, EXCEPTION_MESSAGE)))); + } + + @ParameterizedTest(name = "test WithSpan annotated async method cancelled {0}") + @EnumSource(ReactiveType.class) + void cancelled(ReactiveType type) { + CountDownLatch latch = new CountDownLatch(1); + Object asyncType = type.traceAsync(latch); + + assertEquals(0, writer.size()); + + latch.countDown(); + type.subscribeAndDispose(asyncType); + + String method = "traceAsync" + type.type; + assertTraces( + trace( + otelSpan("RxJava3TracedMethods." + method) + .tags(defaultTags(), otelComponent(), internalSpanKind()))); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3Test.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3Test.java new file mode 100644 index 00000000000..4c2dfbd32e4 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/RxJava3Test.java @@ -0,0 +1,578 @@ +package testdog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.Matchers.validates; +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TagsMatcher.defaultTags; +import static datadog.trace.agent.test.assertions.TagsMatcher.error; +import static datadog.trace.agent.test.assertions.TagsMatcher.tag; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activeSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.agent.test.assertions.SpanMatcher; +import datadog.trace.agent.test.assertions.TagsMatcher; +import datadog.trace.api.Trace; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import datadog.trace.bootstrap.instrumentation.api.Tags; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.core.Maybe; +import io.reactivex.rxjava3.core.Scheduler; +import io.reactivex.rxjava3.schedulers.Schedulers; +import java.util.Arrays; +import java.util.List; +import java.util.function.Supplier; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; + +// NOTE: This test lives in the `testdog` package (not `datadog`) on purpose: the agent ignores +// `datadog.*` classes for instrumentation, so `@Trace`-annotated methods declared under `datadog.*` +// would never be instrumented. See the java-lang-21 tests for the same convention. +class RxJava3Test extends AbstractInstrumentationTest { + + static { + // The reactive chains in these scenarios can finish child spans after the local root has been + // written (e.g. delayed/scheduled work), which trips strict trace write ordering checks. This + // mirrors the Groovy RxJava2Test which also disables strict trace writes for the same reason. + testConfig.strictTraceWrites(false); + } + + static final String EXCEPTION_MESSAGE = "test exception"; + + // The component tag is stored as a UTF8BytesString, so we compare by string content rather than + // using is("trace") which would fail the asymmetric String#equals(UTF8BytesString) check. + static TagsMatcher componentTrace() { + return tag(Tags.COMPONENT, validates(o -> "trace".equals(String.valueOf(o)))); + } + + /** + * Holds the {@code @Trace}-annotated methods used by the scenarios. The captured span ids are + * stored in static fields and read back by the asserting test methods to express cross-span + * parent relationships. + */ + static class Worker { + static long traceParentId; + static long publisherParentId; + static long intermediateId; + + static int addOne(int i) { + return addOneTraced(i); + } + + @Trace(operationName = "addOne", resourceName = "addOne") + static int addOneTraced(int i) { + return i + 1; + } + + static int addTwo(int i) { + return addTwoTraced(i); + } + + @Trace(operationName = "addTwo", resourceName = "addTwo") + static int addTwoTraced(int i) { + return i + 2; + } + + static Object throwException() { + throw new RuntimeException(EXCEPTION_MESSAGE); + } + + @Trace(operationName = "trace-parent", resourceName = "trace-parent") + @SuppressWarnings("unchecked") + static Object assemblePublisherUnderTrace(Supplier publisherSupplier) { + traceParentId = activeSpan().getSpanId(); + AgentSpan span = startSpan("test", "publisher-parent"); + publisherParentId = span.getSpanId(); + // After this activation, the operations below should be children of this span + AgentScope scope = activateSpan(span); + + Object publisher = publisherSupplier.get(); + try { + // Read all data from publisher + if (publisher instanceof Maybe) { + return ((Maybe) publisher).blockingGet(); + } else if (publisher instanceof Flowable) { + List list = ((Flowable) publisher).toList().blockingGet(); + return list.toArray(new Object[0]); + } + throw new RuntimeException("Unknown publisher: " + publisher); + } finally { + span.finish(); + scope.close(); + } + } + + @Trace(operationName = "trace-parent", resourceName = "trace-parent") + static void cancelUnderTrace(Supplier publisherSupplier) { + traceParentId = activeSpan().getSpanId(); + AgentSpan span = startSpan("test", "publisher-parent"); + publisherParentId = span.getSpanId(); + AgentScope scope = activateSpan(span); + + Object publisher = publisherSupplier.get(); + Flowable flowable; + if (publisher instanceof Maybe) { + flowable = ((Maybe) publisher).toFlowable(); + } else { + flowable = (Flowable) publisher; + } + + flowable.subscribe( + new Subscriber() { + @Override + public void onSubscribe(Subscription subscription) { + subscription.cancel(); + } + + @Override + public void onNext(Object t) {} + + @Override + public void onError(Throwable error) {} + + @Override + public void onComplete() {} + }); + + scope.close(); + span.finish(); + } + + @Trace(operationName = "trace-parent", resourceName = "trace-parent") + static Object runUnderTraceParent(Supplier work) { + traceParentId = activeSpan().getSpanId(); + return work.get(); + } + } + + // --- Publisher success --------------------------------------------------- + + static List publisherSuccessArgs() { + return Arrays.asList( + Arguments.of( + "basic maybe", + new Object[] {2}, + 1, + (Supplier) () -> Maybe.just(1).map(Worker::addOne)), + Arguments.of( + "two operations maybe", + new Object[] {4}, + 2, + (Supplier) () -> Maybe.just(2).map(Worker::addOne).map(Worker::addOne)), + Arguments.of( + "delayed maybe", + new Object[] {4}, + 1, + (Supplier) () -> Maybe.just(3).delay(100, MILLISECONDS).map(Worker::addOne)), + Arguments.of( + "delayed twice maybe", + new Object[] {6}, + 2, + (Supplier) + () -> + Maybe.just(4) + .delay(100, MILLISECONDS) + .map(Worker::addOne) + .delay(100, MILLISECONDS) + .map(Worker::addOne)), + Arguments.of( + "basic flowable", + new Object[] {6, 7}, + 2, + (Supplier) + () -> Flowable.fromIterable(Arrays.asList(5, 6)).map(Worker::addOne)), + Arguments.of( + "two operations flowable", + new Object[] {8, 9}, + 4, + (Supplier) + () -> + Flowable.fromIterable(Arrays.asList(6, 7)) + .map(Worker::addOne) + .map(Worker::addOne)), + Arguments.of( + "delayed flowable", + new Object[] {8, 9}, + 2, + (Supplier) + () -> + Flowable.fromIterable(Arrays.asList(7, 8)) + .delay(100, MILLISECONDS) + .map(Worker::addOne)), + Arguments.of( + "delayed twice flowable", + new Object[] {10, 11}, + 4, + (Supplier) + () -> + Flowable.fromIterable(Arrays.asList(8, 9)) + .delay(100, MILLISECONDS) + .map(Worker::addOne) + .delay(100, MILLISECONDS) + .map(Worker::addOne)), + Arguments.of( + "maybe from callable", + new Object[] {12}, + 2, + (Supplier) + () -> Maybe.fromCallable(() -> Worker.addOne(10)).map(Worker::addOne))); + } + + @ParameterizedTest(name = "Publisher ''{0}'' test") + @MethodSource("publisherSuccessArgs") + void publisherSuccess(String name, Object[] expected, int workSpans, Supplier supplier) { + Object result = Worker.assemblePublisherUnderTrace(supplier); + + if (expected.length == 1) { + assertEquals(expected[0], result); + } else { + assertArrayEquals(expected, (Object[]) result); + } + + SpanMatcher[] matchers = new SpanMatcher[workSpans + 2]; + matchers[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(componentTrace(), defaultTags()); + matchers[1] = + span() + .id(Worker.publisherParentId) + .childOf(Worker.traceParentId) + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < workSpans; i++) { + matchers[2 + i] = + span() + .childOf(Worker.publisherParentId) + .operationName("addOne") + .resourceName("addOne") + .tags(componentTrace(), defaultTags()); + } + + assertTraces(trace(SORT_BY_START_TIME, matchers)); + } + + // --- Publisher error ----------------------------------------------------- + + static List publisherErrorArgs() { + return Arrays.asList( + Arguments.of( + "maybe", (Supplier) () -> Maybe.error(new RuntimeException(EXCEPTION_MESSAGE))), + Arguments.of( + "flowable", + (Supplier) () -> Flowable.error(new RuntimeException(EXCEPTION_MESSAGE)))); + } + + @ParameterizedTest(name = "Publisher error ''{0}'' test") + @MethodSource("publisherErrorArgs") + void publisherError(String name, Supplier supplier) { + RuntimeException exception = + assertThrows(RuntimeException.class, () -> Worker.assemblePublisherUnderTrace(supplier)); + assertEquals(EXCEPTION_MESSAGE, exception.getMessage()); + + assertTraces( + trace( + SORT_BY_START_TIME, + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .error() + .tags( + componentTrace(), + error(RuntimeException.class, EXCEPTION_MESSAGE), + defaultTags()), + // It's important that we don't attach errors at the reactive level so that we don't + // impact the spans on reactive integrations such as netty and lettuce. + span() + .id(Worker.publisherParentId) + .childOf(Worker.traceParentId) + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()))); + } + + // --- Publisher step error ------------------------------------------------ + + static List publisherStepErrorArgs() { + return Arrays.asList( + Arguments.of( + "basic maybe failure", + 1, + (Supplier) + () -> Maybe.just(1).map(Worker::addOne).map(i -> Worker.throwException())), + Arguments.of( + "basic flowable failure", + 1, + (Supplier) + () -> + Flowable.fromIterable(Arrays.asList(5, 6)) + .map(Worker::addOne) + .map(i -> Worker.throwException()))); + } + + @ParameterizedTest(name = "Publisher step ''{0}'' test") + @MethodSource("publisherStepErrorArgs") + void publisherStepError(String name, int workSpans, Supplier supplier) { + RuntimeException exception = + assertThrows(RuntimeException.class, () -> Worker.assemblePublisherUnderTrace(supplier)); + assertEquals(EXCEPTION_MESSAGE, exception.getMessage()); + + SpanMatcher[] matchers = new SpanMatcher[workSpans + 2]; + matchers[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .error() + .tags( + componentTrace(), error(RuntimeException.class, EXCEPTION_MESSAGE), defaultTags()); + matchers[1] = + span() + .id(Worker.publisherParentId) + .childOf(Worker.traceParentId) + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < workSpans; i++) { + matchers[2 + i] = + span() + .childOf(Worker.publisherParentId) + .operationName("addOne") + .resourceName("addOne") + .tags(componentTrace(), defaultTags()); + } + + assertTraces(trace(SORT_BY_START_TIME, matchers)); + } + + // --- Cancel -------------------------------------------------------------- + + static List cancelArgs() { + return Arrays.asList( + Arguments.of("basic maybe", (Supplier) () -> Maybe.just(1)), + Arguments.of( + "basic flowable", (Supplier) () -> Flowable.fromIterable(Arrays.asList(5, 6)))); + } + + @ParameterizedTest(name = "Publisher ''{0}'' cancel") + @MethodSource("cancelArgs") + void cancel(String name, Supplier supplier) { + Worker.cancelUnderTrace(supplier); + + assertTraces( + trace( + SORT_BY_START_TIME, + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(componentTrace(), defaultTags()), + span() + .id(Worker.publisherParentId) + .childOf(Worker.traceParentId) + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()))); + } + + // --- Chain spans correct parent ------------------------------------------ + + static List chainParentArgs() { + return Arrays.asList( + Arguments.of( + "basic maybe", + 3, + (Supplier) + () -> + Maybe.just(1) + .map(Worker::addOne) + .map(Worker::addOne) + .concatWith(Maybe.just(1).map(Worker::addOne))), + Arguments.of( + "basic flowable", + 5, + (Supplier) + () -> + Flowable.fromIterable(Arrays.asList(5, 6)) + .map(Worker::addOne) + .map(Worker::addOne) + .concatWith(Maybe.just(1).map(Worker::addOne).toFlowable()))); + } + + @ParameterizedTest(name = "Publisher chain spans have the correct parent for ''{0}''") + @MethodSource("chainParentArgs") + void chainParent(String name, int workSpans, Supplier supplier) { + Worker.assemblePublisherUnderTrace(supplier); + + SpanMatcher[] matchers = new SpanMatcher[workSpans + 2]; + matchers[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(componentTrace(), defaultTags()); + matchers[1] = + span() + .id(Worker.publisherParentId) + .childOf(Worker.traceParentId) + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + for (int i = 0; i < workSpans; i++) { + matchers[2 + i] = + span() + .childOf(Worker.publisherParentId) + .operationName("addOne") + .resourceName("addOne") + .tags(componentTrace(), defaultTags()); + } + + assertTraces(trace(SORT_BY_START_TIME, matchers)); + } + + // --- Correct parents from subscription time (blockingGet) ---------------- + + @Test + void correctParentsFromSubscriptionTimeBlockingGet() { + Maybe maybe = Maybe.just(42).map(Worker::addOne).map(Worker::addTwo); + + Worker.runUnderTraceParent( + () -> { + maybe.blockingGet(); + return null; + }); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("trace-parent").resourceName("trace-parent"), + span() + .childOf(Worker.traceParentId) + .operationName("addOne") + .tags(componentTrace(), defaultTags()), + span() + .childOf(Worker.traceParentId) + .operationName("addTwo") + .tags(componentTrace(), defaultTags()))); + } + + // --- Correct parents from subscription time (intermediate span) ---------- + + static List subscriptionTimeIntermediateArgs() { + return Arrays.asList( + Arguments.of("basic maybe", 1, (Supplier) () -> Maybe.just(1).map(Worker::addOne)), + Arguments.of( + "basic flowable", + 2, + (Supplier) + () -> Flowable.fromIterable(Arrays.asList(1, 2)).map(Worker::addOne))); + } + + @ParameterizedTest( + name = "Publisher chain spans have the correct parents from subscription time ''{0}''") + @MethodSource("subscriptionTimeIntermediateArgs") + @SuppressWarnings("unchecked") + void correctParentsFromSubscriptionTime(String name, int workItems, Supplier supplier) { + Worker.assemblePublisherUnderTrace( + () -> { + // The "add one" operations in the publisher created here should be children of the + // publisher-parent, while the "add two" operations should be children of the + // intermediate. + Object publisher = supplier.get(); + + AgentSpan intermediate = startSpan("test", "intermediate"); + Worker.intermediateId = intermediate.getSpanId(); + AgentScope scope = activateSpan(intermediate); + try { + if (publisher instanceof Maybe) { + return ((Maybe) publisher).map(Worker::addTwo); + } else if (publisher instanceof Flowable) { + return ((Flowable) publisher).map(Worker::addTwo); + } + throw new IllegalStateException("Unknown publisher type"); + } finally { + intermediate.finish(); + scope.close(); + } + }); + + SpanMatcher[] matchers = new SpanMatcher[3 + 2 * workItems]; + matchers[0] = + span() + .root() + .operationName("trace-parent") + .resourceName("trace-parent") + .tags(componentTrace(), defaultTags()); + matchers[1] = + span() + .id(Worker.publisherParentId) + .childOf(Worker.traceParentId) + .operationName("publisher-parent") + .resourceName("publisher-parent") + .tags(defaultTags()); + matchers[2] = + span() + .id(Worker.intermediateId) + .childOf(Worker.publisherParentId) + .operationName("intermediate") + .resourceName("intermediate") + .tags(defaultTags()); + for (int i = 0; i < 2 * workItems; i += 2) { + matchers[3 + i] = + span() + .childOf(Worker.publisherParentId) + .operationName("addOne") + .tags(componentTrace(), defaultTags()); + matchers[4 + i] = + span() + .childOf(Worker.publisherParentId) + .operationName("addTwo") + .tags(componentTrace(), defaultTags()); + } + + assertTraces(trace(SORT_BY_START_TIME, matchers)); + } + + // --- Schedulers ---------------------------------------------------------- + + static List schedulerArgs() { + return Arrays.asList( + Arguments.of("new-thread", Schedulers.newThread()), + Arguments.of("computation", Schedulers.computation()), + Arguments.of("single", Schedulers.single()), + Arguments.of("trampoline", Schedulers.trampoline())); + } + + @ParameterizedTest(name = "Flowables produce the right number of results on ''{0}'' scheduler") + @MethodSource("schedulerArgs") + void schedulers(String schedulerName, Scheduler scheduler) { + List values = + Flowable.fromIterable(Arrays.asList(1, 2, 3, 4)) + .parallel() + .runOn(scheduler) + .flatMap( + num -> + Maybe.just(num.toString() + " on " + Thread.currentThread().getName()) + .toFlowable()) + .sequential() + .toList() + .blockingGet(); + + assertEquals(4, values.size()); + } +} diff --git a/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/SubscriptionTest.java b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/SubscriptionTest.java new file mode 100644 index 00000000000..441338f78f9 --- /dev/null +++ b/dd-java-agent/instrumentation/rxjava/rxjava-3.0/src/test/java/testdog/trace/instrumentation/rxjava3/SubscriptionTest.java @@ -0,0 +1,50 @@ +package testdog.trace.instrumentation.rxjava3; + +import static datadog.trace.agent.test.assertions.SpanMatcher.span; +import static datadog.trace.agent.test.assertions.TraceMatcher.SORT_BY_START_TIME; +import static datadog.trace.agent.test.assertions.TraceMatcher.trace; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.activateSpan; +import static datadog.trace.bootstrap.instrumentation.api.AgentTracer.startSpan; + +import datadog.trace.agent.test.AbstractInstrumentationTest; +import datadog.trace.bootstrap.instrumentation.api.AgentScope; +import datadog.trace.bootstrap.instrumentation.api.AgentSpan; +import io.reactivex.rxjava3.core.Maybe; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import org.junit.jupiter.api.Test; + +class SubscriptionTest extends AbstractInstrumentationTest { + + @Test + void subscriptionPropagatesParentSpan() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + + AgentSpan parent = startSpan("test", "parent"); + try (AgentScope scope = activateSpan(parent)) { + Maybe connection = Maybe.create(emitter -> emitter.onSuccess(new Connection())); + connection.subscribe( + c -> { + c.query(); + latch.countDown(); + }); + } finally { + parent.finish(); + } + latch.await(); + + assertTraces( + trace( + SORT_BY_START_TIME, + span().root().operationName("parent"), + span().childOfPrevious().operationName("Connection.query"))); + } + + static class Connection { + int query() { + AgentSpan span = startSpan("test", "Connection.query"); + span.finish(); + return new Random().nextInt(); + } + } +} diff --git a/metadata/supported-configurations.json b/metadata/supported-configurations.json index b70f1b960d3..d96dd8ae147 100644 --- a/metadata/supported-configurations.json +++ b/metadata/supported-configurations.json @@ -9545,6 +9545,14 @@ "aliases": ["DD_TRACE_INTEGRATION_RXJAVA_ENABLED", "DD_INTEGRATION_RXJAVA_ENABLED"] } ], + "DD_TRACE_RXJAVA_3_ENABLED": [ + { + "version": "A", + "type": "boolean", + "default": "true", + "aliases": ["DD_TRACE_INTEGRATION_RXJAVA_3_ENABLED", "DD_INTEGRATION_RXJAVA_3_ENABLED"] + } + ], "DD_TRACE_S3_ENABLED": [ { "version": "A", diff --git a/settings.gradle.kts b/settings.gradle.kts index 487f6275cf1..3cd650267b6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -556,6 +556,7 @@ include( ":dd-java-agent:instrumentation:rs:jax-rs:jax-rs-client:jax-rs-client-2.0", ":dd-java-agent:instrumentation:rxjava:rxjava-1.0", ":dd-java-agent:instrumentation:rxjava:rxjava-2.0", + ":dd-java-agent:instrumentation:rxjava:rxjava-3.0", ":dd-java-agent:instrumentation:scala:scala-concurrent-2.8", ":dd-java-agent:instrumentation:scala:scala-promise:scala-promise-2.10", ":dd-java-agent:instrumentation:scala:scala-promise:scala-promise-2.13",