Skip to content

Commit 9f8471a

Browse files
committed
Fix H2 stream timeout handling on inbound frames
1 parent b74914a commit 9f8471a

2 files changed

Lines changed: 239 additions & 18 deletions

File tree

httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java

Lines changed: 35 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -782,12 +782,17 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
782782
if (frameType == null) {
783783
return;
784784
}
785+
final long nowNanos = System.nanoTime();
785786
switch (frameType) {
786787
case DATA: {
787788
if (streamId == 0) {
788789
throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId);
789790
}
790791
final H2Stream stream = streams.lookupValid(streamId);
792+
if (resetIfExpired(stream, nowNanos)) {
793+
requestSessionOutput();
794+
break;
795+
}
791796
try {
792797
consumeDataFrame(frame, stream);
793798
} catch (final H2StreamResetException ex) {
@@ -861,6 +866,10 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
861866
}
862867

863868
final H2Stream stream = streams.lookupValid(streamId);
869+
if (resetIfExpired(stream, nowNanos)) {
870+
requestSessionOutput();
871+
break;
872+
}
864873
try {
865874
consumeContinuationFrame(frame, stream);
866875
} catch (final H2StreamResetException ex) {
@@ -1007,6 +1016,10 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio
10071016
}
10081017

10091018
final H2Stream stream = streams.lookupValid(streamId);
1019+
if (resetIfExpired(stream, nowNanos)) {
1020+
requestSessionOutput();
1021+
break;
1022+
}
10101023
if (stream.isRemoteClosed()) {
10111024
stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed"));
10121025
break;
@@ -1705,27 +1718,31 @@ public String toString() {
17051718

17061719
}
17071720

1721+
boolean resetIfExpired(final H2Stream stream, final long nowNanos) throws IOException {
1722+
if (!stream.isActive()) {
1723+
return false;
1724+
}
1725+
final Timeout idleTimeout = stream.getIdleTimeout();
1726+
if (idleTimeout == null || !idleTimeout.isEnabled()) {
1727+
return false;
1728+
}
1729+
final long last = stream.getLastActivityNanos();
1730+
final long idleNanos = idleTimeout.toNanoseconds();
1731+
if (idleNanos > 0 && nowNanos - last > idleNanos) {
1732+
stream.localReset(new H2StreamTimeoutException(
1733+
"HTTP/2 stream idle timeout (" + idleTimeout + ")",
1734+
stream.getId(),
1735+
idleTimeout), H2Error.CANCEL);
1736+
return true;
1737+
}
1738+
return false;
1739+
}
1740+
17081741
private void checkStreamTimeouts(final long nowNanos) throws IOException {
17091742
for (final Iterator<H2Stream> it = streams.iterator(); it.hasNext(); ) {
17101743
final H2Stream stream = it.next();
1711-
if (!stream.isActive()) {
1712-
continue;
1713-
}
1714-
1715-
final Timeout idleTimeout = stream.getIdleTimeout();
1716-
if (idleTimeout == null || !idleTimeout.isEnabled()) {
1717-
continue;
1718-
}
1719-
1720-
final long last = stream.getLastActivityNanos();
1721-
final long idleNanos = idleTimeout.toNanoseconds();
1722-
if (idleNanos > 0 && nowNanos - last > idleNanos) {
1723-
final int streamId = stream.getId();
1724-
final H2StreamTimeoutException ex = new H2StreamTimeoutException(
1725-
"HTTP/2 stream idle timeout (" + idleTimeout + ")",
1726-
streamId,
1727-
idleTimeout);
1728-
stream.localReset(ex, H2Error.CANCEL);
1744+
if (resetIfExpired(stream, nowNanos)) {
1745+
requestSessionOutput();
17291746
}
17301747
}
17311748
}

httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java

Lines changed: 204 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1165,6 +1165,210 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception {
11651165

11661166
}
11671167

1168+
@Test
1169+
void testResetIfExpiredResetsStreamPastDeadline() throws Exception {
1170+
final H2Config h2Config = H2Config.custom().build();
1171+
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
1172+
protocolIOSession,
1173+
FRAME_FACTORY,
1174+
StreamIdGenerator.ODD,
1175+
httpProcessor,
1176+
CharCodingConfig.DEFAULT,
1177+
h2Config,
1178+
h2StreamListener,
1179+
() -> streamHandler);
1180+
1181+
final H2StreamChannel channel = mux.createChannel(1);
1182+
final H2Stream stream = mux.createStream(channel, streamHandler);
1183+
1184+
stream.setTimeout(Timeout.ofMilliseconds(50));
1185+
stream.activate();
1186+
1187+
// Push last activity into the past so the timeout is definitely expired
1188+
final Field lastActivityField = H2Stream.class.getDeclaredField("lastActivityNanos");
1189+
lastActivityField.setAccessible(true);
1190+
lastActivityField.set(stream, System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(100));
1191+
1192+
Assertions.assertTrue(mux.resetIfExpired(stream, System.nanoTime()));
1193+
1194+
Mockito.verify(streamHandler).failed(exceptionCaptor.capture());
1195+
Assertions.assertInstanceOf(H2StreamTimeoutException.class, exceptionCaptor.getValue());
1196+
Assertions.assertTrue(stream.isLocalClosed());
1197+
Assertions.assertTrue(stream.isClosed());
1198+
}
1199+
1200+
@Test
1201+
void testResetIfExpiredIgnoresStreamBeforeDeadline() throws Exception {
1202+
final H2Config h2Config = H2Config.custom().build();
1203+
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
1204+
protocolIOSession,
1205+
FRAME_FACTORY,
1206+
StreamIdGenerator.ODD,
1207+
httpProcessor,
1208+
CharCodingConfig.DEFAULT,
1209+
h2Config,
1210+
h2StreamListener,
1211+
() -> streamHandler);
1212+
1213+
final H2StreamChannel channel = mux.createChannel(1);
1214+
final H2Stream stream = mux.createStream(channel, streamHandler);
1215+
1216+
stream.setTimeout(Timeout.ofMinutes(1));
1217+
stream.activate();
1218+
1219+
Assertions.assertFalse(mux.resetIfExpired(stream, System.nanoTime()));
1220+
Assertions.assertFalse(stream.isLocalClosed());
1221+
1222+
Mockito.verify(streamHandler, Mockito.never()).failed(ArgumentMatchers.any());
1223+
}
1224+
1225+
@Test
1226+
void testExpiredStreamResetOnInboundData() throws Exception {
1227+
Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
1228+
.thenAnswer(invocation -> {
1229+
final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class);
1230+
final int remaining = buffer.remaining();
1231+
buffer.position(buffer.limit());
1232+
return remaining;
1233+
});
1234+
Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
1235+
Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
1236+
1237+
final H2Config h2Config = H2Config.custom().build();
1238+
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
1239+
protocolIOSession,
1240+
FRAME_FACTORY,
1241+
StreamIdGenerator.EVEN,
1242+
httpProcessor,
1243+
CharCodingConfig.DEFAULT,
1244+
h2Config,
1245+
h2StreamListener,
1246+
() -> streamHandler);
1247+
1248+
// Encode request headers
1249+
final ByteArrayBuffer headerBuf = new ByteArrayBuffer(200);
1250+
final HPackEncoder encoder = new HPackEncoder(h2Config.getHeaderTableSize(),
1251+
CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT));
1252+
final List<Header> headers = Arrays.asList(
1253+
new BasicHeader(":method", "GET"),
1254+
new BasicHeader(":scheme", "http"),
1255+
new BasicHeader(":path", "/"),
1256+
new BasicHeader(":authority", "www.example.com"));
1257+
encoder.encodeHeaders(headerBuf, headers, h2Config.isCompressionEnabled());
1258+
1259+
final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024);
1260+
final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024);
1261+
1262+
// Send HEADERS (endHeaders=true, endStream=false) to create stream 1
1263+
final RawFrame headerFrame = FRAME_FACTORY.createHeaders(1,
1264+
ByteBuffer.wrap(headerBuf.array(), 0, headerBuf.length()), true, false);
1265+
outBuffer.write(headerFrame, writableChannel);
1266+
mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray()));
1267+
1268+
Mockito.verify(streamHandler).consumeHeader(headersCaptor.capture(), ArgumentMatchers.eq(false));
1269+
Assertions.assertFalse(headersCaptor.getValue().isEmpty());
1270+
1271+
// Retrieve the stream and set a short timeout
1272+
final Field streamsField = AbstractH2StreamMultiplexer.class.getDeclaredField("streams");
1273+
streamsField.setAccessible(true);
1274+
final H2Streams h2Streams = (H2Streams) streamsField.get(mux);
1275+
final H2Stream stream = h2Streams.lookupValid(1);
1276+
stream.setTimeout(Timeout.ofMilliseconds(50));
1277+
1278+
// Push last activity into the past so the timeout is expired
1279+
final Field lastActivityField = H2Stream.class.getDeclaredField("lastActivityNanos");
1280+
lastActivityField.setAccessible(true);
1281+
lastActivityField.set(stream, System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(100));
1282+
1283+
// Send DATA frame for the expired stream
1284+
writableChannel.reset();
1285+
final RawFrame dataFrame = FRAME_FACTORY.createData(1,
1286+
ByteBuffer.wrap("hello".getBytes(StandardCharsets.US_ASCII)), true);
1287+
outBuffer.write(dataFrame, writableChannel);
1288+
mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray()));
1289+
1290+
// The handler must receive a timeout failure, not data
1291+
Mockito.verify(streamHandler).failed(exceptionCaptor.capture());
1292+
Assertions.assertInstanceOf(H2StreamTimeoutException.class, exceptionCaptor.getValue());
1293+
Mockito.verify(streamHandler, Mockito.never()).consumeData(
1294+
ArgumentMatchers.any(ByteBuffer.class), ArgumentMatchers.anyBoolean());
1295+
}
1296+
1297+
@Test
1298+
void testExpiredStreamResetOnInboundContinuation() throws Exception {
1299+
Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class)))
1300+
.thenAnswer(invocation -> {
1301+
final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class);
1302+
final int remaining = buffer.remaining();
1303+
buffer.position(buffer.limit());
1304+
return remaining;
1305+
});
1306+
Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt());
1307+
Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt());
1308+
1309+
final H2Config h2Config = H2Config.custom().build();
1310+
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(
1311+
protocolIOSession,
1312+
FRAME_FACTORY,
1313+
StreamIdGenerator.EVEN,
1314+
httpProcessor,
1315+
CharCodingConfig.DEFAULT,
1316+
h2Config,
1317+
h2StreamListener,
1318+
() -> streamHandler);
1319+
1320+
// Encode request headers
1321+
final ByteArrayBuffer headerBuf = new ByteArrayBuffer(200);
1322+
final HPackEncoder encoder = new HPackEncoder(h2Config.getHeaderTableSize(),
1323+
CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT));
1324+
final List<Header> headers = Arrays.asList(
1325+
new BasicHeader(":method", "GET"),
1326+
new BasicHeader(":scheme", "http"),
1327+
new BasicHeader(":path", "/"),
1328+
new BasicHeader(":authority", "www.example.com"));
1329+
encoder.encodeHeaders(headerBuf, headers, h2Config.isCompressionEnabled());
1330+
1331+
// Split encoded headers: first part in HEADERS, remainder in CONTINUATION
1332+
final int split = headerBuf.length() / 2;
1333+
final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024);
1334+
final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024);
1335+
1336+
// Send HEADERS (endHeaders=false, endStream=false) to create stream 1
1337+
final RawFrame headerFrame = FRAME_FACTORY.createHeaders(1,
1338+
ByteBuffer.wrap(headerBuf.array(), 0, split), false, false);
1339+
outBuffer.write(headerFrame, writableChannel);
1340+
mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray()));
1341+
1342+
// Stream created but consumeHeader not yet called (waiting for CONTINUATION)
1343+
Mockito.verify(streamHandler, Mockito.never()).consumeHeader(
1344+
ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean());
1345+
1346+
// Retrieve the stream and set a short timeout
1347+
final Field streamsField = AbstractH2StreamMultiplexer.class.getDeclaredField("streams");
1348+
streamsField.setAccessible(true);
1349+
final H2Streams h2Streams = (H2Streams) streamsField.get(mux);
1350+
final H2Stream stream = h2Streams.lookupValid(1);
1351+
stream.setTimeout(Timeout.ofMilliseconds(50));
1352+
1353+
// Push last activity into the past so the timeout is expired
1354+
final Field lastActivityField = H2Stream.class.getDeclaredField("lastActivityNanos");
1355+
lastActivityField.setAccessible(true);
1356+
lastActivityField.set(stream, System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(100));
1357+
1358+
// Send CONTINUATION (endHeaders=true) for the expired stream
1359+
writableChannel.reset();
1360+
final RawFrame continuationFrame = FRAME_FACTORY.createContinuation(1,
1361+
ByteBuffer.wrap(headerBuf.array(), split, headerBuf.length() - split), true);
1362+
outBuffer.write(continuationFrame, writableChannel);
1363+
mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray()));
1364+
1365+
// The handler must receive a timeout failure, not header consumption
1366+
Mockito.verify(streamHandler).failed(exceptionCaptor.capture());
1367+
Assertions.assertInstanceOf(H2StreamTimeoutException.class, exceptionCaptor.getValue());
1368+
Mockito.verify(streamHandler, Mockito.never()).consumeHeader(
1369+
ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean());
1370+
}
1371+
11681372
@Test
11691373
void testOutboundTrailersWithPseudoHeaderRejected() throws Exception {
11701374
final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(

0 commit comments

Comments
 (0)