Skip to content

Commit dea1bf0

Browse files
committed
Remove Reactive backpressure examples
Motivation ---------- The SDK automatically handles backpressure using `autoRead` settings at the Netty layer, so users don't need to worry about backpressure when using the Reactive Query/Search/Analytics API.
1 parent 1d4fb46 commit dea1bf0

6 files changed

Lines changed: 13 additions & 155 deletions

File tree

modules/devguide/examples/java/Analytics.java

Lines changed: 1 addition & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,17 @@
2525

2626
// tag::imports[]
2727
import com.couchbase.client.core.error.CouchbaseException;
28-
import com.couchbase.client.java.Scope;
2928
import com.couchbase.client.java.Bucket;
3029
import com.couchbase.client.java.Cluster;
30+
import com.couchbase.client.java.Scope;
3131
import com.couchbase.client.java.analytics.AnalyticsResult;
3232
import com.couchbase.client.java.analytics.AnalyticsScanConsistency;
3333
import com.couchbase.client.java.analytics.ReactiveAnalyticsResult;
3434
import com.couchbase.client.java.json.JsonArray;
3535
import com.couchbase.client.java.json.JsonObject;
36-
import org.reactivestreams.Subscription;
37-
import reactor.core.publisher.BaseSubscriber;
3836
import reactor.core.publisher.Mono;
3937

4038
import java.util.UUID;
41-
import java.util.concurrent.atomic.AtomicInteger;
4239

4340
import static com.couchbase.client.java.analytics.AnalyticsOptions.analyticsOptions;
4441
// end::imports[]
@@ -199,41 +196,8 @@ public static void asyncExamples(String... args) {
199196
.subscribe(row -> System.out.println("Found row: " + row));
200197
// end::simplereactive[]
201198
}
202-
203-
System.out.println("backpressure");
204-
// tag::backpressure[]
205-
Mono<ReactiveAnalyticsResult> result = cluster
206-
.reactive()
207-
.analyticsQuery("select * from `huge-dataset`");
208-
209-
result
210-
.flatMapMany(ReactiveAnalyticsResult::rowsAsObject)
211-
.subscribe(new BaseSubscriber<JsonObject>() {
212-
// Number of outstanding requests
213-
final AtomicInteger outstanding = new AtomicInteger(0);
214-
215-
@Override
216-
protected void hookOnSubscribe(Subscription subscription) {
217-
request(10); // initially request to rows
218-
outstanding.set(10);
219-
}
220-
221-
@Override
222-
protected void hookOnNext(JsonObject value) {
223-
process(value);
224-
if (outstanding.decrementAndGet() == 0) {
225-
request(10);
226-
outstanding.set(10);
227-
}
228-
}
229-
});
230-
// end::backpressure[]
231199
}
232200

233201
}
234202

235-
static void process(JsonObject value) {
236-
237-
}
238-
239203
}

modules/devguide/examples/java/Queries.java

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -17,29 +17,19 @@
1717
// NB: This example requires the `travel-sample` bucket to be installed.
1818

1919
// tag::imports[]
20-
import static com.couchbase.client.java.query.QueryOptions.queryOptions;
21-
22-
import java.util.UUID;
23-
import java.util.concurrent.atomic.AtomicInteger;
24-
25-
import com.couchbase.client.core.error.CouchbaseException;
2620
import com.couchbase.client.java.Bucket;
2721
import com.couchbase.client.java.Cluster;
28-
import com.couchbase.client.java.Collection;
2922
import com.couchbase.client.java.Scope;
3023
import com.couchbase.client.java.json.JsonArray;
3124
import com.couchbase.client.java.json.JsonObject;
32-
import com.couchbase.client.java.kv.MutationResult;
33-
import com.couchbase.client.java.kv.MutationState;
34-
import com.couchbase.client.java.query.QueryOptions;
3525
import com.couchbase.client.java.query.QueryResult;
3626
import com.couchbase.client.java.query.QueryScanConsistency;
3727
import com.couchbase.client.java.query.ReactiveQueryResult;
28+
import reactor.core.publisher.Mono;
3829

