Skip to content

Commit b668d48

Browse files
authored
[INLONG-11855][Audit] Audit supports Audit reconciliation at the granularity of inlong group (#11856)
1 parent 0cad8fa commit b668d48

6 files changed

Lines changed: 316 additions & 81 deletions

File tree

inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/DayCache.java

Lines changed: 52 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import org.apache.inlong.audit.service.config.Configuration;
2222
import org.apache.inlong.audit.service.entities.JdbcConfig;
2323
import org.apache.inlong.audit.service.entities.StatData;
24+
import org.apache.inlong.audit.service.utils.AuditUtils;
2425
import org.apache.inlong.audit.service.utils.JdbcUtils;
2526

2627
import com.zaxxer.hikari.HikariConfig;
@@ -34,11 +35,21 @@
3435
import java.sql.PreparedStatement;
3536
import java.sql.ResultSet;
3637
import java.sql.SQLException;
38+
import java.util.ArrayList;
3739
import java.util.LinkedList;
3840
import java.util.List;
3941

42+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_ID;
43+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_TAG;
44+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_CNT;
45+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_DELAY;
46+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_GROUP_ID;
47+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_LOG_TS;
48+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_SIZE;
49+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_STREAM_ID;
4050
import static org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL;
4151
import static org.apache.inlong.audit.service.config.SqlConstants.KEY_MYSQL_SOURCE_QUERY_DAY_SQL;
52+
import static org.apache.inlong.audit.service.config.SqlConstants.WILDCARD_STREAM_ID;
4253

4354
/**
4455
* Cache Of day ,for day openapi
@@ -49,16 +60,13 @@ public class DayCache implements AutoCloseable {
4960
private static volatile DayCache dayCache = null;
5061
private DataSource dataSource;
5162

52-
private final String querySql;
53-
5463
private DayCache() {
5564
createDataSource();
56-
querySql = Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
57-
DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
5865
}
5966

6067
/**
6168
* Get instance
69+
*
6270
* @return
6371
*/
6472
public static DayCache getInstance() {
@@ -74,6 +82,7 @@ public static DayCache getInstance() {
7482

7583
/**
7684
* Get data
85+
*
7786
* @param startTime
7887
* @param endTime
7988
* @param inlongGroupId
@@ -84,27 +93,48 @@ public static DayCache getInstance() {
8493
public List<StatData> getData(String startTime, String endTime, String inlongGroupId,
8594
String inlongStreamId, String auditId) {
8695
List<StatData> result = new LinkedList<>();
96+
String querySQL = Configuration.getInstance().get(KEY_MYSQL_SOURCE_QUERY_DAY_SQL,
97+
DEFAULT_MYSQL_SOURCE_QUERY_DAY_SQL);
98+
List<String> paramList = new ArrayList<>();
99+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
100+
querySQL = AuditUtils.removeStreamIdCondition(querySQL);
101+
querySQL = AuditUtils.removeStreamIdColumn(querySQL);
102+
paramList.add(startTime);
103+
paramList.add(endTime);
104+
paramList.add(inlongGroupId);
105+
paramList.add(auditId);
106+
} else {
107+
paramList.add(startTime);
108+
paramList.add(endTime);
109+
paramList.add(inlongGroupId);
110+
paramList.add(inlongStreamId);
111+
paramList.add(auditId);
112+
}
87113
try (Connection connection = dataSource.getConnection();
88-
PreparedStatement pstat = connection.prepareStatement(querySql)) {
114+
PreparedStatement pstat = connection.prepareStatement(querySQL)) {
89115
if (connection.isClosed()) {
90116
createDataSource();
91117
}
92-
pstat.setString(1, startTime);
93-
pstat.setString(2, endTime);
94-
pstat.setString(3, inlongGroupId);
95-
pstat.setString(4, inlongStreamId);
96-
pstat.setString(5, auditId);
118+
for (int i = 0; i < paramList.size(); i++) {
119+
pstat.setString(i + 1, paramList.get(i));
120+
}
97121
try (ResultSet resultSet = pstat.executeQuery()) {
98122
while (resultSet.next()) {
99123
StatData data = new StatData();
100-
data.setLogTs(resultSet.getString(1));
101-
data.setInlongGroupId(resultSet.getString(2));
102-
data.setInlongStreamId(resultSet.getString(3));
103-
data.setAuditId(resultSet.getString(4));
104-
data.setAuditTag(resultSet.getString(5));
105-
data.setCount(resultSet.getLong(6));
106-
data.setSize(resultSet.getLong(7));
107-
data.setDelay(resultSet.getLong(8));
124+
data.setLogTs(resultSet.getString(COLUMN_LOG_TS));
125+
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
126+
data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
127+
data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
128+
data.setCount(resultSet.getLong(COLUMN_CNT));
129+
data.setSize(resultSet.getLong(COLUMN_SIZE));
130+
data.setDelay(resultSet.getLong(COLUMN_DELAY));
131+
132+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
133+
data.setInlongStreamId(WILDCARD_STREAM_ID);
134+
} else {
135+
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
136+
}
137+
108138
result.add(data);
109139
}
110140
} catch (SQLException sqlException) {
@@ -113,7 +143,10 @@ public List<StatData> getData(String startTime, String endTime, String inlongGro
113143
} catch (Exception exception) {
114144
LOGGER.error("Query has exception! ", exception);
115145
}
116-
return result;
146+
147+
return WILDCARD_STREAM_ID.equals(inlongStreamId)
148+
? AuditUtils.aggregateStatData(result, inlongStreamId)
149+
: result;
117150
}
118151

119152
/**

inlong-audit/audit-service/src/main/java/org/apache/inlong/audit/service/cache/RealTimeQuery.java

Lines changed: 125 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,16 @@
4747

4848
import static org.apache.inlong.audit.consts.OpenApiConstants.DEFAULT_API_THREAD_POOL_SIZE;
4949
import static org.apache.inlong.audit.consts.OpenApiConstants.KEY_API_THREAD_POOL_SIZE;
50+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_ID;
51+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_TAG;
52+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_AUDIT_VERSION;
53+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_CNT;
54+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_DELAY;
55+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_GROUP_ID;
56+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_IP;
57+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_LOG_TS;
58+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_SIZE;
59+
import static org.apache.inlong.audit.service.config.AuditColumn.COLUMN_STREAM_ID;
5060
import static org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_DISTINCT_SQL;
5161
import static org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_RECONCILIATION_SQL;
5262
import static org.apache.inlong.audit.service.config.SqlConstants.DEFAULT_SOURCE_QUERY_IDS_SQL;
@@ -57,6 +67,7 @@
5767
import static org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IDS_SQL;
5868
import static org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_IPS_SQL;
5969
import static org.apache.inlong.audit.service.config.SqlConstants.KEY_SOURCE_QUERY_MINUTE_SQL;
70+
import static org.apache.inlong.audit.service.config.SqlConstants.WILDCARD_STREAM_ID;
6071

6172
/**
6273
* Real time query data from audit source.
@@ -67,10 +78,6 @@ public class RealTimeQuery {
6778
private static volatile RealTimeQuery realTimeQuery = null;
6879

6980
private final List<BasicDataSource> dataSourceList = new LinkedList<>();
70-
71-
private final String queryLogTsSql;
72-
private final String queryIdsByIpSql;
73-
private final String queryReportIpsSql;
7481
private final ExecutorService executor =
7582
Executors.newFixedThreadPool(
7683
Configuration.getInstance().get(KEY_API_THREAD_POOL_SIZE, DEFAULT_API_THREAD_POOL_SIZE));
@@ -103,13 +110,6 @@ private RealTimeQuery() {
103110

104111
dataSourceList.add(dataSource);
105112
}
106-
107-
queryLogTsSql = Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
108-
DEFAULT_SOURCE_QUERY_MINUTE_SQL);
109-
queryIdsByIpSql = Configuration.getInstance().get(KEY_SOURCE_QUERY_IDS_SQL,
110-
DEFAULT_SOURCE_QUERY_IDS_SQL);
111-
queryReportIpsSql = Configuration.getInstance().get(KEY_SOURCE_QUERY_IPS_SQL,
112-
DEFAULT_SOURCE_QUERY_IPS_SQL);
113113
}
114114

115115
public static RealTimeQuery getInstance() {
@@ -152,7 +152,14 @@ public List<StatData> queryLogTs(String startTime, String endTime, String inlong
152152
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
153153
LOGGER.info("Query log ts by params: {} {} {} {} {}, total cost {} ms", startTime, endTime, inlongGroupId,
154154
inlongStreamId, auditId, System.currentTimeMillis() - currentTime);
155-
return filterMaxAuditVersion(statDataList);
155+
156+
List<StatData> maxAuditVersion = filterMaxAuditVersion(statDataList);
157+
// If querying for wildcard stream ID, aggregate the data
158+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
159+
return AuditUtils.aggregateStatData(maxAuditVersion, WILDCARD_STREAM_ID);
160+
}
161+
// Otherwise return the filtered data directly
162+
return maxAuditVersion;
156163
}
157164

158165
/**
@@ -202,26 +209,51 @@ private List<StatData> doQueryLogTs(DataSource dataSource, String startTime, Str
202209
String inlongStreamId, String auditId) {
203210
long currentTime = System.currentTimeMillis();
204211
List<StatData> result = new LinkedList<>();
212+
213+
String querySQL = Configuration.getInstance().get(KEY_SOURCE_QUERY_MINUTE_SQL,
214+
DEFAULT_SOURCE_QUERY_MINUTE_SQL);
215+
List<String> paramList = new ArrayList<>();
216+
217+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
218+
querySQL = AuditUtils.removeStreamIdCondition(querySQL);
219+
querySQL = AuditUtils.removeStreamIdColumn(querySQL);
220+
paramList.add(startTime);
221+
paramList.add(endTime);
222+
paramList.add(inlongGroupId);
223+
paramList.add(auditId);
224+
} else {
225+
paramList.add(startTime);
226+
paramList.add(endTime);
227+
paramList.add(inlongGroupId);
228+
paramList.add(inlongStreamId);
229+
paramList.add(auditId);
230+
}
231+
205232
try (Connection connection = dataSource.getConnection();
206-
PreparedStatement pstat = connection.prepareStatement(queryLogTsSql)) {
207-
pstat.setString(1, startTime);
208-
pstat.setString(2, endTime);
209-
pstat.setString(3, inlongGroupId);
210-
pstat.setString(4, inlongStreamId);
211-
pstat.setString(5, auditId);
233+
PreparedStatement pstat = connection.prepareStatement(querySQL)) {
234+
for (int i = 0; i < paramList.size(); i++) {
235+
pstat.setString(i + 1, paramList.get(i));
236+
}
237+
212238
try (ResultSet resultSet = pstat.executeQuery()) {
213239
while (resultSet.next()) {
214240
StatData data = new StatData();
215-
data.setLogTs(resultSet.getString(1));
216-
data.setInlongGroupId(resultSet.getString(2));
217-
data.setInlongStreamId(resultSet.getString(3));
218-
data.setAuditId(resultSet.getString(4));
219-
data.setAuditTag(resultSet.getString(5));
220-
long count = resultSet.getLong(6);
241+
data.setLogTs(resultSet.getString(COLUMN_LOG_TS));
242+
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
243+
data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
244+
data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
245+
long count = resultSet.getLong(COLUMN_CNT);
221246
data.setCount(count);
222-
data.setSize(resultSet.getLong(7));
223-
data.setDelay(CacheUtils.calculateAverageDelay(count, resultSet.getLong(8)));
224-
data.setAuditVersion(resultSet.getLong(9));
247+
data.setDelay(CacheUtils.calculateAverageDelay(count, resultSet.getLong(COLUMN_DELAY)));
248+
data.setSize(resultSet.getLong(COLUMN_SIZE));
249+
data.setAuditVersion(resultSet.getLong(COLUMN_AUDIT_VERSION));
250+
251+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
252+
data.setInlongStreamId(WILDCARD_STREAM_ID);
253+
} else {
254+
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
255+
}
256+
225257
result.add(data);
226258
}
227259
} catch (SQLException sqlException) {
@@ -270,23 +302,30 @@ public List<StatData> queryIdsByIp(String startTime, String endTime, String ip,
270302
private List<StatData> doQueryIdsByIp(DataSource dataSource, String startTime, String endTime, String ip,
271303
String auditId) {
272304
List<StatData> result = new LinkedList<>();
305+
String querySQL = Configuration.getInstance().get(KEY_SOURCE_QUERY_IDS_SQL,
306+
DEFAULT_SOURCE_QUERY_IDS_SQL);
307+
List<String> paramList = new ArrayList<>();
308+
paramList.add(startTime);
309+
paramList.add(endTime);
310+
paramList.add(auditId);
311+
paramList.add(ip);
312+
273313
try (Connection connection = dataSource.getConnection();
274-
PreparedStatement pstat = connection.prepareStatement(queryIdsByIpSql)) {
275-
pstat.setString(1, startTime);
276-
pstat.setString(2, endTime);
277-
pstat.setString(3, auditId);
278-
pstat.setString(4, ip);
314+
PreparedStatement pstat = connection.prepareStatement(querySQL)) {
315+
for (int i = 0; i < paramList.size(); i++) {
316+
pstat.setString(i + 1, paramList.get(i));
317+
}
279318
try (ResultSet resultSet = pstat.executeQuery()) {
280319
while (resultSet.next()) {
281320
StatData data = new StatData();
282-
data.setInlongGroupId(resultSet.getString(1));
283-
data.setInlongStreamId(resultSet.getString(2));
284-
data.setAuditId(resultSet.getString(3));
285-
data.setAuditTag(resultSet.getString(4));
286-
long count = resultSet.getLong(5);
321+
data.setInlongGroupId(resultSet.getString(COLUMN_GROUP_ID));
322+
data.setInlongStreamId(resultSet.getString(COLUMN_STREAM_ID));
323+
data.setAuditId(resultSet.getString(COLUMN_AUDIT_ID));
324+
data.setAuditTag(resultSet.getString(COLUMN_AUDIT_TAG));
325+
long count = resultSet.getLong(COLUMN_CNT);
287326
data.setCount(count);
288-
data.setSize(resultSet.getLong(6));
289-
data.setDelay(CacheUtils.calculateAverageDelay(count, resultSet.getLong(7)));
327+
data.setSize(resultSet.getLong(COLUMN_SIZE));
328+
data.setDelay(CacheUtils.calculateAverageDelay(count, resultSet.getLong(COLUMN_DELAY)));
290329
result.add(data);
291330
}
292331
} catch (SQLException sqlException) {
@@ -337,20 +376,38 @@ private List<StatData> doQueryIpsById(DataSource dataSource, String startTime, S
337376
String inlongGroupId,
338377
String inlongStreamId, String auditId) {
339378
List<StatData> result = new LinkedList<>();
379+
String querySQL = Configuration.getInstance().get(KEY_SOURCE_QUERY_IPS_SQL,
380+
DEFAULT_SOURCE_QUERY_IPS_SQL);
381+
List<String> paramList = new ArrayList<>();
382+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
383+
querySQL = AuditUtils.removeStreamIdCondition(querySQL);
384+
paramList.add(startTime);
385+
paramList.add(endTime);
386+
paramList.add(inlongGroupId);
387+
paramList.add(auditId);
388+
} else {
389+
paramList.add(startTime);
390+
paramList.add(endTime);
391+
paramList.add(inlongGroupId);
392+
paramList.add(inlongStreamId);
393+
paramList.add(auditId);
394+
}
340395
try (Connection connection = dataSource.getConnection();
341-
PreparedStatement pstat = connection.prepareStatement(queryReportIpsSql)) {
342-
pstat.setString(1, startTime);
343-
pstat.setString(2, endTime);
344-
pstat.setString(3, inlongGroupId);
345-
pstat.setString(4, inlongStreamId);
346-
pstat.setString(5, auditId);
396+
PreparedStatement pstat = connection.prepareStatement(querySQL)) {
397+
for (int i = 0; i < paramList.size(); i++) {
398+
pstat.setString(i + 1, paramList.get(i));
399+
}
347400
try (ResultSet resultSet = pstat.executeQuery()) {
348401
while (resultSet.next()) {
349402
StatData data = new StatData();
350-
data.setIp(resultSet.getString(1));
351-
long count = resultSet.getLong(2);
403+
data.setIp(resultSet.getString(COLUMN_IP));
404+
long count = resultSet.getLong(COLUMN_CNT);
405+
data.setSize(resultSet.getLong(COLUMN_SIZE));
406+
data.setLogTs(startTime);
407+
data.setInlongGroupId(inlongGroupId);
408+
data.setInlongStreamId(inlongStreamId);
409+
data.setAuditId(auditId);
352410
data.setCount(count);
353-
data.setSize(resultSet.getLong(3));
354411
data.setDelay(CacheUtils.calculateAverageDelay(count, resultSet.getLong(4)));
355412
result.add(data);
356413
}
@@ -397,21 +454,33 @@ public StatData doQueryAuditData(DataSource dataSource, String startTime, String
397454
String querySQL = distinct
398455
? Configuration.getInstance().get(KEY_RECONCILIATION_DISTINCT_SQL, DEFAULT_RECONCILIATION_DISTINCT_SQL)
399456
: Configuration.getInstance().get(KEY_RECONCILIATION_SQL, DEFAULT_RECONCILIATION_SQL);
457+
List<String> paramList = new ArrayList<>();
458+
if (WILDCARD_STREAM_ID.equals(inlongStreamId)) {
459+
querySQL = AuditUtils.removeStreamIdCondition(querySQL);
460+
paramList.add(startTime);
461+
paramList.add(endTime);
462+
paramList.add(auditId);
463+
paramList.add(inlongGroupId);
464+
} else {
465+
paramList.add(startTime);
466+
paramList.add(endTime);
467+
paramList.add(auditId);
468+
paramList.add(inlongGroupId);
469+
paramList.add(inlongStreamId);
470+
}
471+
paramList.add(auditTag);
400472

401473
try (Connection connection = dataSource.getConnection();
402474
PreparedStatement pstat = connection.prepareStatement(querySQL)) {
475+
for (int i = 0; i < paramList.size(); i++) {
476+
pstat.setString(i + 1, paramList.get(i));
477+
}
403478

404-
pstat.setString(1, startTime);
405-
pstat.setString(2, endTime);
406-
pstat.setString(3, auditId);
407-
pstat.setString(4, inlongGroupId);
408-
pstat.setString(5, inlongStreamId);
409-
pstat.setString(6, auditTag);
410479
try (ResultSet resultSet = pstat.executeQuery()) {
411480
while (resultSet.next()) {
412481
StatData data = new StatData();
413-
data.setAuditVersion(resultSet.getLong(1));
414-
data.setCount(resultSet.getLong(2));
482+
data.setAuditVersion(resultSet.getLong(COLUMN_AUDIT_VERSION));
483+
data.setCount(resultSet.getLong(COLUMN_CNT));
415484
data.setLogTs(startTime);
416485
data.setInlongGroupId(inlongGroupId);
417486
data.setInlongStreamId(inlongStreamId);

0 commit comments

Comments
 (0)