Skip to content

Commit 75f5829

Browse files
committed
* Improved cancellability of HTTP message streams
* Improved response timeout support
1 parent 323251c commit 75f5829

4 files changed

Lines changed: 140 additions & 31 deletions

File tree

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java

Lines changed: 56 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,10 @@
4444
import org.apache.hc.client5.http.protocol.HttpClientContext;
4545
import org.apache.hc.core5.concurrent.CallbackContribution;
4646
import org.apache.hc.core5.concurrent.Cancellable;
47+
import org.apache.hc.core5.concurrent.ComplexCancellable;
4748
import org.apache.hc.core5.concurrent.FutureCallback;
49+
import org.apache.hc.core5.http.HttpVersion;
50+
import org.apache.hc.core5.http.ProtocolVersion;
4851
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
4952
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
5053
import org.apache.hc.core5.http.nio.HandlerFactory;
@@ -327,6 +330,54 @@ private AsyncClientExchangeHandler guard(final AsyncClientExchangeHandler handle
327330
return new ReleasingAsyncClientExchangeHandler(handler, this::releaseSlot);
328331
}
329332

333+
private Cancellable doExecute(
334+
final String id,
335+
final AsyncConnectionEndpoint endpoint,
336+
final AsyncClientExchangeHandler exchangeHandler,
337+
final HttpClientContext context) {
338+
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
339+
final Timeout responseTimeout = requestConfig.getResponseTimeout();
340+
final EndpointInfo endpointInfo = endpoint.getInfo();
341+
final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null;
342+
final boolean supportsStreams = version != null && version.greaterEquals(HttpVersion.HTTP_2);
343+
344+
if (log.isDebugEnabled()) {
345+
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
346+
}
347+
348+
if (supportsStreams) {
349+
final ComplexCancellable complexCancellable = new ComplexCancellable();
350+
endpoint.execute(
351+
id,
352+
exchangeHandler,
353+
pushHandlerFactory,
354+
context,
355+
streamControl -> {
356+
streamControl.setTimeout(responseTimeout);
357+
complexCancellable.setDependency(streamControl);
358+
});
359+
return complexCancellable;
360+
} else {
361+
if (responseTimeout != null) {
362+
endpoint.setSocketTimeout(responseTimeout);
363+
}
364+
endpoint.execute(
365+
id,
366+
exchangeHandler,
367+
pushHandlerFactory,
368+
context,
369+
null);
370+
if (requestConfig.isHardCancellationEnabled()) {
371+
return () -> {
372+
exchangeHandler.cancel();
373+
return true;
374+
};
375+
} else {
376+
return Operations.nonCancellable();
377+
}
378+
}
379+
}
380+
330381
@Override
331382
public Cancellable execute(
332383
final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
@@ -336,46 +387,29 @@ public Cancellable execute(
336387
"Execution pipeline queue limit reached (max=" + maxQueued + ")"));
337388
return Operations.nonCancellable();
338389
}
339-
final AsyncClientExchangeHandler actual = guard(exchangeHandler);
390+
final AsyncClientExchangeHandler handler = guard(exchangeHandler);
340391
if (endpoint.isConnected()) {
341-
if (log.isDebugEnabled()) {
342-
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
343-
}
344-
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
345-
final Timeout responseTimeout = requestConfig.getResponseTimeout();
346-
if (responseTimeout != null) {
347-
endpoint.setSocketTimeout(responseTimeout);
348-
}
349-
endpoint.execute(id, actual, pushHandlerFactory, context);
350-
if (context.getRequestConfigOrDefault().isHardCancellationEnabled()) {
351-
return () -> {
352-
actual.cancel();
353-
return true;
354-
};
355-
}
392+
doExecute(id, endpoint, handler, context);
356393
} else {
357394
connectEndpoint(context, new FutureCallback<AsyncExecRuntime>() {
358395

359396
@Override
360397
public void completed(final AsyncExecRuntime runtime) {
361-
if (log.isDebugEnabled()) {
362-
log.debug("{} start execution {}", ConnPoolSupport.getId(endpoint), id);
363-
}
364398
try {
365-
endpoint.execute(id, actual, pushHandlerFactory, context);
399+
doExecute(id, endpoint, handler, context);
366400
} catch (final RuntimeException ex) {
367401
failed(ex);
368402
}
369403
}
370404

371405
@Override
372406
public void failed(final Exception ex) {
373-
actual.failed(ex);
407+
handler.failed(ex);
374408
}
375409

376410
@Override
377411
public void cancelled() {
378-
actual.failed(new InterruptedIOException());
412+
handler.failed(new InterruptedIOException());
379413
}
380414

381415
});

httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/MinimalHttpAsyncClient.java

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import java.util.concurrent.atomic.AtomicBoolean;
3636
import java.util.concurrent.atomic.AtomicInteger;
3737

38+
import org.apache.hc.client5.http.EndpointInfo;
3839
import org.apache.hc.client5.http.HttpRoute;
3940
import org.apache.hc.client5.http.SchemePortResolver;
4041
import org.apache.hc.client5.http.config.Configurable;
@@ -61,6 +62,8 @@
6162
import org.apache.hc.core5.http.HttpHost;
6263
import org.apache.hc.core5.http.HttpResponse;
6364
import org.apache.hc.core5.http.HttpStatus;
65+
import org.apache.hc.core5.http.HttpVersion;
66+
import org.apache.hc.core5.http.ProtocolVersion;
6467
import org.apache.hc.core5.http.nio.AsyncClientEndpoint;
6568
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
6669
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
@@ -448,6 +451,40 @@ public boolean isConnected() {
448451
return !isReleased() && connectionEndpoint.isConnected();
449452
}
450453

454+
private void doExecute(
455+
final String id,
456+
final AsyncConnectionEndpoint endpoint,
457+
final AsyncClientExchangeHandler exchangeHandler,
458+
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
459+
final HttpClientContext context) {
460+
final RequestConfig requestConfig = context.getRequestConfigOrDefault();
461+
final Timeout responseTimeout = requestConfig.getResponseTimeout();
462+
final EndpointInfo endpointInfo = endpoint.getInfo();
463+
final ProtocolVersion version = endpointInfo != null ? endpointInfo.getProtocol() : null;
464+
final boolean supportsStreams = version != null && version.greaterEquals(HttpVersion.HTTP_2);
465+
466+
if (supportsStreams) {
467+
endpoint.execute(
468+
id,
469+
exchangeHandler,
470+
pushHandlerFactory,
471+
context,
472+
streamControl -> {
473+
streamControl.setTimeout(responseTimeout);
474+
});
475+
} else {
476+
if (responseTimeout != null) {
477+
endpoint.setSocketTimeout(responseTimeout);
478+
}
479+
endpoint.execute(
480+
id,
481+
exchangeHandler,
482+
pushHandlerFactory,
483+
context,
484+
null);
485+
}
486+
}
487+
451488
@Override
452489
public void execute(
453490
final AsyncClientExchangeHandler exchangeHandler,
@@ -460,13 +497,18 @@ public void execute(
460497
clientContext.setExchangeId(exchangeId);
461498
if (LOG.isDebugEnabled()) {
462499
LOG.debug("{} executing message exchange {}", exchangeId, ConnPoolSupport.getId(connectionEndpoint));
463-
connectionEndpoint.execute(
500+
doExecute(
464501
exchangeId,
502+
connectionEndpoint,
465503
new LoggingAsyncClientExchangeHandler(LOG, exchangeId, exchangeHandler),
466504
pushHandlerFactory,
467505
clientContext);
468506
} else {
469-
connectionEndpoint.execute(exchangeId, exchangeHandler, clientContext);
507+
doExecute(exchangeId,
508+
connectionEndpoint,
509+
exchangeHandler,
510+
pushHandlerFactory,
511+
clientContext);
470512
}
471513
}
472514

httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,6 @@
5050
import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator;
5151
import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint;
5252
import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection;
53-
import org.apache.hc.client5.http.protocol.HttpClientContext;
5453
import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy;
5554
import org.apache.hc.core5.annotation.Contract;
5655
import org.apache.hc.core5.annotation.Internal;
@@ -59,11 +58,13 @@
5958
import org.apache.hc.core5.concurrent.CallbackContribution;
6059
import org.apache.hc.core5.concurrent.ComplexFuture;
6160
import org.apache.hc.core5.concurrent.FutureCallback;
61+
import org.apache.hc.core5.function.Callback;
6262
import org.apache.hc.core5.function.Resolver;
6363
import org.apache.hc.core5.http.HttpConnection;
6464
import org.apache.hc.core5.http.HttpHost;
6565
import org.apache.hc.core5.http.HttpVersion;
6666
import org.apache.hc.core5.http.ProtocolVersion;
67+
import org.apache.hc.core5.http.StreamControl;
6768
import org.apache.hc.core5.http.URIScheme;
6869
import org.apache.hc.core5.http.config.Lookup;
6970
import org.apache.hc.core5.http.config.RegistryBuilder;
@@ -808,12 +809,16 @@ public void setSocketTimeout(final Timeout timeout) {
808809
getValidatedPoolEntry().getConnection().setSocketTimeout(timeout);
809810
}
810811

812+
/**
813+
* @since 5.7
814+
*/
811815
@Override
812816
public void execute(
813817
final String exchangeId,
814818
final AsyncClientExchangeHandler exchangeHandler,
815819
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
816-
final HttpContext context) {
820+
final HttpContext context,
821+
final Callback<StreamControl> initiationCallback) {
817822
final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection();
818823
if (LOG.isDebugEnabled()) {
819824
LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection));
@@ -824,14 +829,19 @@ public void execute(
824829
exchangeHandler,
825830
pushHandlerFactory,
826831
context,
827-
streamControl -> {
828-
final HttpClientContext clientContext = HttpClientContext.cast(context);
829-
final Timeout responseTimeout = clientContext.getRequestConfigOrDefault().getResponseTimeout();
830-
streamControl.setTimeout(responseTimeout);
831-
}),
832+
initiationCallback),
832833
Command.Priority.NORMAL);
833834
}
834835