39-
import org.reactivestreams.Subscription;
30+
import java.util.UUID;
4031

41-
import reactor.core.publisher.BaseSubscriber;
42-
import reactor.core.publisher.Mono;
32+
import static com.couchbase.client.java.query.QueryOptions.queryOptions;
4333
// end::imports[]
4434

4535
public class Queries {
@@ -138,32 +128,6 @@ public static void main(String[] args) throws Exception {
138128
// end::simplereactive[]
139129
}
140130

141-
{
142-
System.out.println("\nExample: [backpressure]");
143-
// tag::backpressure[]
144-
Mono<ReactiveQueryResult> result = cluster.reactive().query("select * from `travel-sample`.inventory.route");
145-
146-
result.flatMapMany(ReactiveQueryResult::rowsAsObject).subscribe(new BaseSubscriber<JsonObject>() {
147-
// Number of outstanding requests
148-
final AtomicInteger oustanding = new AtomicInteger(0);
149-
150-
@Override
151-
protected void hookOnSubscribe(Subscription subscription) {
152-
request(10); // initially request to rows
153-
oustanding.set(10);
154-
}
155-
156-
@Override
157-
protected void hookOnNext(JsonObject value) {
158-
process(value);
159-
if (oustanding.decrementAndGet() == 0) {
160-
request(10);
161-
}
162-
}
163-
});
164-
// end::backpressure[]
165-
}
166-
167131
{
168132
System.out.println("\nExample: [scope-level-query]");
169133
// tag::scope-level-query[]
@@ -178,8 +142,4 @@ protected void hookOnNext(JsonObject value) {
178142

179143
}
180144

181-
static void process(JsonObject value) {
182-
183-
}
184-
185145
}

modules/devguide/examples/java/Search.java

Lines changed: 6 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,6 @@
1414
* limitations under the License.
1515
*/
1616

17-
import static com.couchbase.client.java.search.SearchOptions.searchOptions;
18-
import java.util.HashMap;
19-
import java.util.List;
20-
import java.util.Map;
21-
import java.util.concurrent.atomic.AtomicInteger;
2217
import com.couchbase.client.core.error.CouchbaseException;
2318
import com.couchbase.client.java.Bucket;
2419
import com.couchbase.client.java.Cluster;
@@ -37,10 +32,14 @@
3732
import com.couchbase.client.java.search.sort.SearchSort;
3833
import com.couchbase.client.java.search.vector.VectorQuery;
3934
import com.couchbase.client.java.search.vector.VectorSearch;
40-
import org.reactivestreams.Subscription;
41-
import reactor.core.publisher.BaseSubscriber;
4235
import reactor.core.publisher.Mono;
4336

37+
import java.util.HashMap;
38+
import java.util.List;
39+
import java.util.Map;
40+
41+
import static com.couchbase.client.java.search.SearchOptions.searchOptions;
42+
4443
// This example assumes an index called `travel-sample-index` exists.
4544
// Please refer to file `modules/test/scripts/init-couchbase/init-buckets.sh` (line 66)
4645
// for the relevant curl command to create this.
@@ -149,32 +148,6 @@ public static void main(String... args) {
149148
// end::simplereactive[]
150149
}
151150

152-
{
153-
// tag::backpressure[]
154-
Mono<ReactiveSearchResult> result = cluster.reactive().searchQuery("travel-sample-index",
155-
SearchQuery.queryString("swanky"));
156-
157-
result.flatMapMany(ReactiveSearchResult::rows).subscribe(new BaseSubscriber<SearchRow>() {
158-
// Number of outstanding requests
159-
final AtomicInteger oustanding = new AtomicInteger(0);
160-
161-
@Override
162-
protected void hookOnSubscribe(Subscription subscription) {
163-
request(10); // initially request to rows
164-
oustanding.set(10);
165-
}
166-
167-
@Override
168-
protected void hookOnNext(SearchRow row) {
169-
process(row);
170-
if (oustanding.decrementAndGet() == 0) {
171-
request(10);
172-
}
173-
}
174-
});
175-
// end::backpressure[]
176-
}
177-
178151
// This will come from an external source, such as an embeddings API.
179152
float[] vectorQuery = null;
180153
float[] anotherVectorQuery = null;
@@ -218,8 +191,4 @@ protected void hookOnNext(SearchRow row) {
218191

219192
}
220193

221-
static void process(SearchRow value) {
222-
223-
}
224-
225194
}

