7575import java .util .concurrent .Executors ;
7676import java .util .concurrent .LinkedBlockingDeque ;
7777import java .util .concurrent .ThreadFactory ;
78- import java .util .logging .Level ;
7978
8079/**
8180 * An implementation of {@link java.sql.Statement} for executing BigQuery SQL statement and
@@ -808,40 +807,88 @@ Thread populateArrowBufferedQueue(
808807
809808 Runnable arrowStreamProcessor =
810809 () -> {
810+ long rowsRead = 0 ;
811+ int retryCount = 0 ;
812+ final long retryTimeoutInSecs = this .connection .getRetryTimeoutInSeconds ();
813+ long startTime = System .currentTimeMillis ();
811814 try {
812815 // Use the first stream to perform reading.
816+ if (readSession .getStreamsCount () == 0 ) {
817+ return ;
818+ }
813819 String streamName = readSession .getStreams (0 ).getName ();
814- ReadRowsRequest readRowsRequest =
815- ReadRowsRequest .newBuilder ().setReadStream (streamName ).build ();
816-
817- // Process each block of rows as they arrive and decode using our simple row reader.
818- com .google .api .gax .rpc .ServerStream <ReadRowsResponse > stream =
819- bqReadClient .readRowsCallable ().call (readRowsRequest );
820- for (ReadRowsResponse response : stream ) {
821- if (Thread .currentThread ().isInterrupted ()
822- || queryTaskExecutor .isShutdown ()) { // do not process and shutdown
820+
821+ while (true ) {
822+ try {
823+ ReadRowsRequest readRowsRequest =
824+ ReadRowsRequest .newBuilder ()
825+ .setReadStream (streamName )
826+ .setOffset (rowsRead )
827+ .build ();
828+
829+ com .google .api .gax .rpc .ServerStream <ReadRowsResponse > stream =
830+ bqReadClient .readRowsCallable ().call (readRowsRequest );
831+ for (ReadRowsResponse response : stream ) {
832+ if (Thread .currentThread ().isInterrupted () || queryTaskExecutor .isShutdown ()) {
833+ break ;
834+ }
835+
836+ ArrowRecordBatch currentBatch = response .getArrowRecordBatch ();
837+ arrowBatchWrapperBlockingQueue .put (BigQueryArrowBatchWrapper .of (currentBatch ));
838+ rowsRead += response .getRowCount ();
839+ startTime = System .currentTimeMillis ();
840+ }
823841 break ;
842+ } catch (RuntimeException e ) {
843+ if (e instanceof com .google .api .gax .rpc .ApiException
844+ && ((com .google .api .gax .rpc .ApiException ) e ).getStatusCode ().getCode ()
845+ == com .google .api .gax .rpc .StatusCode .Code .NOT_FOUND ) {
846+ LOG .warning ("Read session expired or not found" + ": %s" , e .getMessage ());
847+ break ;
848+ }
849+ long elapsedSecs = (System .currentTimeMillis () - startTime ) / 1000 ;
850+ if (elapsedSecs >= retryTimeoutInSecs ) {
851+ LOG .warning (
852+ "\n "
853+ + Thread .currentThread ().getName ()
854+ + " Interrupted @ arrowStreamProcessor, timeout exceeded: %s" ,
855+ e .getMessage ());
856+ break ;
857+ }
858+ retryCount ++;
859+ LOG .info (
860+ "Connection interrupted during arrow stream read, retrying. attempt: "
861+ + retryCount );
862+ try {
863+ Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
864+ } catch (InterruptedException ie ) {
865+ LOG .warning (
866+ "\n "
867+ + Thread .currentThread ().getName ()
868+ + " Interrupted @ arrowStreamProcessor waiting for retry"
869+ + ": %s" ,
870+ ie .getMessage ());
871+ break ;
872+ }
824873 }
825-
826- ArrowRecordBatch currentBatch = response .getArrowRecordBatch ();
827- arrowBatchWrapperBlockingQueue .put (BigQueryArrowBatchWrapper .of (currentBatch ));
828874 }
829875
830- } catch (RuntimeException | InterruptedException e ) {
831- LOG .log (
832- Level .WARNING ,
833- "\n " + Thread .currentThread ().getName () + " Interrupted @ arrowStreamProcessor" ,
834- e );
876+ } catch (InterruptedException e ) {
877+ LOG .warning (
878+ "\n "
879+ + Thread .currentThread ().getName ()
880+ + " Interrupted @ arrowStreamProcessor"
881+ + ": %s" ,
882+ e .getMessage ());
835883 } finally { // logic needed for graceful shutdown
836884 // marking end of stream
837885 try {
838886 arrowBatchWrapperBlockingQueue .put (
839887 BigQueryArrowBatchWrapper .of (null , true )); // mark the end of the stream
840888 } catch (InterruptedException e ) {
841- LOG .log (
842- Level .WARNING ,
843- "\n " + Thread .currentThread ().getName () + " Interrupted @ markLast" ,
844- e );
889+ LOG .warning (
890+ "\n " + Thread .currentThread ().getName () + " Interrupted @ markLast" + ": %s" ,
891+ e .getMessage ());
845892 }
846893 }
847894 };
@@ -923,11 +970,11 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) {
923970 populateFirstPage (results , rpcResponseQueue );
924971 rpcResponseQueue .put (Tuple .of (null , false ));
925972 } catch (InterruptedException e ) {
926- LOG .log (
927- Level .WARNING ,
973+ LOG .warning (
928974 "\n "
929975 + Thread .currentThread ().getName ()
930- + " Interrupted @ processJsonQueryResponseResults" );
976+ + " Interrupted @ processJsonQueryResponseResults: %s" ,
977+ e .getMessage ());
931978 }
932979 }
933980
@@ -964,9 +1011,7 @@ void populateFirstPage(
9641011 // this is the first page which we have received.
9651012 rpcResponseQueue .put (Tuple .of (result , true ));
9661013 } catch (InterruptedException e ) {
967- LOG .log (
968- Level .WARNING ,
969- "\n " + Thread .currentThread ().getName () + " Interrupted @ populateFirstPage" );
1014+ LOG .warning ("\n " + Thread .currentThread ().getName () + " Interrupted @ populateFirstPage" );
9701015 }
9711016 }
9721017
@@ -992,44 +1037,75 @@ Thread runNextPageTaskAsync(
9921037 // This thread makes the RPC calls and paginates
9931038 Runnable nextPageTask =
9941039 () -> {
995- // results.getPageToken();
996- String pageToken = firstPageToken ;
1040+ long startTimeLoop = System .currentTimeMillis ();
1041+ final long retryTimeoutInSecs = this .connection .getRetryTimeoutInSeconds ();
1042+ int retryCount = 0 ;
1043+ String currentPageToken = firstPageToken ;
1044+ TableResult currentResults = result ;
9971045 TableId destinationTable = null ;
9981046 if (firstPageToken != null ) {
9991047 destinationTable = getDestinationTable (jobId );
10001048 }
1049+
10011050 try {
1002- // paginate for non null token
1003- while (pageToken != null ) {
1051+ while (currentPageToken != null ) {
10041052 // do not process further pages and shutdown
10051053 if (Thread .currentThread ().isInterrupted () || queryTaskExecutor .isShutdown ()) {
1006- LOG .log (
1007- Level .WARNING ,
1054+ LOG .warning (
10081055 "\n "
10091056 + Thread .currentThread ().getName ()
10101057 + " Interrupted @ runNextPageTaskAsync" );
10111058 break ;
10121059 }
1013- long startTime = System .nanoTime ();
1014- TableResult results =
1015- this .bigQuery .listTableData (
1016- destinationTable ,
1017- TableDataListOption .pageSize (querySettings .getMaxResultPerPage ()),
1018- TableDataListOption .pageToken (pageToken ));
1019-
1020- pageToken = results .getNextPageToken ();
1021- // this will be parsed asynchronously without blocking the current
1022- // thread
1023- rpcResponseQueue .put (Tuple .of (results , true ));
1024- LOG .fine (
1025- "Fetched %d results from the server in %d ms." ,
1026- querySettings .getMaxResultPerPage (),
1027- (int ) ((System .nanoTime () - startTime ) / 1000000 ));
1060+
1061+ try {
1062+ long startTime = System .nanoTime ();
1063+ currentResults =
1064+ this .bigQuery .listTableData (
1065+ destinationTable ,
1066+ TableDataListOption .pageSize (querySettings .getMaxResultPerPage ()),
1067+ TableDataListOption .pageToken (currentPageToken ));
1068+
1069+ currentPageToken = currentResults .getNextPageToken ();
1070+ // this will be parsed asynchronously without blocking the current
1071+ // thread
1072+ rpcResponseQueue .put (Tuple .of (currentResults , true ));
1073+ startTimeLoop = System .currentTimeMillis ();
1074+ LOG .fine (
1075+ "Fetched %d results from the server in %d ms." ,
1076+ querySettings .getMaxResultPerPage (),
1077+ (int ) ((System .nanoTime () - startTime ) / 1000000 ));
1078+ } catch (Exception ex ) {
1079+ if (ex instanceof com .google .cloud .BaseServiceException
1080+ && ((com .google .cloud .BaseServiceException ) ex ).getCode () == 404 ) {
1081+ throw ex ;
1082+ }
1083+ long elapsedSecs = (System .currentTimeMillis () - startTimeLoop ) / 1000 ;
1084+ if (elapsedSecs >= retryTimeoutInSecs || ex instanceof InterruptedException ) {
1085+ throw ex ;
1086+ }
1087+ retryCount ++;
1088+ LOG .info (
1089+ "Connection interrupted during json stream read, retrying. attempt: "
1090+ + retryCount );
1091+ try {
1092+ Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
1093+ } catch (InterruptedException ie ) {
1094+ Thread .currentThread ().interrupt ();
1095+ throw new BigQueryJdbcRuntimeException (ie );
1096+ }
1097+ }
10281098 }
10291099 // this will stop the parseDataTask as well when the pagination
10301100 // completes
10311101 rpcResponseQueue .put (Tuple .of (null , false ));
10321102 } catch (Exception ex ) {
1103+ LOG .warning (
1104+ "\n "
1105+ + Thread .currentThread ().getName ()
1106+ + " Interrupted @ runNextPageTaskAsync"
1107+ + ": %s" ,
1108+ ex .getMessage ());
10331109 throw new BigQueryJdbcRuntimeException (ex );
10341110 }
10351111 // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
@@ -1068,7 +1144,9 @@ Thread parseAndPopulateRpcDataAsync(
10681144 hasRows = nextPageTuple .y ();
10691145
10701146 } catch (InterruptedException e ) {
1071- LOG .log (Level .WARNING , "\n " + Thread .currentThread ().getName () + " Interrupted" , e );
1147+ LOG .warning (
1148+ "\n " + Thread .currentThread ().getName () + " Interrupted" + ": %s" ,
1149+ e .getMessage ());
10721150 // Thread might get interrupted while calling the Cancel method, which is
10731151 // expected, so logging this instead of throwing the exception back
10741152 break ;
@@ -1105,10 +1183,12 @@ Thread parseAndPopulateRpcDataAsync(
11051183 bigQueryFieldValueListWrapperBlockingQueue .put (
11061184 BigQueryFieldValueListWrapper .of (null , null , true ));
11071185 } catch (InterruptedException e ) {
1108- LOG .log (
1109- Level .WARNING ,
1110- "\n " + Thread .currentThread ().getName () + " Interrupted @ populateBufferAsync" ,
1111- e );
1186+ LOG .warning (
1187+ "\n "
1188+ + Thread .currentThread ().getName ()
1189+ + " Interrupted @ populateBufferAsync"
1190+ + ": %s" ,
1191+ e .getMessage ());
11121192 }
11131193 };
11141194
0 commit comments