@@ -114,7 +114,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(TimestampBound bound) {
114114 sessionClient .getSpanner ().getOptions ().getDirectedReadOptions ())
115115 .setSpan (sessionClient .getSpanner ().getTracer ().getCurrentSpan ())
116116 .setTracer (sessionClient .getSpanner ().getTracer ()),
117- checkNotNull (bound ));
117+ checkNotNull (bound ),
118+ sessionClient );
118119 }
119120
120121 @ Override
@@ -137,7 +138,8 @@ public BatchReadOnlyTransaction batchReadOnlyTransaction(BatchTransactionId batc
137138 sessionClient .getSpanner ().getOptions ().getDirectedReadOptions ())
138139 .setSpan (sessionClient .getSpanner ().getTracer ().getCurrentSpan ())
139140 .setTracer (sessionClient .getSpanner ().getTracer ()),
140- batchTransactionId );
141+ batchTransactionId ,
142+ sessionClient );
141143 }
142144
143145 private boolean canUseMultiplexedSession () {
@@ -160,20 +162,28 @@ private SessionImpl getMultiplexedSession() {
160162
161163 private static class BatchReadOnlyTransactionImpl extends MultiUseReadOnlyTransaction
162164 implements BatchReadOnlyTransaction {
163- private final String sessionName ;
165+ private String sessionName ;
164166 private final Map <SpannerRpc .Option , ?> options ;
167+ private final SessionClient sessionClient ;
168+ private final AtomicBoolean fallbackInitiated = new AtomicBoolean (false );
165169
166170 BatchReadOnlyTransactionImpl (
167- MultiUseReadOnlyTransaction .Builder builder , TimestampBound bound ) {
171+ MultiUseReadOnlyTransaction .Builder builder ,
172+ TimestampBound bound ,
173+ SessionClient sessionClient ) {
168174 super (builder .setTimestampBound (bound ));
175+ this .sessionClient = sessionClient ;
169176 this .sessionName = session .getName ();
170177 this .options = session .getOptions ();
171178 initTransaction ();
172179 }
173180
174181 BatchReadOnlyTransactionImpl (
175- MultiUseReadOnlyTransaction .Builder builder , BatchTransactionId batchTransactionId ) {
182+ MultiUseReadOnlyTransaction .Builder builder ,
183+ BatchTransactionId batchTransactionId ,
184+ SessionClient sessionClient ) {
176185 super (builder .setTransactionId (batchTransactionId .getTransactionId ()));
186+ this .sessionClient = sessionClient ;
177187 this .sessionName = session .getName ();
178188 this .options = session .getOptions ();
179189 }
@@ -204,6 +214,18 @@ public List<Partition> partitionReadUsingIndex(
204214 Iterable <String > columns ,
205215 ReadOption ... option )
206216 throws SpannerException {
217+ return partitionReadUsingIndex (partitionOptions , table , index , keys , columns , false , option );
218+ }
219+
220+ private List <Partition > partitionReadUsingIndex (
221+ PartitionOptions partitionOptions ,
222+ String table ,
223+ String index ,
224+ KeySet keys ,
225+ Iterable <String > columns ,
226+ boolean isFallback ,
227+ ReadOption ... option )
228+ throws SpannerException {
207229 Options readOptions = Options .fromReadOptions (option );
208230 Preconditions .checkArgument (
209231 !readOptions .hasLimit (),
@@ -246,7 +268,10 @@ public List<Partition> partitionReadUsingIndex(
246268 }
247269 return partitions .build ();
248270 } catch (SpannerException e ) {
249- maybeMarkUnimplementedForPartitionedOps (e );
271+ if (!isFallback && maybeMarkUnimplementedForPartitionedOps (e )) {
272+ return partitionReadUsingIndex (
273+ partitionOptions , table , index , keys , columns , true , option );
274+ }
250275 throw e ;
251276 }
252277 }
@@ -255,6 +280,15 @@ public List<Partition> partitionReadUsingIndex(
255280 public List <Partition > partitionQuery (
256281 PartitionOptions partitionOptions , Statement statement , QueryOption ... option )
257282 throws SpannerException {
283+ return partitionQuery (partitionOptions , statement , false , option );
284+ }
285+
286+ private List <Partition > partitionQuery (
287+ PartitionOptions partitionOptions ,
288+ Statement statement ,
289+ boolean isFallback ,
290+ QueryOption ... option )
291+ throws SpannerException {
258292 Options queryOptions = Options .fromQueryOptions (option );
259293 final PartitionQueryRequest .Builder builder =
260294 PartitionQueryRequest .newBuilder ().setSession (sessionName ).setSql (statement .getSql ());
@@ -291,16 +325,29 @@ public List<Partition> partitionQuery(
291325 }
292326 return partitions .build ();
293327 } catch (SpannerException e ) {
294- maybeMarkUnimplementedForPartitionedOps (e );
328+ if (!isFallback && maybeMarkUnimplementedForPartitionedOps (e )) {
329+ return partitionQuery (partitionOptions , statement , true , option );
330+ }
295331 throw e ;
296332 }
297333 }
298334
299- void maybeMarkUnimplementedForPartitionedOps (SpannerException spannerException ) {
335+ boolean maybeMarkUnimplementedForPartitionedOps (SpannerException spannerException ) {
300336 if (MultiplexedSessionDatabaseClient .verifyErrorMessage (
301337 spannerException , "Partitioned operations are not supported with multiplexed sessions" )) {
302- unimplementedForPartitionedOps .set (true );
338+ synchronized (fallbackInitiated ) {
339+ if (!fallbackInitiated .get ()) {
340+ session .setFallbackSessionReference (
341+ sessionClient .createSession ().getSessionReference ());
342+ sessionName = session .getName ();
343+ initFallbackTransaction ();
344+ unimplementedForPartitionedOps .set (true );
345+ fallbackInitiated .set (true );
346+ }
347+ return true ;
348+ }
303349 }
350+ return false ;
304351 }
305352
306353 @ Override
0 commit comments