Skip to content

Commit 8a5f57a

Browse files
authored
JsonToRowTest: fix race condition (#36073)
* fix race condition * fix a few more calls and comments * fix sizelimit * fix spotless * add back increaseDefaultStreamReadConstraints method for backwards capability * revise per gemini recommendations
1 parent ab549a0 commit 8a5f57a

2 files changed

Lines changed: 48 additions & 6 deletions

File tree

sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonUtils.java

Lines changed: 47 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
*/
1818
package org.apache.beam.sdk.util;
1919

20+
import com.fasterxml.jackson.core.JsonFactory;
2021
import com.fasterxml.jackson.core.JsonParseException;
2122
import com.fasterxml.jackson.core.JsonProcessingException;
2223
import com.fasterxml.jackson.databind.JsonMappingException;
@@ -34,9 +35,14 @@
3435
@Internal
3536
public class RowJsonUtils {
3637

38+
// The maximum string length for the JSON parser, set to 100 MB.
39+
public static final int MAX_STRING_LENGTH = 100 * 1024 * 1024;
40+
3741
//
3842
private static int defaultBufferLimit;
3943

44+
private static final boolean STREAM_READ_CONSTRAINTS_AVAILABLE = streamReadConstraintsAvailable();
45+
4046
/**
4147
* Increase the default jackson-databind stream read constraint.
4248
*
@@ -63,14 +69,52 @@ public static void increaseDefaultStreamReadConstraints(int newLimit) {
6369
}
6470

6571
static {
66-
increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
72+
increaseDefaultStreamReadConstraints(MAX_STRING_LENGTH);
73+
}
74+
75+
private static boolean streamReadConstraintsAvailable() {
76+
try {
77+
Class.forName("com.fasterxml.jackson.core.StreamReadConstraints");
78+
return true;
79+
} catch (ClassNotFoundException e) {
80+
return false;
81+
}
82+
}
83+
84+
private static class StreamReadConstraintsHelper {
85+
static void setStreamReadConstraints(JsonFactory jsonFactory, int sizeLimit) {
86+
com.fasterxml.jackson.core.StreamReadConstraints streamReadConstraints =
87+
com.fasterxml.jackson.core.StreamReadConstraints.builder()
88+
.maxStringLength(sizeLimit)
89+
.build();
90+
jsonFactory.setStreamReadConstraints(streamReadConstraints);
91+
}
92+
}
93+
94+
/**
95+
* Creates a thread-safe JsonFactory with custom stream read constraints.
96+
*
97+
* <p>This method encapsulates the logic to increase the default jackson-databind stream read
98+
* constraint to 100MB. This functionality was introduced in Jackson 2.15 causing string > 20MB
99+
* (5MB in <2.15.0) parsing failure. This has caused regressions in its dependencies including
100+
* Beam. Here we create a streamReadConstraints minimum size limit set to 100MB and exposing the
101+
* factory to higher limits. If needed, call this method during pipeline run time, e.g. in
102+
* DoFn.setup. This avoids a data race caused by modifying the global default settings.
103+
*/
104+
public static JsonFactory createJsonFactory(int sizeLimit) {
105+
sizeLimit = Math.max(sizeLimit, MAX_STRING_LENGTH);
106+
JsonFactory jsonFactory = new JsonFactory();
107+
if (STREAM_READ_CONSTRAINTS_AVAILABLE) {
108+
StreamReadConstraintsHelper.setStreamReadConstraints(jsonFactory, sizeLimit);
109+
}
110+
return jsonFactory;
67111
}
68112

69113
public static ObjectMapper newObjectMapperWith(RowJson.RowJsonDeserializer deserializer) {
70114
SimpleModule module = new SimpleModule("rowDeserializationModule");
71115
module.addDeserializer(Row.class, deserializer);
72116

73-
ObjectMapper objectMapper = new ObjectMapper();
117+
ObjectMapper objectMapper = new ObjectMapper(createJsonFactory(MAX_STRING_LENGTH));
74118
objectMapper.registerModule(module);
75119

76120
return objectMapper;
@@ -80,7 +124,7 @@ public static ObjectMapper newObjectMapperWith(RowJson.RowJsonSerializer seriali
80124
SimpleModule module = new SimpleModule("rowSerializationModule");
81125
module.addSerializer(Row.class, serializer);
82126

83-
ObjectMapper objectMapper = new ObjectMapper();
127+
ObjectMapper objectMapper = new ObjectMapper(createJsonFactory(MAX_STRING_LENGTH));
84128
objectMapper.registerModule(module);
85129

86130
return objectMapper;

sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/TableRowJsonCoder.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -75,10 +75,8 @@ public long getEncodedElementByteSize(TableRow value) throws Exception {
7575
private static final TypeDescriptor<TableRow> TYPE_DESCRIPTOR;
7676

7777
static {
78-
RowJsonUtils.increaseDefaultStreamReadConstraints(100 * 1024 * 1024);
79-
8078
MAPPER =
81-
new ObjectMapper()
79+
new ObjectMapper(RowJsonUtils.createJsonFactory(RowJsonUtils.MAX_STRING_LENGTH))
8280
.registerModule(new JavaTimeModule())
8381
.registerModule(new JodaModule())
8482
.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)

0 commit comments

Comments
 (0)