Skip to content

Commit c8a61e4

Browse files
lsq888lsqlsq
andauthored
[INLONG-12000][Audit]Implement API for obtaining audit data in Audit Tool (#12001)
* [Feature][Audit]Implement API for obtaining audit data in Audit Tool * [Feature][Audit]Implement API for obtaining audit data in Audit Tool * [INLONG-12000][Audit]Implement API for obtaining audit data in Audit Tool * [INLONG-12000][Audit]Implement API for obtaining audit data in Audit Tool * [INLONG-12000][Audit]Implement API for obtaining audit data in Audit Tool * [INLONG-12000][Audit] Remove DTO directory * [INLONG-12000][Audit]Implement API for obtaining audit data in Audit Tool --------- Co-authored-by: lsq <15297823178@163.com>
1 parent c5b84ec commit c8a61e4

12 files changed

Lines changed: 431 additions & 86 deletions

File tree

inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/DTO/AuditAlertCondition.java renamed to inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/dto/AuditAlertCondition.java

File renamed without changes.

inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/DTO/AuditAlertRule.java renamed to inlong-audit/audit-tool/src/main/java/org/apache/inlong/audit/tool/dto/AuditAlertRule.java

File renamed without changes.
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.entity;
19+
20+
import lombok.Data;
21+
22+
@Data
23+
public class AuditMetric {
24+
25+
private String inlongGroupId;
26+
private String inlongStreamId;
27+
private long count;
28+
29+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.mapper;
19+
20+
import org.apache.inlong.audit.tool.entity.AuditMetric;
21+
22+
import org.apache.ibatis.annotations.Mapper;
23+
import org.apache.ibatis.annotations.Param;
24+
import org.apache.ibatis.annotations.Select;
25+
26+
import java.util.List;
27+
28+
@Mapper
29+
public interface AuditMapper {
30+
31+
@Select("select inlong_group_id as inlongGroupId,inlong_stream_id as inlongStreamId, sum(count) count" +
32+
" from audit_data where audit_id = #{audit_id}" +
33+
" and log_ts between #{startLogTs} and #{endLogTs} group by inlong_group_id,inlong_stream_id")
34+
List<AuditMetric> getAuditMetrics(@Param("startLogTs") String startLogTs, @Param("endLogTs") String endLogTs,
35+
@Param("audit_id") String auditId);
36+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.metric;
19+
20+
import io.prometheus.client.CollectorRegistry;
21+
import io.prometheus.client.Gauge;
22+
23+
public class AuditMetric {
24+
25+
private final Gauge sourceAndSinkAuditDiffMetric;
26+
27+
public AuditMetric(CollectorRegistry registry) {
28+
this.sourceAndSinkAuditDiffMetric = Gauge.build()
29+
.name("inlong_source_sink_diff")
30+
.help("The difference in count between inflow and outflow")
31+
.register(registry);
32+
}
33+
34+
public void updateSourceAndSinkAuditDiffMetric(double diff) {
35+
sourceAndSinkAuditDiffMetric.set(diff);
36+
}
37+
}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.reporter;
19+
20+
import java.util.Map;
21+
22+
public interface MetricReporter {
23+
24+
void init(Map<String, Object> config);
25+
26+
}
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.reporter;
19+
20+
import org.apache.inlong.audit.tool.metric.AuditMetric;
21+
22+
import io.prometheus.client.CollectorRegistry;
23+
import io.prometheus.client.Gauge;
24+
import io.prometheus.client.exporter.HTTPServer;
25+
import lombok.Getter;
26+
import org.slf4j.Logger;
27+
import org.slf4j.LoggerFactory;
28+
29+
import java.io.IOException;
30+
import java.util.Map;
31+
32+
import static org.apache.inlong.audit.tool.config.ConfigConstants.AUDIT_TOOL_ALERTS_TOTAL;
33+
import static org.apache.inlong.audit.tool.config.ConfigConstants.AUDIT_TOOL_DATA_LOSS_RATE;
34+
import static org.apache.inlong.audit.tool.config.ConfigConstants.DEFAULT_PROMETHEUS_PORT;
35+
import static org.apache.inlong.audit.tool.config.ConfigConstants.DESC_AUDIT_TOOL_ALERTS_TOTAL;
36+
import static org.apache.inlong.audit.tool.config.ConfigConstants.DESC_AUDIT_TOOL_DATA_LOSS_RATE;
37+
import static org.apache.inlong.audit.tool.config.ConfigConstants.KEY_ALERT_TYPE;
38+
import static org.apache.inlong.audit.tool.config.ConfigConstants.KEY_GROUP_ID;
39+
import static org.apache.inlong.audit.tool.config.ConfigConstants.KEY_PROMETHEUS_PORT;
40+
import static org.apache.inlong.audit.tool.config.ConfigConstants.KEY_STREAM_ID;
41+
42+
/**
43+
* PrometheusReporter implements the MetricReporter interface to expose audit tool metrics
44+
* to a Prometheus monitoring system.
45+
*/
46+
public class PrometheusReporter implements MetricReporter {
47+
48+
private static final Logger LOGGER = LoggerFactory.getLogger(PrometheusReporter.class);
49+
50+
private final CollectorRegistry registry;
51+
@Getter
52+
private final AuditMetric auditMetric;
53+
54+
/**
55+
* Constructor for PrometheusReporter.
56+
* Initializes a new CollectorRegistry for DTO isolation.
57+
*/
58+
public PrometheusReporter() {
59+
this.registry = new CollectorRegistry();
60+
auditMetric = new AuditMetric(registry);
61+
}
62+
63+
/**
64+
* Initializes the Prometheus reporter.
65+
* Starts an HTTP server to expose metrics and registers the gauges.
66+
*
67+
* @param config A map containing configuration for the reporter, e.g., port.
68+
*/
69+
@Override
70+
public void init(Map<String, Object> config) {
71+
// It's better to get configuration from the passed map than a global singleton.
72+
int port = (int) config.getOrDefault(KEY_PROMETHEUS_PORT, DEFAULT_PROMETHEUS_PORT);
73+
try {
74+
// Start the Prometheus HTTP server on the configured port with our registry.
75+
HTTPServer server = new HTTPServer.Builder()
76+
.withPort(port)
77+
.withRegistry(registry)
78+
.build();
79+
LOGGER.info("Prometheus server started on port {}", port);
80+
81+
// Define and register the 'alerts total' gauge.
82+
Gauge alertGauge = Gauge.build()
83+
.name(AUDIT_TOOL_ALERTS_TOTAL)
84+
.help(DESC_AUDIT_TOOL_ALERTS_TOTAL)
85+
.labelNames(KEY_GROUP_ID, KEY_STREAM_ID, KEY_ALERT_TYPE)
86+
.register(registry); // Register with our specific registry instance
87+
88+
// Define and register the 'data loss rate' gauge.
89+
Gauge dataLossRateGauge = Gauge.build()
90+
.name(AUDIT_TOOL_DATA_LOSS_RATE)
91+
.help(DESC_AUDIT_TOOL_DATA_LOSS_RATE)
92+
.labelNames(KEY_GROUP_ID, KEY_STREAM_ID)
93+
.register(registry); // Register with our specific registry instance
94+
95+
} catch (IOException e) {
96+
LOGGER.error("Failed to start Prometheus server on port {}", port, e);
97+
// Throwing a runtime exception is appropriate if the reporter cannot start.
98+
throw new RuntimeException("Failed to start Prometheus server", e);
99+
}
100+
}
101+
102+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.service;
19+
20+
import org.apache.inlong.audit.tool.entity.AuditMetric;
21+
import org.apache.inlong.audit.tool.mapper.AuditMapper;
22+
import org.apache.inlong.audit.tool.util.AuditSQLUtil;
23+
24+
import org.apache.ibatis.session.SqlSession;
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
28+
import java.util.Collections;
29+
import java.util.List;
30+
31+
public class AuditMetricService {
32+
33+
private static final Logger LOGGER = LoggerFactory.getLogger(AuditMetricService.class);
34+
35+
public List<AuditMetric> getStorageAuditMetrics(String auditId, String startLogTs, String endLogTs) {
36+
try (SqlSession sqlSession = AuditSQLUtil.getSqlSession()) {
37+
AuditMapper auditMapper = sqlSession.getMapper(AuditMapper.class);
38+
return auditMapper.getAuditMetrics(startLogTs, endLogTs, auditId);
39+
} catch (Exception e) {
40+
LOGGER.error("Exception occurred during database query: ", e);
41+
return Collections.emptyList();
42+
}
43+
}
44+
45+
}
Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.inlong.audit.tool.util;
19+
20+
import org.apache.inlong.audit.tool.mapper.AuditMapper;
21+
22+
import org.apache.ibatis.datasource.pooled.PooledDataSource;
23+
import org.apache.ibatis.session.SqlSession;
24+
import org.apache.ibatis.session.SqlSessionFactory;
25+
import org.apache.ibatis.session.SqlSessionFactoryBuilder;
26+
27+
import javax.sql.DataSource;
28+
29+
import java.util.Properties;
30+
31+
public class AuditSQLUtil {
32+
33+
private static SqlSessionFactory sqlSessionFactory;
34+
private static Properties appProperties;
35+
36+
public static void initialize(Properties properties) {
37+
appProperties = properties;
38+
try {
39+
// Create a data source
40+
DataSource dataSource = createDataSourceFromProperties();
41+
42+
// Create a SqlSessionFactory
43+
org.apache.ibatis.session.Configuration configuration = new org.apache.ibatis.session.Configuration();
44+
configuration.setEnvironment(new org.apache.ibatis.mapping.Environment(
45+
"development",
46+
new org.apache.ibatis.transaction.jdbc.JdbcTransactionFactory(),
47+
dataSource));
48+
49+
configuration.addMapper(AuditMapper.class);
50+
51+
sqlSessionFactory = new SqlSessionFactoryBuilder().build(configuration);
52+
53+
} catch (Exception e) {
54+
throw new RuntimeException("Error initializing MyBatis with application properties", e);
55+
}
56+
}
57+
58+
private static DataSource createDataSourceFromProperties() {
59+
String url = appProperties.getProperty("audit.data.source.url");
60+
String username = appProperties.getProperty("audit.data.source.username");
61+
String password = appProperties.getProperty("audit.data.source.password");
62+
63+
PooledDataSource dataSource = new PooledDataSource();
64+
dataSource.setDriver("com.mysql.cj.jdbc.Driver");
65+
dataSource.setUrl(url);
66+
dataSource.setUsername(username);
67+
dataSource.setPassword(password);
68+
69+
dataSource.setPoolMaximumActiveConnections(10);
70+
dataSource.setPoolMaximumIdleConnections(5);
71+
72+
return dataSource;
73+
}
74+
75+
public static SqlSession getSqlSession() {
76+
if (sqlSessionFactory == null) {
77+
throw new IllegalStateException("MyBatisUtil not initialized. Call initialize() first.");
78+
}
79+
return sqlSessionFactory.openSession();
80+
}
81+
}

inlong-audit/audit-tool/src/main/resources/application.properties

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,13 @@ logging.file.path=/var/log/inlong/audit-tool.log
4141

4242
# Audit Database settings
4343
audit.data.source=apache_inlong_audit
44-
audit.data.source.url=jdbc:mysql://192.168.10.3:3306/apache_inlong_audit
44+
audit.data.source.url=jdbc:mysql://127.0.0.1:3306/apache_inlong_audit
4545
audit.data.source.username=root
4646
audit.data.source.password=inlong
4747
audit.data.time.interval.minute=1
4848
audit.data.time.delay.minute=1
4949
audit.id.source=5
5050

5151
# Audit SecretId SecretKey
52-
audit.secretId = admin
53-
audit.secretKey = inlong
52+
audit.secretId=xxx
53+
audit.secretKey=xxx

0 commit comments

Comments
 (0)