Skip to content

Commit 1741a27

Browse files
committed
Bug fix: Fixed a race condition in async message stream handlers that can occur when a message with no content body is being submitted asynchronously
1 parent 22abdc0 commit 1741a27

5 files changed

Lines changed: 150 additions & 8 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ClientH2StreamHandler.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -142,12 +142,13 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
142142
httpProcessor.process(request, entityDetails, context);
143143

144144
final List<Header> headers = DefaultH2RequestConverter.INSTANCE.convert(request);
145-
outputChannel.submit(headers, entityDetails == null);
146-
connMetrics.incrementRequestCount();
147-
148145
if (entityDetails == null) {
149146
requestState.set(MessageState.COMPLETE);
147+
outputChannel.submit(headers, true);
148+
connMetrics.incrementRequestCount();
150149
} else {
150+
outputChannel.submit(headers, false);
151+
connMetrics.incrementRequestCount();
151152
final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
152153
final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());
153154
if (expectContinue) {

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/ServerH2StreamHandler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -178,11 +178,13 @@ private void commitResponse(
178178

179179
final boolean endStream = responseEntityDetails == null ||
180180
receivedRequest != null && Method.HEAD.isSame(receivedRequest.getMethod());
181-
outputChannel.submit(responseHeaders, endStream);
182-
connMetrics.incrementResponseCount();
183181
if (endStream) {
184182
responseState.set(MessageState.COMPLETE);
183+
outputChannel.submit(responseHeaders, endStream);
184+
connMetrics.incrementResponseCount();
185185
} else {
186+
outputChannel.submit(responseHeaders, endStream);
187+
connMetrics.incrementResponseCount();
186188
exchangeHandler.produce(outputChannel);
187189
if (responseState.compareAndSet(MessageState.IDLE, MessageState.BODY)) {
188190
outputChannel.requestOutput();

httpcore5-testing/src/test/java/org/apache/hc/core5/testing/nio/HttpIntegrationTest.java

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@
8181
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityConsumer;
8282
import org.apache.hc.core5.http.nio.entity.StringAsyncEntityProducer;
8383
import org.apache.hc.core5.http.nio.support.AbstractServerExchangeHandler;
84+
import org.apache.hc.core5.http.nio.support.AsyncResponseBuilder;
8485
import org.apache.hc.core5.http.nio.support.BasicAsyncServerExpectationDecorator;
8586
import org.apache.hc.core5.http.nio.support.BasicRequestConsumer;
8687
import org.apache.hc.core5.http.nio.support.BasicRequestProducer;
@@ -97,6 +98,7 @@
9798
import org.apache.hc.core5.util.Timeout;
9899
import org.hamcrest.CoreMatchers;
99100
import org.hamcrest.MatcherAssert;
101+
import org.hamcrest.Matchers;
100102
import org.junit.jupiter.api.Assertions;
101103
import org.junit.jupiter.api.Test;
102104
import org.junit.jupiter.api.extension.RegisterExtension;
@@ -1309,6 +1311,142 @@ public void releaseResources() {
13091311
}
13101312
}
13111313

1314+
@Test
1315+
void testDelayedResponseSubmission() throws Exception {
1316+
final HttpTestServer server = server();
1317+
final HttpTestClient client = client();
1318+
1319+
server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1320+
1321+
private final Random random = new Random(System.currentTimeMillis());
1322+
1323+
@Override
1324+
protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1325+
final HttpRequest request,
1326+
final EntityDetails entityDetails,
1327+
final HttpContext context) throws HttpException {
1328+
return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1329+
}
1330+
1331+
@Override
1332+
protected void handle(
1333+
final Message<HttpRequest, String> requestMessage,
1334+
final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1335+
final HttpContext context) throws HttpException, IOException {
1336+
executorResource.getExecutorService().execute(() -> {
1337+
try {
1338+
Thread.sleep(random.nextInt(200));
1339+
responseTrigger.submitResponse(AsyncResponseBuilder.create(HttpStatus.SC_OK)
1340+
.setEntity(new MultiLineEntityProducer("All is well", 100))
1341+
.build(),
1342+
context);
1343+
Thread.sleep(random.nextInt(200));
1344+
} catch (final Exception ignore) {
1345+
// ignore
1346+
}
1347+
});
1348+
1349+
}
1350+
1351+
});
1352+
final InetSocketAddress serverEndpoint = server.start();
1353+
1354+
final HttpHost target = target(serverEndpoint);
1355+
1356+
client.start();
1357+
1358+
final Future<ClientSessionEndpoint> connectFuture = client.connect(
1359+
"localhost", serverEndpoint.getPort(), TIMEOUT);
1360+
final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1361+
1362+
final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1363+
for (int i = 0; i < REQ_NUM; i++) {
1364+
final BasicHttpRequest request = BasicRequestBuilder.get()
1365+
.setHttpHost(target)
1366+
.setPath("/hello")
1367+
.build();
1368+
queue.add(streamEndpoint.execute(
1369+
new BasicRequestProducer(request, null),
1370+
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1371+
}
1372+
while (!queue.isEmpty()) {
1373+
final Future<Message<HttpResponse, String>> future = queue.remove();
1374+
final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1375+
Assertions.assertNotNull(result);
1376+
final HttpResponse response = result.getHead();
1377+
Assertions.assertNotNull(response);
1378+
Assertions.assertEquals(200, response.getCode());
1379+
MatcherAssert.assertThat(result.getBody(), Matchers.startsWith("All is well"));
1380+
}
1381+
}
1382+
1383+
@Test
1384+
void testDelayedResponseSubmissionNoResponseBody() throws Exception {
1385+
final HttpTestServer server = server();
1386+
final HttpTestClient client = client();
1387+
1388+
server.register("/hello", () -> new AbstractServerExchangeHandler<Message<HttpRequest, String>>() {
1389+
1390+
private final Random random = new Random(System.currentTimeMillis());
1391+
1392+
@Override
1393+
protected AsyncRequestConsumer<Message<HttpRequest, String>> supplyConsumer(
1394+
final HttpRequest request,
1395+
final EntityDetails entityDetails,
1396+
final HttpContext context) throws HttpException {
1397+
return new BasicRequestConsumer<>(entityDetails != null ? new StringAsyncEntityConsumer() : null);
1398+
}
1399+
1400+
@Override
1401+
protected void handle(
1402+
final Message<HttpRequest, String> requestMessage,
1403+
final AsyncServerRequestHandler.ResponseTrigger responseTrigger,
1404+
final HttpContext context) throws HttpException, IOException {
1405+
executorResource.getExecutorService().execute(() -> {
1406+
try {
1407+
Thread.sleep(random.nextInt(200));
1408+
responseTrigger.submitResponse(AsyncResponseBuilder.create(200)
1409+
.build(),
1410+
context);
1411+
Thread.sleep(random.nextInt(200));
1412+
} catch (final Exception ignore) {
1413+
// ignore
1414+
}
1415+
});
1416+
1417+
}
1418+
1419+
});
1420+
final InetSocketAddress serverEndpoint = server.start();
1421+
1422+
final HttpHost target = target(serverEndpoint);
1423+
1424+
client.start();
1425+
1426+
final Future<ClientSessionEndpoint> connectFuture = client.connect(
1427+
"localhost", serverEndpoint.getPort(), TIMEOUT);
1428+
final ClientSessionEndpoint streamEndpoint = connectFuture.get();
1429+
1430+
final Queue<Future<Message<HttpResponse, String>>> queue = new LinkedList<>();
1431+
for (int i = 0; i < REQ_NUM; i++) {
1432+
final BasicHttpRequest request = BasicRequestBuilder.get()
1433+
.setHttpHost(target)
1434+
.setPath("/hello")
1435+
.build();
1436+
queue.add(streamEndpoint.execute(
1437+
new BasicRequestProducer(request, null),
1438+
new BasicResponseConsumer<>(new StringAsyncEntityConsumer()), null));
1439+
}
1440+
while (!queue.isEmpty()) {
1441+
final Future<Message<HttpResponse, String>> future = queue.remove();
1442+
final Message<HttpResponse, String> result = future.get(LONG_TIMEOUT.getDuration(), LONG_TIMEOUT.getTimeUnit());
1443+
Assertions.assertNotNull(result);
1444+
final HttpResponse response = result.getHead();
1445+
Assertions.assertNotNull(response);
1446+
Assertions.assertEquals(200, response.getCode());
1447+
}
1448+
}
1449+
13121450
void testHeaderTooLarge(final String method) throws Exception {
13131451
final HttpTestServer server = server();
13141452
final HttpTestClient client = client();

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ClientHttp1StreamHandler.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,9 +166,9 @@ private void commitRequest(final HttpRequest request, final EntityDetails entity
166166

167167
final boolean endStream = entityDetails == null;
168168
if (endStream) {
169-
outputChannel.submit(request, true, FlushMode.IMMEDIATE);
170169
committedRequest = request;
171170
requestState.set(MessageState.COMPLETE);
171+
outputChannel.submit(request, true, FlushMode.IMMEDIATE);
172172
} else {
173173
final Header h = request.getFirstHeader(HttpHeaders.EXPECT);
174174
final boolean expectContinue = h != null && HeaderElements.CONTINUE.equalsIgnoreCase(h.getValue());

httpcore5/src/main/java/org/apache/hc/core5/http/impl/nio/ServerHttp1StreamHandler.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,13 +189,14 @@ private void commitResponse(
189189
keepAlive = false;
190190
}
191191

192-
outputChannel.submit(response, endStream, endStream ? FlushMode.IMMEDIATE : FlushMode.BUFFER);
193192
if (endStream) {
193+
responseState.set(MessageState.COMPLETE);
194+
outputChannel.submit(response, true, FlushMode.IMMEDIATE);
194195
if (!keepAlive) {
195196
outputChannel.close();
196197
}
197-
responseState.set(MessageState.COMPLETE);
198198
} else {
199+
outputChannel.submit(response, false, FlushMode.BUFFER);
199200
exchangeHandler.produce(internalDataChannel);
200201
if (responseState.compareAndSet(MessageState.IDLE, MessageState.BODY)) {
201202
outputChannel.requestOutput();

0 commit comments

Comments
 (0)