Skip to content

Commit 564e3a8

Browse files
committed
Merge branch 'selective-json-decoding' into 3.x
2 parents e41557e + 56585b8 commit 564e3a8

7 files changed

Lines changed: 141 additions & 61 deletions

File tree

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
/*
2+
* Copyright 2013-2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.cloudfoundry.reactor;
18+
19+
import org.immutables.value.Value;
20+
import reactor.netty.Connection;
21+
import reactor.netty.http.client.HttpClientResponse;
22+
23+
@Value.Immutable
24+
public interface _HttpClientResponseWithConnection {
25+
26+
@Value.Parameter
27+
Connection getConnection();
28+
29+
@Value.Parameter
30+
HttpClientResponse getResponse();
31+
32+
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/client/CloudFoundryClientCompatibilityChecker.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,8 +42,7 @@ void check() {
4242
.build())
4343
.map(response -> Version.valueOf(response.getApiVersion()))
4444
.zipWith(Mono.just(Version.valueOf(CloudFoundryClient.SUPPORTED_API_VERSION)))
45-
.doOnNext(consumer((server, supported) -> logCompatibility(server, supported, this.logger)))
46-
.subscribe();
45+
.subscribe(consumer((server, supported) -> logCompatibility(server, supported, this.logger)), t -> this.logger.error("An error occurred while checking version compatibility:", t));
4746
}
4847

4948
private static void logCompatibility(Version server, Version supported, Logger logger) {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMapper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,11 @@
1616

1717
package org.cloudfoundry.reactor.util;
1818

19-
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
19+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2020
import reactor.core.publisher.Flux;
2121

2222
import java.util.function.Function;
2323

24-
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithBody>, Flux<HttpClientResponseWithBody>> {
24+
public interface ErrorPayloadMapper extends Function<Flux<HttpClientResponseWithConnection>, Flux<HttpClientResponseWithConnection>> {
2525

2626
}

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/ErrorPayloadMappers.java

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import org.cloudfoundry.client.v2.ClientV2Exception;
2323
import org.cloudfoundry.client.v3.ClientV3Exception;
2424
import org.cloudfoundry.client.v3.Errors;
25-
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
25+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2626
import org.cloudfoundry.uaa.UaaException;
2727
import reactor.core.publisher.Mono;
28+
import reactor.netty.ByteBufFlux;
29+
import reactor.netty.Connection;
2830
import reactor.netty.http.client.HttpClientResponse;
2931

3032
import java.util.Map;
@@ -58,15 +60,21 @@ public static ErrorPayloadMapper clientV3(ObjectMapper objectMapper) {
5860

5961
public static ErrorPayloadMapper fallback() {
6062
return inbound -> inbound
61-
.flatMap(responseWithBody -> {
62-
HttpClientResponse response = responseWithBody.getResponse();
63+
.flatMap(responseWithConnection -> {
64+
HttpClientResponse response = responseWithConnection.getResponse();
6365

6466
if (isError(response)) {
65-
return responseWithBody.getBody().aggregate().asString()
66-
.flatMap(payload -> Mono.error(new UnknownCloudFoundryException(response.status().code(), payload)));
67+
Connection connection = responseWithConnection.getConnection();
68+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive());
69+
70+
return body.aggregate().asString()
71+
.doFinally(signalType -> connection.channel().close())
72+
.flatMap(payload -> {
73+
return Mono.error(new UnknownCloudFoundryException(response.status().code(), payload));
74+
});
6775
}
6876

69-
return Mono.just(responseWithBody);
77+
return Mono.just(responseWithConnection);
7078
});
7179
}
7280

@@ -87,13 +95,17 @@ private static boolean isError(HttpClientResponse response) {
8795
return statusClass == CLIENT_ERROR || statusClass == SERVER_ERROR;
8896
}
8997

90-
private static Function<HttpClientResponseWithBody, Mono<HttpClientResponseWithBody>> mapToError(ExceptionGenerator exceptionGenerator) {
98+
private static Function<HttpClientResponseWithConnection, Mono<HttpClientResponseWithConnection>> mapToError(ExceptionGenerator exceptionGenerator) {
9199
return response -> {
92100
if (!isError(response.getResponse())) {
93101
return Mono.just(response);
94102
}
95103

96-
return response.getBody().aggregate().asString()
104+
Connection connection = response.getConnection();
105+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
106+
.doFinally(signalType -> connection.dispose()));
107+
108+
return body.aggregate().asString()
97109
.switchIfEmpty(Mono.error(new UnknownCloudFoundryException(response.getResponse().status().code())))
98110
.flatMap(payload -> {
99111
try {

cloudfoundry-client-reactor/src/main/java/org/cloudfoundry/reactor/util/Operator.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,13 @@
1717
package org.cloudfoundry.reactor.util;
1818

1919
import io.netty.channel.ChannelHandler;
20+
import io.netty.handler.codec.http.HttpHeaderNames;
21+
import io.netty.handler.codec.http.HttpHeaderValues;
2022
import io.netty.handler.codec.http.HttpHeaders;
2123
import io.netty.handler.codec.http.HttpMethod;
2224
import io.netty.handler.codec.http.HttpResponseStatus;
2325
import org.cloudfoundry.reactor.HttpClientResponseWithBody;
26+
import org.cloudfoundry.reactor.HttpClientResponseWithConnection;
2427
import org.reactivestreams.Publisher;
2528
import org.springframework.web.util.UriComponentsBuilder;
2629
import reactor.core.publisher.Flux;
@@ -152,48 +155,62 @@ public ResponseReceiver addChannelHandler(Function<HttpClientResponse, ChannelHa
152155
}
153156

154157
public Mono<HttpClientResponse> get() {
155-
return this.responseReceiver.response((resp, body) -> Mono.just(HttpClientResponseWithBody.of(body, resp)))
158+
return this.responseReceiver.responseConnection((response, connection) -> Mono.just(HttpClientResponseWithConnection.of(connection, response)))
156159
.transform(this::processResponse)
157-
.map(HttpClientResponseWithBody::getResponse)
160+
.map(HttpClientResponseWithConnection::getResponse)
158161
.singleOrEmpty();
159162
}
160163

161164
public <T> Mono<T> parseBody(Class<T> bodyType) {
162-
addChannelHandler(ignore -> JsonCodec.createDecoder());
165+
addChannelHandler(response -> {
166+
if (HttpHeaderValues.APPLICATION_JSON.contentEquals(response.responseHeaders().get(HttpHeaderNames.CONTENT_TYPE))) {
167+
return JsonCodec.createDecoder();
168+
}
169+
170+
return null;
171+
});
172+
163173
return parseBodyToMono(responseWithBody -> deserialized(responseWithBody.getBody(), bodyType));
164174
}
165175

166176
public <T> Flux<T> parseBodyToFlux(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
167-
return this.responseReceiver.responseConnection((response, connection) -> {
168-
attachChannelHandlers(response, connection);
169-
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
170-
.doFinally(signalType -> connection.dispose()));
171-
172-
return Mono.just(HttpClientResponseWithBody.of(body, response));
173-
})
177+
return this.responseReceiver.responseConnection((response, connection) -> Mono.just(HttpClientResponseWithConnection.of(connection, response)))
174178
.transform(this::processResponse)
179+
.flatMap(httpClientResponseWithConnection -> {
180+
Connection connection = httpClientResponseWithConnection.getConnection();
181+
HttpClientResponse response = httpClientResponseWithConnection.getResponse();
182+
183+
attachChannelHandlers(response, connection);
184+
ByteBufFlux body = ByteBufFlux.fromInbound(connection.inbound().receive()
185+
.doFinally(signalType -> connection.dispose()));
186+
187+
return Mono.just(HttpClientResponseWithBody.of(body, response));
188+
})
175189
.flatMap(responseTransformer);
176190
}
177191

178192
public <T> Mono<T> parseBodyToMono(Function<HttpClientResponseWithBody, Publisher<T>> responseTransformer) {
179193
return parseBodyToFlux(responseTransformer).singleOrEmpty();
180194
}
181195

182-
private static boolean isUnauthorized(HttpClientResponseWithBody response) {
196+
private static boolean isUnauthorized(HttpClientResponseWithConnection response) {
183197
return response.getResponse().status() == HttpResponseStatus.UNAUTHORIZED;
184198
}
185199

186200
private void attachChannelHandlers(HttpClientResponse response, Connection connection) {
187201
for (Function<HttpClientResponse, ChannelHandler> handlerBuilder : this.channelHandlerBuilders) {
188-
connection.addHandler(handlerBuilder.apply(response));
202+
ChannelHandler handler = handlerBuilder.apply(response);
203+
if (handler != null) {
204+
connection.addHandler(handler);
205+
}
189206
}
190207
}
191208

192209
private <T> Mono<T> deserialized(ByteBufFlux body, Class<T> bodyType) {
193210
return JsonCodec.decode(this.context.getConnectionContext().getObjectMapper(), body, bodyType);
194211
}
195212

196-
private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponseWithBody> inbound) {
213+
private Flux<HttpClientResponseWithConnection> invalidateToken(Flux<HttpClientResponseWithConnection> inbound) {
197214
return inbound
198215
.doOnNext(response -> {
199216
if (isUnauthorized(response)) {
@@ -203,7 +220,7 @@ private Flux<HttpClientResponseWithBody> invalidateToken(Flux<HttpClientResponse
203220
});
204221
}
205222

206-
private Flux<HttpClientResponseWithBody> processResponse(Flux<HttpClientResponseWithBody> inbound) {
223+
private Flux<HttpClientResponseWithConnection> processResponse(Flux<HttpClientResponseWithConnection> inbound) {
207224
return inbound
208225
.transform(this::invalidateToken)
209226
.retry(this.context.getConnectionContext().getInvalidTokenRetries(),

0 commit comments

Comments
 (0)