Skip to content

Commit 34dc2e1

Browse files
committed
Fixing List population race in DocOps.java
1 parent 42217bd commit 34dc2e1

2 files changed

Lines changed: 22 additions & 28 deletions

File tree

src/main/java/couchbase/sdk/DocOps.java

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package couchbase.sdk;
22

33
import java.time.Duration;
4-
import java.util.ArrayList;
54
import java.util.List;
65
import java.util.concurrent.ConcurrentHashMap;
76
import java.util.function.Function;
@@ -36,41 +35,38 @@ public class DocOps {
3635
public List<Result> bulkInsert(Collection collection, List<Tuple2<String, Object>> documents, InsertOptions insertOptions) {
3736
ReactiveCollection reactiveCollection = collection.reactive();
3837

39-
List<Result> out = new ArrayList<Result>();
40-
41-
Flux.fromIterable(documents)
38+
// Emit error Results as part of the stream and collect at the end
39+
// This is thread-safe and avoids synchronization overhead
40+
return Flux.fromIterable(documents)
4241
.flatMap(documentToInsert -> {
43-
4442
String k = documentToInsert.getT1();
4543
Object v = documentToInsert.getT2();
4644

4745
return reactiveCollection.insert(k, v, insertOptions)
48-
.doOnError(error -> out.add(new Result(k, v, error, false)))
49-
.onErrorResume(error -> Mono.empty());
46+
.then(Mono.<Result>empty())
47+
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
5048
})
51-
.blockLast();
52-
53-
return out;
49+
.collectList()
50+
.block();
5451
}
5552

5653
public List<Result> bulkUpsert(Collection collection, List<Tuple2<String, Object>> documents,
5754
UpsertOptions upsertOptions) {
5855
ReactiveCollection reactiveCollection = collection.reactive();
59-
List<Result> out = new ArrayList<Result>();
6056

61-
Flux.fromIterable(documents)
57+
// Emit error Results as part of the stream and collect at the end
58+
// This is thread-safe and avoids synchronization overhead
59+
return Flux.fromIterable(documents)
6260
.flatMap(documentToInsert -> {
63-
6461
String k = documentToInsert.getT1();
6562
Object v = documentToInsert.getT2();
6663

6764
return reactiveCollection.upsert(k, v, upsertOptions)
68-
.doOnError(error -> out.add(new Result(k, v, error, false)))
69-
.onErrorResume(error -> Mono.empty());
65+
.then(Mono.<Result>empty())
66+
.onErrorResume(error -> Mono.just(new Result(k, v, error, false)));
7067
})
71-
.blockLast();
72-
73-
return out;
68+
.collectList()
69+
.block();
7470
}
7571

7672
public List<Tuple2<String, Object>> bulkGets(Collection collection, List<Tuple2<String, Object>> documents, GetOptions getOptions) {
@@ -97,18 +93,16 @@ public Mono<Tuple2<String, Object>> apply(Throwable error) {
9793
public List<Result> bulkDelete(Collection collection, List<String> keys, RemoveOptions removeOptions) {
9894
ReactiveCollection reactiveCollection = collection.reactive();
9995

100-
List<Result> out = new ArrayList<Result>();
101-
102-
Flux.fromIterable(keys)
96+
// Emit error Results as part of the stream and collect at the end
97+
// This is thread-safe and avoids synchronization overhead
98+
return Flux.fromIterable(keys)
10399
.flatMap(key -> {
104-
105100
return reactiveCollection.remove(key, removeOptions)
106-
.doOnError(error -> out.add(new Result(key, null, error, false)))
107-
.onErrorResume(error -> Mono.empty());
101+
.then(Mono.<Result>empty())
102+
.onErrorResume(error -> Mono.just(new Result(key, null, error, false)));
108103
})
109-
.blockLast();
110-
111-
return out;
104+
.collectList()
105+
.block();
112106
}
113107

114108
public List<ConcurrentHashMap<String, Object>> bulkReplace(Collection collection, List<Tuple2<String, Object>> documents,

src/main/java/couchbase/transactions/SimpleTransaction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public TransactionConfig createTransactionConfig(int expiryTimeout, int changedu
9393
return config.expirationTime(Duration.of(expiryTimeout, ChronoUnit.SECONDS)).build();
9494
}
9595
public List<Tuple2<String, JsonObject>> ReadTransaction(Transactions transaction, List<Collection> collections, List<String> Readkeys) {
96-
List<Tuple2<String, JsonObject>> res = null;
96+
List<Tuple2<String, JsonObject>> res = new ArrayList<>();
9797
try {
9898
TransactionResult result = transaction.run(ctx -> {
9999
for (String key: Readkeys) {

0 commit comments

Comments
 (0)