836+
@Override
837+
public void execute(
838+
final String exchangeId,
839+
final AsyncClientExchangeHandler exchangeHandler,
840+
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
841+
final HttpContext context) {
842+
execute(exchangeId, exchangeHandler, pushHandlerFactory, context, null);
843+
}
844+
835845
@Override
836846
public EndpointInfo getInfo() {
837847
final PoolEntry<HttpRoute, ManagedAsyncClientConnection> poolEntry = poolEntryRef.get();

httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
import org.apache.hc.core5.annotation.ThreadingBehavior;
3636
import org.apache.hc.core5.concurrent.BasicFuture;
3737
import org.apache.hc.core5.concurrent.FutureCallback;
38+
import org.apache.hc.core5.function.Callback;
39+
import org.apache.hc.core5.http.StreamControl;
3840
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
3941
import org.apache.hc.core5.http.nio.AsyncPushConsumer;
4042
import org.apache.hc.core5.http.nio.AsyncRequestProducer;
@@ -69,6 +71,27 @@ public abstract void execute(
6971
HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
7072
HttpContext context);
7173

74+
/**
75+
* Initiates a message exchange using the given handler.
76+
*
77+
* @param id unique operation ID or {@code null}.
78+
* @param exchangeHandler the message exchange handler.
79+
* @param pushHandlerFactory the push handler factory.
80+
* @param context the execution context.
81+
* @param initiationCallback the initialization callback optionally executed
82+
* by connections that support cancellable message
83+
* streams.
84+
* @since 5.7
85+
*/
86+
public void execute(
87+
final String id,
88+
final AsyncClientExchangeHandler exchangeHandler,
89+
final HandlerFactory<AsyncPushConsumer> pushHandlerFactory,
90+
final HttpContext context,
91+
final Callback<StreamControl> initiationCallback) {
92+
execute(id, exchangeHandler, pushHandlerFactory, context);
93+
}
94+
7295
/**
7396
* Determines if the connection to the remote endpoint is still open and valid.
7497
*/

0 commit comments

Comments
 (0)