7575import java .util .concurrent .Executors ;
7676import java .util .concurrent .LinkedBlockingDeque ;
7777import java .util .concurrent .ThreadFactory ;
78+ import java .util .logging .Level ;
7879
7980/**
8081 * An implementation of {@link java.sql.Statement} for executing BigQuery SQL statement and
@@ -826,6 +827,7 @@ Thread populateArrowBufferedQueue(
826827 .setOffset (rowsRead )
827828 .build ();
828829
830+ // Process each block of rows as they arrive and decode using our simple row reader.
829831 com .google .api .gax .rpc .ServerStream <ReadRowsResponse > stream =
830832 bqReadClient .readRowsCallable ().call (readRowsRequest );
831833 for (ReadRowsResponse response : stream ) {
@@ -848,47 +850,47 @@ Thread populateArrowBufferedQueue(
848850 }
849851 long elapsedSecs = (System .currentTimeMillis () - startTime ) / 1000 ;
850852 if (elapsedSecs >= retryTimeoutInSecs ) {
851- LOG .warning (
853+ LOG .log (
854+ Level .WARNING ,
852855 "\n "
853856 + Thread .currentThread ().getName ()
854- + " Interrupted @ arrowStreamProcessor, timeout exceeded: %s " ,
855- e . getMessage () );
857+ + " Interrupted @ arrowStreamProcessor, timeout exceeded" ,
858+ e );
856859 break ;
857860 }
858861 retryCount ++;
859862 LOG .info (
860- "Connection interrupted during arrow stream read, retrying. attempt: "
861- + retryCount );
863+ "Connection interrupted during arrow stream read, retrying. attempt: %d" ,
864+ retryCount );
862865 try {
863866 Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
864867 } catch (InterruptedException ie ) {
865- LOG .warning (
868+ LOG .log (
869+ Level .WARNING ,
866870 "\n "
867871 + Thread .currentThread ().getName ()
868- + " Interrupted @ arrowStreamProcessor waiting for retry"
869- + ": %s" ,
870- ie .getMessage ());
872+ + " Interrupted @ arrowStreamProcessor waiting for retry" ,
873+ ie );
871874 break ;
872875 }
873876 }
874877 }
875878
876879 } catch (InterruptedException e ) {
877- LOG .warning (
878- "\n "
879- + Thread .currentThread ().getName ()
880- + " Interrupted @ arrowStreamProcessor"
881- + ": %s" ,
882- e .getMessage ());
880+ LOG .log (
881+ Level .WARNING ,
882+ "\n " + Thread .currentThread ().getName () + " Interrupted @ arrowStreamProcessor" ,
883+ e );
883884 } finally { // logic needed for graceful shutdown
884885 // marking end of stream
885886 try {
886887 arrowBatchWrapperBlockingQueue .put (
887888 BigQueryArrowBatchWrapper .of (null , true )); // mark the end of the stream
888889 } catch (InterruptedException e ) {
889- LOG .warning (
890- "\n " + Thread .currentThread ().getName () + " Interrupted @ markLast" + ": %s" ,
891- e .getMessage ());
890+ LOG .log (
891+ Level .WARNING ,
892+ "\n " + Thread .currentThread ().getName () + " Interrupted @ markLast" ,
893+ e );
892894 }
893895 }
894896 };
@@ -971,10 +973,8 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) {
971973 rpcResponseQueue .put (Tuple .of (null , false ));
972974 } catch (InterruptedException e ) {
973975 LOG .warning (
974- "\n "
975- + Thread .currentThread ().getName ()
976- + " Interrupted @ processJsonQueryResponseResults: %s" ,
977- e .getMessage ());
976+ "\n %s Interrupted @ processJsonQueryResponseResults: %s" ,
977+ Thread .currentThread ().getName (), e .getMessage ());
978978 }
979979 }
980980
@@ -1011,7 +1011,9 @@ void populateFirstPage(
10111011 // this is the first page which we have received.
10121012 rpcResponseQueue .put (Tuple .of (result , true ));
10131013 } catch (InterruptedException e ) {
1014- LOG .warning ("\n " + Thread .currentThread ().getName () + " Interrupted @ populateFirstPage" );
1014+ LOG .warning (
1015+ "\n %s Interrupted @ populateFirstPage: %s" ,
1016+ Thread .currentThread ().getName (), e .getMessage ());
10151017 }
10161018 }
10171019
@@ -1052,9 +1054,7 @@ Thread runNextPageTaskAsync(
10521054 // do not process further pages and shutdown
10531055 if (Thread .currentThread ().isInterrupted () || queryTaskExecutor .isShutdown ()) {
10541056 LOG .warning (
1055- "\n "
1056- + Thread .currentThread ().getName ()
1057- + " Interrupted @ runNextPageTaskAsync" );
1057+ "\n %s Interrupted @ runNextPageTaskAsync" , Thread .currentThread ().getName ());
10581058 break ;
10591059 }
10601060
@@ -1081,13 +1081,15 @@ Thread runNextPageTaskAsync(
10811081 throw ex ;
10821082 }
10831083 long elapsedSecs = (System .currentTimeMillis () - startTimeLoop ) / 1000 ;
1084- if (elapsedSecs >= retryTimeoutInSecs || ex instanceof InterruptedException ) {
1084+ if (elapsedSecs >= retryTimeoutInSecs
1085+ || ex instanceof InterruptedException
1086+ || ex .getCause () instanceof InterruptedException ) {
10851087 throw ex ;
10861088 }
10871089 retryCount ++;
10881090 LOG .info (
1089- "Connection interrupted during json stream read, retrying. attempt: "
1090- + retryCount );
1091+ "Connection interrupted during json stream read, retrying. attempt: %d" ,
1092+ retryCount );
10911093 try {
10921094 Thread .sleep (this .connection .getRetryInitialDelayInSeconds () * 1000 );
10931095 } catch (InterruptedException ie ) {
@@ -1144,9 +1146,7 @@ Thread parseAndPopulateRpcDataAsync(
11441146 hasRows = nextPageTuple .y ();
11451147
11461148 } catch (InterruptedException e ) {
1147- LOG .warning (
1148- "\n " + Thread .currentThread ().getName () + " Interrupted" + ": %s" ,
1149- e .getMessage ());
1149+ LOG .log (Level .WARNING , "\n " + Thread .currentThread ().getName () + " Interrupted" , e );
11501150 // Thread might get interrupted while calling the Cancel method, which is
11511151 // expected, so logging this instead of throwing the exception back
11521152 break ;
@@ -1183,12 +1183,10 @@ Thread parseAndPopulateRpcDataAsync(
11831183 bigQueryFieldValueListWrapperBlockingQueue .put (
11841184 BigQueryFieldValueListWrapper .of (null , null , true ));
11851185 } catch (InterruptedException e ) {
1186- LOG .warning (
1187- "\n "
1188- + Thread .currentThread ().getName ()
1189- + " Interrupted @ populateBufferAsync"
1190- + ": %s" ,
1191- e .getMessage ());
1186+ LOG .log (
1187+ Level .WARNING ,
1188+ "\n " + Thread .currentThread ().getName () + " Interrupted @ populateBufferAsync" ,
1189+ e );
11921190 }
11931191 };
11941192
0 commit comments