Skip to content

Commit 5477e61

Browse files
committed
feat(query): add streaming query with a timeunit
1 parent 738acd5 commit 5477e61

4 files changed

Lines changed: 189 additions & 5 deletions

File tree

src/main/java/org/influxdb/InfluxDB.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -538,6 +538,25 @@ public void write(final String database, final String retentionPolicy,
538538
public void query(Query query, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
539539
Consumer<Throwable> onFailure);
540540

541+
/**
542+
* Execute a streaming query against a database.
543+
*
544+
* @param query
545+
* the query to execute.
546+
* @param timeUnit
547+
* the time unit of the results.
548+
* @param chunkSize
549+
* the number of QueryResults to process in one chunk.
550+
* @param onNext
551+
* the consumer to invoke for each received QueryResult; with capability to discontinue a streaming query
552+
* @param onComplete
553+
* the onComplete to invoke for successfully end of stream
554+
* @param onFailure
555+
* the consumer for error handling
556+
*/
557+
public void query(Query query, TimeUnit timeUnit, int chunkSize, BiConsumer<Cancellable, QueryResult> onNext, Runnable onComplete,
558+
Consumer<Throwable> onFailure);
559+
541560
/**
542561
* Execute a query against a database.
543562
*

src/main/java/org/influxdb/impl/InfluxDBImpl.java

Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -713,6 +713,90 @@ public void onFailure(final Call<ResponseBody> call, final Throwable t) {
713713
});
714714
}
715715

716+
/**
717+
* {@inheritDoc}
718+
*/
719+
@Override
720+
public void query(final Query query, final TimeUnit timeUnit, final int chunkSize, final BiConsumer<Cancellable, QueryResult> onNext,
721+
final Runnable onComplete, final Consumer<Throwable> onFailure) {
722+
Call<ResponseBody> call;
723+
if (query.hasBoundParameters()) {
724+
if (query.requiresPost()) {
725+
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
726+
query.getParameterJsonWithUrlEncoded());
727+
} else {
728+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize,
729+
query.getParameterJsonWithUrlEncoded());
730+
}
731+
} else {
732+
if (query.requiresPost()) {
733+
call = this.influxDBService.postQuery(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
734+
} else {
735+
call = this.influxDBService.query(getDatabase(query), query.getCommandWithUrlEncoded(), TimeUtil.toTimePrecision(timeUnit), chunkSize);
736+
}
737+
}
738+
739+
call.enqueue(new Callback<ResponseBody>() {
740+
@Override
741+
public void onResponse(final Call<ResponseBody> call, final Response<ResponseBody> response) {
742+
743+
Cancellable cancellable = new Cancellable() {
744+
@Override
745+
public void cancel() {
746+
call.cancel();
747+
}
748+
749+
@Override
750+
public boolean isCanceled() {
751+
return call.isCanceled();
752+
}
753+
};
754+
755+
try {
756+
if (response.isSuccessful()) {
757+
ResponseBody chunkedBody = response.body();
758+
chunkProccesor.process(chunkedBody, cancellable, onNext, onComplete);
759+
} else {
760+
// REVIEW: must be handled consistently with IOException.
761+
ResponseBody errorBody = response.errorBody();
762+
if (errorBody != null) {
763+
InfluxDBException influxDBException = new InfluxDBException(errorBody.string());
764+
if (onFailure == null) {
765+
throw influxDBException;
766+
} else {
767+
onFailure.accept(influxDBException);
768+
}
769+
}
770+
}
771+
} catch (IOException e) {
772+
QueryResult queryResult = new QueryResult();
773+
queryResult.setError(e.toString());
774+
onNext.accept(cancellable, queryResult);
775+
//passing null onFailure consumer is here for backward compatibility
776+
//where the empty queryResult containing error is propagating into onNext consumer
777+
if (onFailure != null) {
778+
onFailure.accept(e);
779+
}
780+
} catch (Exception e) {
781+
call.cancel();
782+
if (onFailure != null) {
783+
onFailure.accept(e);
784+
}
785+
}
786+
787+
}
788+
789+
@Override
790+
public void onFailure(final Call<ResponseBody> call, final Throwable t) {
791+
if (onFailure == null) {
792+
throw new InfluxDBException(t);
793+
} else {
794+
onFailure.accept(t);
795+
}
796+
}
797+
});
798+
}
799+
716800
/**
717801
* {@inheritDoc}
718802
*/

src/main/java/org/influxdb/impl/InfluxDBService.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,12 +75,24 @@ public Call<QueryResult> postQuery(@Query(DB) String db, @Query(EPOCH) String ep
7575
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
7676
@Query(CHUNK_SIZE) int chunkSize);
7777

78+
@Streaming
79+
@POST("query?chunked=true")
80+
@FormUrlEncoded
81+
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
82+
@Query(CHUNK_SIZE) int chunkSize);
83+
7884
@Streaming
7985
@POST("query?chunked=true")
8086
@FormUrlEncoded
8187
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query,
8288
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
8389

