Skip to content

Commit 4c9da1b

Browse files
authored
[INLONG-11807][Sort] Support exactly metric report in mysql-cdc case (#11808)
1 parent 1dfb1a2 commit 4c9da1b

2 files changed

Lines changed: 133 additions & 13 deletions

File tree

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.sort.base.metric;
19+
20+
import org.apache.inlong.audit.AuditReporterImpl;
21+
22+
import lombok.extern.slf4j.Slf4j;
23+
import org.apache.commons.collections.CollectionUtils;
24+
import org.apache.flink.table.data.RowData;
25+
import org.apache.flink.types.RowKind;
26+
27+
import java.io.Serializable;
28+
import java.util.HashMap;
29+
import java.util.List;
30+
import java.util.Map;
31+
32+
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
33+
import static org.apache.inlong.common.constant.Constants.DEFAULT_AUDIT_VERSION;
34+
import static org.apache.inlong.sort.base.Constants.GROUP_ID;
35+
import static org.apache.inlong.sort.base.Constants.STREAM_ID;
36+
import static org.apache.inlong.sort.base.util.CalculateObjectSizeUtils.getDataSize;
37+
38+
@Slf4j
39+
public class CdcExactlyMetric implements Serializable, SourceMetricsReporter {
40+
41+
private final Map<String, String> labels;
42+
private final Map<RowKind, Integer> auditKeyMap;
43+
private final String groupId;
44+
private final String streamId;
45+
46+
private AuditReporterImpl auditReporter;
47+
private Long currentCheckpointId = 0L;
48+
private Long lastCheckpointId = 0L;
49+
50+
public CdcExactlyMetric(MetricOption option) {
51+
this.labels = option.getLabels();
52+
this.groupId = labels.get(GROUP_ID);
53+
this.streamId = labels.get(STREAM_ID);
54+
this.auditKeyMap = new HashMap<>();
55+
56+
if (option.getIpPorts().isPresent()) {
57+
auditReporter = new AuditReporterImpl();
58+
auditReporter.setAutoFlush(false);
59+
auditReporter.setAuditProxy(option.getIpPortSet());
60+
List<Integer> auditKeys = option.getInlongAuditKeys();
61+
62+
if (CollectionUtils.isEmpty(auditKeys)) {
63+
log.warn("inlong audit keys is empty");
64+
} else if (auditKeys.size() == 1) {
65+
auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
66+
log.warn("only the insert audit key is set, the update and delete audit will be ignored");
67+
} else if (auditKeys.size() == 4) {
68+
auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
69+
auditKeyMap.put(RowKind.UPDATE_BEFORE, auditKeys.get(1));
70+
auditKeyMap.put(RowKind.UPDATE_AFTER, auditKeys.get(2));
71+
auditKeyMap.put(RowKind.DELETE, auditKeys.get(3));
72+
} else {
73+
throw new IllegalArgumentException("audit key size must be 1 or 4");
74+
}
75+
}
76+
log.info("CdcExactlyMetric init, groupId: {}, streamId: {}, audit key: {}", groupId, streamId, auditKeyMap);
77+
}
78+
79+
@Override
80+
public void outputMetricsWithEstimate(Object data, long dataTime) {
81+
long size = getDataSize(data);
82+
if (data instanceof RowData) {
83+
RowData rowData = (RowData) data;
84+
RowKind rowKind = rowData.getRowKind();
85+
int key = auditKeyMap.get(rowKind);
86+
outputMetrics(1, size, dataTime, key);
87+
} else {
88+
outputMetrics(1, size, dataTime, auditKeyMap.get(RowKind.INSERT));
89+
}
90+
}
91+
92+
public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime, int key) {
93+
if (auditReporter != null) {
94+
auditReporter.add(
95+
this.currentCheckpointId,
96+
key,
97+
DEFAULT_AUDIT_TAG,
98+
groupId,
99+
streamId,
100+
dataTime,
101+
rowCountSize,
102+
rowDataSize,
103+
DEFAULT_AUDIT_VERSION);
104+
}
105+
}
106+
107+
public void updateLastCheckpointId(Long checkpointId) {
108+
lastCheckpointId = checkpointId;
109+
}
110+
111+
public void updateCurrentCheckpointId(Long checkpointId) {
112+
currentCheckpointId = checkpointId;
113+
}
114+
115+
public void flushAudit() {
116+
if (auditReporter != null) {
117+
auditReporter.flush(lastCheckpointId);
118+
}
119+
}
120+
}

inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717

1818
package org.apache.inlong.sort.mysql;
1919

20+
import org.apache.inlong.sort.base.metric.CdcExactlyMetric;
2021
import org.apache.inlong.sort.base.metric.MetricOption;
2122
import org.apache.inlong.sort.base.metric.MetricsCollector;
22-
import org.apache.inlong.sort.base.metric.SourceExactlyMetric;
2323

2424
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
2525
import com.ververica.cdc.debezium.table.AppendMetadataCollector;
@@ -102,7 +102,7 @@ public interface ValueValidator extends Serializable {
102102

103103
/** Changelog Mode to use for encoding changes in Flink internal data structure. */
104104
private final DebeziumChangelogMode changelogMode;
105-
private SourceExactlyMetric sourceExactlyMetric;
105+
private CdcExactlyMetric cdcExactlyMetric;
106106
private final MetricOption metricOption;
107107

108108
/** Returns a builder to build {@link RowDataDebeziumDeserializeSchema}. */
@@ -141,15 +141,15 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
141141
GenericRowData insert = extractAfterRow(value, valueSchema);
142142
validator.validate(insert, RowKind.INSERT);
143143
insert.setRowKind(RowKind.INSERT);
144-
if (sourceExactlyMetric != null) {
144+
if (cdcExactlyMetric != null) {
145145
out = createMetricsCollector(record, out);
146146
}
147147
emit(record, insert, out);
148148
} else if (op == Envelope.Operation.DELETE) {
149149
GenericRowData delete = extractBeforeRow(value, valueSchema);
150150
validator.validate(delete, RowKind.DELETE);
151151
delete.setRowKind(RowKind.DELETE);
152-
if (sourceExactlyMetric != null) {
152+
if (cdcExactlyMetric != null) {
153153
out = createMetricsCollector(record, out);
154154
}
155155
emit(record, delete, out);
@@ -164,7 +164,7 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
164164
GenericRowData after = extractAfterRow(value, valueSchema);
165165
validator.validate(after, RowKind.UPDATE_AFTER);
166166
after.setRowKind(RowKind.UPDATE_AFTER);
167-
if (sourceExactlyMetric != null) {
167+
if (cdcExactlyMetric != null) {
168168
out = createMetricsCollector(record, out);
169169
}
170170
emit(record, after, out);
@@ -178,7 +178,7 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
178178
* @return metrics collector
179179
*/
180180
private Collector<RowData> createMetricsCollector(SourceRecord record, Collector<RowData> out) {
181-
MetricsCollector<RowData> collector = new MetricsCollector<>(out, sourceExactlyMetric);
181+
MetricsCollector<RowData> collector = new MetricsCollector<>(out, cdcExactlyMetric);
182182
collector.resetTimestamp((Long) ((Struct) record.value()).get(FieldName.TIMESTAMP));
183183
return collector;
184184
}
@@ -190,7 +190,7 @@ private Collector<RowData> createMetricsCollector(SourceRecord record, Collector
190190
*/
191191
public void initSourceMetricData() {
192192
if (metricOption != null) {
193-
this.sourceExactlyMetric = new SourceExactlyMetric(metricOption);
193+
this.cdcExactlyMetric = new CdcExactlyMetric(metricOption);
194194
}
195195
}
196196

@@ -222,20 +222,20 @@ public TypeInformation<RowData> getProducedType() {
222222
}
223223

224224
public void flushAudit() {
225-
if (sourceExactlyMetric != null) {
226-
sourceExactlyMetric.flushAudit();
225+
if (cdcExactlyMetric != null) {
226+
cdcExactlyMetric.flushAudit();
227227
}
228228
}
229229

230230
public void updateCurrentCheckpointId(long checkpointId) {
231-
if (sourceExactlyMetric != null) {
232-
sourceExactlyMetric.updateCurrentCheckpointId(checkpointId);
231+
if (cdcExactlyMetric != null) {
232+
cdcExactlyMetric.updateCurrentCheckpointId(checkpointId);
233233
}
234234
}
235235

236236
public void updateLastCheckpointId(long checkpointId) {
237-
if (sourceExactlyMetric != null) {
238-
sourceExactlyMetric.updateLastCheckpointId(checkpointId);
237+
if (cdcExactlyMetric != null) {
238+
cdcExactlyMetric.updateLastCheckpointId(checkpointId);
239239
}
240240
}
241241
// -------------------------------------------------------------------------------------

0 commit comments

Comments
 (0)