Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 126 additions & 3 deletions java11/src/main/java/feign/http2client/Http2Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.net.http.HttpRequest.Builder;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.net.http.HttpTimeoutException;
import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -52,13 +53,26 @@
import java.util.TreeSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.zip.GZIPInputStream;
import java.util.zip.InflaterInputStream;

public class Http2Client implements Client, AsyncClient<Object> {

private static final ScheduledExecutorService BODY_READ_TIMEOUT_EXECUTOR =
Executors.newSingleThreadScheduledExecutor(
runnable -> {
Thread thread = new Thread(runnable, "feign-http2client-body-timeout");
thread.setDaemon(true);
return thread;
});

private final HttpClient client;

private final Map<Integer, SoftReference<HttpClient>> clients = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -109,7 +123,7 @@ public Response execute(Request request, Options options) throws IOException {
throw new IOException(e);
}

return toFeignResponse(request, httpResponse);
return toFeignResponse(request, httpResponse, options);
}

@Override
Expand All @@ -125,17 +139,22 @@ public CompletableFuture<Response> execute(
HttpClient clientForRequest = getOrCreateClient(options);
CompletableFuture<HttpResponse<InputStream>> future =
clientForRequest.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofInputStream());
return future.thenApply(httpResponse -> toFeignResponse(request, httpResponse));
return future.thenApply(httpResponse -> toFeignResponse(request, httpResponse, options));
}

protected Response toFeignResponse(Request request, HttpResponse<InputStream> httpResponse) {
return toFeignResponse(request, httpResponse, null);
}

private Response toFeignResponse(
Request request, HttpResponse<InputStream> httpResponse, Options options) {
final OptionalLong length = httpResponse.headers().firstValueAsLong("Content-Length");
final Integer contentLength =
length.isPresent() && length.getAsLong() >= 0 && length.getAsLong() <= Integer.MAX_VALUE
? (int) length.getAsLong()
: null;

InputStream body = httpResponse.body();
InputStream body = withReadTimeout(httpResponse.body(), options);

if (httpResponse.headers().allValues(CONTENT_ENCODING).contains(ENCODING_GZIP)) {
try {
Expand All @@ -156,6 +175,110 @@ protected Response toFeignResponse(Request request, HttpResponse<InputStream> ht
.build();
}

private static InputStream withReadTimeout(InputStream body, Options options) {
if (body == null || options == null || options.readTimeout() <= 0) {
return body;
}
return new TimeoutInputStream(body, options.readTimeout(), options.readTimeoutUnit());
}

private static final class TimeoutInputStream extends InputStream {

private final InputStream delegate;
private final long timeout;
private final TimeUnit timeoutUnit;

private TimeoutInputStream(InputStream delegate, long timeout, TimeUnit timeoutUnit) {
this.delegate = delegate;
this.timeout = timeout;
this.timeoutUnit = timeoutUnit;
}

@Override
public int read() throws IOException {
return readWithTimeout(delegate::read);
}

@Override
public int read(byte[] b, int off, int len) throws IOException {
return readWithTimeout(() -> delegate.read(b, off, len));
}

@Override
public int available() throws IOException {
return delegate.available();
}

@Override
public void close() throws IOException {
delegate.close();
}

private int readWithTimeout(BodyRead read) throws IOException {
final AtomicBoolean completed = new AtomicBoolean(false);
final AtomicBoolean timedOut = new AtomicBoolean(false);
final ScheduledFuture<?> timeoutFuture =
BODY_READ_TIMEOUT_EXECUTOR.schedule(
() -> {
if (completed.compareAndSet(false, true)) {
timedOut.set(true);
try {
delegate.close();
} catch (IOException ignored) {
}
}
},
timeout,
timeoutUnit);

try {
final int result = read.read();
if (completed.compareAndSet(false, true)) {
timeoutFuture.cancel(false);
return result;
}
throw timeoutException(null);
} catch (IOException e) {
if (completed.compareAndSet(false, true)) {
timeoutFuture.cancel(false);
}
final HttpTimeoutException timeoutException = findTimeoutException(e);
if (timedOut.get() || timeoutException != null) {
throw timeoutException == null ? timeoutException(e) : timeoutException;
}
throw e;
} catch (RuntimeException e) {
if (completed.compareAndSet(false, true)) {
timeoutFuture.cancel(false);
}
throw e;
}
}

private static HttpTimeoutException timeoutException(IOException cause) {
final HttpTimeoutException exception = new HttpTimeoutException("response timed out");
if (cause != null) {
exception.initCause(cause);
}
return exception;
}

private static HttpTimeoutException findTimeoutException(Throwable throwable) {
Throwable current = throwable;
while (current != null) {
if (current instanceof HttpTimeoutException) {
return (HttpTimeoutException) current;
}
current = current.getCause();
}
return null;
}
}

private interface BodyRead {
int read() throws IOException;
}

private HttpClient getOrCreateClient(Options options) {
if (doesClientConfigurationDiffer(options)) {
// create a new client from the existing one - but with connectTimeout and followRedirect
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.io.IOException;
import java.lang.reflect.Type;
import java.net.URI;
import java.net.http.HttpTimeoutException;
import java.nio.charset.StandardCharsets;
import java.time.Clock;
import java.time.Instant;
Expand Down Expand Up @@ -510,6 +511,23 @@ void doesntRetryAfterResponseIsSent() throws Throwable {
assertThat(exception.getMessage()).contains("timeout reading POST http://");
}

@Test
void timeoutReadingResponseBody() throws Throwable {
server.enqueue(new MockResponse().setBody("foo").setBodyDelay(1, TimeUnit.SECONDS));

final TestInterfaceAsync api =
newAsyncBuilder()
.options(
new Request.Options(500, TimeUnit.MILLISECONDS, 500, TimeUnit.MILLISECONDS, true))
.target("http://localhost:" + server.getPort());

final CompletableFuture<?> cf = api.post();
server.takeRequest();

Throwable exception = assertThrows(FeignException.class, () -> unwrap(cf));
assertThat(exception).hasCauseInstanceOf(HttpTimeoutException.class);
}

@Test
void throwsFeignExceptionIncludingBody() throws Throwable {
server.enqueue(new MockResponse().setBody("success!"));
Expand Down Expand Up @@ -1060,6 +1078,11 @@ TestInterfaceAsyncBuilder dismiss404() {
return this;
}

TestInterfaceAsyncBuilder options(Request.Options options) {
delegate.options(options);
return this;
}

TestInterfaceAsyncBuilder queryMapEndcoder(QueryMapEncoder queryMapEncoder) {
delegate.queryMapEncoder(queryMapEncoder);
return this;
Expand Down
15 changes: 15 additions & 0 deletions java11/src/test/java/feign/http2client/test/Http2ClientTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,21 @@ void timeoutTest() {
assertThat(exception).hasCauseInstanceOf(HttpTimeoutException.class);
}

@Test
void timeoutReadingResponseBody() {
server.enqueue(new MockResponse().setBody("foo").setBodyDelay(1, TimeUnit.SECONDS));

final TestInterface api =
newBuilder()
.retryer(Retryer.NEVER_RETRY)
.options(
new Request.Options(500, TimeUnit.MILLISECONDS, 500, TimeUnit.MILLISECONDS, true))
.target(TestInterface.class, server.url("/").toString());

FeignException exception = assertThrows(FeignException.class, () -> api.timeout());
assertThat(exception).hasCauseInstanceOf(HttpTimeoutException.class);
}

@Test
void getWithRequestBody() throws Exception {
// MockWebServer rejects GET requests carrying a body ("Request must not have a body"),
Expand Down