Skip to content

Commit e0832fc

Browse files
authored
[INLONG-11825][Manager] Support CDC audit ID configuration (#11828)
1 parent 829a4d0 commit e0832fc

3 files changed

Lines changed: 54 additions & 0 deletions

File tree

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/AuditService.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,6 @@ public interface AuditService {
7171
*/
7272
List<AuditProxy> getAuditProxy(String component) throws Exception;
7373

74+
List<AuditInformation> getCdcAuditInfoList(String type, IndicatorType indicatorType);
75+
7476
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/core/impl/AuditServiceImpl.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.concurrent.Executors;
6868
import java.util.concurrent.ScheduledExecutorService;
6969
import java.util.concurrent.TimeUnit;
70+
import java.util.stream.Collectors;
7071

7172
/**
7273
* Audit service layer implementation
@@ -116,6 +117,10 @@ public Boolean refreshBaseItemCache() {
116117
auditIndicatorMap.clear();
117118
List<AuditInformation> auditInformationList = AuditOperator.getInstance().getAllAuditInformation();
118119
List<AuditInformation> metricInformationList = AuditOperator.getInstance().getAllMetricInformation();
120+
List<AuditInformation> cdcMetricInformationList = AuditOperator.getInstance().getAllCdcIdInformation();
121+
cdcMetricInformationList.forEach(v -> {
122+
auditItemMap.put(String.valueOf(v.getAuditId()), v.getNameInChinese());
123+
});
119124
auditInformationList.forEach(v -> {
120125
auditItemMap.put(String.valueOf(v.getAuditId()), v.getNameInChinese());
121126
});
@@ -193,6 +198,15 @@ public List<AuditVO> listByCondition(AuditRequest request) throws Exception {
193198
// properly overwrite audit ids by role and stream config
194199
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(groupEntity.getInlongGroupMode())
195200
|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(groupEntity.getInlongGroupMode())) {
201+
List<AuditInformation> cdcAuditInfoList =
202+
getCdcAuditInfoList(sourceNodeType, IndicatorType.RECEIVED_SUCCESS);
203+
List<String> cdcAuditIdList =
204+
cdcAuditInfoList.stream().map(v -> String.valueOf(v.getAuditId()))
205+
.collect(Collectors.toList());
206+
if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
207+
String tempSourceNodeType = sourceNodeType;
208+
cdcAuditIdList.forEach(v -> auditIdMap.put(v, tempSourceNodeType));
209+
}
196210
auditIdMap.put(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS), sourceNodeType);
197211
request.setAuditIds(getAuditIds(groupId, streamId, sourceNodeType, sinkNodeType));
198212
} else {
@@ -250,6 +264,14 @@ private List<String> getAuditIds(String groupId, String streamId, String sourceN
250264
InlongGroupEntity inlongGroup = inlongGroupMapper.selectByGroupId(groupId);
251265
if (InlongConstants.DATASYNC_REALTIME_MODE.equals(inlongGroup.getInlongGroupMode())
252266
|| InlongConstants.DATASYNC_OFFLINE_MODE.equals(inlongGroup.getInlongGroupMode())) {
267+
List<AuditInformation> cdcAuditInfoList =
268+
getCdcAuditInfoList(sourceNodeType, IndicatorType.RECEIVED_SUCCESS);
269+
if (CollectionUtils.isNotEmpty(cdcAuditInfoList)) {
270+
List<String> cdcAuditIdList =
271+
cdcAuditInfoList.stream().map(v -> String.valueOf(v.getAuditId()))
272+
.collect(Collectors.toList());
273+
auditSet.addAll(cdcAuditIdList);
274+
}
253275
auditSet.add(getAuditId(sourceNodeType, IndicatorType.RECEIVED_SUCCESS));
254276
} else {
255277
auditSet.add(getAuditId(sinkNodeType, IndicatorType.RECEIVED_SUCCESS));
@@ -294,4 +316,16 @@ public List<AuditProxy> getAuditProxy(String component) throws Exception {
294316
}
295317
}
296318

319+
@Override
320+
public List<AuditInformation> getCdcAuditInfoList(String type, IndicatorType indicatorType) {
321+
if (StringUtils.isBlank(type)) {
322+
return null;
323+
}
324+
325+
FlowType flowType = indicatorType.getCode() % 2 == 0 ? FlowType.INPUT : FlowType.OUTPUT;
326+
List<AuditInformation> cdcAuditInfo = AuditOperator.getInstance().getAllCdcIdInformation(type, flowType);
327+
Preconditions.expectNotNull(cdcAuditInfo, ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED,
328+
String.format(ErrorCodeEnum.AUDIT_ID_TYPE_NOT_SUPPORTED.getMessage(), type));
329+
return cdcAuditInfo;
330+
}
297331
}

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortFlinkConfigOperator.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,16 @@
1717

1818
package org.apache.inlong.manager.service.resource.sort;
1919

20+
import org.apache.inlong.audit.CdcIdEnum;
2021
import org.apache.inlong.audit.entity.AuditComponent;
22+
import org.apache.inlong.audit.entity.AuditInformation;
2123
import org.apache.inlong.audit.entity.AuditProxy;
24+
import org.apache.inlong.audit.entity.CdcType;
25+
import org.apache.inlong.audit.entity.FlowType;
2226
import org.apache.inlong.common.enums.IndicatorType;
2327
import org.apache.inlong.manager.common.consts.InlongConstants;
2428
import org.apache.inlong.manager.common.consts.SinkType;
29+
import org.apache.inlong.manager.common.exceptions.BusinessException;
2530
import org.apache.inlong.manager.common.util.CommonBeanUtils;
2631
import org.apache.inlong.manager.pojo.group.InlongGroupInfo;
2732
import org.apache.inlong.manager.pojo.sink.StreamSink;
@@ -302,6 +307,19 @@ private void addAuditId(Map<String, Object> properties, String type, IndicatorTy
302307
properties.putIfAbsent("metrics.audit.key", auditId);
303308
properties.putIfAbsent("metrics.audit.proxy.hosts",
304309
Joiner.on(InlongConstants.AMPERSAND).join(auditProxyList));
310+
List<AuditInformation> cdcAuditInfoList = auditService.getCdcAuditInfoList(type, indicatorType);
311+
FlowType flowType = indicatorType.getCode() % 2 == 0 ? FlowType.INPUT : FlowType.OUTPUT;
312+
List<String> cdcAuditIdList =
313+
cdcAuditInfoList.stream().map(v -> {
314+
for (CdcType cdcType : CdcType.values()) {
315+
if (Objects.equals(CdcIdEnum.getCdcId(type, flowType, cdcType), v.getAuditId())) {
316+
return cdcType.name() + InlongConstants.EQUAL + v.getAuditId();
317+
}
318+
}
319+
throw new BusinessException("current audit id can not find cdc audit information");
320+
}).collect(Collectors.toList());
321+
properties.putIfAbsent("metrics.changelog.audit.key",
322+
Joiner.on(InlongConstants.AMPERSAND).join(cdcAuditIdList));
305323
} catch (Exception e) {
306324
LOGGER.error("Current type ={} is not set auditId", type);
307325
}

0 commit comments

Comments
 (0)