Skip to content

Commit a8a0551

Browse files
authored
[INLONG-11829][Sort] Optimize MySQL-CDC changelog audit report (#11835)
* [INLONG-11829][Sort] Optimize MySQL-CDC changelog audit report
1 parent b170faf commit a8a0551

5 files changed

Lines changed: 47 additions & 32 deletions

File tree

inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/util/AuditUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,13 @@
1818
package org.apache.inlong.sort.util;
1919

2020
import com.google.common.base.Preconditions;
21+
import com.google.common.base.Splitter;
22+
import org.apache.flink.types.RowKind;
2123

2224
import java.util.Arrays;
2325
import java.util.HashSet;
2426
import java.util.List;
27+
import java.util.Map;
2528
import java.util.regex.Pattern;
2629
import java.util.stream.Collectors;
2730

@@ -52,4 +55,12 @@ public static List<Integer> extractAuditKeys(String auditKeys) {
5255
.collect(Collectors.toList());
5356
}
5457

58+
public static Map<RowKind, Integer> extractChangelogAuditKeyMap(String changelogAuditKeys) {
59+
return Splitter.on("&").withKeyValueSeparator("=").split(changelogAuditKeys)
60+
.entrySet()
61+
.stream()
62+
.collect(Collectors.toMap(entry -> RowKind.valueOf(entry.getKey()),
63+
entry -> Integer.parseInt(entry.getValue())));
64+
}
65+
5566
}

inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/CdcExactlyMetric.java

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,10 @@
2020
import org.apache.inlong.audit.AuditReporterImpl;
2121

2222
import lombok.extern.slf4j.Slf4j;
23-
import org.apache.commons.collections.CollectionUtils;
2423
import org.apache.flink.table.data.RowData;
2524
import org.apache.flink.types.RowKind;
2625

2726
import java.io.Serializable;
28-
import java.util.HashMap;
29-
import java.util.List;
3027
import java.util.Map;
3128

3229
import static org.apache.inlong.audit.consts.ConfigConstants.DEFAULT_AUDIT_TAG;
@@ -51,28 +48,13 @@ public CdcExactlyMetric(MetricOption option) {
5148
this.labels = option.getLabels();
5249
this.groupId = labels.get(GROUP_ID);
5350
this.streamId = labels.get(STREAM_ID);
54-
this.auditKeyMap = new HashMap<>();
5551

5652
if (option.getIpPorts().isPresent()) {
5753
auditReporter = new AuditReporterImpl();
5854
auditReporter.setAutoFlush(false);
5955
auditReporter.setAuditProxy(option.getIpPortSet());
60-
List<Integer> auditKeys = option.getInlongAuditKeys();
61-
62-
if (CollectionUtils.isEmpty(auditKeys)) {
63-
log.warn("inlong audit keys is empty");
64-
} else if (auditKeys.size() == 1) {
65-
auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
66-
log.warn("only the insert audit key is set, the update and delete audit will be ignored");
67-
} else if (auditKeys.size() == 4) {
68-
auditKeyMap.put(RowKind.INSERT, auditKeys.get(0));
69-
auditKeyMap.put(RowKind.UPDATE_BEFORE, auditKeys.get(1));
70-
auditKeyMap.put(RowKind.UPDATE_AFTER, auditKeys.get(2));
71-
auditKeyMap.put(RowKind.DELETE, auditKeys.get(3));
72-
} else {
73-
throw new IllegalArgumentException("audit key size must be 1 or 4");
74-
}
7556
}
57+
auditKeyMap = option.getInlongChangelogAuditKeys();
7658
log.info("CdcExactlyMetric init, groupId: {}, streamId: {}, audit key: {}", groupId, streamId, auditKeyMap);
7759
}
7860

@@ -82,15 +64,15 @@ public void outputMetricsWithEstimate(Object data, long dataTime) {
8264
if (data instanceof RowData) {
8365
RowData rowData = (RowData) data;
8466
RowKind rowKind = rowData.getRowKind();
85-
int key = auditKeyMap.get(rowKind);
67+
Integer key = auditKeyMap.get(rowKind);
8668
outputMetrics(1, size, dataTime, key);
8769
} else {
8870
outputMetrics(1, size, dataTime, auditKeyMap.get(RowKind.INSERT));
8971
}
9072
}
9173

