Skip to content

Commit b962b28

Browse files
committed
Expose terminal error as a CompletionStage that completes with the failure cause or null on success.
1 parent 0fac29b commit b962b28

2 files changed

Lines changed: 40 additions & 2 deletions

File tree

httpcore5-reactive/src/main/java/org/apache/hc/core5/reactive/ReactiveResponseConsumer.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.util.ArrayList;
3232
import java.util.Collections;
3333
import java.util.List;
34+
import java.util.concurrent.CancellationException;
3435
import java.util.concurrent.CompletableFuture;
3536
import java.util.concurrent.CompletionStage;
3637
import java.util.concurrent.Future;
@@ -67,6 +68,12 @@ public final class ReactiveResponseConsumer implements AsyncResponseConsumer<Voi
6768
private final CompletableFuture<Message<HttpResponse, Publisher<ByteBuffer>>> responseCompletableFuture;
6869
private final CompletableFuture<Void> responseCompletionFuture;
6970

71+
/**
72+
* Completes with {@code null} on success and with the terminal {@link Throwable} on failure.
73+
* This future never completes exceptionally.
74+
*/
75+
private final CompletableFuture<Throwable> failureFuture;
76+
7077
private volatile BasicFuture<Void> responseCompletion;
7178
private volatile HttpResponse informationResponse;
7279
private volatile EntityDetails entityDetails;
@@ -78,6 +85,7 @@ public ReactiveResponseConsumer() {
7885
this.responseFuture = new BasicFuture<>(null);
7986
this.responseCompletableFuture = new CompletableFuture<>();
8087
this.responseCompletionFuture = new CompletableFuture<>();
88+
this.failureFuture = new CompletableFuture<>();
8189
}
8290

8391
/**
@@ -90,6 +98,7 @@ public ReactiveResponseConsumer(final FutureCallback<Message<HttpResponse, Publi
9098
this.responseFuture = new BasicFuture<>(Args.notNull(responseCallback, "responseCallback"));
9199
this.responseCompletableFuture = new CompletableFuture<>();
92100
this.responseCompletionFuture = new CompletableFuture<>();
101+
this.failureFuture = new CompletableFuture<>();
93102
}
94103

95104
public Future<Message<HttpResponse, Publisher<ByteBuffer>>> getResponseFuture() {
@@ -136,6 +145,16 @@ public CompletionStage<Void> getResponseCompletionStage() {
136145
return responseCompletionFuture;
137146
}
138147

148+
/**
149+
* Completes with {@code null} on success and with the terminal {@link Throwable} on failure.
150+
* This stage never completes exceptionally.
151+
*
152+
* @since 5.5
153+
*/
154+
public CompletionStage<Throwable> getFailureStage() {
155+
return failureFuture;
156+
}
157+
139158
/**
140159
* Returns the intermediate (1xx) HTTP response if one was received.
141160
*
@@ -196,6 +215,9 @@ public void failed(final Exception cause) {
196215
responseCompletableFuture.completeExceptionally(cause);
197216
responseCompletionFuture.completeExceptionally(cause);
198217

218+
// Record failure as a normal completion value.
219+
failureFuture.complete(cause);
220+
199221
final BasicFuture<Void> completion = responseCompletion;
200222
if (completion != null) {
201223
completion.failed(cause);
@@ -222,6 +244,9 @@ public void streamEnd(final List<? extends Header> trailers) {
222244
// Complete CF before BasicFuture.completed(...) (it may trigger releaseResources()).
223245
responseCompletionFuture.complete(null);
224246

247+
// Success => no failure.
248+
failureFuture.complete(null);
249+
225250
final BasicFuture<Void> completion = responseCompletion;
226251
if (completion != null) {
227252
completion.completed(null);
@@ -241,6 +266,10 @@ public void releaseResources() {
241266
responseCompletionFuture.cancel(true);
242267
}
243268

269+
if (!failureFuture.isDone()) {
270+
failureFuture.complete(new CancellationException());
271+
}
272+
244273
final BasicFuture<Void> completion = responseCompletion;
245274
if (completion != null) {
246275
completion.cancel();

httpcore5-reactive/src/test/java/org/apache/hc/core5/reactive/examples/ReactiveClientCompletionStageExample.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,6 @@
3939
import java.util.concurrent.TimeUnit;
4040
import java.util.concurrent.TimeoutException;
4141

42-
import io.reactivex.rxjava3.core.Flowable;
43-
import io.reactivex.rxjava3.core.Observable;
4442
import org.apache.hc.core5.http.ContentType;
4543
import org.apache.hc.core5.http.Header;
4644
import org.apache.hc.core5.http.HttpConnection;
@@ -60,6 +58,9 @@
6058
import org.apache.hc.core5.util.Timeout;
6159
import org.reactivestreams.Publisher;
6260

61+
import io.reactivex.rxjava3.core.Flowable;
62+
import io.reactivex.rxjava3.core.Observable;
63+
6364
/**
6465
* Client demo using CompletionStage accessors on ReactiveResponseConsumer (Java 8).
6566
*/
@@ -190,6 +191,14 @@ public void onExchangeComplete(final HttpConnection connection, final boolean ke
190191
final ReactiveResponseConsumer consumer = new ReactiveResponseConsumer();
191192
requester.execute(requestProducer, consumer, Timeout.ofSeconds(30), null);
192193

194+
consumer.getFailureStage().whenComplete((t, ex) -> {
195+
if (ex != null) {
196+
ex.printStackTrace();
197+
} else if (t != null) {
198+
System.out.println("Request failed: " + t);
199+
}
200+
});
201+
193202
final CompletableFuture<Void> printedAndDrained = new CompletableFuture<>();
194203

195204
consumer.getResponseStage().whenComplete((msg, ex) -> {

0 commit comments

Comments
 (0)