modules/howtos/pages/analytics-using-sdk.adoc

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -190,18 +190,7 @@ A simple reactive query is similar to the blocking one:
190190
include::devguide:example$java/Analytics.java[tag=simplereactive,indent=0]
191191
----
192192

193-
This query will stream all rows as they become available from the server. If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls.
194-
195-
[source,java]
196-
----
197-
include::devguide:example$java/Analytics.java[tag=backpressure,indent=0]
198-
----
199-
200-
In this example we initially request a batch size of 10 rows (so streaming can begin).
201-
Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process.
202-
Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded.
203-
Please note that with reactive code, if your `process()` method equivalent is blocking, you *must* move it onto another scheduler so that the I/O threads are not stalled.
204-
We always recommend not blocking in the first place in reactive code.
193+
This query will stream all rows as they become available from the server, automatically applying backpressure as necessary.
205194

206195
== Scoped Queries on Named Collections
207196

modules/howtos/pages/full-text-searching-with-sdk.adoc

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -303,16 +303,4 @@ A simple reactive query is similar to the blocking one:
303303
include::devguide:example$java/Search.java[tag=simplereactive,indent=0]
304304
----
305305

306-
This Search query will stream all rows as they become available form the server.
307-
If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls.
308-
309-
[source,java]
310-
----
311-
include::devguide:example$java/Search.java[tag=backpressure,indent=0]
312-
----
313-
314-
In this example we initially request a batch size of 10 rows (so streaming can begin).
315-
Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process.
316-
Then a counter is decremented, and once all of the 10 outstanding rows are processed another batch is loaded.
317-
Please note that with reactive code, if your `process()` method equivalent is blocking, you *must* move it onto another scheduler so that the I/O threads are not stalled.
318-
We always recommend not blocking in the first place in reactive code.
306+
This Search query will stream all rows as they become available from the server, automatically applying backpressure as necessary.

modules/howtos/pages/sqlpp-queries-with-sdk.adoc

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -238,19 +238,7 @@ A simple reactive query is similar to the blocking one:
238238
include::devguide:example$java/Queries.java[tag=simplereactive,indent=0]
239239
----
240240
241-
This query will stream all rows as they become available form the server.
242-
If you want to manually control the data flow (which is important if you are streaming a lot of rows which could cause a potential out of memory situation) you can do this by using explicit `request()` calls.
243-
244-
[source,java]
245-
----
246-
include::devguide:example$java/Queries.java[tag=backpressure,indent=0]
247-
----
248-
249-
In this example we initially request a batch size of 10 rows (so streaming can begin).
250-
Then as each row gets streamed it is written to a `process()` method which does whatever it needs to do to process.
251-
Then a counter is decremented and once all of the 10 outstanding rows are processed another batch is loaded.
252-
Please note that if your `process()` method equivalent is blocking, like always with reactive code, you *must* move it onto another scheduler so that the I/O threads are not stalled.
253-
As always we recommend not blocking in the first place in reactive code.
241+
This query will stream all rows as they become available from the server, automatically applying backpressure as necessary.
254242
255243
256244
== Querying at Scope Level

0 commit comments

Comments
 (0)