92-
public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime, int key) {
93-
if (auditReporter != null) {
74+
public void outputMetrics(long rowCountSize, long rowDataSize, long dataTime, Integer key) {
75+
if (auditReporter != null && key != null) {
9476
auditReporter.add(
9577
this.currentCheckpointId,
9678
key,

inlong-sort/sort-flink/base/src/main/java/org/apache/inlong/sort/base/metric/MetricOption.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import org.apache.inlong.sort.util.AuditUtils;
2121

22+
import org.apache.flink.types.RowKind;
2223
import org.apache.flink.util.Preconditions;
2324
import org.apache.flink.util.StringUtils;
2425
import org.slf4j.Logger;
@@ -56,6 +57,7 @@ public class MetricOption implements Serializable {
5657
private long initDirtyBytes;
5758
private long readPhase;
5859
private List<Integer> inlongAuditKeys;
60+
private Map<RowKind, Integer> inlongChangelogAuditKeys;
5961

6062
private MetricOption(
6163
Map<String, String> labels,
@@ -67,6 +69,7 @@ private MetricOption(
6769
Long initDirtyBytes,
6870
Long readPhase,
6971
List<Integer> inlongAuditKeys,
72+
Map<RowKind, Integer> inlongChangelogAuditKeys,
7073
Set<String> ipPortSet) {
7174
this.initRecords = initRecords;
7275
this.initBytes = initBytes;
@@ -78,6 +81,7 @@ private MetricOption(
7881
this.inlongAuditKeys = inlongAuditKeys;
7982
this.ipPortSet = ipPortSet;
8083
this.registeredMetric = registeredMetric;
84+
this.inlongChangelogAuditKeys = inlongChangelogAuditKeys;
8185
}
8286

8387
public Map<String, String> getLabels() {
@@ -124,6 +128,10 @@ public List<Integer> getInlongAuditKeys() {
124128
return inlongAuditKeys;
125129
}
126130

131+
public Map<RowKind, Integer> getInlongChangelogAuditKeys() {
132+
return inlongChangelogAuditKeys;
133+
}
134+
127135
public long getInitDirtyBytes() {
128136
return initDirtyBytes;
129137
}
@@ -155,6 +163,7 @@ public static class Builder {
155163
private String inlongLabels;
156164
private String inlongAudit;
157165
private String inlongAuditKeys;
166+
private String inlongChangelogAuditKeys;
158167
private RegisteredMetric registeredMetric = RegisteredMetric.ALL;
159168
private long initRecords = 0L;
160169
private long initBytes = 0L;
@@ -180,6 +189,11 @@ public MetricOption.Builder withAuditKeys(String inlongAuditIds) {
180189
return this;
181190
}
182191

192+
public MetricOption.Builder withChangelogAuditKeys(String inlongChangelogAuditKeys) {
193+
this.inlongChangelogAuditKeys = inlongChangelogAuditKeys;
194+
return this;
195+
}
196+
183197
public MetricOption.Builder withRegisterMetric(RegisteredMetric registeredMetric) {
184198
this.registeredMetric = registeredMetric;
185199
return this;
@@ -231,6 +245,7 @@ public MetricOption build() {
231245

232246
List<Integer> inlongAuditKeysList = null;
233247
Set<String> ipPortSet = null;
248+
Map<RowKind, Integer> inlongChangelogAuditKeysMap = null;
234249

235250
if (inlongAudit != null) {
236251
Preconditions.checkArgument(labels.containsKey(GROUP_ID) && labels.containsKey(STREAM_ID),
@@ -244,10 +259,13 @@ public MetricOption build() {
244259

245260
inlongAuditKeysList = AuditUtils.extractAuditKeys(inlongAuditKeys);
246261
ipPortSet = AuditUtils.extractAuditIpPorts(inlongAudit);
262+
inlongChangelogAuditKeysMap = AuditUtils.extractChangelogAuditKeyMap(inlongChangelogAuditKeys);
263+
247264
}
248265

249266
return new MetricOption(labels, inlongAudit, registeredMetric, initRecords, initBytes,
250-
initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, ipPortSet);
267+
initDirtyRecords, initDirtyBytes, initReadPhase, inlongAuditKeysList, inlongChangelogAuditKeysMap,
268+
ipPortSet);
251269
}
252270
}
253271
}

inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/MysqlTableFactory.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,9 +104,11 @@ public DynamicTableSource createDynamicTableSource(Context context) {
104104
String inlongMetric = config.getOptional(INLONG_METRIC).orElse(null);
105105
String auditHostAndPorts = config.get(INLONG_AUDIT);
106106
String auditKeys = config.get(AUDIT_KEYS);
107+
String changelogKeys = config.get(CHANGELOG_AUDIT_KEYS);
107108
MetricOption metricOption = MetricOption.builder()
108109
.withInlongLabels(inlongMetric)
109110
.withAuditAddress(auditHostAndPorts)
111+
.withChangelogAuditKeys(changelogKeys)
110112
.withAuditKeys(auditKeys)
111113
.build();
112114

@@ -177,6 +179,7 @@ public Set<ConfigOption<?>> optionalOptions() {
177179
options.add(INLONG_AUDIT);
178180
options.add(ROW_KINDS_FILTERED);
179181
options.add(AUDIT_KEYS);
182+
options.add(CHANGELOG_AUDIT_KEYS);
180183
options.add(GH_OST_DDL_CHANGE);
181184
options.add(GH_OST_TABLE_REGEX);
182185
options.add(CHUNK_KEY_COLUMN);
@@ -548,4 +551,10 @@ private String validateAndGetServerId(ReadableConfig configuration) {
548551
.defaultValue("")
549552
.withDescription("Audit keys for metrics collecting");
550553

554+
public static final ConfigOption<String> CHANGELOG_AUDIT_KEYS =
555+
ConfigOptions.key("metrics.changelog.audit.key")
556+
.stringType()
557+
.defaultValue("")
558+
.withDescription("Audit keys for changelog metrics collecting");
559+
551560
}

inlong-sort/sort-flink/sort-flink-v1.15/sort-connectors/mysql-cdc/src/main/java/org/apache/inlong/sort/mysql/RowDataDebeziumDeserializeSchema.java

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -137,21 +137,19 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
137137
Envelope.Operation op = Envelope.operationFor(record);
138138
Struct value = (Struct) record.value();
139139
Schema valueSchema = record.valueSchema();
140+
if (cdcExactlyMetric != null) {
141+
out = createMetricsCollector(record, out);
142+
}
143+
140144
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
141145
GenericRowData insert = extractAfterRow(value, valueSchema);
142146
validator.validate(insert, RowKind.INSERT);
143147
insert.setRowKind(RowKind.INSERT);
144-
if (cdcExactlyMetric != null) {
145-
out = createMetricsCollector(record, out);
146-
}
147148
emit(record, insert, out);
148149
} else if (op == Envelope.Operation.DELETE) {
149150
GenericRowData delete = extractBeforeRow(value, valueSchema);
150151
validator.validate(delete, RowKind.DELETE);
151152
delete.setRowKind(RowKind.DELETE);
152-
if (cdcExactlyMetric != null) {
153-
out = createMetricsCollector(record, out);
154-
}
155153
emit(record, delete, out);
156154
} else {
157155
if (changelogMode == DebeziumChangelogMode.ALL) {
@@ -164,9 +162,6 @@ public void deserialize(SourceRecord record, Collector<RowData> out) throws Exce
164162
GenericRowData after = extractAfterRow(value, valueSchema);
165163
validator.validate(after, RowKind.UPDATE_AFTER);
166164
after.setRowKind(RowKind.UPDATE_AFTER);
167-
if (cdcExactlyMetric != null) {
168-
out = createMetricsCollector(record, out);
169-
}
170165
emit(record, after, out);
171166
}
172167
}

0 commit comments

Comments
 (0)