diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp2StreamResponseTimeout.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp2StreamResponseTimeout.java new file mode 100644 index 0000000000..722f116d7a --- /dev/null +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestHttp2StreamResponseTimeout.java @@ -0,0 +1,144 @@ +/* + * ==================================================================== + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * ==================================================================== + * + * This software consists of voluntary contributions made by many + * individuals on behalf of the Apache Software Foundation. For more + * information on the Apache Software Foundation, please see + * . + * + */ +package org.apache.hc.client5.testing.async; + +import static org.apache.hc.core5.util.ReflectionUtils.determineJRELevel; +import static org.junit.jupiter.api.Assumptions.assumeTrue; + +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hc.client5.http.async.methods.SimpleHttpRequest; +import org.apache.hc.client5.http.async.methods.SimpleHttpResponse; +import org.apache.hc.client5.http.async.methods.SimpleRequestBuilder; +import org.apache.hc.client5.http.config.ConnectionConfig; +import org.apache.hc.client5.http.config.RequestConfig; +import org.apache.hc.client5.http.config.TlsConfig; +import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager; +import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel; +import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel; +import org.apache.hc.client5.testing.extension.async.TestAsyncClient; +import org.apache.hc.core5.http.HttpHost; +import org.apache.hc.core5.http.URIScheme; +import org.apache.hc.core5.http2.H2StreamTimeoutException; +import org.apache.hc.core5.http2.HttpVersionPolicy; +import org.apache.hc.core5.pool.PoolStats; +import org.apache.hc.core5.util.Timeout; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; + +abstract class AbstractTestHttp2StreamResponseTimeout extends AbstractIntegrationTestBase { + + public AbstractTestHttp2StreamResponseTimeout(final URIScheme scheme) { + this(scheme, false); + } + + public AbstractTestHttp2StreamResponseTimeout(final URIScheme scheme, final boolean useUnixDomainSocket) { + super(scheme, ClientProtocolLevel.STANDARD, ServerProtocolLevel.H2_ONLY, useUnixDomainSocket); + } + + void checkAssumptions() { + } + + @Test + void testResponseTimeout() throws Exception { + checkAssumptions(); + configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new)); + final HttpHost target = startServer(); + + final TestAsyncClient client = startClient(); + final PoolingAsyncClientConnectionManager connManager = client.getConnectionManager(); + connManager.setDefaultConnectionConfig(ConnectionConfig.custom() + .setSocketTimeout(Timeout.ofMinutes(1)) + .build()); + connManager.setDefaultTlsConfig(TlsConfig.custom() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .build()); + + final SimpleHttpRequest request1 = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/10240") + .setRequestConfig(RequestConfig.custom() + .setUnixDomainSocket(getUnixDomainSocket()) + .setResponseTimeout(Timeout.ofMinutes(1)) + .build()) + .build(); + final SimpleHttpRequest request2 = SimpleRequestBuilder.get() + .setHttpHost(target) + .setPath("/random/10240?delay=2000") + .setRequestConfig(RequestConfig.custom() + .setUnixDomainSocket(getUnixDomainSocket()) + .setResponseTimeout(Timeout.ofMilliseconds(100)) + .build()) + .build(); + + final Future future1 = client.execute(request1, null, null); + final Future future2 = client.execute(request2, null, null); + final SimpleHttpResponse response1 = future1.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + Assertions.assertNotNull(response1); + final ExecutionException exception = Assertions.assertThrows(ExecutionException.class, () -> + future2.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, exception.getCause()); + + final PoolStats totalStats = connManager.getTotalStats(); + Assertions.assertTrue(totalStats.getAvailable() > 0); + } + +} + +@Disabled +public class TestHttp2StreamResponseTimeout { + + @Nested + class Http extends AbstractTestHttp2StreamResponseTimeout { + public Http() { + super(URIScheme.HTTP, false); + } + } + + @Nested + class Https extends AbstractTestHttp2StreamResponseTimeout { + public Https() { + super(URIScheme.HTTPS, false); + } + } + + @Nested + class Uds extends AbstractTestHttp2StreamResponseTimeout { + public Uds() { + super(URIScheme.HTTP, true); + } + + @Override + void checkAssumptions() { + assumeTrue(determineJRELevel() >= 16, "Async UDS requires Java 16+"); + } + } + +} diff --git a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java index c29017b41d..8be22927e0 100644 --- a/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java +++ b/httpclient5-testing/src/test/java/org/apache/hc/client5/testing/async/TestInternalHttpAsyncExecRuntime.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.function.Consumer; @@ -53,6 +54,7 @@ import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.nio.support.AsyncClientPipeline; import org.apache.hc.core5.http.support.BasicRequestBuilder; +import org.apache.hc.core5.http2.H2StreamResetException; import org.apache.hc.core5.http2.HttpVersionPolicy; import org.apache.hc.core5.pool.PoolStats; import org.apache.hc.core5.reactor.ConnectionInitiator; @@ -206,4 +208,65 @@ public void completed(final Message result) { } } + @Test + void testExecutionCancellation_http12_connectionAlive() throws Exception { + configureServer(bootstrap -> { + bootstrap.setServerProtocolLevel(ServerProtocolLevel.H2_ONLY); + bootstrap.register("/random/*", AsyncRandomHandler::new); + }); + final HttpHost target = startServer(); + + final TestAsyncClient client = startClient(); + final ConnectionInitiator connectionInitiator = client.getImplementation(); + final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager(); + for (int i = 0; i < REQ_NUM; i++) { + final HttpClientContext context = HttpClientContext.create(); + + final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime( + connectionManager, + connectionInitiator, + TlsConfig.custom() + .setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_2) + .build()); + final Future connectFuture = testRuntime.leaseAndConnect(target, context); + Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit())); + + final BasicFuture> resultFuture = new BasicFuture<>(null); + final Cancellable cancellable = testRuntime.execute( + "test-" + i, + AsyncClientPipeline.assemble() + .request(createRequest(target)) + .noContent() + .response().asByteArray() + .result(new FutureContribution>(resultFuture) { + + @Override + public void completed(final Message result) { + resultFuture.completed(result); + } + + }) + .create(), + context); + // sleep a bit + Thread.sleep(i % 10); + cancellable.cancel(); + + // The message exchange is expected to get aborted + try { + resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()); + } catch (final ExecutionException ex) { + Assertions.assertInstanceOf(H2StreamResetException.class, ex.getCause()); + } + Assertions.assertFalse(testRuntime.isAborted()); + // The underlying connection is expected to stay valid + Assertions.assertTrue(testRuntime.isEndpointConnected()); + testRuntime.markConnectionReusable(null, TimeValue.ofMinutes(1)); + testRuntime.releaseEndpoint(); + + final PoolStats totalStats = connectionManager.getTotalStats(); + Assertions.assertTrue(totalStats.getAvailable() > 0); + } + } + } diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java index da3adf532a..998b447b46 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/async/InternalHttpAsyncExecRuntime.java @@ -29,7 +29,6 @@ import java.io.InterruptedIOException; import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -347,25 +346,23 @@ private Cancellable doExecute( if (!isH2 && responseTimeout != null) { endpoint.setSocketTimeout(responseTimeout); } - endpoint.execute(id, exchangeHandler, pushHandlerFactory, context); - if (isH2 || requestConfig.isHardCancellationEnabled()) { - return new Cancellable() { - - private final AtomicBoolean cancelled = new AtomicBoolean(); - - @Override - public boolean cancel() { - if (cancelled.compareAndSet(false, true)) { - exchangeHandler.cancel(); - return true; - } - return false; - } - - }; - } else { - return Operations.nonCancellable(); + final ComplexCancellable complexCancellable = new ComplexCancellable(); + endpoint.execute( + id, + exchangeHandler, + pushHandlerFactory, + context, + isH2 ? streamControl -> { + streamControl.setTimeout(responseTimeout); + complexCancellable.setDependency(streamControl); + } : null); + if (!isH2 && requestConfig.isHardCancellationEnabled()) { + complexCancellable.setDependency(() -> { + exchangeHandler.cancel(); + return true; + }); } + return complexCancellable; } @Override diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java index 1f07c75fe9..58fbdca336 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/impl/nio/PoolingAsyncClientConnectionManager.java @@ -50,7 +50,6 @@ import org.apache.hc.client5.http.nio.AsyncClientConnectionOperator; import org.apache.hc.client5.http.nio.AsyncConnectionEndpoint; import org.apache.hc.client5.http.nio.ManagedAsyncClientConnection; -import org.apache.hc.client5.http.protocol.HttpClientContext; import org.apache.hc.client5.http.ssl.DefaultClientTlsStrategy; import org.apache.hc.core5.annotation.Contract; import org.apache.hc.core5.annotation.Internal; @@ -59,11 +58,13 @@ import org.apache.hc.core5.concurrent.CallbackContribution; import org.apache.hc.core5.concurrent.ComplexFuture; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Callback; import org.apache.hc.core5.function.Resolver; import org.apache.hc.core5.http.HttpConnection; import org.apache.hc.core5.http.HttpHost; import org.apache.hc.core5.http.HttpVersion; import org.apache.hc.core5.http.ProtocolVersion; +import org.apache.hc.core5.http.StreamControl; import org.apache.hc.core5.http.URIScheme; import org.apache.hc.core5.http.config.Lookup; import org.apache.hc.core5.http.config.RegistryBuilder; @@ -813,7 +814,8 @@ public void execute( final String exchangeId, final AsyncClientExchangeHandler exchangeHandler, final HandlerFactory pushHandlerFactory, - final HttpContext context) { + final HttpContext context, + final Callback initiationCallback) { final ManagedAsyncClientConnection connection = getValidatedPoolEntry().getConnection(); if (LOG.isDebugEnabled()) { LOG.debug("{} executing exchange {} over {}", id, exchangeId, ConnPoolSupport.getId(connection)); @@ -824,14 +826,19 @@ public void execute( exchangeHandler, pushHandlerFactory, context, - streamControl -> { - final HttpClientContext clientContext = HttpClientContext.cast(context); - final Timeout responseTimeout = clientContext.getRequestConfigOrDefault().getResponseTimeout(); - streamControl.setTimeout(responseTimeout); - }), + initiationCallback), Command.Priority.NORMAL); } + @Override + public void execute( + final String id, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context) { + execute(id, exchangeHandler, pushHandlerFactory, context, null); + } + @Override public EndpointInfo getInfo() { final PoolEntry poolEntry = poolEntryRef.get(); diff --git a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java index 8ae25aae2c..0fc8c4f9a4 100644 --- a/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java +++ b/httpclient5/src/main/java/org/apache/hc/client5/http/nio/AsyncConnectionEndpoint.java @@ -35,6 +35,8 @@ import org.apache.hc.core5.annotation.ThreadingBehavior; import org.apache.hc.core5.concurrent.BasicFuture; import org.apache.hc.core5.concurrent.FutureCallback; +import org.apache.hc.core5.function.Callback; +import org.apache.hc.core5.http.StreamControl; import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler; import org.apache.hc.core5.http.nio.AsyncPushConsumer; import org.apache.hc.core5.http.nio.AsyncRequestProducer; @@ -69,6 +71,28 @@ public abstract void execute( HandlerFactory pushHandlerFactory, HttpContext context); + /** + * Initiates a message exchange using the given handler. + * + * @param id unique operation ID or {@code null}. + * @param exchangeHandler the message exchange handler. + * @param pushHandlerFactory the push handler factory. + * @param context the execution context. + * @param initiationCallback Optional callback for message exchanges + * executed over a separate stream. The callback + * provides a interface allowing to control + * the process of message exchange execution. + * @since 5.7 + */ + public void execute( + final String id, + final AsyncClientExchangeHandler exchangeHandler, + final HandlerFactory pushHandlerFactory, + final HttpContext context, + final Callback initiationCallback) { + execute(id, exchangeHandler, pushHandlerFactory, context); + } + /** * Determines if the connection to the remote endpoint is still open and valid. */