@@ -811,7 +811,7 @@ Thread populateArrowBufferedQueue(
811811 long rowsRead = 0 ;
812812 int retryCount = 0 ;
813813 final long retryTimeoutInSecs = this .connection .getRetryTimeoutInSeconds ();
814- long startTime = System .currentTimeMillis ();
814+ long startTime = System .nanoTime ();
815815 try {
816816 // Use the first stream to perform reading.
817817 if (readSession .getStreamsCount () == 0 ) {
@@ -838,7 +838,7 @@ Thread populateArrowBufferedQueue(
838838 ArrowRecordBatch currentBatch = response .getArrowRecordBatch ();
839839 arrowBatchWrapperBlockingQueue .put (BigQueryArrowBatchWrapper .of (currentBatch ));
840840 rowsRead += response .getRowCount ();
841- startTime = System .currentTimeMillis ();
841+ startTime = System .nanoTime ();
842842 }
843843 break ;
844844 } catch (com .google .api .gax .rpc .ApiException e ) {
@@ -847,7 +847,7 @@ Thread populateArrowBufferedQueue(
847847 LOG .warning ("Read session expired or not found: %s" , e .getMessage ());
848848 break ;
849849 }
850- long elapsedSecs = (System .currentTimeMillis () - startTime ) / 1000 ;
850+ long elapsedSecs = (System .nanoTime () - startTime ) / 1_000_000_000L ;
851851 if (elapsedSecs >= retryTimeoutInSecs ) {
852852 LOG .log (
853853 Level .WARNING ,
@@ -861,12 +861,7 @@ Thread populateArrowBufferedQueue(
861861 LOG .info (
862862 "Connection interrupted during arrow stream read, retrying. attempt: %d" ,
863863 retryCount );
864- try {
865- Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
866- } catch (InterruptedException ie ) {
867- Thread .currentThread ().interrupt ();
868- throw new BigQueryJdbcRuntimeException (ie );
869- }
864+ Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
870865 }
871866 }
872867
@@ -875,6 +870,8 @@ Thread populateArrowBufferedQueue(
875870 Level .WARNING ,
876871 "\n " + Thread .currentThread ().getName () + " Interrupted @ arrowStreamProcessor" ,
877872 e );
873+ Thread .currentThread ().interrupt ();
874+ throw new BigQueryJdbcRuntimeException (e );
878875 } finally { // logic needed for graceful shutdown
879876 // marking end of stream
880877 try {
@@ -1033,7 +1030,7 @@ Thread runNextPageTaskAsync(
10331030 // This thread makes the RPC calls and paginates
10341031 Runnable nextPageTask =
10351032 () -> {
1036- long startTimeLoop = System .currentTimeMillis ();
1033+ long startTimeLoop = System .nanoTime ();
10371034 final long retryTimeoutInSecs = this .connection .getRetryTimeoutInSeconds ();
10381035 int retryCount = 0 ;
10391036 String currentPageToken = firstPageToken ;
@@ -1064,7 +1061,7 @@ Thread runNextPageTaskAsync(
10641061 // this will be parsed asynchronously without blocking the current
10651062 // thread
10661063 rpcResponseQueue .put (Tuple .of (currentResults , true ));
1067- startTimeLoop = System .currentTimeMillis ();
1064+ startTimeLoop = System .nanoTime ();
10681065 LOG .fine (
10691066 "Fetched %d results from the server in %d ms." ,
10701067 querySettings .getMaxResultPerPage (),
@@ -1073,30 +1070,33 @@ Thread runNextPageTaskAsync(
10731070 if (ex .getCode () == 404 ) {
10741071 throw ex ;
10751072 }
1076- long elapsedSecs = (System .currentTimeMillis () - startTimeLoop ) / 1000 ;
1073+ long elapsedSecs = (System .nanoTime () - startTimeLoop ) / 1_000_000_000L ;
10771074 if (elapsedSecs >= retryTimeoutInSecs ) {
10781075 throw ex ;
10791076 }
10801077 retryCount ++;
10811078 LOG .info (
10821079 "Connection interrupted during json stream read, retrying. attempt: %d" ,
10831080 retryCount );
1084- try {
1085- Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
1086- } catch (InterruptedException ie ) {
1087- Thread .currentThread ().interrupt ();
1088- throw new BigQueryJdbcRuntimeException (ie );
1089- }
1081+ Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
10901082 } catch (InterruptedException ie ) {
10911083 Thread .currentThread ().interrupt ();
10921084 throw new BigQueryJdbcRuntimeException (ie );
10931085 }
10941086 }
1095- // this will stop the parseDataTask as well when the pagination
1096- // completes
1097- rpcResponseQueue .put (Tuple .of (null , false ));
10981087 } catch (Exception ex ) {
10991088 throw new BigQueryJdbcRuntimeException (ex );
1089+ } finally {
1090+ try {
1091+ // this will stop the parseDataTask as well when the pagination
1092+ // completes
1093+ rpcResponseQueue .put (Tuple .of (null , false ));
1094+ } catch (InterruptedException ie ) {
1095+ LOG .warning (
1096+ "\n %s Interrupted sending end-of-stream sentinel @ runNextPageTaskAsync" ,
1097+ Thread .currentThread ().getName ());
1098+ Thread .currentThread ().interrupt ();
1099+ }
11001100 }
11011101 // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
11021102 // have finished processing the records and even that will be interrupted
0 commit comments