diff --git a/java11/src/main/java/feign/http2client/Http2Client.java b/java11/src/main/java/feign/http2client/Http2Client.java index 9eb718df9..8148edccc 100644 --- a/java11/src/main/java/feign/http2client/Http2Client.java +++ b/java11/src/main/java/feign/http2client/Http2Client.java @@ -38,6 +38,7 @@ import java.net.http.HttpRequest.Builder; import java.net.http.HttpResponse; import java.net.http.HttpResponse.BodyHandlers; +import java.net.http.HttpTimeoutException; import java.time.Duration; import java.util.Arrays; import java.util.Collection; @@ -52,6 +53,11 @@ import java.util.TreeSet; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.Function; import java.util.stream.Collectors; import java.util.zip.GZIPInputStream; @@ -59,6 +65,14 @@ public class Http2Client implements Client, AsyncClient { + private static final ScheduledExecutorService BODY_READ_TIMEOUT_EXECUTOR = + Executors.newSingleThreadScheduledExecutor( + runnable -> { + Thread thread = new Thread(runnable, "feign-http2client-body-timeout"); + thread.setDaemon(true); + return thread; + }); + private final HttpClient client; private final Map> clients = new ConcurrentHashMap<>(); @@ -109,7 +123,7 @@ public Response execute(Request request, Options options) throws IOException { throw new IOException(e); } - return toFeignResponse(request, httpResponse); + return toFeignResponse(request, httpResponse, options); } @Override @@ -125,17 +139,22 @@ public CompletableFuture execute( HttpClient clientForRequest = getOrCreateClient(options); CompletableFuture> future = clientForRequest.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofInputStream()); - return future.thenApply(httpResponse -> toFeignResponse(request, httpResponse)); + return future.thenApply(httpResponse -> toFeignResponse(request, httpResponse, options)); } protected Response toFeignResponse(Request request, HttpResponse httpResponse) { + return toFeignResponse(request, httpResponse, null); + } + + private Response toFeignResponse( + Request request, HttpResponse httpResponse, Options options) { final OptionalLong length = httpResponse.headers().firstValueAsLong("Content-Length"); final Integer contentLength = length.isPresent() && length.getAsLong() >= 0 && length.getAsLong() <= Integer.MAX_VALUE ? (int) length.getAsLong() : null; - InputStream body = httpResponse.body(); + InputStream body = withReadTimeout(httpResponse.body(), options); if (httpResponse.headers().allValues(CONTENT_ENCODING).contains(ENCODING_GZIP)) { try { @@ -156,6 +175,110 @@ protected Response toFeignResponse(Request request, HttpResponse ht .build(); } + private static InputStream withReadTimeout(InputStream body, Options options) { + if (body == null || options == null || options.readTimeout() <= 0) { + return body; + } + return new TimeoutInputStream(body, options.readTimeout(), options.readTimeoutUnit()); + } + + private static final class TimeoutInputStream extends InputStream { + + private final InputStream delegate; + private final long timeout; + private final TimeUnit timeoutUnit; + + private TimeoutInputStream(InputStream delegate, long timeout, TimeUnit timeoutUnit) { + this.delegate = delegate; + this.timeout = timeout; + this.timeoutUnit = timeoutUnit; + } + + @Override + public int read() throws IOException { + return readWithTimeout(delegate::read); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return readWithTimeout(() -> delegate.read(b, off, len)); + } + + @Override + public int available() throws IOException { + return delegate.available(); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + private int readWithTimeout(BodyRead read) throws IOException { + final AtomicBoolean completed = new AtomicBoolean(false); + final AtomicBoolean timedOut = new AtomicBoolean(false); + final ScheduledFuture timeoutFuture = + BODY_READ_TIMEOUT_EXECUTOR.schedule( + () -> { + if (completed.compareAndSet(false, true)) { + timedOut.set(true); + try { + delegate.close(); + } catch (IOException ignored) { + } + } + }, + timeout, + timeoutUnit); + + try { + final int result = read.read(); + if (completed.compareAndSet(false, true)) { + timeoutFuture.cancel(false); + return result; + } + throw timeoutException(null); + } catch (IOException e) { + if (completed.compareAndSet(false, true)) { + timeoutFuture.cancel(false); + } + final HttpTimeoutException timeoutException = findTimeoutException(e); + if (timedOut.get() || timeoutException != null) { + throw timeoutException == null ? timeoutException(e) : timeoutException; + } + throw e; + } catch (RuntimeException e) { + if (completed.compareAndSet(false, true)) { + timeoutFuture.cancel(false); + } + throw e; + } + } + + private static HttpTimeoutException timeoutException(IOException cause) { + final HttpTimeoutException exception = new HttpTimeoutException("response timed out"); + if (cause != null) { + exception.initCause(cause); + } + return exception; + } + + private static HttpTimeoutException findTimeoutException(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof HttpTimeoutException) { + return (HttpTimeoutException) current; + } + current = current.getCause(); + } + return null; + } + } + + private interface BodyRead { + int read() throws IOException; + } + private HttpClient getOrCreateClient(Options options) { if (doesClientConfigurationDiffer(options)) { // create a new client from the existing one - but with connectTimeout and followRedirect diff --git a/java11/src/test/java/feign/http2client/test/Http2ClientAsyncTest.java b/java11/src/test/java/feign/http2client/test/Http2ClientAsyncTest.java index c5a29c43e..4147d2f35 100644 --- a/java11/src/test/java/feign/http2client/test/Http2ClientAsyncTest.java +++ b/java11/src/test/java/feign/http2client/test/Http2ClientAsyncTest.java @@ -63,6 +63,7 @@ import java.io.IOException; import java.lang.reflect.Type; import java.net.URI; +import java.net.http.HttpTimeoutException; import java.nio.charset.StandardCharsets; import java.time.Clock; import java.time.Instant; @@ -510,6 +511,23 @@ void doesntRetryAfterResponseIsSent() throws Throwable { assertThat(exception.getMessage()).contains("timeout reading POST http://"); } + @Test + void timeoutReadingResponseBody() throws Throwable { + server.enqueue(new MockResponse().setBody("foo").setBodyDelay(1, TimeUnit.SECONDS)); + + final TestInterfaceAsync api = + newAsyncBuilder() + .options( + new Request.Options(500, TimeUnit.MILLISECONDS, 500, TimeUnit.MILLISECONDS, true)) + .target("http://localhost:" + server.getPort()); + + final CompletableFuture cf = api.post(); + server.takeRequest(); + + Throwable exception = assertThrows(FeignException.class, () -> unwrap(cf)); + assertThat(exception).hasCauseInstanceOf(HttpTimeoutException.class); + } + @Test void throwsFeignExceptionIncludingBody() throws Throwable { server.enqueue(new MockResponse().setBody("success!")); @@ -1060,6 +1078,11 @@ TestInterfaceAsyncBuilder dismiss404() { return this; } + TestInterfaceAsyncBuilder options(Request.Options options) { + delegate.options(options); + return this; + } + TestInterfaceAsyncBuilder queryMapEndcoder(QueryMapEncoder queryMapEncoder) { delegate.queryMapEncoder(queryMapEncoder); return this; diff --git a/java11/src/test/java/feign/http2client/test/Http2ClientTest.java b/java11/src/test/java/feign/http2client/test/Http2ClientTest.java index 4f3602322..af8996e14 100644 --- a/java11/src/test/java/feign/http2client/test/Http2ClientTest.java +++ b/java11/src/test/java/feign/http2client/test/Http2ClientTest.java @@ -175,6 +175,21 @@ void timeoutTest() { assertThat(exception).hasCauseInstanceOf(HttpTimeoutException.class); } + @Test + void timeoutReadingResponseBody() { + server.enqueue(new MockResponse().setBody("foo").setBodyDelay(1, TimeUnit.SECONDS)); + + final TestInterface api = + newBuilder() + .retryer(Retryer.NEVER_RETRY) + .options( + new Request.Options(500, TimeUnit.MILLISECONDS, 500, TimeUnit.MILLISECONDS, true)) + .target(TestInterface.class, server.url("/").toString()); + + FeignException exception = assertThrows(FeignException.class, () -> api.timeout()); + assertThat(exception).hasCauseInstanceOf(HttpTimeoutException.class); + } + @Test void getWithRequestBody() throws Exception { // MockWebServer rejects GET requests carrying a body ("Request must not have a body"),