Skip to content

Commit e42602a

Browse files
committed
Bug-fix: corrects message exchange cancellation logic in InternalHttpAsyncExecRuntime
* Fixes the problem with message exchanges over an existing persistent connection being non-cancellable * Hard-cancellation request parameter should have no bearing on H2 connections * Response timeout not be applied to H2 endpoints * Test coverage
1 parent 1aac376 commit e42602a

4 files changed

Lines changed: 417 additions & 25 deletions

File tree

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.impl.async;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.List;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
import org.apache.hc.client5.http.HttpRoute;
37+
import org.apache.hc.client5.http.async.AsyncExecRuntime;
38+
import org.apache.hc.client5.http.config.TlsConfig;
39+
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
40+
import org.apache.hc.client5.http.protocol.HttpClientContext;
41+
import org.apache.hc.core5.concurrent.BasicFuture;
42+
import org.apache.hc.core5.concurrent.Cancellable;
43+
import org.apache.hc.core5.concurrent.FutureContribution;
44+
import org.apache.hc.core5.http.EntityDetails;
45+
import org.apache.hc.core5.http.Header;
46+
import org.apache.hc.core5.http.HttpException;
47+
import org.apache.hc.core5.http.HttpHost;
48+
import org.apache.hc.core5.http.HttpResponse;
49+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50+
import org.apache.hc.core5.http.nio.CapacityChannel;
51+
import org.apache.hc.core5.http.nio.DataStreamChannel;
52+
import org.apache.hc.core5.http.nio.RequestChannel;
53+
import org.apache.hc.core5.http.protocol.HttpContext;
54+
import org.apache.hc.core5.reactor.ConnectionInitiator;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
57+
58+
public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExecRuntime {
59+
60+
private static final Logger LOG = LoggerFactory.getLogger(InternalTestHttpAsyncExecRuntime.class);
61+
62+
private final AtomicBoolean cancelled;
63+
64+
public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager,
65+
final ConnectionInitiator connectionInitiator,
66+
final TlsConfig tlsConfig) {
67+
super(LOG, manager, connectionInitiator, null, tlsConfig, -1, new AtomicInteger());
68+
this.cancelled = new AtomicBoolean();
69+
}
70+
71+
public Future<Boolean> leaseAndConnect(final HttpHost target, final HttpClientContext context) {
72+
final BasicFuture<Boolean> resultFuture = new BasicFuture<>(null);
73+
acquireEndpoint("test", new HttpRoute(target), null, context, new FutureContribution<AsyncExecRuntime>(resultFuture) {
74+
75+
@Override
76+
public void completed(final AsyncExecRuntime runtime) {
77+
if (!runtime.isEndpointConnected()) {
78+
runtime.connectEndpoint(context, new FutureContribution<AsyncExecRuntime>(resultFuture) {
79+
80+
@Override
81+
public void completed(final AsyncExecRuntime runtime) {
82+
resultFuture.completed(true);
83+
}
84+
85+
});
86+
} else {
87+
resultFuture.completed(true);
88+
}
89+
}
90+
91+
});
92+
return resultFuture;
93+
}
94+
95+
@Override
96+
public Cancellable execute(final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
97+
return super.execute(id, new AsyncClientExchangeHandler() {
98+
99+
public void cancel() {
100+
if (cancelled.compareAndSet(false, true)) {
101+
exchangeHandler.cancel();
102+
}
103+
}
104+
105+
public void failed(final Exception cause) {
106+
exchangeHandler.failed(cause);
107+
}
108+
109+
public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
110+
exchangeHandler.produceRequest(channel, context);
111+
}
112+
113+
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) throws HttpException, IOException {
114+
exchangeHandler.consumeResponse(response, entityDetails, context);
115+
}
116+
117+
public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
118+
exchangeHandler.consumeInformation(response, context);
119+
}
120+
121+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
122+
exchangeHandler.updateCapacity(capacityChannel);
123+
}
124+
125+
public void consume(final ByteBuffer src) throws IOException {
126+
exchangeHandler.consume(src);
127+
}
128+
129+
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
130+
exchangeHandler.streamEnd(trailers);
131+
}
132+
133+
public void releaseResources() {
134+
exchangeHandler.releaseResources();
135+
}
136+
137+
public int available() {
138+
return exchangeHandler.available();
139+
}
140+
141+
public void produce(final DataStreamChannel channel) throws IOException {
142+
exchangeHandler.produce(channel);
143+
}
144+
145+
}, context);
146+
}
147+
148+
public boolean isAborted() {
149+
return cancelled.get();
150+
}
151+
152+
}
Lines changed: 209 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,209 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.testing.async;
28+
29+
import java.net.InetSocketAddress;
30+
import java.util.concurrent.CancellationException;
31+
import java.util.concurrent.Future;
32+
import java.util.function.Consumer;
33+
34+
import org.apache.hc.client5.http.config.RequestConfig;
35+
import org.apache.hc.client5.http.config.TlsConfig;
36+
import org.apache.hc.client5.http.impl.async.InternalTestHttpAsyncExecRuntime;
37+
import org.apache.hc.client5.http.impl.nio.PoolingAsyncClientConnectionManager;
38+
import org.apache.hc.client5.http.protocol.HttpClientContext;
39+
import org.apache.hc.client5.testing.extension.async.ClientProtocolLevel;
40+
import org.apache.hc.client5.testing.extension.async.ServerProtocolLevel;
41+
import org.apache.hc.client5.testing.extension.async.TestAsyncClient;
42+
import org.apache.hc.client5.testing.extension.async.TestAsyncResources;
43+
import org.apache.hc.client5.testing.extension.async.TestAsyncServer;
44+
import org.apache.hc.client5.testing.extension.async.TestAsyncServerBootstrap;
45+
import org.apache.hc.core5.concurrent.BasicFuture;
46+
import org.apache.hc.core5.concurrent.Cancellable;
47+
import org.apache.hc.core5.concurrent.FutureContribution;
48+
import org.apache.hc.core5.http.HttpHeaders;
49+
import org.apache.hc.core5.http.HttpHost;
50+
import org.apache.hc.core5.http.HttpRequest;
51+
import org.apache.hc.core5.http.HttpResponse;
52+
import org.apache.hc.core5.http.Message;
53+
import org.apache.hc.core5.http.URIScheme;
54+
import org.apache.hc.core5.http.nio.support.AsyncClientPipeline;
55+
import org.apache.hc.core5.http.support.BasicRequestBuilder;
56+
import org.apache.hc.core5.http2.HttpVersionPolicy;
57+
import org.apache.hc.core5.pool.PoolStats;
58+
import org.apache.hc.core5.reactor.ConnectionInitiator;
59+
import org.apache.hc.core5.util.TimeValue;
60+
import org.apache.hc.core5.util.Timeout;
61+
import org.junit.jupiter.api.Assertions;
62+
import org.junit.jupiter.api.Test;
63+
import org.junit.jupiter.api.extension.RegisterExtension;
64+
65+
public class TestInternalHttpAsyncExecRuntime {
66+
67+
public static final Timeout TIMEOUT = Timeout.ofMinutes(1);
68+
69+
@RegisterExtension
70+
private final TestAsyncResources testResources;
71+
72+
public TestInternalHttpAsyncExecRuntime() {
73+
this.testResources = new TestAsyncResources(URIScheme.HTTP, ClientProtocolLevel.STANDARD, ServerProtocolLevel.STANDARD, TIMEOUT);
74+
}
75+
76+
public void configureServer(final Consumer<TestAsyncServerBootstrap> serverCustomizer) {
77+
testResources.configureServer(serverCustomizer);
78+
}
79+
80+
public HttpHost startServer() throws Exception {
81+
final TestAsyncServer server = testResources.server();
82+
final InetSocketAddress inetSocketAddress = server.start();
83+
return new HttpHost(testResources.scheme().id, "localhost", inetSocketAddress.getPort());
84+
}
85+
86+
public TestAsyncClient startClient() throws Exception {
87+
final TestAsyncClient client = testResources.client();
88+
client.start();
89+
return client;
90+
}
91+
92+
static final int REQ_NUM = 5;
93+
94+
HttpRequest createRequest(final HttpHost target) {
95+
return BasicRequestBuilder.get()
96+
.setHttpHost(target)
97+
.setPath("/random/20000")
98+
.addHeader(HttpHeaders.HOST, target.toHostString())
99+
.build();
100+
}
101+
102+
@Test
103+
void testExecutionCancellation_http11HardCancellation_connectionMarkedNonReusable() throws Exception {
104+
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
105+
final HttpHost target = startServer();
106+
107+
final TestAsyncClient client = startClient();
108+
final ConnectionInitiator connectionInitiator = client.getImplementation();
109+
final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
110+
for (int i = 0; i < REQ_NUM; i++) {
111+
final HttpClientContext context = HttpClientContext.create();
112+
113+
final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
114+
connectionManager,
115+
connectionInitiator,
116+
TlsConfig.custom()
117+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
118+
.build());
119+
final Future<Boolean> connectFuture = testRuntime.leaseAndConnect(target, context);
120+
Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
121+
122+
final BasicFuture<Message<HttpResponse, byte[]>> resultFuture = new BasicFuture<>(null);
123+
final Cancellable cancellable = testRuntime.execute(
124+
"test-" + i,
125+
AsyncClientPipeline.assemble()
126+
.request(createRequest(target)).noContent()
127+
.response().asByteArray()
128+
.result(new FutureContribution<Message<HttpResponse, byte[]>>(resultFuture) {
129+
130+
@Override
131+
public void completed(final Message<HttpResponse, byte[]> result) {
132+
resultFuture.completed(result);
133+
}
134+
135+
})
136+
.create(),
137+
context);
138+
// sleep a bit
139+
Thread.sleep(i % 10);
140+
cancellable.cancel();
141+
142+
// The message exchange is expected to get aborted
143+
try {
144+
resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
145+
} catch (final CancellationException expected) {
146+
}
147+
Assertions.assertTrue(testRuntime.isAborted());
148+
testRuntime.discardEndpoint();
149+
}
150+
}
151+
152+
@Test
153+
void testExecutionCancellation_http11NoHardCancellation_connectionAlive() throws Exception {
154+
configureServer(bootstrap -> bootstrap.register("/random/*", AsyncRandomHandler::new));
155+
final HttpHost target = startServer();
156+
157+
final TestAsyncClient client = startClient();
158+
final ConnectionInitiator connectionInitiator = client.getImplementation();
159+
final PoolingAsyncClientConnectionManager connectionManager = client.getConnectionManager();
160+
for (int i = 0; i < REQ_NUM; i++) {
161+
final HttpClientContext context = HttpClientContext.create();
162+
context.setRequestConfig(RequestConfig.custom()
163+
.setHardCancellationEnabled(false)
164+
.build());
165+
166+
final InternalTestHttpAsyncExecRuntime testRuntime = new InternalTestHttpAsyncExecRuntime(
167+
connectionManager,
168+
connectionInitiator,
169+
TlsConfig.custom()
170+
.setVersionPolicy(HttpVersionPolicy.FORCE_HTTP_1)
171+
.build());
172+
final Future<Boolean> connectFuture = testRuntime.leaseAndConnect(target, context);
173+
Assertions.assertTrue(connectFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit()));
174+
175+
final BasicFuture<Message<HttpResponse, byte[]>> resultFuture = new BasicFuture<>(null);
176+
final Cancellable cancellable = testRuntime.execute(
177+
"test-" + i,
178+
AsyncClientPipeline.assemble()
179+
.request(createRequest(target)).noContent()
180+
.response().asByteArray()
181+
.result(new FutureContribution<Message<HttpResponse, byte[]>>(resultFuture) {
182+
183+
@Override
184+
public void completed(final Message<HttpResponse, byte[]> result) {
185+
resultFuture.completed(result);
186+
}
187+
188+
})
189+
.create(),
190+
context);
191+
// sleep a bit
192+
Thread.sleep(i % 10);
193+
cancellable.cancel();
194+
195+
// The message exchange should not get aborted and is expected to successfully complete
196+
final Message<HttpResponse, byte[]> message = resultFuture.get(TIMEOUT.getDuration(), TIMEOUT.getTimeUnit());
197+
Assertions.assertNotNull(message);
198+
Assertions.assertFalse(testRuntime.isAborted());
199+
// The underlying connection is expected to stay valid
200+
Assertions.assertTrue(testRuntime.isEndpointConnected());
201+
testRuntime.markConnectionReusable(null, TimeValue.ofMinutes(1));
202+
testRuntime.releaseEndpoint();
203+
204+
final PoolStats totalStats = connectionManager.getTotalStats();
205+
Assertions.assertTrue(totalStats.getAvailable() > 0);
206+
}
207+
}
208+
209+
}

httpclient5-testing/src/test/java/org/apache/hc/client5/testing/extension/async/TestAsyncServerBootstrap.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public HandlerEntry(final String uriPattern, final T handler) {
6666
}
6767

6868
private final URIScheme scheme;
69-
private final ServerProtocolLevel serverProtocolLevel;
69+
private ServerProtocolLevel serverProtocolLevel;
7070

7171
private final List<HandlerEntry<Supplier<AsyncServerExchangeHandler>>> handlerList;
7272
private Timeout timeout;
@@ -79,6 +79,10 @@ public TestAsyncServerBootstrap(final URIScheme scheme, final ServerProtocolLeve
7979
this.handlerList = new ArrayList<>();
8080
}
8181

82+
public void setServerProtocolLevel(final ServerProtocolLevel serverProtocolLevel) {
83+
this.serverProtocolLevel = serverProtocolLevel;
84+
}
85+
8286
public ServerProtocolLevel getProtocolLevel() {
8387
return serverProtocolLevel;
8488
}

0 commit comments

Comments
 (0)