Skip to content

Commit 624a009

Browse files
authored
[INLONG-11966][Sort]The deserialization process supports returning the data byte size in one rowdata (#11967)
1 parent c0377bd commit 624a009

20 files changed

Lines changed: 847 additions & 17 deletions

File tree

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.sort.formats.base;
19+
20+
import lombok.Data;
21+
import org.apache.flink.table.data.RowData;
22+
23+
@Data
24+
public class FormatMsg {
25+
26+
private RowData rowData;
27+
private long rowDataLength;
28+
29+
public FormatMsg(RowData rowData, long rowDataLength) {
30+
this.rowData = rowData;
31+
this.rowDataLength = rowDataLength;
32+
}
33+
}

inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/base/TableFormatUtils.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@
6060
import org.apache.inlong.common.pojo.sort.dataflow.field.format.VarCharFormatInfo;
6161
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
6262

63+
import org.apache.commons.lang3.StringUtils;
6364
import org.apache.flink.api.common.typeinfo.TypeInformation;
6465
import org.apache.flink.api.common.typeinfo.Types;
6566
import org.apache.flink.table.api.DataTypes;
@@ -582,6 +583,35 @@ public static Object deserializeBasicField(
582583
return null;
583584
}
584585

586+
public static long getFormatValueLength(FormatInfo fieldFormatInfo, String fieldText) {
587+
if (fieldFormatInfo instanceof BooleanFormatInfo) {
588+
return 4;
589+
} else if (fieldFormatInfo instanceof ByteFormatInfo) {
590+
return 4;
591+
} else if (fieldFormatInfo instanceof BooleanFormatInfo) {
592+
return 4;
593+
} else if (fieldFormatInfo instanceof ShortFormatInfo) {
594+
return 4;
595+
} else if (fieldFormatInfo instanceof IntFormatInfo) {
596+
return 4;
597+
} else if (fieldFormatInfo instanceof LongFormatInfo) {
598+
return 8;
599+
} else if (fieldFormatInfo instanceof FloatFormatInfo) {
600+
return 8;
601+
} else if (fieldFormatInfo instanceof DoubleFormatInfo) {
602+
return 8;
603+
} else if (fieldFormatInfo instanceof DecimalFormatInfo) {
604+
return 8;
605+
} else if (fieldFormatInfo instanceof DateFormatInfo
606+
|| fieldFormatInfo instanceof TimeFormatInfo
607+
|| fieldFormatInfo instanceof TimestampFormatInfo) {
608+
return 8;
609+
} else if (StringUtils.isNotEmpty(fieldText)) {
610+
return fieldText.length();
611+
}
612+
return 0L;
613+
}
614+
585615
/**
586616
* Serializes the basic field.
587617
*/

inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgDeserializationSchema.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

20+
import org.apache.inlong.sort.formats.base.FormatMsg;
2021
import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
2122

2223
import org.apache.flink.api.common.serialization.DeserializationSchema;
@@ -78,6 +79,14 @@ public void deserialize(byte[] message, Collector<RowData> out) {
7879
}
7980
}
8081

82+
public void deserializeFormatMsg(byte[] message, Collector<FormatMsg> out) {
83+
try {
84+
formatDeserializer.flatFormatMsgMap(message, out);
85+
} catch (Exception e) {
86+
throw new RuntimeException(e);
87+
}
88+
}
89+
8190
public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
8291
return formatDeserializer.preParse(bytes);
8392
}
@@ -88,6 +97,11 @@ public void parse(
8897
formatDeserializer.parse(inLongMsgWrap, collector);
8998
}
9099

