Skip to content

Commit 051f04b

Browse files
authored
[INLONG-11838][Manager] Add the judgment of whether the cdc audit ID is exist when passing metrics.changelog.audit.key (#11839)
* [INLONG-11838][Manager] Add the judgment of whether the cdc audit ID is exist when passing metrics.changelog.audit.key
1 parent a8a0551 commit 051f04b

1 file changed

Lines changed: 5 additions & 2 deletions

File tree

inlong-manager/manager-service/src/main/java/org/apache/inlong/manager/service/resource/sort/SortFlinkConfigOperator.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ public class SortFlinkConfigOperator implements SortConfigOperator {
7676
private static final Logger LOGGER = LoggerFactory.getLogger(SortFlinkConfigOperator.class);
7777
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
7878

79+
private static final String CDC_AUDIT_KEY = "metrics.changelog.audit.key";
80+
7981
@Autowired
8082
private StreamSourceService sourceService;
8183
@Autowired
@@ -318,8 +320,9 @@ private void addAuditId(Map<String, Object> properties, String type, IndicatorTy
318320
}
319321
throw new BusinessException("current audit id can not find cdc audit information");
320322
}).collect(Collectors.toList());
321-
properties.putIfAbsent("metrics.changelog.audit.key",
322-
Joiner.on(InlongConstants.AMPERSAND).join(cdcAuditIdList));
323+
if (CollectionUtils.isNotEmpty(cdcAuditIdList)) {
324+
properties.putIfAbsent(CDC_AUDIT_KEY, Joiner.on(InlongConstants.AMPERSAND).join(cdcAuditIdList));
325+
}
323326
} catch (Exception e) {
324327
LOGGER.error("Current type ={} is not set auditId", type);
325328
}

0 commit comments

Comments
 (0)