2424import static org .junit .Assert .assertTrue ;
2525import static org .junit .Assume .assumeFalse ;
2626
27+ import com .google .api .gax .grpc .GrpcInterceptorProvider ;
2728import com .google .cloud .NoCredentials ;
29+ import com .google .cloud .grpc .GcpManagedChannel ;
2830import com .google .cloud .spanner .MockSpannerServiceImpl .SimulatedExecutionTime ;
2931import com .google .cloud .spanner .connection .AbstractMockServerTest ;
32+ import com .google .common .collect .ImmutableList ;
3033import com .google .spanner .v1 .BatchCreateSessionsRequest ;
3134import com .google .spanner .v1 .BeginTransactionRequest ;
3235import com .google .spanner .v1 .ExecuteSqlRequest ;
3336import io .grpc .Attributes ;
37+ import io .grpc .CallOptions ;
38+ import io .grpc .Channel ;
39+ import io .grpc .ClientCall ;
40+ import io .grpc .ClientInterceptor ;
3441import io .grpc .Context ;
3542import io .grpc .Deadline ;
3643import io .grpc .ManagedChannelBuilder ;
3744import io .grpc .Metadata ;
45+ import io .grpc .MethodDescriptor ;
3846import io .grpc .ServerCall ;
3947import io .grpc .ServerCall .Listener ;
4048import io .grpc .ServerCallHandler ;
6371@ RunWith (JUnit4 .class )
6472public class RetryOnDifferentGrpcChannelMockServerTest extends AbstractMockServerTest {
6573 private static final Map <String , Set <InetSocketAddress >> SERVER_ADDRESSES = new HashMap <>();
74+
75+ /** Tracks the physical channel IDs from request ID headers (set by grpc-gcp). */
6676 private static final Map <String , Set <Long >> CHANNEL_HINTS = new HashMap <>();
6777
78+ /** Tracks the logical affinity keys before grpc-gcp routes the request. */
79+ private static final Map <String , Set <String >> LOGICAL_AFFINITY_KEYS = new HashMap <>();
80+
6881 @ BeforeClass
6982 public static void startStaticServer () throws IOException {
7083 System .setProperty ("spanner.retry_deadline_exceeded_on_different_channel" , "true" );
@@ -80,10 +93,38 @@ public static void removeSystemProperty() {
8093 public void clearRequests () {
8194 SERVER_ADDRESSES .clear ();
8295 CHANNEL_HINTS .clear ();
96+ LOGICAL_AFFINITY_KEYS .clear ();
8397 mockSpanner .clearRequests ();
8498 mockSpanner .removeAllExecutionTimes ();
8599 }
86100
101+ /**
102+ * Creates a client interceptor that captures the logical affinity key before grpc-gcp routes the
103+ * request. This allows us to verify that retry logic uses distinct logical channel hints, even
104+ * when DCP maps them to fewer physical channels.
105+ */
106+ static GrpcInterceptorProvider createAffinityKeyInterceptorProvider () {
107+ return () ->
108+ ImmutableList .of (
109+ new ClientInterceptor () {
110+ @ Override
111+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
112+ MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
113+ // Capture the AFFINITY_KEY before grpc-gcp processes it
114+ String affinityKey = callOptions .getOption (GcpManagedChannel .AFFINITY_KEY );
115+ if (affinityKey != null ) {
116+ String methodName = method .getFullMethodName ();
117+ synchronized (LOGICAL_AFFINITY_KEYS ) {
118+ Set <String > keys =
119+ LOGICAL_AFFINITY_KEYS .computeIfAbsent (methodName , k -> new HashSet <>());
120+ keys .add (affinityKey );
121+ }
122+ }
123+ return next .newCall (method , callOptions );
124+ }
125+ });
126+ }
127+
87128 static ServerInterceptor createServerInterceptor () {
88129 return new ServerInterceptor () {
89130 @ Override
@@ -136,7 +177,8 @@ SpannerOptions.Builder createSpannerOptionsBuilder() {
136177 .setProjectId ("my-project" )
137178 .setHost (String .format ("http://localhost:%d" , getPort ()))
138179 .setChannelConfigurator (ManagedChannelBuilder ::usePlaintext )
139- .setCredentials (NoCredentials .getInstance ());
180+ .setCredentials (NoCredentials .getInstance ())
181+ .setInterceptorProvider (createAffinityKeyInterceptorProvider ());
140182 }
141183
142184 @ Test
@@ -172,9 +214,10 @@ public void testReadWriteTransaction_retriesOnNewChannel() {
172214 List <BeginTransactionRequest > requests =
173215 mockSpanner .getRequestsOfType (BeginTransactionRequest .class );
174216 assertNotEquals (requests .get (0 ).getSession (), requests .get (1 ).getSession ());
217+ // Verify that the retry used 2 distinct logical affinity keys (before grpc-gcp routing).
175218 assertEquals (
176219 2 ,
177- CHANNEL_HINTS
220+ LOGICAL_AFFINITY_KEYS
178221 .getOrDefault ("google.spanner.v1.Spanner/BeginTransaction" , new HashSet <>())
179222 .size ());
180223 }
@@ -216,9 +259,11 @@ public void testReadWriteTransaction_stopsRetrying() {
216259 Set <String > sessions =
217260 requests .stream ().map (BeginTransactionRequest ::getSession ).collect (Collectors .toSet ());
218261 assertEquals (numChannels , sessions .size ());
262+ // Verify that the retry logic used distinct logical affinity keys (before grpc-gcp routing).
263+ // This confirms each retry attempt targeted a different logical channel.
219264 assertEquals (
220265 numChannels ,
221- CHANNEL_HINTS
266+ LOGICAL_AFFINITY_KEYS
222267 .getOrDefault ("google.spanner.v1.Spanner/BeginTransaction" , new HashSet <>())
223268 .size ());
224269 }
@@ -290,9 +335,11 @@ public void testDenyListedChannelIsCleared() {
290335 // of the first transaction. That fails, the session is deny-listed, the transaction is
291336 // retried on yet another session and succeeds.
292337 assertEquals (numChannels + 1 , sessions .size ());
338+ // Verify that the retry logic used distinct logical affinity keys (before grpc-gcp routing).
339+ // This confirms each retry attempt targeted a different logical channel.
293340 assertEquals (
294341 numChannels ,
295- CHANNEL_HINTS
342+ LOGICAL_AFFINITY_KEYS
296343 .getOrDefault ("google.spanner.v1.Spanner/BeginTransaction" , new HashSet <>())
297344 .size ());
298345 assertEquals (numChannels , mockSpanner .countRequestsOfType (BatchCreateSessionsRequest .class ));
@@ -320,10 +367,10 @@ public void testSingleUseQuery_retriesOnNewChannel() {
320367 List <ExecuteSqlRequest > requests = mockSpanner .getRequestsOfType (ExecuteSqlRequest .class );
321368 // The requests use the same multiplexed session.
322369 assertEquals (requests .get (0 ).getSession (), requests .get (1 ).getSession ());
323- // The requests use two different channel hints (which may map to same physical channel ).
370+ // Verify that the retry used 2 distinct logical affinity keys (before grpc-gcp routing ).
324371 assertEquals (
325372 2 ,
326- CHANNEL_HINTS
373+ LOGICAL_AFFINITY_KEYS
327374 .getOrDefault ("google.spanner.v1.Spanner/ExecuteStreamingSql" , new HashSet <>())
328375 .size ());
329376 }
@@ -350,13 +397,15 @@ public void testSingleUseQuery_stopsRetrying() {
350397 for (ExecuteSqlRequest request : requests ) {
351398 assertEquals (session , request .getSession ());
352399 }
353- // Each attempt, including retries, must use a distinct channel hint .
400+ // Verify that the retry mechanism is working (made numChannels requests) .
354401 int totalRequests = mockSpanner .countRequestsOfType (ExecuteSqlRequest .class );
355- int distinctHints =
356- CHANNEL_HINTS
402+ assertEquals (numChannels , totalRequests );
403+ // Verify each attempt used a distinct logical affinity key (before grpc-gcp routing).
404+ int distinctLogicalKeys =
405+ LOGICAL_AFFINITY_KEYS
357406 .getOrDefault ("google.spanner.v1.Spanner/ExecuteStreamingSql" , new HashSet <>())
358407 .size ();
359- assertEquals (totalRequests , distinctHints );
408+ assertEquals (totalRequests , distinctLogicalKeys );
360409 }
361410 }
362411
0 commit comments