Skip to content

Commit ec3b69e

Browse files
authored
[INLONG-11819][Sort] KV/CSV format support keep escape, using line delimiter and call back when parse field has exception (#11820)
1 parent 9c13aeb commit ec3b69e

44 files changed

Lines changed: 748 additions & 192 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@
5858
import org.apache.inlong.common.pojo.sort.dataflow.field.format.TypeInfo;
5959
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarBinaryFormatInfo;
6060
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
61+
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
6162

6263
import org.apache.flink.api.common.typeinfo.TypeInformation;
6364
import org.apache.flink.api.common.typeinfo.Types;
@@ -547,7 +548,8 @@ public static Object deserializeBasicField(
547548
String fieldName,
548549
FormatInfo fieldFormatInfo,
549550
String fieldText,
550-
String nullLiteral) {
551+
String nullLiteral,
552+
FailureHandler failureHandler) throws Exception {
551553
checkState(fieldFormatInfo instanceof BasicFormatInfo);
552554

553555
if (fieldText == null) {
@@ -573,6 +575,9 @@ public static Object deserializeBasicField(
573575
} catch (Exception e) {
574576
LOG.warn("Could not properly deserialize the " + "text "
575577
+ fieldText + " for field " + fieldName + ".", e);
578+
if (failureHandler != null) {
579+
failureHandler.onConvertingFieldFailure(fieldName, fieldText, fieldFormatInfo, e);
580+
}
576581
}
577582
return null;
578583
}

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TextFormatBuilder.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.inlong.sort.formats.base;
1919

2020
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
21+
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
2122

2223
import org.apache.flink.table.descriptors.DescriptorProperties;
2324

@@ -44,6 +45,7 @@ public abstract class TextFormatBuilder<T extends TextFormatBuilder> {
4445
protected Character quoteChar = DEFAULT_QUOTE_CHARACTER;
4546
protected String nullLiteral = DEFAULT_NULL_LITERAL;
4647
protected boolean ignoreErrors = DEFAULT_IGNORE_ERRORS;
48+
protected FailureHandler failureHandler;
4749

4850
protected TextFormatBuilder(RowFormatInfo rowFormatInfo) {
4951
this.rowFormatInfo = rowFormatInfo;
@@ -79,6 +81,11 @@ public T setIgnoreErrors(boolean ignoreErrors) {
7981
return (T) this;
8082
}
8183

84+
public T setFailureHandler(FailureHandler failureHandler) {
85+
this.failureHandler = failureHandler;
86+
return (T) this;
87+
}
88+
8289
@SuppressWarnings("unchecked")
8390
public T configure(DescriptorProperties descriptorProperties) {
8491

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/FailureHandler.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

20+
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
21+
2022
import java.io.Serializable;
2123

2224
/**
@@ -71,4 +73,16 @@ default void onParsingMsgFailure(Object msg, Exception exception) throws Excepti
7173
*/
7274
void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, Exception exception) throws Exception;
7375

76+
/**
77+
* This method is called when there is a failure occurred while converting any field to row.
78+
*
79+
* @param fieldName the filed name
80+
* @param fieldText the filed test
81+
* @param formatInfo the filed target type info
82+
* @param exception the thrown exception
83+
* @throws Exception the exception
84+
*/
85+
void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
86+
Exception exception) throws Exception;
87+
7488
}

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/IgnoreFailureHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

20+
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

@@ -47,6 +49,13 @@ public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, Excep
4749
LOG.warn("Cannot properly convert the InLongMsg ({}, {})", head, body, exception);
4850
}
4951

52+
@Override
53+
public void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
54+
Exception exception) throws Exception {
55+
LOG.warn("Cannot convert the InLongMsg Filed (fieldName = {}, formatInfo = {}, fieldText = {}),",
56+
fieldName, formatInfo, fieldText, exception);
57+
}
58+
5059
@Override
5160
public boolean isIgnoreFailure() {
5261
return true;

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/NoOpFailureHandler.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717

1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

20+
import org.apache.inlong.common.pojo.sort.dataflow.field.format.FormatInfo;
21+
2022
import org.slf4j.Logger;
2123
import org.slf4j.LoggerFactory;
2224

@@ -51,6 +53,13 @@ public void onConvertingRowFailure(InLongMsgHead head, InLongMsgBody body, Excep
5153
throw exception;
5254
}
5355

56+
@Override
57+
public void onConvertingFieldFailure(String fieldName, String fieldText, FormatInfo formatInfo,
58+
Exception exception) throws Exception {
59+
LOG.warn("Cannot convert the InLongMsg Filed (fieldName = {}, formatInfo = {}, fieldText = {}),",
60+
fieldName, formatInfo, fieldText, exception);
61+
}
62+
5463
@Override
5564
public boolean isIgnoreFailure() {
5665
return false;

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/util/StringUtils.java

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public class StringUtils {
3939
private static final int STATE_QUOTING = 16;
4040

4141
/**
42-
* @see StringUtils#splitKv(String, Character, Character, Character,Character, Character)
42+
* @see StringUtils#splitKv(String, Character, Character, Character,Character)
4343
*/
4444
public static Map<String, String> splitKv(
4545
@Nonnull String text,
@@ -48,7 +48,8 @@ public static Map<String, String> splitKv(
4848
@Nullable Character escapeChar,
4949
@Nullable Character quoteChar) {
5050
List<Map<String, String>> lines =
51-
splitKv(text, entryDelimiter, kvDelimiter, escapeChar, quoteChar, null);
51+
splitKv(text, entryDelimiter, kvDelimiter, escapeChar, quoteChar, null,
52+
true);
5253
if (lines.size() == 0) {
5354
return new HashMap<>();
5455
}
@@ -77,7 +78,8 @@ public static List<Map<String, String>> splitKv(
7778
@Nonnull Character kvDelimiter,
7879
@Nullable Character escapeChar,
7980
@Nullable Character quoteChar,
80-
@Nullable Character lineDelimiter) {
81+
@Nullable Character lineDelimiter,
82+
@Nullable boolean isDeleteEscapeChar) {
8183
Map<String, String> fields = new HashMap<>();
8284
List<Map<String, String>> lines = new ArrayList<>();
8385

@@ -158,6 +160,9 @@ public static List<Map<String, String>> splitKv(
158160
case STATE_VALUE:
159161
kvState = state;
160162
state = STATE_ESCAPING;
163+
if (!isDeleteEscapeChar) {
164+
stringBuilder.append(ch);
165+
}
161166
break;
162167
case STATE_ESCAPING:
163168
stringBuilder.append(ch);
@@ -369,7 +374,7 @@ private static void encodeText(
369374
/**
370375
* Splits a single line of csv text.
371376
*
372-
* @see StringUtils#splitCsv(String, Character, Character, Character, Character, boolean)
377+
* @see StringUtils#splitCsv(String, Character, Character, Character, Character)
373378
*/
374379
public static String[] splitCsv(
375380
@Nonnull String text,
@@ -384,28 +389,31 @@ public static String[] splitCsv(
384389
}
385390

386391
/**
387-
* @see StringUtils#splitCsv(String, Character, Character, Character, Character, boolean)
392+
* @see StringUtils#splitCsv(String, Character, Character, Character, Character)
388393
*/
389394
public static String[][] splitCsv(
390395
@Nonnull String text,
391396
@Nonnull Character delimiter,
392397
@Nullable Character escapeChar,
393398
@Nullable Character quoteChar,
394399
@Nullable Character lineDelimiter) {
395-
return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, false);
400+
return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
401+
false, true);
396402
}
397403

398404
/**
399-
* @see StringUtils#splitCsv(String, Character, Character, Character, Character, boolean, Integer)
405+
* @see StringUtils#splitCsv(String, Character, Character, Character, Character, boolean, boolean)
400406
*/
401407
public static String[][] splitCsv(
402408
@Nonnull String text,
403409
@Nonnull Character delimiter,
404410
@Nullable Character escapeChar,
405411
@Nullable Character quoteChar,
406412
@Nullable Character lineDelimiter,
407-
boolean deleteHeadDelimiter) {
408-
return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter, deleteHeadDelimiter, null);
413+
boolean deleteHeadDelimiter,
414+
boolean isDeleteEscapeChar) {
415+
return splitCsv(text, delimiter, escapeChar, quoteChar, lineDelimiter,
416+
deleteHeadDelimiter, isDeleteEscapeChar, null);
409417
}
410418

411419
/**
@@ -434,6 +442,7 @@ public static String[][] splitCsv(
434442
@Nullable Character quoteChar,
435443
@Nullable Character lineDelimiter,
436444
boolean deleteHeadDelimiter,
445+
boolean isDeleteEscapeChar,
437446
@Nullable Integer maxFieldSize) {
438447
if (maxFieldSize != null && maxFieldSize <= 0) {
439448
return new String[0][];
@@ -481,6 +490,9 @@ public static String[][] splitCsv(
481490
switch (state) {
482491
case STATE_NORMAL:
483492
state = STATE_ESCAPING;
493+
if (!isDeleteEscapeChar) {
494+
stringBuilder.append(ch);
495+
}
484496
break;
485497
case STATE_ESCAPING:
486498
stringBuilder.append(ch);

inlong-sort/sort-formats/format-common/src/test/java/org/apache/inlong/sort/formats/common/StringUtilsTest.java

Lines changed: 56 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -34,35 +34,35 @@ public class StringUtilsTest {
3434
public void testSplitKvString() {
3535

3636
String kvString1 = "name=n&age=10";
37-
Map<String, String> map1 = StringUtils.splitKv(kvString1, '&',
38-
'=', '\\', '\'');
39-
assertEquals("n", map1.get("name"));
40-
assertEquals("10", map1.get("age"));
37+
List<Map<String, String>> listMap1 = StringUtils.splitKv(kvString1, '&',
38+
'=', '\\', '\'', '\n', true);
39+
assertEquals("n", listMap1.get(0).get("name"));
40+
assertEquals("10", listMap1.get(0).get("age"));
4141

4242
String kvString2 = "name=&age=20&";
43-
Map<String, String> map2 = StringUtils.splitKv(kvString2, '&',
44-
'=', '\\', '\'');
45-
assertEquals("", map2.get("name"));
46-
assertEquals("20&", map2.get("age"));
43+
List<Map<String, String>> listMap2 = StringUtils.splitKv(kvString2, '&',
44+
'=', '\\', '\'', '\n', true);
45+
assertEquals("", listMap2.get(0).get("name"));
46+
assertEquals("20&", listMap2.get(0).get("age"));
4747

4848
String kvString3 = "name==&age=20&&&value=aaa&dddd&";
49-
Map<String, String> map3 = StringUtils.splitKv(kvString3, '&',
50-
'=', '\\', '\'');
51-
assertEquals("=", map3.get("name"));
52-
assertEquals("20&&", map3.get("age"));
53-
assertEquals("aaa&dddd&", map3.get("value"));
49+
List<Map<String, String>> listMap3 = StringUtils.splitKv(kvString3, '&',
50+
'=', '\\', '\'', '\n', true);
51+
assertEquals("=", listMap3.get(0).get("name"));
52+
assertEquals("20&&", listMap3.get(0).get("age"));
53+
assertEquals("aaa&dddd&", listMap3.get(0).get("value"));
5454

5555
String kvString4 = "name==&age=20&&\nname1==&age1=20&&";
5656
List<Map<String, String>> map4 = StringUtils.splitKv(kvString4, '&',
57-
'=', '\\', '\'', '\n');
57+
'=', '\\', '\'', '\n', true);
5858
assertEquals("=", map4.get(0).get("name"));
5959
assertEquals("20&&", map4.get(0).get("age"));
6060
assertEquals("=", map4.get(1).get("name1"));
6161
assertEquals("20&&", map4.get(1).get("age1"));
6262

6363
String kvString5 = "name==&age=20&&\nname1==&age1=20&&&value=aaa&dddd&";
6464
List<Map<String, String>> map5 = StringUtils.splitKv(kvString5, '&',
65-
'=', '\\', '\'', '\n');
65+
'=', '\\', '\'', '\n', true);
6666
assertEquals("=", map5.get(0).get("name"));
6767
assertEquals("20&&", map5.get(0).get("age"));
6868
assertEquals("=", map5.get(1).get("name1"));
@@ -71,25 +71,25 @@ public void testSplitKvString() {
7171

7272
String kvString6 = "name==&age=20&&\\";
7373
List<Map<String, String>> map6 = StringUtils.splitKv(kvString6, '&',
74-
'=', '\\', '\'', '\n');
74+
'=', '\\', '\'', '\n', true);
7575
assertEquals("=", map6.get(0).get("name"));
7676
assertEquals("20&&", map6.get(0).get("age"));
7777

7878
String kvString7 = "name==&age=20&&'";
7979
List<Map<String, String>> map7 = StringUtils.splitKv(kvString7, '&',
80-
'=', '\\', '\'', '\n');
80+
'=', '\\', '\'', '\n', true);
8181
assertEquals("=", map7.get(0).get("name"));
8282
assertEquals("20&&", map7.get(0).get("age"));
8383

8484
String kvString8 = "name=\\=&age=20&a&'";
8585
List<Map<String, String>> map8 = StringUtils.splitKv(kvString8, '&',
86-
'=', '\\', '\'', '\n');
86+
'=', '\\', '\'', '\n', true);
8787
assertEquals("=", map8.get(0).get("name"));
8888
assertEquals("20&a&", map8.get(0).get("age"));
8989

9090
String kvString9 = "name=\\=&age=20&a\\&'";
9191
List<Map<String, String>> map9 = StringUtils.splitKv(kvString9, '&',
92-
'=', '\\', '\'', '\n');
92+
'=', '\\', '\'', '\n', true);
9393
assertEquals("=", map8.get(0).get("name"));
9494
assertEquals("20&a&", map8.get(0).get("age"));
9595
}
@@ -115,30 +115,55 @@ public void testSplitCsvString() {
115115
assertEquals("home", csv1Array2[2][2]);
116116
}
117117

118+
@Test
119+
public void testSplitCsvStringWhiteEscape() {
120+
String csvString1 = "name|age=20\\||&'";
121+
String[][] csv1Array1 = StringUtils.splitCsv(csvString1, '|',
122+
'\\', '\'', '\n', false, false);
123+
124+
assertEquals("age=20\\|", csv1Array1[0][1]);
125+
assertEquals("&", csv1Array1[0][2]);
126+
127+
String csvString2 = "name|age=20\\||&'\n\name|age=20\\||&'\n\n|home|\\home\\";
128+
String[][] csv1Array2 = StringUtils.splitCsv(csvString2, '|',
129+
'\\', '\'', '\n', false, false);
130+
131+
assertEquals("name", csv1Array2[0][0]);
132+
assertEquals("age=20\\|", csv1Array2[0][1]);
133+
assertEquals("&\n\name|age=20\\||&", csv1Array2[0][2]);
134+
assertEquals("", csv1Array2[2][0]);
135+
assertEquals("home", csv1Array2[2][1]);
136+
assertEquals("\\home\\", csv1Array2[2][2]);
137+
}
138+
118139
@Test
119140
public void testSplitCsvStringWithMaxFields() {
120141

121142
String csvString = "name|age=20\\||&'\n\name|age=20\\||&'\n\n|home|\\home\\";
122143
String[][] csv1Array0 = StringUtils.splitCsv(csvString, '|',
123-
'\\', '\'', '\n', false, 0);
144+
'\\', '\'', '\n', false, true,
145+
0);
124146
assertEquals(0, csv1Array0.length);
125147

126148
String[][] csv1Array1 = StringUtils.splitCsv(csvString, '|',
127-
'\\', '\'', '\n', false, 1);
149+
'\\', '\'', '\n', false, true,
150+
1);
128151
assertEquals("name|age=20\\||&'\n\name|age=20\\||&'", csv1Array1[0][0]);
129152
assertEquals("", csv1Array1[1][0]);
130153
assertEquals("|home|\\home\\", csv1Array1[2][0]);
131154

132155
String[][] csv1Array2 = StringUtils.splitCsv(csvString, '|',
133-
'\\', '\'', '\n', false, 2);
156+
'\\', '\'', '\n', false, true,
157+
2);
134158
assertEquals("name", csv1Array2[0][0]);
135159
assertEquals("age=20\\||&'\n\name|age=20\\||&'", csv1Array2[0][1]);
136160
assertEquals("", csv1Array2[1][0]);
137161
assertEquals("", csv1Array2[2][0]);
138162
assertEquals("home|\\home\\", csv1Array2[2][1]);
139163

140164
String[][] csv1Array3 = StringUtils.splitCsv(csvString, '|',
141-
'\\', '\'', '\n', false, 3);
165+
'\\', '\'', '\n', false, true,
166+
3);
142167
assertEquals("name", csv1Array3[0][0]);
143168
assertEquals("age=20|", csv1Array3[0][1]);
144169
assertEquals("&\n\name|age=20\\||&", csv1Array3[0][2]);
@@ -147,7 +172,8 @@ public void testSplitCsvStringWithMaxFields() {
147172
assertEquals("home", csv1Array3[2][2]);
148173

149174
String[][] csv1Array4 = StringUtils.splitCsv(csvString, '|',
150-
'\\', '\'', '\n', false, 4);
175+
'\\', '\'', '\n', false, true,
176+
4);
151177
assertEquals("name", csv1Array4[0][0]);
152178
assertEquals("age=20|", csv1Array4[0][1]);
153179
assertEquals("&\n\name|age=20\\||&", csv1Array4[0][2]);
@@ -159,9 +185,11 @@ public void testSplitCsvStringWithMaxFields() {
159185
@Test
160186
public void testKvScapeCharSplit() {
161187
String text = "k1=v1&\nk\\2=v2\\&&k3=v3";
162-
Map<String, String> kvMap = splitKv(text, '&', '=', '\\', null);
163-
Assert.assertTrue(kvMap != null && kvMap.size() == 3);
164-
Assert.assertTrue(kvMap.get("k3") != null);
165-
Assert.assertTrue(kvMap.get("\nk2") != null);
188+
List<Map<String, String>> kvMapList = splitKv(text, '&', '=', '\\',
189+
null, '\n', false);
190+
Assert.assertTrue(kvMapList != null && kvMapList.size() == 2);
191+
Assert.assertTrue(kvMapList.get(0).get("k3") == null);
192+
Assert.assertTrue(kvMapList.get(1).get("\nk2") == null);
193+
Assert.assertTrue(kvMapList.get(1).get("k\\2") != null);
166194
}
167195
}

0 commit comments

Comments
 (0)