Skip to content

Commit 56585b8

Browse files
nebhaletwoseat
authored andcommitted
Selective JSON Decoding
Signed-off-by: Ben Hale <bhale@pivotal.io> Improve Error Handling on Startup Previously we assumed that any value returned from CF would be formatted as json. This isn't true if we have certain error conditions, for example if a 404 is returned when we try to access the /v2/info endpoint. These scenarios produced unhelpful json-based errors that hid the actual problem. With this commit, while we will still produce an error (because we need the json response to continue), the error should be informative. Signed-off-by: Paul Harris <harrisp@vmware.com>
1 parent e41557e commit 56585b8

7 files changed

Lines changed: 141 additions & 61 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2013-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor;
18+
19+
import org.immutables.value.Value;
20+
import reactor.netty.Connection;
21+
import reactor.netty.http.client.HttpClientResponse;
22+
23+
@Value.Immutable
24+
public interface _HttpClientResponseWithConnection {
25+
26+
@Value.Parameter
27+
Connection getConnection();
28+
29+
@Value.Parameter
30+
HttpClientResponse getResponse();
31+
32+
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/client/CloudFoundryClientCompatibilityChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ void check() {
4242
.build())
4343
.map(response -> Version.valueOf(response.getApiVersion()))
4444
.zipWith(Mono.just(Version.valueOf(CloudFoundryClient.SUPPORTED_API_VERSION)))
45-
.doOnNext(consumer((server, supported) -> logCompatibility(server, supported, this.logger)))
46-
.subscribe();
45+
.subscribe(consumer((server, supported) -> logCompatibility(server, supported, this.logger)), t -> this.logger.error("An error occurred while checking version compatibility:", t));
4746
}
4847