100+
public void parseFormatMsg(InLongMsgWrap inLongMsgWrap,
101+
Collector<FormatMsg> collector) throws Exception {
102+
formatDeserializer.parseFormatMsg(inLongMsgWrap, collector);
103+
}
104+
91105
@Override
92106
public boolean isEndOfStream(RowData rowData) {
93107
return false;

inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgFormatDeserializer.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.inlong.sort.formats.inlongmsg;
1919

2020
import org.apache.inlong.common.msg.InLongMsg;
21+
import org.apache.inlong.sort.formats.base.FormatMsg;
2122
import org.apache.inlong.sort.formats.metrics.FormatMetricGroup;
2223

2324
import org.apache.commons.lang3.StringUtils;
@@ -32,6 +33,7 @@
3233

3334
import java.io.IOException;
3435
import java.io.Serializable;
36+
import java.time.Instant;
3537
import java.util.ArrayList;
3638
import java.util.Iterator;
3739
import java.util.List;
@@ -45,6 +47,10 @@ public abstract class AbstractInLongMsgFormatDeserializer implements ResultTypeQ
4547

4648
private static final Logger LOG = LoggerFactory.getLogger(AbstractInLongMsgFormatDeserializer.class);
4749

50+
protected long lastPrintTimestamp = 0L;
51+
protected long PRINT_TIMESTAMP_INTERVAL = 60 * 1000L;
52+
protected int fieldNameSize = 0;
53+
4854
protected FailureHandler failureHandler;
4955

5056
/**
@@ -76,6 +82,17 @@ public AbstractInLongMsgFormatDeserializer(@Nonnull FailureHandler failureHandle
7682
*/
7783
protected abstract List<RowData> convertRowDataList(InLongMsgHead head, InLongMsgBody body) throws Exception;
7884

85+
protected abstract List<FormatMsg> convertFormatMsgList(InLongMsgHead head, InLongMsgBody body) throws Exception;
86+
87+
protected boolean needPrint() {
88+
long now = Instant.now().toEpochMilli();
89+
if (now - lastPrintTimestamp > PRINT_TIMESTAMP_INTERVAL) {
90+
lastPrintTimestamp = now;
91+
return true;
92+
}
93+
return false;
94+
}
95+
7996
public void flatMap(
8097
byte[] bytes,
8198
Collector<RowData> collector) throws Exception {
@@ -84,6 +101,14 @@ public void flatMap(
84101
}
85102
}
86103

104+
public void flatFormatMsgMap(
105+
byte[] bytes,
106+
Collector<FormatMsg> collector) throws Exception {
107+
for (InLongMsgWrap inLongMsgWrap : preParse(bytes)) {
108+
parseFormatMsg(inLongMsgWrap, collector);
109+
}
110+
}
111+
87112
public List<InLongMsgWrap> preParse(byte[] bytes) throws Exception {
88113
final List<InLongMsgWrap> result = new ArrayList<>();
89114

@@ -160,6 +185,26 @@ public void parse(InLongMsgWrap inLongMsgWrap, Collector<RowData> collector) thr
160185
}
161186
}
162187

188+
public void parseFormatMsg(InLongMsgWrap inLongMsgWrap, Collector<FormatMsg> collector) throws Exception {
189+
InLongMsgHead inLongMsgHead = inLongMsgWrap.getInLongMsgHead();
190+
191+
for (InLongMsgBody inLongMsgBody : inLongMsgWrap.getInLongMsgBodyList()) {
192+
List<FormatMsg> formatMsgList;
193+
try {
194+
formatMsgList = convertFormatMsgList(inLongMsgHead, inLongMsgBody);
195+
} catch (Exception e) {
196+
reportDeSerializeErrorMetrics();
197+
failureHandler.onConvertingRowFailure(inLongMsgHead, inLongMsgBody, e);
198+
continue;
199+
}
200+
if (formatMsgList != null) {
201+
for (FormatMsg formatMsg : formatMsgList) {
202+
collector.collect(formatMsg);
203+
}
204+
}
205+
}
206+
}
207+
163208
@Override
164209
public boolean equals(Object o) {
165210
if (this == o) {

inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
2121
import org.apache.inlong.sort.formats.base.FieldToRowDataConverters;
22+
import org.apache.inlong.sort.formats.base.FormatMsg;
2223
import org.apache.inlong.sort.formats.base.TableFormatUtils;
2324

2425
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -400,6 +401,17 @@ public static GenericRowData decorateRowWithMetaData(
400401
return producedRow;
401402
}
402403

404+
public static FormatMsg decorateFormatMsgWithNeededHeadFields(
405+
@Nullable String timeFieldName,
406+
@Nullable String attributesFieldName,
407+
Timestamp time,
408+
Map<String, String> attributes,
409+
FormatMsg formatMsg) {
410+
formatMsg.setRowData(decorateRowDataWithNeededHeadFields(timeFieldName, attributesFieldName, time, attributes,
411+
(GenericRowData) formatMsg.getRowData()));
412+
return formatMsg;
413+
}
414+
403415
public static GenericRowData decorateRowDataWithNeededHeadFields(
404416
@Nullable String timeFieldName,
405417
@Nullable String attributesFieldName,

inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatDeserializer.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.inlong.sort.formats.inlongmsgbinlog;
1919

2020
import org.apache.inlong.common.pojo.sort.dataflow.field.format.RowFormatInfo;
21+
import org.apache.inlong.sort.formats.base.FormatMsg;
2122
import org.apache.inlong.sort.formats.inlongmsg.AbstractInLongMsgFormatDeserializer;
2223
import org.apache.inlong.sort.formats.inlongmsg.FailureHandler;
2324
import org.apache.inlong.sort.formats.inlongmsg.InLongMsgBody;
@@ -154,6 +155,19 @@ protected List<RowData> convertRowDataList(InLongMsgHead head, InLongMsgBody bod
154155
failureHandler);
155156
}
156157

158+
@Override
159+
protected List<FormatMsg> convertFormatMsgList(InLongMsgHead head, InLongMsgBody body) throws Exception {
160+
return InLongMsgBinlogUtils.getFormatMsgData(
161+
rowFormatInfo,
162+
timeFieldName,
163+
attributesFieldName,
164+
metadataFieldName,
165+
head.getAttributes(),
166+
body.getData(),
167+
includeUpdateBefore,
168+
failureHandler);
169+
}
170+
157171
@Override
158172
public TypeInformation<RowData> getProducedType() {
159173
return InLongMsgBinlogUtils.getRowType(

0 commit comments

Comments
 (0)