4545import scouter .lang .step .ParameterizedMessageStep ;
4646import scouter .util .StringUtil ;
4747
48+ import java .lang .reflect .Method ;
4849import java .util .function .BiFunction ;
4950import java .util .function .Consumer ;
5051import java .util .function .Function ;
5152
5253public class ReactiveSupportWithCoroutine implements IReactiveSupport {
5354
5455 static Configure configure = Configure .getInstance ();
56+ private Method subscriberContextMethod ;
57+ private static Method isCheckpoint ;
58+ private static boolean isReactor34 ;
59+
60+ public ReactiveSupportWithCoroutine () {
61+ isReactor34 = ReactiveSupportUtils .isSupportReactor34 ();
62+ try {
63+ if (isReactor34 ) {
64+ subscriberContextMethod = Mono .class .getMethod ("contextWrite" , Function .class );
65+ Class <?> assemblySnapshotClass = Class .forName ("reactor.core.publisher.FluxOnAssembly$AssemblySnapshot" );
66+ isCheckpoint = assemblySnapshotClass .getDeclaredMethod ("isCheckpoint" );
67+ isCheckpoint .setAccessible (true );
68+ } else {
69+ subscriberContextMethod = Mono .class .getMethod ("subscriberContext" , Function .class );
70+ }
71+ } catch (Exception e ) {
72+ throw new RuntimeException (e );
73+ }
74+ }
5575
5676 @ Override
5777 public Object subscriptOnContext (Object mono0 , final TraceContext traceContext ) {
@@ -61,12 +81,16 @@ public Object subscriptOnContext(Object mono0, final TraceContext traceContext)
6181 }
6282 Mono <?> mono = (Mono <?>) mono0 ;
6383 traceContext .isReactiveTxidMarked = true ;
64- return mono .subscriberContext (new Function <Context , Context >() {
84+ Mono <?> monoChain ;
85+ Function <Context , Context > func = new Function <Context , Context >() {
6586 @ Override
6687 public Context apply (Context context ) {
6788 return context .put (TraceContext .class , traceContext );
6889 }
69- }).doOnSuccess (new Consumer <Object >() {
90+ };
91+
92+ monoChain = (Mono <?>) subscriberContextMethod .invoke (mono , func );
93+ return monoChain .doOnSuccess (new Consumer <Object >() {
7094 @ Override
7195 public void accept (Object o ) {
7296 TraceMain .endHttpService (new TraceMain .Stat (traceContext ), null );
@@ -161,6 +185,7 @@ public static class TxidLifter<T> implements SpanSubscription<T>, Scannable {
161185 private final String checkpointDesc ;
162186 private final Integer depth ;
163187 private Subscription orgSubs ;
188+ private boolean isReactor34 ;
164189
165190 private enum ReactorCheckPointType {
166191 ON_SUBSCRIBE ,
@@ -176,9 +201,10 @@ public TxidLifter(CoreSubscriber<T> coreSubscriber, Scannable scannable, Publish
176201 this .scannable = scannable ;
177202 this .publisher = publisher ;
178203 this .traceContext = traceContext ;
204+ this .isReactor34 = isReactor34 ;
179205
180206 Tuple .StringLongPair checkpointPair = ScouterOptimizableOperatorProxy
181- .nameOnCheckpoint (scannable , configure .profile_reactor_checkpoint_search_depth );
207+ .nameOnCheckpoint (scannable , configure .profile_reactor_checkpoint_search_depth , isReactor34 , isCheckpoint );
182208 checkpointDesc = checkpointPair .aString ;
183209
184210 Integer parentDepth = context .getOrDefault (SubscribeDepth .class , 0 );
@@ -335,4 +361,9 @@ public String dumpScannable(TraceContext traceContext, TraceContext.TimedScannab
335361 ScouterOptimizableOperatorProxy .appendSources4Dump (scannable , builder , configure .profile_reactor_checkpoint_search_depth );
336362 return builder .toString ();
337363 }
364+
365+ @ Override
366+ public boolean isReactor34 () {
367+ return isReactor34 ;
368+ }
338369}
0 commit comments