Skip to content

Commit f2b6af6

Browse files
committed
GH-3167 Fix deserialization of ConatinerProperties
Nullify ObservationRegistry Make ExpressionSerializer top level class since jackson 3 has issues with inner classes Resolves #3167
1 parent 2960df4 commit f2b6af6

4 files changed

Lines changed: 87 additions & 0 deletions

File tree

binders/kafka-binder/spring-cloud-stream-binder-kafka/src/main/java/org/springframework/cloud/stream/binder/kafka/KafkaMessageChannelBinder.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,10 @@
151151
import org.springframework.util.backoff.ExponentialBackOff;
152152
import org.springframework.util.backoff.FixedBackOff;
153153

154+
import io.micrometer.observation.Observation;
155+
import io.micrometer.observation.Observation.Scope;
156+
import io.micrometer.observation.ObservationRegistry;
157+
154158
/**
155159
* A {@link org.springframework.cloud.stream.binder.Binder} that uses Kafka as the
156160
* underlying middleware.
@@ -861,6 +865,31 @@ private void propagateContainerProperties(ContainerProperties containerPropertie
861865
containerProperties.setMissingTopicsFatal(listener.isMissingTopicsFatal());
862866
containerProperties.setStopImmediate(listener.isImmediateStop());
863867
}
868+
869+
public static class NullObservationRegistry implements ObservationRegistry {
870+
871+
@Override
872+
public @org.jspecify.annotations.Nullable Observation getCurrentObservation() {
873+
return null;
874+
}
875+
876+
@Override
877+
public @org.jspecify.annotations.Nullable Scope getCurrentObservationScope() {
878+
return null;
879+
}
880+
881+
@Override
882+
public void setCurrentObservationScope(@org.jspecify.annotations.Nullable Scope current) {
883+
884+
}
885+
886+
@Override
887+
public ObservationConfig observationConfig() {
888+
return null;
889+
}
890+
891+
892+
}
864893

865894
/**
866895
* Returns an unmodifiable copy of {@link ContainerProperties} associated with the destination name
@@ -872,6 +901,8 @@ private void propagateContainerProperties(ContainerProperties containerPropertie
872901
@Override
873902
protected Map<String, Object> doGetAdditionalConfigurationProperties(String destinationName) {
874903
ContainerProperties kafkaContainerProperties = this.kafkaMessageListenerContainers.iterator().next().getContainerProperties();
904+
// see 3167 we need to nullify ObservationRegistry to avoid jackson deserialization error
905+
kafkaContainerProperties.setObservationRegistry(new NullObservationRegistry());
875906
Map mapOfContainerProperties = this.objectMapper.convertValue(kafkaContainerProperties, Map.class);
876907
Map<String, Object> additionalConfigurationProperties = new HashMap<>();
877908
additionalConfigurationProperties.put("containerProperties", mapOfContainerProperties);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Copyright 2016-present the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
18+
package org.springframework.cloud.stream.binder;
19+
20+
import org.springframework.expression.Expression;
21+
22+
import tools.jackson.core.JacksonException;
23+
import tools.jackson.core.JsonGenerator;
24+
import tools.jackson.databind.SerializationContext;
25+
import tools.jackson.databind.ser.std.StdSerializer;
26+
27+
/**
28+
* since 5.0.2
29+
* author Oleg Zhurakousky
30+
*/
31+
public class ExpressionSerializer extends StdSerializer<Expression> {
32+
33+
public ExpressionSerializer() {
34+
super(Expression.class);
35+
}
36+
public ExpressionSerializer(Class<Expression> t) {
37+
super(t);
38+
}
39+
@Override
40+
public void serialize(Expression expression, JsonGenerator jsonGenerator, SerializationContext provider) throws JacksonException {
41+
if (expression != null) {
42+
jsonGenerator.writeString(expression.getExpressionString());
43+
}
44+
}
45+
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ExtendedProducerProperties.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ public class ExtendedProducerProperties<T> extends ProducerProperties {
2424

2525
private T extension;
2626

27+
public ExtendedProducerProperties() {
28+
29+
}
30+
2731
public ExtendedProducerProperties(T extension) {
2832
this.extension = extension;
2933
}
@@ -32,4 +36,7 @@ public T getExtension() {
3236
return this.extension;
3337
}
3438

39+
public void setExtension(T extension) {
40+
this.extension = extension;
41+
}
3542
}

core/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/binder/ProducerProperties.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,10 @@ public void setDynamicPartitionUpdatesEnabled(boolean enabled) {
215215
this.dynamicPartitionUpdatesEnabled = enabled;
216216
}
217217

218+
/**
219+
* @deprecated since 5.0.2 in favor of top level class in the same package
220+
*/
221+
@Deprecated
218222
public static class ExpressionSerializer extends StdSerializer<Expression> {
219223
public ExpressionSerializer() {
220224
super(Expression.class);

0 commit comments

Comments
 (0)