Skip to content

Commit e101491

Browse files
authored
[INLONG-12065][Sort] Sort Format supports outputting complete row information when errors occur in field parsing (#12066)
1 parent 6ae6636 commit e101491

25 files changed

Lines changed: 243 additions & 164 deletions

File tree

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java

Lines changed: 48 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -59,8 +59,9 @@
5959
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
6060
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
6161
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
62+
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
63+
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgHead;
6264

63-
import org.apache.commons.lang3.StringUtils;
6465
import org.apache.flink.api.common.typeinfo.TypeInformation;
6566
import org.apache.flink.api.common.typeinfo.Types;
6667
import org.apache.flink.table.api.DataTypes;
@@ -551,6 +552,22 @@ public static Object deserializeBasicField(
551552
String fieldText,
552553
String nullLiteral,
553554
FailureHandler failureHandler) throws Exception {
555+
return deserializeBasicField(fieldName, fieldFormatInfo, fieldText, nullLiteral,
556+
null, null, null, failureHandler);
557+
}
558+
559+
/**
560+
* Deserializes the basic field.
561+
*/
562+
public static Object deserializeBasicField(
563+
String fieldName,
564+
FormatInfo fieldFormatInfo,
565+
String fieldText,
566+
String nullLiteral,
567+
InLongMsgHead head,
568+
InLongMsgBody inLongMsgBody,
569+
String originBody,
570+
FailureHandler failureHandler) throws Exception {
554571
checkState(fieldFormatInfo instanceof BasicFormatInfo);
555572

556573
if (fieldText == null) {
@@ -574,42 +591,49 @@ public static Object deserializeBasicField(
574591
try {
575592
return ((BasicFormatInfo<?>) fieldFormatInfo).deserialize(fieldText);
576593
} catch (Exception e) {
577-
LOG.warn("Could not properly deserialize the " + "text "
578-
+ fieldText + " for field " + fieldName + ".", e);
579594
if (failureHandler != null) {
580-
failureHandler.onConvertingFieldFailure(fieldName, fieldText, fieldFormatInfo, e);
595+
failureHandler.onConvertingFieldFailure(fieldName, fieldText, fieldFormatInfo,
596+
head, inLongMsgBody, originBody, e);
597+
} else {
598+
LOG.warn("Could not properly deserialize the" + "text: {},for field:{}"
599+
+ ". predefinedFields = {},fields = {}, attr={}, originBody={}",
600+
fieldText, fieldName,
601+
head == null ? "" : head.getPredefinedFields(),
602+
inLongMsgBody == null ? "" : inLongMsgBody.getFields(),
603+
head == null ? "" : head.getAttributes(),
604+
originBody == null ? (inLongMsgBody == null ? "" : new String(inLongMsgBody.getDataBytes()))
605+
: originBody,
606+
e);
581607
}
582608
}
583609
return null;
584610
}
585611

586612
public static long getFormatValueLength(FormatInfo fieldFormatInfo, String fieldText) {
587-
if (fieldFormatInfo instanceof BooleanFormatInfo) {
588-
return 4;
589-
} else if (fieldFormatInfo instanceof ByteFormatInfo) {
590-
return 4;
591-
} else if (fieldFormatInfo instanceof BooleanFormatInfo) {
592-
return 4;
593-
} else if (fieldFormatInfo instanceof ShortFormatInfo) {
594-
return 4;
595-
} else if (fieldFormatInfo instanceof IntFormatInfo) {
596-
return 4;
613+
if (fieldFormatInfo instanceof StringFormatInfo) {
614+
return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
597615
} else if (fieldFormatInfo instanceof LongFormatInfo) {
598-
return 8;
599-
} else if (fieldFormatInfo instanceof FloatFormatInfo) {
600-
return 8;
616+
return 24;
617+
} else if (fieldFormatInfo instanceof IntFormatInfo) {
618+
return 16;
601619
} else if (fieldFormatInfo instanceof DoubleFormatInfo) {
602-
return 8;
603-
} else if (fieldFormatInfo instanceof DecimalFormatInfo) {
604-
return 8;
620+
return 24;
621+
} else if (fieldFormatInfo instanceof FloatFormatInfo) {
622+
return 16;
605623
} else if (fieldFormatInfo instanceof DateFormatInfo
606624
|| fieldFormatInfo instanceof TimeFormatInfo
607625
|| fieldFormatInfo instanceof TimestampFormatInfo) {
608-
return 8;
609-
} else if (StringUtils.isNotEmpty(fieldText)) {
610-
return fieldText.length();
626+
return 24;
627+
} else if (fieldFormatInfo instanceof BooleanFormatInfo) {
628+
return 16;
629+
} else if (fieldFormatInfo instanceof ByteFormatInfo) {
630+
return 16;
631+
} else if (fieldFormatInfo instanceof ShortFormatInfo) {
632+
return 16;
633+
} else if (fieldFormatInfo instanceof DecimalFormatInfo) {
634+
return 24;
611635
}
612-
return 0L;
636+
return 42 + 2L * (fieldText == null ? 0 : fieldText.length());
613637
}
614638

615639
/**

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,4 +85,34 @@ default void onParsingMsgFailure(Object msg, Exception exception) throws Excepti
8585
void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
8686
Exception exception) throws Exception;
8787

88+
/**
89+
* This method is called when there is a failure occurred while converting any field to row.
90+
*
91+
* @param fieldName the filed name
92+
* @param fieldText the filed test
93+
* @param formatInfo the filed target type info
94+
* @param exception the thrown exception
95+
* @param head the predefined fields
96+
* @param inLongMsgBody the fields
97+
* @param originBody the origin body
98+
* @throws Exception the exception
99+
*/
100+
default void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
101+
InLongMsgHead head, InLongMsgBody inLongMsgBody, String originBody,
102+
Exception exception) throws Exception {
103+
onConvertingFieldFailure(fieldName, fieldText, formatInfo, exception);
104+
}
105+
106+
/**
107+
* This method is called when there is a failure occurred while field num error.
108+
*
109+
* @param predefinedFields predefined fields
110+
* @param originBodyBytes origin body bytes
111+
* @param originBody origin body
112+
* @param actualNumFields actual number of fields
113+
* @param fieldNameSize expected number of fields
114+
*/
115+
default void onFieldNumError(String predefinedFields, byte[] originBodyBytes, String originBody,
116+
int actualNumFields, int fieldNameSize) {
117+
}
88118
}

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

20+
import lombok.Data;
21+
import org.apache.commons.lang3.StringUtils;
22+
2023
import java.io.Serializable;
2124
import java.util.Arrays;
2225
import java.util.List;
@@ -25,14 +28,20 @@
2528
/**
2629
* The body deserialized from {@link InLongMsgBody}.
2730
*/
31+
@Data
2832
public class InLongMsgBody implements Serializable {
2933

3034
private static final long serialVersionUID = 1L;
3135

3236
/**
3337
* The body of the record.
3438
*/
35-
private final byte[] data;
39+
private final byte[] dataBytes;
40+
41+
/**
42+
* The body of the record.
43+
*/
44+
private final String data;
3645

3746
/**
3847
* The interface of the record.
@@ -50,32 +59,18 @@ public class InLongMsgBody implements Serializable {
5059
private final Map<String, String> entries;
5160

5261
public InLongMsgBody(
53-
byte[] data,
62+
byte[] dataBytes,
63+
String data,
5464
String streamId,
5565
List<String> fields,
5666
Map<String, String> entries) {
67+
this.dataBytes = dataBytes;
5768
this.data = data;
5869
this.streamId = streamId;
5970
this.fields = fields;
6071
this.entries = entries;
6172
}
6273

63-
public byte[] getData() {
64-
return data;
65-
}
66-
67-
public String getStreamId() {
68-
return streamId;
69-
}
70-
71-
public List<String> getFields() {
72-
return fields;
73-
}
74-
75-
public Map<String, String> getEntries() {
76-
return entries;
77-
}
78-
7974
@Override
8075
public boolean equals(Object o) {
8176
if (this == o) {
@@ -87,17 +82,22 @@ public boolean equals(Object o) {
8782
}
8883

8984
InLongMsgBody inLongMsgBody = (InLongMsgBody) o;
90-
return Arrays.equals(data, inLongMsgBody.data);
85+
return StringUtils.equals(data, inLongMsgBody.data)
86+
&& Arrays.equals(dataBytes, inLongMsgBody.dataBytes);
9187
}
9288

9389
@Override
9490
public int hashCode() {
95-
return Arrays.hashCode(data);
91+
if (dataBytes != null) {
92+
return Arrays.hashCode(dataBytes);
93+
}
94+
return data == null ? super.hashCode() : data.hashCode();
9695
}
9796

9897
@Override
9998
public String toString() {
100-
return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", streamId='" + streamId + '\''
99+
return "InLongMsgBody{" + "data=" + (data == null ? new String(dataBytes) : data)
100+
+ ", streamId='" + streamId + '\''
101101
+ ", fields=" + fields + ", entries=" + entries + '}';
102102
}
103103
}

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgWrap.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,27 @@
1717

1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

20+
import lombok.Data;
21+
2022
import java.io.Serializable;
2123
import java.util.List;
2224

2325
/**
2426
* The body deserialized from {@link InLongMsgWrap}.
2527
*/
28+
@Data
2629
public class InLongMsgWrap implements Serializable {
2730

2831
private final InLongMsgHead inLongMsgHead;
2932

3033
private final List<InLongMsgBody> inLongMsgBodyList;
3134

32-
public InLongMsgWrap(InLongMsgHead inLongMsgHead, List<InLongMsgBody> inLongMsgBodyList) {
35+
private final byte[] originBody;
36+
37+
public InLongMsgWrap(InLongMsgHead inLongMsgHead,
38+
List<InLongMsgBody> inLongMsgBodyList, byte[] originBody) {
3339
this.inLongMsgHead = inLongMsgHead;
3440
this.inLongMsgBodyList = inLongMsgBodyList;
35-
}
36-
37-
public InLongMsgHead getInLongMsgHead() {
38-
return inLongMsgHead;
39-
}
40-
41-
public List<InLongMsgBody> getInLongMsgBodyList() {
42-
return inLongMsgBodyList;
41+
this.originBody = originBody;
4342
}
4443
}

inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) throws E
158158
attributesFieldName,
159159
metadataFieldName,
160160
head.getAttributes(),
161-
body.getData(),
161+
body.getDataBytes(),
162162
includeUpdateBefore);
163163
}
164164

inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,7 @@ public static InLongMsgBody parseBody(byte[] bytes) {
138138
return new InLongMsgBody(
139139
bytes,
140140
null,
141+
null,
141142
Collections.emptyList(),
142143
Collections.emptyMap());
143144
}

inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,8 @@ public static List<InLongMsgBody> parseBodyList(
116116
// Only parsed fields will be used by downstream, so it's safe to leave
117117
// the other parameters empty.
118118
return new InLongMsgBody(
119-
null,
119+
bytes,
120+
bodyStr,
120121
null,
121122
Arrays.asList(line),
122123
Collections.emptyMap());

inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,7 @@ public static List<InLongMsgBody> parseBodyList(
106106
return list.stream().map((line) -> {
107107
return new InLongMsgBody(
108108
bytes,
109+
text,
109110
null,
110111
Collections.emptyList(),
111112
line);

inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static InLongMsgBody parseBody(
9494
List<String> fields =
9595
Arrays.stream(segments, (isIncludeFirstSegment ? 0 : 1), segments.length).collect(Collectors.toList());
9696

97-
return new InLongMsgBody(bytes, streamId, fields, Collections.emptyMap());
97+
return new InLongMsgBody(bytes, text, streamId, fields, Collections.emptyMap());
9898
}
9999

100100
/**

inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public static InLongMsgBody parseBody(
9494
entries = Collections.emptyMap();
9595
}
9696

97-
return new InLongMsgBody(bytes, streamId, Collections.emptyList(), entries);
97+
return new InLongMsgBody(bytes, text, streamId, Collections.emptyList(), entries);
9898
}
9999

100100
/**

0 commit comments

Comments
 (0)