diff --git a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java index 8e9171b57..28b39c892 100644 --- a/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/main/java/org/apache/hc/core5/http2/impl/nio/AbstractH2StreamMultiplexer.java @@ -782,12 +782,17 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio if (frameType == null) { return; } + final long nowNanos = System.nanoTime(); switch (frameType) { case DATA: { if (streamId == 0) { throw new H2ConnectionException(H2Error.PROTOCOL_ERROR, "Illegal stream id: " + streamId); } final H2Stream stream = streams.lookupValid(streamId); + if (resetIfExpired(stream, nowNanos)) { + requestSessionOutput(); + break; + } try { consumeDataFrame(frame, stream); } catch (final H2StreamResetException ex) { @@ -861,6 +866,10 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } final H2Stream stream = streams.lookupValid(streamId); + if (resetIfExpired(stream, nowNanos)) { + requestSessionOutput(); + break; + } try { consumeContinuationFrame(frame, stream); } catch (final H2StreamResetException ex) { @@ -1007,6 +1016,10 @@ private void consumeFrame(final RawFrame frame) throws HttpException, IOExceptio } final H2Stream stream = streams.lookupValid(streamId); + if (resetIfExpired(stream, nowNanos)) { + requestSessionOutput(); + break; + } if (stream.isRemoteClosed()) { stream.localReset(new H2StreamResetException(H2Error.STREAM_CLOSED, "Stream closed")); break; @@ -1705,27 +1718,31 @@ public String toString() { } + boolean resetIfExpired(final H2Stream stream, final long nowNanos) throws IOException { + if (!stream.isActive()) { + return false; + } + final Timeout idleTimeout = stream.getIdleTimeout(); + if (idleTimeout == null || !idleTimeout.isEnabled()) { + return false; + } + final long last = stream.getLastActivityNanos(); + final long idleNanos = idleTimeout.toNanoseconds(); + if (idleNanos > 0 && nowNanos - last > idleNanos) { + stream.localReset(new H2StreamTimeoutException( + "HTTP/2 stream idle timeout (" + idleTimeout + ")", + stream.getId(), + idleTimeout), H2Error.CANCEL); + return true; + } + return false; + } + private void checkStreamTimeouts(final long nowNanos) throws IOException { for (final Iterator it = streams.iterator(); it.hasNext(); ) { final H2Stream stream = it.next(); - if (!stream.isActive()) { - continue; - } - - final Timeout idleTimeout = stream.getIdleTimeout(); - if (idleTimeout == null || !idleTimeout.isEnabled()) { - continue; - } - - final long last = stream.getLastActivityNanos(); - final long idleNanos = idleTimeout.toNanoseconds(); - if (idleNanos > 0 && nowNanos - last > idleNanos) { - final int streamId = stream.getId(); - final H2StreamTimeoutException ex = new H2StreamTimeoutException( - "HTTP/2 stream idle timeout (" + idleTimeout + ")", - streamId, - idleTimeout); - stream.localReset(ex, H2Error.CANCEL); + if (resetIfExpired(stream, nowNanos)) { + requestSessionOutput(); } } } diff --git a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java index 6c9e88e4f..8a17b8124 100644 --- a/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java +++ b/httpcore5-h2/src/test/java/org/apache/hc/core5/http2/impl/nio/TestAbstractH2StreamMultiplexer.java @@ -1165,6 +1165,210 @@ void testStreamIdleTimeoutTriggersH2StreamTimeoutException() throws Exception { } + @Test + void testResetIfExpiredResetsStreamPastDeadline() throws Exception { + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final H2StreamChannel channel = mux.createChannel(1); + final H2Stream stream = mux.createStream(channel, streamHandler); + + stream.setTimeout(Timeout.ofMilliseconds(50)); + stream.activate(); + + // Push last activity into the past so the timeout is definitely expired + final Field lastActivityField = H2Stream.class.getDeclaredField("lastActivityNanos"); + lastActivityField.setAccessible(true); + lastActivityField.set(stream, System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(100)); + + Assertions.assertTrue(mux.resetIfExpired(stream, System.nanoTime())); + + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, exceptionCaptor.getValue()); + Assertions.assertTrue(stream.isLocalClosed()); + Assertions.assertTrue(stream.isClosed()); + } + + @Test + void testResetIfExpiredIgnoresStreamBeforeDeadline() throws Exception { + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.ODD, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + final H2StreamChannel channel = mux.createChannel(1); + final H2Stream stream = mux.createStream(channel, streamHandler); + + stream.setTimeout(Timeout.ofMinutes(1)); + stream.activate(); + + Assertions.assertFalse(mux.resetIfExpired(stream, System.nanoTime())); + Assertions.assertFalse(stream.isLocalClosed()); + + Mockito.verify(streamHandler, Mockito.never()).failed(ArgumentMatchers.any()); + } + + @Test + void testExpiredStreamResetOnInboundData() throws Exception { + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.EVEN, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + // Encode request headers + final ByteArrayBuffer headerBuf = new ByteArrayBuffer(200); + final HPackEncoder encoder = new HPackEncoder(h2Config.getHeaderTableSize(), + CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + final List
headers = Arrays.asList( + new BasicHeader(":method", "GET"), + new BasicHeader(":scheme", "http"), + new BasicHeader(":path", "/"), + new BasicHeader(":authority", "www.example.com")); + encoder.encodeHeaders(headerBuf, headers, h2Config.isCompressionEnabled()); + + final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024); + final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024); + + // Send HEADERS (endHeaders=true, endStream=false) to create stream 1 + final RawFrame headerFrame = FRAME_FACTORY.createHeaders(1, + ByteBuffer.wrap(headerBuf.array(), 0, headerBuf.length()), true, false); + outBuffer.write(headerFrame, writableChannel); + mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray())); + + Mockito.verify(streamHandler).consumeHeader(headersCaptor.capture(), ArgumentMatchers.eq(false)); + Assertions.assertFalse(headersCaptor.getValue().isEmpty()); + + // Retrieve the stream and set a short timeout + final Field streamsField = AbstractH2StreamMultiplexer.class.getDeclaredField("streams"); + streamsField.setAccessible(true); + final H2Streams h2Streams = (H2Streams) streamsField.get(mux); + final H2Stream stream = h2Streams.lookupValid(1); + stream.setTimeout(Timeout.ofMilliseconds(50)); + + // Push last activity into the past so the timeout is expired + final Field lastActivityField = H2Stream.class.getDeclaredField("lastActivityNanos"); + lastActivityField.setAccessible(true); + lastActivityField.set(stream, System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(100)); + + // Send DATA frame for the expired stream + writableChannel.reset(); + final RawFrame dataFrame = FRAME_FACTORY.createData(1, + ByteBuffer.wrap("hello".getBytes(StandardCharsets.US_ASCII)), true); + outBuffer.write(dataFrame, writableChannel); + mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray())); + + // The handler must receive a timeout failure, not data + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, exceptionCaptor.getValue()); + Mockito.verify(streamHandler, Mockito.never()).consumeData( + ArgumentMatchers.any(ByteBuffer.class), ArgumentMatchers.anyBoolean()); + } + + @Test + void testExpiredStreamResetOnInboundContinuation() throws Exception { + Mockito.when(protocolIOSession.write(ArgumentMatchers.any(ByteBuffer.class))) + .thenAnswer(invocation -> { + final ByteBuffer buffer = invocation.getArgument(0, ByteBuffer.class); + final int remaining = buffer.remaining(); + buffer.position(buffer.limit()); + return remaining; + }); + Mockito.doNothing().when(protocolIOSession).setEvent(ArgumentMatchers.anyInt()); + Mockito.doNothing().when(protocolIOSession).clearEvent(ArgumentMatchers.anyInt()); + + final H2Config h2Config = H2Config.custom().build(); + final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl( + protocolIOSession, + FRAME_FACTORY, + StreamIdGenerator.EVEN, + httpProcessor, + CharCodingConfig.DEFAULT, + h2Config, + h2StreamListener, + () -> streamHandler); + + // Encode request headers + final ByteArrayBuffer headerBuf = new ByteArrayBuffer(200); + final HPackEncoder encoder = new HPackEncoder(h2Config.getHeaderTableSize(), + CharCodingSupport.createEncoder(CharCodingConfig.DEFAULT)); + final List
headers = Arrays.asList( + new BasicHeader(":method", "GET"), + new BasicHeader(":scheme", "http"), + new BasicHeader(":path", "/"), + new BasicHeader(":authority", "www.example.com")); + encoder.encodeHeaders(headerBuf, headers, h2Config.isCompressionEnabled()); + + // Split encoded headers: first part in HEADERS, remainder in CONTINUATION + final int split = headerBuf.length() / 2; + final WritableByteChannelMock writableChannel = new WritableByteChannelMock(1024); + final FrameOutputBuffer outBuffer = new FrameOutputBuffer(16 * 1024); + + // Send HEADERS (endHeaders=false, endStream=false) to create stream 1 + final RawFrame headerFrame = FRAME_FACTORY.createHeaders(1, + ByteBuffer.wrap(headerBuf.array(), 0, split), false, false); + outBuffer.write(headerFrame, writableChannel); + mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray())); + + // Stream created but consumeHeader not yet called (waiting for CONTINUATION) + Mockito.verify(streamHandler, Mockito.never()).consumeHeader( + ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean()); + + // Retrieve the stream and set a short timeout + final Field streamsField = AbstractH2StreamMultiplexer.class.getDeclaredField("streams"); + streamsField.setAccessible(true); + final H2Streams h2Streams = (H2Streams) streamsField.get(mux); + final H2Stream stream = h2Streams.lookupValid(1); + stream.setTimeout(Timeout.ofMilliseconds(50)); + + // Push last activity into the past so the timeout is expired + final Field lastActivityField = H2Stream.class.getDeclaredField("lastActivityNanos"); + lastActivityField.setAccessible(true); + lastActivityField.set(stream, System.nanoTime() - TimeUnit.MILLISECONDS.toNanos(100)); + + // Send CONTINUATION (endHeaders=true) for the expired stream + writableChannel.reset(); + final RawFrame continuationFrame = FRAME_FACTORY.createContinuation(1, + ByteBuffer.wrap(headerBuf.array(), split, headerBuf.length() - split), true); + outBuffer.write(continuationFrame, writableChannel); + mux.onInput(ByteBuffer.wrap(writableChannel.toByteArray())); + + // The handler must receive a timeout failure, not header consumption + Mockito.verify(streamHandler).failed(exceptionCaptor.capture()); + Assertions.assertInstanceOf(H2StreamTimeoutException.class, exceptionCaptor.getValue()); + Mockito.verify(streamHandler, Mockito.never()).consumeHeader( + ArgumentMatchers.anyList(), ArgumentMatchers.anyBoolean()); + } + @Test void testOutboundTrailersWithPseudoHeaderRejected() throws Exception { final AbstractH2StreamMultiplexer mux = new H2StreamMultiplexerImpl(