90+
@Streaming
91+
@POST("query?chunked=true")
92+
@FormUrlEncoded
93+
public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
94+
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
95+
8496
@POST("query")
8597
@FormUrlEncoded
8698
public Call<QueryResult> postQuery(@Field(value = Q, encoded = true) String query);
@@ -90,8 +102,18 @@ public Call<ResponseBody> postQuery(@Query(DB) String db, @Field(value = Q, enco
90102
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
91103
@Query(CHUNK_SIZE) int chunkSize);
92104

105+
@Streaming
106+
@GET("query?chunked=true")
107+
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
108+
@Query(CHUNK_SIZE) int chunkSize);
109+
93110
@Streaming
94111
@GET("query?chunked=true")
95112
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query,
96113
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
114+
115+
@Streaming
116+
@GET("query?chunked=true")
117+
public Call<ResponseBody> query(@Query(DB) String db, @Query(value = Q, encoded = true) String query, @Query(EPOCH) String epoch,
118+
@Query(CHUNK_SIZE) int chunkSize, @Query(value = PARAMS, encoded = true) String params);
97119
}

src/test/java/org/influxdb/InfluxDBTest.java

Lines changed: 64 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,7 @@
2424
import java.time.Instant;
2525
import java.time.ZoneId;
2626
import java.time.format.DateTimeFormatter;
27-
import java.util.ArrayList;
28-
import java.util.Arrays;
29-
import java.util.Collections;
30-
import java.util.List;
31-
import java.util.Set;
27+
import java.util.*;
3228
import java.util.concurrent.BlockingQueue;
3329
import java.util.concurrent.Callable;
3430
import java.util.concurrent.CountDownLatch;
@@ -37,6 +33,7 @@
3733
import java.util.concurrent.LinkedBlockingQueue;
3834
import java.util.concurrent.ThreadFactory;
3935
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicReference;
4037
import java.util.concurrent.atomic.LongAdder;
4138
import java.util.function.Consumer;
4239
import java.util.regex.Pattern;
@@ -1158,6 +1155,68 @@ public void testChunkingOnComplete() throws InterruptedException {
11581155
Assertions.assertTrue(await, "The onComplete action did not arrive!");
11591156
}
11601157

1158+
/**
1159+
* Test chunking with TimeUnit
1160+
* @throws InterruptedException
1161+
*/
1162+
@Test
1163+
public void testChunkingWithImeUnit() throws InterruptedException {
1164+
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {
1165+
// do not test version 0.13 and 1.0
1166+
return;
1167+
}
1168+
1169+
String dbName = "write_unittest_" + System.currentTimeMillis();
1170+
this.influxDB.query(new Query("CREATE DATABASE " + dbName));
1171+
String rp = TestUtils.defaultRetentionPolicy(this.influxDB.version());
1172+
BatchPoints batchPoints = BatchPoints.database(dbName).retentionPolicy(rp).build();
1173+
Point point1 = Point.measurement("disk").tag("atag", "a").addField("used", 60L).addField("free", 1L).build();
1174+
Point point2 = Point.measurement("disk").tag("atag", "b").addField("used", 70L).addField("free", 2L).build();
1175+
Point point3 = Point.measurement("disk").tag("atag", "c").addField("used", 80L).addField("free", 3L).build();
1176+
batchPoints.point(point1);
1177+
batchPoints.point(point2);
1178+
batchPoints.point(point3);
1179+
this.influxDB.write(batchPoints);
1180+
1181+
CountDownLatch countDownLatch = new CountDownLatch(1);
1182+
1183+
Thread.sleep(2000);
1184+
Query query = new Query("SELECT * FROM disk", dbName);
1185+
this.influxDB.query(query, 2, result -> {}, countDownLatch::countDown);
1186+
List<QueryResult> results = new ArrayList<>();
1187+
AtomicReference<Throwable> errorFound = new AtomicReference<>();
1188+
1189+
// Run and map to points
1190+
this.influxDB.query(
1191+
query,
1192+
TimeUnit.MILLISECONDS,
1193+
5000,
1194+
(cancellable, queryResult) -> results.add(queryResult),
1195+
countDownLatch::countDown,
1196+
throwable -> {
1197+
countDownLatch.countDown();
1198+
errorFound.set(throwable);
1199+
}
1200+
);
1201+
1202+
Thread.sleep(2000);
1203+
this.influxDB.query(new Query("DROP DATABASE " + dbName));
1204+
1205+
boolean await = countDownLatch.await(10, TimeUnit.SECONDS);
1206+
Assertions.assertTrue(await, "The onComplete action did not arrive!");
1207+
Assertions.assertNull(errorFound.get(), "An error occurred : " + errorFound.get());
1208+
1209+
long totalPoints = results.stream()
1210+
.filter(qr -> qr.getResults() != null)
1211+
.flatMap(qr -> qr.getResults().stream())
1212+
.filter(r -> r.getSeries() != null)
1213+
.flatMap(r -> r.getSeries().stream())
1214+
.filter(s -> s.getValues() != null)
1215+
.mapToLong(s -> s.getValues().size())
1216+
.sum();
1217+
Assertions.assertEquals(3, totalPoints);
1218+
}
1219+
11611220
@Test
11621221
public void testChunkingFailOnComplete() throws InterruptedException {
11631222
if (this.influxDB.version().startsWith("0.") || this.influxDB.version().startsWith("1.0")) {

0 commit comments

Comments
 (0)