Skip to content

Commit 23041ea

Browse files
committed
Bug-fix: corrects message exchange cancellation logic in InternalHttpAsyncExecRuntime
* fixes the problem with message exchanges over an existing persistent connection being non-cancellable * Hard-cancellation request parameter should have no bearing on H2 connections * Response timeout not be applied to H2 endpoints * Test coverage
1 parent 1aac376 commit 23041ea

4 files changed

Lines changed: 474 additions & 25 deletions

File tree

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/*
2+
* ====================================================================
3+
* Licensed to the Apache Software Foundation (ASF) under one
4+
* or more contributor license agreements. See the NOTICE file
5+
* distributed with this work for additional information
6+
* regarding copyright ownership. The ASF licenses this file
7+
* to you under the Apache License, Version 2.0 (the
8+
* "License"); you may not use this file except in compliance
9+
* with the License. You may obtain a copy of the License at
10+
*
11+
* http://www.apache.org/licenses/LICENSE-2.0
12+
*
13+
* Unless required by applicable law or agreed to in writing,
14+
* software distributed under the License is distributed on an
15+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
* KIND, either express or implied. See the License for the
17+
* specific language governing permissions and limitations
18+
* under the License.
19+
* ====================================================================
20+
*
21+
* This software consists of voluntary contributions made by many
22+
* individuals on behalf of the Apache Software Foundation. For more
23+
* information on the Apache Software Foundation, please see
24+
* <http://www.apache.org/>.
25+
*
26+
*/
27+
package org.apache.hc.client5.http.impl.async;
28+
29+
import java.io.IOException;
30+
import java.nio.ByteBuffer;
31+
import java.util.List;
32+
import java.util.concurrent.Future;
33+
import java.util.concurrent.atomic.AtomicBoolean;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
import org.apache.hc.client5.http.HttpRoute;
37+
import org.apache.hc.client5.http.async.AsyncExecRuntime;
38+
import org.apache.hc.client5.http.config.TlsConfig;
39+
import org.apache.hc.client5.http.nio.AsyncClientConnectionManager;
40+
import org.apache.hc.client5.http.protocol.HttpClientContext;
41+
import org.apache.hc.core5.concurrent.BasicFuture;
42+
import org.apache.hc.core5.concurrent.Cancellable;
43+
import org.apache.hc.core5.concurrent.FutureContribution;
44+
import org.apache.hc.core5.http.EntityDetails;
45+
import org.apache.hc.core5.http.Header;
46+
import org.apache.hc.core5.http.HttpException;
47+
import org.apache.hc.core5.http.HttpHost;
48+
import org.apache.hc.core5.http.HttpResponse;
49+
import org.apache.hc.core5.http.nio.AsyncClientExchangeHandler;
50+
import org.apache.hc.core5.http.nio.CapacityChannel;
51+
import org.apache.hc.core5.http.nio.DataStreamChannel;
52+
import org.apache.hc.core5.http.nio.RequestChannel;
53+
import org.apache.hc.core5.http.protocol.HttpContext;
54+
import org.apache.hc.core5.reactor.ConnectionInitiator;
55+
import org.slf4j.Logger;
56+
import org.slf4j.LoggerFactory;
57+
58+
public final class InternalTestHttpAsyncExecRuntime extends InternalHttpAsyncExecRuntime {
59+
60+
private static final Logger LOG = LoggerFactory.getLogger(InternalTestHttpAsyncExecRuntime.class);
61+
62+
private final AtomicBoolean cancelled;
63+
64+
public InternalTestHttpAsyncExecRuntime(final AsyncClientConnectionManager manager,
65+
final ConnectionInitiator connectionInitiator,
66+
final TlsConfig tlsConfig) {
67+
super(LOG, manager, connectionInitiator, null, tlsConfig, -1, new AtomicInteger());
68+
this.cancelled = new AtomicBoolean();
69+
}
70+
71+
public Future<Boolean> leaseAndConnect(final HttpHost target, final HttpClientContext context) {
72+
final BasicFuture<Boolean> resultFuture = new BasicFuture<>(null);
73+
acquireEndpoint("test", new HttpRoute(target), null, context, new FutureContribution<AsyncExecRuntime>(resultFuture) {
74+
75+
@Override
76+
public void completed(final AsyncExecRuntime runtime) {
77+
if (!runtime.isEndpointConnected()) {
78+
runtime.connectEndpoint(context, new FutureContribution<AsyncExecRuntime>(resultFuture) {
79+
80+
@Override
81+
public void completed(final AsyncExecRuntime runtime) {
82+
resultFuture.completed(true);
83+
}
84+
85+
});
86+
} else {
87+
resultFuture.completed(true);
88+
}
89+
}
90+
91+
});
92+
return resultFuture;
93+
}
94+
95+
@Override
96+
public Cancellable execute(final String id, final AsyncClientExchangeHandler exchangeHandler, final HttpClientContext context) {
97+
return super.execute(id, new AsyncClientExchangeHandler() {
98+
99+
public void cancel() {
100+
if (cancelled.compareAndSet(false, true)) {
101+
exchangeHandler.cancel();
102+
}
103+
}
104+
105+
public void failed(final Exception cause) {
106+
exchangeHandler.failed(cause);
107+
}
108+
109+
public void produceRequest(final RequestChannel channel, final HttpContext context) throws HttpException, IOException {
110+
exchangeHandler.produceRequest(channel, context);
111+
}
112+
113+
public void consumeResponse(final HttpResponse response, final EntityDetails entityDetails, final HttpContext context) throws HttpException, IOException {
114+
exchangeHandler.consumeResponse(response, entityDetails, context);
115+
}
116+
117+
public void consumeInformation(final HttpResponse response, final HttpContext context) throws HttpException, IOException {
118+
exchangeHandler.consumeInformation(response, context);
119+
}
120+
121+
public void updateCapacity(final CapacityChannel capacityChannel) throws IOException {
122+
exchangeHandler.updateCapacity(capacityChannel);
123+
}
124+
125+
public void consume(final ByteBuffer src) throws IOException {
126+
exchangeHandler.consume(src);
127+
}
128+
129+
public void streamEnd(final List<? extends Header> trailers) throws HttpException, IOException {
130+
exchangeHandler.streamEnd(trailers);
131+
}
132+
133+
public void releaseResources() {
134+
exchangeHandler.releaseResources();
135+
}
136+
137+
public int available() {
138+
return exchangeHandler.available();
139+
}
140+
141+
public void produce(final DataStreamChannel channel) throws IOException {
142+
exchangeHandler.produce(channel);
143+
}
144+
145+
}, context);
146+
}
147+
148+
public boolean isAborted() {
149+
return cancelled.get();
150+
}
151+
152+
}

0 commit comments

Comments
 (0)