5656import com .google .common .annotations .VisibleForTesting ;
5757import com .google .common .collect .ImmutableList ;
5858import com .google .common .collect .Iterators ;
59+ import com .google .common .util .concurrent .Uninterruptibles ;
5960import java .lang .ref .ReferenceQueue ;
6061import java .sql .Connection ;
6162import java .sql .ResultSet ;
@@ -814,9 +815,6 @@ Thread populateArrowBufferedQueue(
814815 int retryCount = 0 ;
815816 try {
816817 // Use the first stream to perform reading.
817- if (readSession .getStreamsCount () == 0 ) {
818- return ;
819- }
820818 String streamName = readSession .getStreams (0 ).getName ();
821819
822820 while (true ) {
@@ -836,20 +834,16 @@ Thread populateArrowBufferedQueue(
836834 }
837835
838836 ArrowRecordBatch currentBatch = response .getArrowRecordBatch ();
839- arrowBatchWrapperBlockingQueue .put (BigQueryArrowBatchWrapper .of (currentBatch ));
837+ Uninterruptibles .putUninterruptibly (
838+ arrowBatchWrapperBlockingQueue , BigQueryArrowBatchWrapper .of (currentBatch ));
840839 rowsRead += response .getRowCount ();
841840 }
842841 break ;
843842 } catch (com .google .api .gax .rpc .ApiException e ) {
844843 if (e .getStatusCode ().getCode ()
845844 == com .google .api .gax .rpc .StatusCode .Code .NOT_FOUND ) {
846845 LOG .warning ("Read session expired or not found: %s" , e .getMessage ());
847- try {
848- arrowBatchWrapperBlockingQueue .put (
849- BigQueryArrowBatchWrapper .ofError (new BigQueryJdbcRuntimeException (e )));
850- } catch (InterruptedException ie ) {
851- Thread .currentThread ().interrupt ();
852- }
846+ enqueueError (arrowBatchWrapperBlockingQueue , e );
853847 break ;
854848 }
855849 if (retryCount >= MAX_RETRY_COUNT ) {
@@ -859,12 +853,7 @@ Thread populateArrowBufferedQueue(
859853 + Thread .currentThread ().getName ()
860854 + " Interrupted @ arrowStreamProcessor, max retries exceeded" ,
861855 e );
862- try {
863- arrowBatchWrapperBlockingQueue .put (
864- BigQueryArrowBatchWrapper .ofError (new BigQueryJdbcRuntimeException (e )));
865- } catch (InterruptedException ie ) {
866- Thread .currentThread ().interrupt ();
867- }
856+ enqueueError (arrowBatchWrapperBlockingQueue , e );
868857 break ;
869858 }
870859 retryCount ++;
@@ -880,35 +869,16 @@ Thread populateArrowBufferedQueue(
880869 Level .WARNING ,
881870 "\n " + Thread .currentThread ().getName () + " Interrupted @ arrowStreamProcessor" ,
882871 e );
883- try {
884- arrowBatchWrapperBlockingQueue .put (
885- BigQueryArrowBatchWrapper .ofError (new BigQueryJdbcRuntimeException (e )));
886- } catch (InterruptedException ie ) {
887- Thread .currentThread ().interrupt ();
888- }
872+ enqueueError (arrowBatchWrapperBlockingQueue , e );
889873 Thread .currentThread ().interrupt ();
890874 } catch (Exception e ) {
891875 LOG .log (
892876 Level .WARNING ,
893877 "\n " + Thread .currentThread ().getName () + " Error @ arrowStreamProcessor" ,
894878 e );
895- try {
896- arrowBatchWrapperBlockingQueue .put (
897- BigQueryArrowBatchWrapper .ofError (new BigQueryJdbcRuntimeException (e )));
898- } catch (InterruptedException ie ) {
899- Thread .currentThread ().interrupt ();
900- }
879+ enqueueError (arrowBatchWrapperBlockingQueue , e );
901880 } finally { // logic needed for graceful shutdown
902- // marking end of stream
903- try {
904- arrowBatchWrapperBlockingQueue .put (
905- BigQueryArrowBatchWrapper .of (null , true )); // mark the end of the stream
906- } catch (InterruptedException e ) {
907- LOG .log (
908- Level .WARNING ,
909- "\n " + Thread .currentThread ().getName () + " Interrupted @ markLast" ,
910- e );
911- }
881+ enqueueEndOfStream (arrowBatchWrapperBlockingQueue );
912882 }
913883 };
914884
@@ -995,7 +965,7 @@ BigQueryJsonResultSet processJsonResultSet(TableResult results) {
995965 rpcResponseQueue .put (Tuple .of (null , false ));
996966 } catch (InterruptedException e ) {
997967 LOG .warning (
998- "\n %s Interrupted @ processJsonQueryResponseResults: %s" ,
968+ "%s Interrupted @ processJsonQueryResponseResults: %s" ,
999969 Thread .currentThread ().getName (), e .getMessage ());
1000970 }
1001971 }
@@ -1034,7 +1004,7 @@ void populateFirstPage(
10341004 rpcResponseQueue .put (Tuple .of (result , true ));
10351005 } catch (InterruptedException e ) {
10361006 LOG .warning (
1037- "\n %s Interrupted @ populateFirstPage: %s" ,
1007+ "%s Interrupted @ populateFirstPage: %s" ,
10381008 Thread .currentThread ().getName (), e .getMessage ());
10391009 }
10401010 }
@@ -1074,7 +1044,7 @@ Thread runNextPageTaskAsync(
10741044 // do not process further pages and shutdown
10751045 if (Thread .currentThread ().isInterrupted () || queryTaskExecutor .isShutdown ()) {
10761046 LOG .warning (
1077- "\n %s Interrupted @ runNextPageTaskAsync" , Thread .currentThread ().getName ());
1047+ "%s Interrupted @ runNextPageTaskAsync" , Thread .currentThread ().getName ());
10781048 break ;
10791049 }
10801050
@@ -1088,30 +1058,20 @@ Thread runNextPageTaskAsync(
10881058 currentPageToken = currentResults .getNextPageToken ();
10891059 // this will be parsed asynchronously without blocking the current
10901060 // thread
1091- rpcResponseQueue . put ( Tuple .of (currentResults , true ));
1061+ Uninterruptibles . putUninterruptibly ( rpcResponseQueue , Tuple .of (currentResults , true ));
10921062 LOG .fine (
10931063 "Fetched %d results from the server in %d ms." ,
10941064 querySettings .getMaxResultPerPage (),
10951065 (int ) ((System .nanoTime () - startTime ) / 1000000 ));
10961066 }
10971067 } catch (Exception ex ) {
1098- try {
1099- bigQueryFieldValueListWrapperBlockingQueue .put (
1100- BigQueryFieldValueListWrapper .ofError (new BigQueryJdbcRuntimeException (ex )));
1101- } catch (InterruptedException ie ) {
1102- Thread .currentThread ().interrupt ();
1103- }
1068+ Uninterruptibles .putUninterruptibly (
1069+ bigQueryFieldValueListWrapperBlockingQueue ,
1070+ BigQueryFieldValueListWrapper .ofError (new BigQueryJdbcRuntimeException (ex )));
11041071 } finally {
1105- try {
1106- // this will stop the parseDataTask as well when the pagination
1107- // completes
1108- rpcResponseQueue .put (Tuple .of (null , false ));
1109- } catch (InterruptedException ie ) {
1110- LOG .warning (
1111- "\n %s Interrupted sending end-of-stream sentinel @ runNextPageTaskAsync" ,
1112- Thread .currentThread ().getName ());
1113- Thread .currentThread ().interrupt ();
1114- }
1072+ // this will stop the parseDataTask as well when the pagination
1073+ // completes
1074+ Uninterruptibles .putUninterruptibly (rpcResponseQueue , Tuple .of (null , false ));
11151075 }
11161076 // We cannot do queryTaskExecutor.shutdownNow() here as populate buffer method may not
11171077 // have finished processing the records and even that will be interrupted
@@ -1166,44 +1126,29 @@ Thread parseAndPopulateRpcDataAsync(
11661126 long startTime = System .nanoTime ();
11671127 long results = 0 ;
11681128 for (FieldValueList fieldValueList : fieldValueLists ) {
1169- try {
1170- if (Thread .currentThread ().isInterrupted () || queryTaskExecutor .isShutdown ()) {
1171- // do not process further pages and shutdown (inner loop)
1172- break ;
1173- }
1174- bigQueryFieldValueListWrapperBlockingQueue .put (
1175- BigQueryFieldValueListWrapper .of (schema .getFields (), fieldValueList ));
1176- results += 1 ;
1177- } catch (InterruptedException ex ) {
1178- throw new BigQueryJdbcRuntimeException (ex );
1129+
1130+ if (Thread .currentThread ().isInterrupted () || queryTaskExecutor .isShutdown ()) {
1131+ // do not process further pages and shutdown (inner loop)
1132+ break ;
11791133 }
1134+ Uninterruptibles .putUninterruptibly (
1135+ bigQueryFieldValueListWrapperBlockingQueue ,
1136+ BigQueryFieldValueListWrapper .of (schema .getFields (), fieldValueList ));
1137+ results += 1 ;
11801138 }
11811139 LOG .fine (
11821140 "Processed %d results in %d ms." ,
11831141 results , (int ) ((System .nanoTime () - startTime ) / 1000000 ));
11841142 }
1143+
11851144 } catch (Exception ex ) {
11861145 LOG .log (
11871146 Level .WARNING ,
11881147 "\n " + Thread .currentThread ().getName () + " Error @ populateBufferAsync" ,
11891148 ex );
1190- try {
1191- bigQueryFieldValueListWrapperBlockingQueue .put (
1192- BigQueryFieldValueListWrapper .ofError (new BigQueryJdbcRuntimeException (ex )));
1193- } catch (InterruptedException ie ) {
1194- Thread .currentThread ().interrupt ();
1195- }
1149+ enqueueBufferError (bigQueryFieldValueListWrapperBlockingQueue , ex );
11961150 } finally {
1197- try {
1198- // All the pages has been processed, put this marker
1199- bigQueryFieldValueListWrapperBlockingQueue .put (
1200- BigQueryFieldValueListWrapper .of (null , null , true ));
1201- } catch (InterruptedException e ) {
1202- LOG .log (
1203- Level .WARNING ,
1204- "\n " + Thread .currentThread ().getName () + " Interrupted @ populateBufferAsync" ,
1205- e );
1206- }
1151+ enqueueBufferEndOfStream (bigQueryFieldValueListWrapperBlockingQueue );
12071152 }
12081153 };
12091154
@@ -1610,4 +1555,22 @@ enum QueryDialectType {
16101555 SQL ,
16111556 BIG_QUERY
16121557 }
1558+
1559+ private void enqueueError (BlockingQueue <BigQueryArrowBatchWrapper > queue , Exception e ) {
1560+ Uninterruptibles .putUninterruptibly (
1561+ queue , BigQueryArrowBatchWrapper .ofError (new BigQueryJdbcRuntimeException (e )));
1562+ }
1563+
1564+ private void enqueueEndOfStream (BlockingQueue <BigQueryArrowBatchWrapper > queue ) {
1565+ Uninterruptibles .putUninterruptibly (queue , BigQueryArrowBatchWrapper .of (null , true ));
1566+ }
1567+
1568+ private void enqueueBufferError (BlockingQueue <BigQueryFieldValueListWrapper > queue , Exception e ) {
1569+ Uninterruptibles .putUninterruptibly (
1570+ queue , BigQueryFieldValueListWrapper .ofError (new BigQueryJdbcRuntimeException (e )));
1571+ }
1572+
1573+ private void enqueueBufferEndOfStream (BlockingQueue <BigQueryFieldValueListWrapper > queue ) {
1574+ Uninterruptibles .putUninterruptibly (queue , BigQueryFieldValueListWrapper .of (null , null , true ));
1575+ }
16131576}
0 commit comments