4948
private static void logCompatibility(Version server, Version supported, Logger logger) {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19-
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
19+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2020
import reactor.core.publisher.Flux;
2121

2222
import java.util.function.Function;
2323

24-
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithBody>, Flux<HttpClientResponseWithBody>> {
24+
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithConnection>, Flux<HttpClientResponseWithConnection>> {
2525

2626
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMappers.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.cloudfoundry.client.v2.ClientV2Exception;
2323
import org.cloudfoundry.client.v3.ClientV3Exception;
2424
import org.cloudfoundry.client.v3.Errors;
25-
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
25+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2626
import org.cloudfoundry.uaa.UaaException;
2727
import reactor.core.publisher.Mono;
28+
import reactor.netty.ByteBufFlux;
29+
import reactor.netty.Connection;
2830
import reactor.netty.http.client.HttpClientResponse;
2931

3032
import java.util.Map;
@@ -58,15 +60,21 @@ public static ErrorPayloadMapper clientV3(ObjectMapper objectMapper) {
5860

5961
public static ErrorPayloadMapper fallback() {
6062
return inbound -> inbound
61-
.flatMap(responseWithBody -> {
62-
HttpClientResponse response = responseWithBody.getResponse();
63+
.flatMap(responseWithConnection -> {
64+
HttpClientResponse response = responseWithConnection.getResponse();
6365

6466
if (isError(response)) {
65-
return responseWithBody.getBody().aggregate().asString()
66-
.flatMap(payload -> Mono.error(new UnknownCloudFoundryException(response.status().code(), payload)));
67+
Connection connection = responseWithConnection.getConnection();
68+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive());
69+
70+
return body.aggregate().asString()
71+
.doFinally(signalType -> connection.channel().close())
72+
.flatMap(payload -> {
73+
return Mono.error(new UnknownCloudFoundryException(response.status().code(), payload));
74+
});
6775
}
6876

69-
return Mono.just(responseWithBody);
77+
return Mono.just(responseWithConnection);
7078
});
7179
}
7280

@@ -87,13 +95,17 @@ private static boolean isError(HttpClientResponse response) {
8795
return statusClass == CLIENT_ERROR || statusClass == SERVER_ERROR;
8896
}
8997

90-
private static Function<HttpClientResponseWithBody, Mono<HttpClientResponseWithBody>> mapToError(ExceptionGenerator exceptionGenerator) {
98+
private static Function<HttpClientResponseWithConnection, Mono<HttpClientResponseWithConnection>> mapToError(ExceptionGenerator exceptionGenerator) {
9199
return response -> {
92100
if (!isError(response.getResponse())) {
93101
return Mono.just(response);
94102
}
95103

96-
return response.getBody().aggregate().asString()
104+
Connection connection = response.getConnection();
105+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
106+
.doFinally(signalType -> connection.dispose()));
107+
108+
return body.aggregate().asString()
97109
.switchIfEmpty(Mono.error(new UnknownCloudFoundryException(response.getResponse().status().code())))
98110
.flatMap(payload -> {
99111
try {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.cloudfoundry.reactor.util;
1818

1919
import io.netty.channel.ChannelHandler;
20+
import io.netty.handler.codec.http.HttpHeaderNames;
21+
import io.netty.handler.codec.http.HttpHeaderValues;
2022
import io.netty.handler.codec.http.HttpHeaders;
2123
import io.netty.handler.codec.http.HttpMethod;
2224
import io.netty.handler.codec.http.HttpResponseStatus;
2325
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
26+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2427
import org.reactivestreams.Publisher;
2528
import org.springframework.web.util.UriComponentsBuilder;
2629
import reactor.core.publisher.Flux;
@@ -152,48 +155,62 @@ public ResponseReceiver addChannelHandler(Function<HttpClientResponse, ChannelHa
152155
}
153156

154157
public Mono<HttpClientResponse> get() {
155-
return this.responseReceiver.response((resp, body) -> Mono.just(HttpClientResponseWithBody.of(body, resp)))
158+
return this.responseReceiver.responseConnection((response, connection) -> Mono.just(HttpClientResponseWithConnection.of(connection, response)))
156159
.transform(this::processResponse)
157-
.map(HttpClientResponseWithBody::getResponse)
160+
.map(HttpClientResponseWithConnection::getResponse)
158161
.singleOrEmpty();
159162
}
160163

161164
public <T> Mono<T> parseBody(Class<T> bodyType) {
162-
addChannelHandler(ignore -> JsonCodec.createDecoder());
165+
addChannelHandler(response -> {
166+
if (HttpHeaderValues.APPLICATION_JSON.contentEquals(response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE))) {
167+
return JsonCodec.createDecoder();
168+
}
169+
170+
return null;
171+
});
172+
163173
return parseBodyToMono(responseWithBody -> deserialized(responseWithBody.getBody(), bodyType));
164174
}
165175

166176
public <T> Flux<T> parseBodyToFlux(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
167-
return this.responseReceiver.responseConnection((response, connection) -> {
168-
attachChannelHandlers(response, connection);
169-
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
170-
.doFinally(signalType -> connection.dispose()));
171-
172-
return Mono.just(HttpClientResponseWithBody.of(body, response));
173-
})
177+
return this.responseReceiver.responseConnection((response, connection) -> Mono.just(HttpClientResponseWithConnection.of(connection, response)))
174178
.transform(this::processResponse)
179+
.flatMap(httpClientResponseWithConnection -> {
180+
Connection connection = httpClientResponseWithConnection.getConnection();
181+
HttpClientResponse response = httpClientResponseWithConnection.getResponse();
182+
183+
attachChannelHandlers(response, connection);
184+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
185+
.doFinally(signalType -> connection.dispose()));
186+
187+
return Mono.just(HttpClientResponseWithBody.of(body, response));
188+
})
175189
.flatMap(responseTransformer);
176190
}
177191

178192
public <T> Mono<T> parseBodyToMono(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
179193
return parseBodyToFlux(responseTransformer).singleOrEmpty();
180194
}
181195

182-
private static boolean isUnauthorized(HttpClientResponseWithBody response) {
196+
private static boolean isUnauthorized(HttpClientResponseWithConnection response) {
183197
return response.getResponse().status() == HttpResponseStatus.UNAUTHORIZED;
184198
}
185199

186200
private void attachChannelHandlers(HttpClientResponse response, Connection connection) {
187201
for (Function<HttpClientResponse, ChannelHandler> handlerBuilder : this.channelHandlerBuilders) {
188-
connection.addHandler(handlerBuilder.apply(response));
202+
ChannelHandler handler = handlerBuilder.apply(response);
203+
if (handler != null) {
204+
connection.addHandler(handler);
205+
}
189206
}
190207
}
191208

192209
private <T> Mono<T> deserialized(ByteBufFlux body, Class<T> bodyType) {
193210
return JsonCodec.decode(this.context.getConnectionContext().getObjectMapper(), body, bodyType);
194211
}
195212

196-
private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponseWithBody> inbound) {
213+
private Flux<HttpClientResponseWithConnection> invalidateToken(Flux<HttpClientResponseWithConnection> inbound) {
197214
return inbound
198215
.doOnNext(response -> {
199216
if (isUnauthorized(response)) {
@@ -203,7 +220,7 @@ private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponse
203220
});
204221
}
205222

206-
private Flux<HttpClientResponseWithBody> processResponse(Flux<HttpClientResponseWithBody> inbound) {
223+
private Flux<HttpClientResponseWithConnection> processResponse(Flux<HttpClientResponseWithConnection> inbound) {
207224
return inbound
208225
.transform(this::invalidateToken)
209226
.retry(this.context.getConnectionContext().getInvalidTokenRetries(),

0 commit comments

Comments
 (0)