-
Notifications
You must be signed in to change notification settings - Fork 337
feat(rxjava3): add RxJava 3 context propagation instrumentation #11505
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
c9ce2fb
d877672
5f40db9
5b72730
d0591b2
00fa55a
a3779c2
66ab94b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,27 @@ | ||
| muzzle { | ||
| pass { | ||
| group = "io.reactivex.rxjava3" | ||
| module = "rxjava" | ||
| versions = "[3.0.0,)" | ||
| assertInverse = true | ||
| } | ||
| } | ||
|
|
||
| apply from: "$rootDir/gradle/java.gradle" | ||
|
|
||
| addTestSuiteForDir('latestDepTest', 'test') | ||
|
|
||
| dependencies { | ||
| compileOnly group: 'org.reactivestreams', name: 'reactive-streams', version: '1.0.3' | ||
| compileOnly group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.0.0' | ||
|
|
||
| testImplementation project(':dd-java-agent:instrumentation:datadog:tracing:trace-annotation') | ||
| testImplementation project(':dd-java-agent:instrumentation:opentelemetry:opentelemetry-annotations-1.20') | ||
|
|
||
| testImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.1.10' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the minimum version to test is |
||
| testImplementation group: 'io.opentelemetry.instrumentation', name: 'opentelemetry-instrumentation-annotations', version: '1.28.0' | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would also add previous versions of this instrumentation in order to check for overlaps when testing |
||
|
|
||
| testImplementation libs.junit.jupiter | ||
|
|
||
| latestDepTestImplementation group: 'io.reactivex.rxjava3', name: 'rxjava', version: '3.+' | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,73 @@ | ||
| package datadog.trace.instrumentation.rxjava3; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import datadog.context.Context; | ||
| import datadog.context.ContextScope; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; | ||
| import io.reactivex.rxjava3.core.Completable; | ||
| import io.reactivex.rxjava3.core.CompletableObserver; | ||
| import net.bytebuddy.asm.Advice; | ||
|
|
||
| public final class CompletableInstrumentation | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "io.reactivex.rxjava3.core.Completable"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("subscribe")) | ||
| .and(takesArguments(1)) | ||
| .and(takesArgument(0, named("io.reactivex.rxjava3.core.CompletableObserver"))), | ||
| getClass().getName() + "$PropagateParentSpanAdvice"); | ||
| } | ||
|
|
||
| public static class CaptureParentSpanAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onConstruct(@Advice.This final Completable completable) { | ||
| Context parentContext = Java8BytecodeBridge.getCurrentContext(); | ||
| if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { | ||
| InstrumentationContext.get(Completable.class, Context.class) | ||
| .put(completable, parentContext); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class PropagateParentSpanAdvice { | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static ContextScope onSubscribe( | ||
| @Advice.This final Completable completable, | ||
| @Advice.Argument(value = 0, readOnly = false) CompletableObserver observer) { | ||
| if (observer != null) { | ||
| Context parentContext = | ||
| InstrumentationContext.get(Completable.class, Context.class).get(completable); | ||
| if (parentContext != null) { | ||
| // wrap the observer so spans from its events treat the captured span as their parent | ||
| observer = new TracingCompletableObserver(observer, parentContext); | ||
| // attach the context here in case additional observers are created during subscribe | ||
| return parentContext.attach(); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void closeScope(@Advice.Enter final ContextScope scope) { | ||
| if (scope != null) { | ||
| scope.close(); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,83 @@ | ||
| package datadog.trace.instrumentation.rxjava3; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import datadog.context.Context; | ||
| import datadog.context.ContextScope; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; | ||
| import io.reactivex.rxjava3.core.Flowable; | ||
| import net.bytebuddy.asm.Advice; | ||
| import net.bytebuddy.implementation.bytecode.assign.Assigner; | ||
| import org.reactivestreams.Subscriber; | ||
|
|
||
| public final class FlowableInstrumentation | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
|
|
||
| @Override | ||
| public String instrumentedType() { | ||
| return "io.reactivex.rxjava3.core.Flowable"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); | ||
| // Hook both the org.reactivestreams.Subscriber overload and the RxJava-3-specific | ||
| // FlowableSubscriber overload. DYNAMIC typing allows the same Advice class to write back a | ||
| // TracingSubscriber (which implements both interfaces) into either argument slot. | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("subscribe")) | ||
| .and(takesArguments(1)) | ||
| .and(takesArgument(0, named("org.reactivestreams.Subscriber"))), | ||
| getClass().getName() + "$PropagateParentSpanAdvice"); | ||
|
Comment on lines
+33
to
+38
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. prise to have spotted both method. However it looks like that this version delegate to the method above that's public final. So that this can be avoided to be instrumented since it only delegates to the latter |
||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("subscribe")) | ||
| .and(takesArguments(1)) | ||
| .and(takesArgument(0, named("io.reactivex.rxjava3.core.FlowableSubscriber"))), | ||
| getClass().getName() + "$PropagateParentSpanAdvice"); | ||
| } | ||
|
|
||
| public static class CaptureParentSpanAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onConstruct(@Advice.This final Flowable<?> flowable) { | ||
| Context parentContext = Java8BytecodeBridge.getCurrentContext(); | ||
| if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { | ||
| InstrumentationContext.get(Flowable.class, Context.class).put(flowable, parentContext); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class PropagateParentSpanAdvice { | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static ContextScope onSubscribe( | ||
| @Advice.This final Flowable<?> flowable, | ||
| @Advice.Argument(value = 0, readOnly = false, typing = Assigner.Typing.DYNAMIC) | ||
| Subscriber<?> subscriber) { | ||
| if (subscriber != null) { | ||
| Context parentContext = | ||
| InstrumentationContext.get(Flowable.class, Context.class).get(flowable); | ||
| if (parentContext != null) { | ||
| // wrap the subscriber so spans from its events treat the captured span as their parent | ||
| subscriber = new TracingSubscriber<>(subscriber, parentContext); | ||
| // attach the context here in case additional subscribers are created during subscribe | ||
| return parentContext.attach(); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void closeScope(@Advice.Enter final ContextScope scope) { | ||
| if (scope != null) { | ||
| scope.close(); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| package datadog.trace.instrumentation.rxjava3; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import datadog.context.Context; | ||
| import datadog.context.ContextScope; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; | ||
| import io.reactivex.rxjava3.core.Maybe; | ||
| import io.reactivex.rxjava3.core.MaybeObserver; | ||
| import net.bytebuddy.asm.Advice; | ||
|
|
||
| public final class MaybeInstrumentation | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
| @Override | ||
| public String instrumentedType() { | ||
| return "io.reactivex.rxjava3.core.Maybe"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("subscribe")) | ||
| .and(takesArguments(1)) | ||
| .and(takesArgument(0, named("io.reactivex.rxjava3.core.MaybeObserver"))), | ||
| getClass().getName() + "$PropagateParentSpanAdvice"); | ||
| } | ||
|
|
||
| public static class CaptureParentSpanAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onConstruct(@Advice.This final Maybe<?> maybe) { | ||
| Context parentContext = Java8BytecodeBridge.getCurrentContext(); | ||
| if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { | ||
| InstrumentationContext.get(Maybe.class, Context.class).put(maybe, parentContext); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class PropagateParentSpanAdvice { | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static ContextScope onSubscribe( | ||
| @Advice.This final Maybe<?> maybe, | ||
| @Advice.Argument(value = 0, readOnly = false) MaybeObserver<?> observer) { | ||
| if (observer != null) { | ||
| Context parentContext = InstrumentationContext.get(Maybe.class, Context.class).get(maybe); | ||
| if (parentContext != null) { | ||
| // wrap the observer so spans from its events treat the captured span as their parent | ||
| observer = new TracingMaybeObserver<>(observer, parentContext); | ||
| // attach the context here in case additional observers are created during subscribe | ||
| return parentContext.attach(); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void closeScope(@Advice.Enter final ContextScope scope) { | ||
| if (scope != null) { | ||
| scope.close(); | ||
| } | ||
| } | ||
| } | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,71 @@ | ||
| package datadog.trace.instrumentation.rxjava3; | ||
|
|
||
| import static datadog.trace.agent.tooling.bytebuddy.matcher.NameMatchers.named; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isConstructor; | ||
| import static net.bytebuddy.matcher.ElementMatchers.isMethod; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArgument; | ||
| import static net.bytebuddy.matcher.ElementMatchers.takesArguments; | ||
|
|
||
| import datadog.context.Context; | ||
| import datadog.context.ContextScope; | ||
| import datadog.trace.agent.tooling.Instrumenter; | ||
| import datadog.trace.bootstrap.InstrumentationContext; | ||
| import datadog.trace.bootstrap.instrumentation.api.Java8BytecodeBridge; | ||
| import io.reactivex.rxjava3.core.Observable; | ||
| import io.reactivex.rxjava3.core.Observer; | ||
| import net.bytebuddy.asm.Advice; | ||
|
|
||
| public final class ObservableInstrumentation | ||
| implements Instrumenter.ForSingleType, Instrumenter.HasMethodAdvice { | ||
| @Override | ||
| public String instrumentedType() { | ||
| return "io.reactivex.rxjava3.core.Observable"; | ||
| } | ||
|
|
||
| @Override | ||
| public void methodAdvice(MethodTransformer transformer) { | ||
| transformer.applyAdvice(isConstructor(), getClass().getName() + "$CaptureParentSpanAdvice"); | ||
| transformer.applyAdvice( | ||
| isMethod() | ||
| .and(named("subscribe")) | ||
| .and(takesArguments(1)) | ||
| .and(takesArgument(0, named("io.reactivex.rxjava3.core.Observer"))), | ||
| getClass().getName() + "$PropagateParentSpanAdvice"); | ||
| } | ||
|
|
||
| public static class CaptureParentSpanAdvice { | ||
| @Advice.OnMethodExit(suppress = Throwable.class) | ||
| public static void onConstruct(@Advice.This final Observable<?> observable) { | ||
| Context parentContext = Java8BytecodeBridge.getCurrentContext(); | ||
| if (parentContext != null && parentContext != Java8BytecodeBridge.getRootContext()) { | ||
| InstrumentationContext.get(Observable.class, Context.class).put(observable, parentContext); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public static class PropagateParentSpanAdvice { | ||
| @Advice.OnMethodEnter(suppress = Throwable.class) | ||
| public static ContextScope onSubscribe( | ||
| @Advice.This final Observable<?> observable, | ||
| @Advice.Argument(value = 0, readOnly = false) Observer<?> observer) { | ||
| if (observer != null) { | ||
| Context parentContext = | ||
| InstrumentationContext.get(Observable.class, Context.class).get(observable); | ||
| if (parentContext != null) { | ||
| // wrap the observer so spans from its events treat the captured span as their parent | ||
| observer = new TracingObserver<>(observer, parentContext); | ||
| // attach the context here in case additional observers are created during subscribe | ||
| return parentContext.attach(); | ||
| } | ||
| } | ||
| return null; | ||
| } | ||
|
|
||
| @Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class) | ||
| public static void closeScope(@Advice.Enter final ContextScope scope) { | ||
| if (scope != null) { | ||
| scope.close(); | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not enough. previous versions of rxjava has
io.reactivex.rxjava2group hence a specificfailsection is needed here