2424import static org .junit .Assert .assertEquals ;
2525import static org .junit .Assert .assertNotNull ;
2626
27+ import com .google .api .gax .grpc .GrpcInterceptorProvider ;
2728import com .google .cloud .NoCredentials ;
29+ import com .google .cloud .grpc .GcpManagedChannel ;
2830import com .google .cloud .spanner .MockSpannerServiceImpl .StatementResult ;
2931import com .google .cloud .spanner .Options .RpcPriority ;
3032import com .google .cloud .spanner .ReadContext .QueryAnalyzeMode ;
33+ import com .google .common .collect .ImmutableList ;
3134import com .google .protobuf .ListValue ;
3235import com .google .spanner .v1 .ResultSetMetadata ;
3336import com .google .spanner .v1 .SpannerGrpc ;
3437import com .google .spanner .v1 .StructType ;
3538import com .google .spanner .v1 .StructType .Field ;
3639import com .google .spanner .v1 .TypeCode ;
37- import io .grpc .Context ;
38- import io .grpc .Contexts ;
39- import io .grpc .Metadata ;
40+ import io .grpc .CallOptions ;
41+ import io .grpc .Channel ;
42+ import io .grpc .ClientCall ;
43+ import io .grpc .ClientInterceptor ;
44+ import io .grpc .MethodDescriptor ;
4045import io .grpc .Server ;
41- import io .grpc .ServerCall ;
42- import io .grpc .ServerCallHandler ;
43- import io .grpc .ServerInterceptor ;
4446import io .grpc .netty .shaded .io .grpc .netty .NettyServerBuilder ;
4547import java .net .InetSocketAddress ;
4648import java .util .Set ;
@@ -93,10 +95,11 @@ public class TransactionChannelHintTest {
9395 private static MockSpannerServiceImpl mockSpanner ;
9496 private static Server server ;
9597 private static InetSocketAddress address ;
96- // Track channel hints (from X-Goog-Spanner-Request-Id header) per RPC method
97- private static final Set <Long > executeSqlChannelHints = ConcurrentHashMap .newKeySet ();
98- private static final Set <Long > beginTransactionChannelHints = ConcurrentHashMap .newKeySet ();
99- private static final Set <Long > streamingReadChannelHints = ConcurrentHashMap .newKeySet ();
98+ // Track logical affinity keys (before grpc-gcp routing) per RPC method.
99+ // These are captured by a client interceptor to verify channel affinity consistency.
100+ private static final Set <String > executeSqlAffinityKeys = ConcurrentHashMap .newKeySet ();
101+ private static final Set <String > beginTransactionAffinityKeys = ConcurrentHashMap .newKeySet ();
102+ private static final Set <String > streamingReadAffinityKeys = ConcurrentHashMap .newKeySet ();
100103 private static Level originalLogLevel ;
101104
102105 @ BeforeClass
@@ -109,49 +112,40 @@ public static void startServer() throws Exception {
109112 StatementResult .query (READ_ONE_KEY_VALUE_STATEMENT , READ_ONE_KEY_VALUE_RESULTSET ));
110113
111114 address = new InetSocketAddress ("localhost" , 0 );
112- server =
113- NettyServerBuilder .forAddress (address )
114- .addService (mockSpanner )
115- // Add a server interceptor to extract channel hints from X-Goog-Spanner-Request-Id
116- // header. This verifies that all operations in a transaction use the same channel hint.
117- .intercept (
118- new ServerInterceptor () {
119- @ Override
120- public <ReqT , RespT > ServerCall .Listener <ReqT > interceptCall (
121- ServerCall <ReqT , RespT > call ,
122- Metadata headers ,
123- ServerCallHandler <ReqT , RespT > next ) {
124- // Extract channel hint from X-Goog-Spanner-Request-Id header
125- String requestId = headers .get (XGoogSpannerRequestId .REQUEST_ID_HEADER_KEY );
126- if (requestId != null ) {
127- // Format:
128- // <version>.<randProcessId>.<nthClientId>.<nthChannelId>.<nthRequest>.<attempt>
129- String [] parts = requestId .split ("\\ ." );
130- if (parts .length >= 4 ) {
131- try {
132- long channelHint = Long .parseLong (parts [3 ]);
133- if (call .getMethodDescriptor ()
134- .equals (SpannerGrpc .getExecuteStreamingSqlMethod ())) {
135- executeSqlChannelHints .add (channelHint );
136- }
137- if (call .getMethodDescriptor ()
138- .equals (SpannerGrpc .getStreamingReadMethod ())) {
139- streamingReadChannelHints .add (channelHint );
140- }
141- if (call .getMethodDescriptor ()
142- .equals (SpannerGrpc .getBeginTransactionMethod ())) {
143- beginTransactionChannelHints .add (channelHint );
144- }
145- } catch (NumberFormatException e ) {
146- // Ignore parse errors
147- }
148- }
149- }
150- return Contexts .interceptCall (Context .current (), call , headers , next );
115+ server = NettyServerBuilder .forAddress (address ).addService (mockSpanner ).build ().start ();
116+ }
117+
118+ /**
119+ * Creates a client interceptor that captures the logical affinity key before grpc-gcp routes the
120+ * request. This allows us to verify that all operations within a transaction use the same logical
121+ * channel affinity, even though the physical channel ID may vary.
122+ */
123+ private static GrpcInterceptorProvider createAffinityKeyInterceptorProvider () {
124+ return () ->
125+ ImmutableList .of (
126+ new ClientInterceptor () {
127+ @ Override
128+ public <ReqT , RespT > ClientCall <ReqT , RespT > interceptCall (
129+ MethodDescriptor <ReqT , RespT > method , CallOptions callOptions , Channel next ) {
130+ // Capture the AFFINITY_KEY before grpc-gcp processes it
131+ String affinityKey = callOptions .getOption (GcpManagedChannel .AFFINITY_KEY );
132+ if (affinityKey != null ) {
133+ String methodName = method .getFullMethodName ();
134+ if (methodName .equals (
135+ SpannerGrpc .getExecuteStreamingSqlMethod ().getFullMethodName ())) {
136+ executeSqlAffinityKeys .add (affinityKey );
137+ }
138+ if (methodName .equals (SpannerGrpc .getStreamingReadMethod ().getFullMethodName ())) {
139+ streamingReadAffinityKeys .add (affinityKey );
140+ }
141+ if (methodName .equals (
142+ SpannerGrpc .getBeginTransactionMethod ().getFullMethodName ())) {
143+ beginTransactionAffinityKeys .add (affinityKey );
151144 }
152- })
153- .build ()
154- .start ();
145+ }
146+ return next .newCall (method , callOptions );
147+ }
148+ });
155149 }
156150
157151 @ AfterClass
@@ -176,9 +170,9 @@ public static void resetLogging() {
176170 @ After
177171 public void reset () {
178172 mockSpanner .reset ();
179- executeSqlChannelHints .clear ();
180- streamingReadChannelHints .clear ();
181- beginTransactionChannelHints .clear ();
173+ executeSqlAffinityKeys .clear ();
174+ streamingReadAffinityKeys .clear ();
175+ beginTransactionAffinityKeys .clear ();
182176 }
183177
184178 private SpannerOptions createSpannerOptions () {
@@ -193,6 +187,7 @@ private SpannerOptions createSpannerOptions() {
193187 .setCompressorName ("gzip" )
194188 .setHost ("http://" + endpoint )
195189 .setCredentials (NoCredentials .getInstance ())
190+ .setInterceptorProvider (createAffinityKeyInterceptorProvider ())
196191 .setSessionPoolOption (
197192 SessionPoolOptions .newBuilder ().setSkipVerifyingBeginTransactionForMuxRW (true ).build ())
198193 .build ();
@@ -206,7 +201,8 @@ public void testSingleUseReadOnlyTransaction_usesSingleChannelHint() {
206201 while (resultSet .next ()) {}
207202 }
208203 }
209- assertEquals (1 , executeSqlChannelHints .size ());
204+ // All ExecuteSql calls should use the same logical affinity key
205+ assertEquals (1 , executeSqlAffinityKeys .size ());
210206 }
211207
212208 @ Test
@@ -220,7 +216,8 @@ public void testSingleUseReadOnlyTransaction_withTimestampBound_usesSingleChanne
220216 while (resultSet .next ()) {}
221217 }
222218 }
223- assertEquals (1 , executeSqlChannelHints .size ());
219+ // All ExecuteSql calls should use the same logical affinity key
220+ assertEquals (1 , executeSqlAffinityKeys .size ());
224221 }
225222
226223 @ Test
@@ -236,10 +233,10 @@ public void testReadOnlyTransaction_usesSingleChannelHint() {
236233 }
237234 }
238235 }
239- // All ExecuteSql calls within the transaction should use the same channel hint
240- assertEquals (1 , executeSqlChannelHints .size ());
241- // BeginTransaction should use a single channel hint
242- assertEquals (1 , beginTransactionChannelHints .size ());
236+ // All ExecuteSql calls within the transaction should use the same logical affinity key
237+ assertEquals (1 , executeSqlAffinityKeys .size ());
238+ // BeginTransaction should use a single logical affinity key
239+ assertEquals (1 , beginTransactionAffinityKeys .size ());
243240 }
244241
245242 @ Test
@@ -256,10 +253,10 @@ public void testReadOnlyTransaction_withTimestampBound_usesSingleChannelHint() {
256253 }
257254 }
258255 }
259- // All ExecuteSql calls within the transaction should use the same channel hint
260- assertEquals (1 , executeSqlChannelHints .size ());
261- // BeginTransaction should use a single channel hint
262- assertEquals (1 , beginTransactionChannelHints .size ());
256+ // All ExecuteSql calls within the transaction should use the same logical affinity key
257+ assertEquals (1 , executeSqlAffinityKeys .size ());
258+ // BeginTransaction should use a single logical affinity key
259+ assertEquals (1 , beginTransactionAffinityKeys .size ());
263260 }
264261
265262 @ Test
@@ -288,7 +285,8 @@ public void testTransactionManager_usesSingleChannelHint() {
288285 }
289286 }
290287 }
291- assertEquals (1 , executeSqlChannelHints .size ());
288+ // All ExecuteSql calls within the transaction should use the same logical affinity key
289+ assertEquals (1 , executeSqlAffinityKeys .size ());
292290 }
293291
294292 @ Test
@@ -318,6 +316,7 @@ public void testTransactionRunner_usesSingleChannelHint() {
318316 return null ;
319317 });
320318 }
321- assertEquals (1 , streamingReadChannelHints .size ());
319+ // All StreamingRead calls within the transaction should use the same logical affinity key
320+ assertEquals (1 , streamingReadAffinityKeys .size ());
322321 }
323322}
0 commit comments