Skip to content

Commit 482289c

Browse files
committed
ATK-4339&ATK-4341 :support slow log
1 parent ae3cad7 commit 482289c

20 files changed

Lines changed: 648 additions & 33 deletions

src/main/java/com/actiontech/dble/config/model/SystemConfig.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,17 @@ private SystemConfig() {
180180
//unit: ms
181181
private long releaseTimeout = 10L;
182182

183+
private int appendTraceId = 1;
184+
185+
186+
public int getAppendTraceId() {
187+
return appendTraceId;
188+
}
189+
190+
public void setAppendTraceId(int appendTraceId) {
191+
this.appendTraceId = appendTraceId;
192+
}
193+
183194
public int getEnableAsyncRelease() {
184195
return enableAsyncRelease;
185196
}

src/main/java/com/actiontech/dble/log/slow/SlowQueryLogEntry.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,21 +7,21 @@
77

88
import com.actiontech.dble.config.model.user.UserName;
99
import com.actiontech.dble.route.util.RouterUtil;
10-
import com.actiontech.dble.server.trace.TraceResult;
10+
import com.actiontech.dble.server.trace.ITraceResult;
1111

1212
import java.text.SimpleDateFormat;
1313
import java.util.Date;
1414
import java.util.List;
1515

1616
public class SlowQueryLogEntry {
17-
private TraceResult trace;
17+
private ITraceResult trace;
1818
private long timeStamp;
1919
private String sql;
2020
private UserName user;
2121
private String clientIp;
2222
private long connID;
2323

24-
SlowQueryLogEntry(String sql, TraceResult traceResult, UserName user, String clientIp, long connID) {
24+
SlowQueryLogEntry(String sql, ITraceResult traceResult, UserName user, String clientIp, long connID) {
2525
this.timeStamp = System.currentTimeMillis();
2626
this.sql = RouterUtil.getFixedSql(sql);
2727
this.trace = traceResult;

src/main/java/com/actiontech/dble/log/slow/SlowQueryLogProcessor.java

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
import com.actiontech.dble.config.model.SystemConfig;
1010
import com.actiontech.dble.log.DailyRotateLogStore;
1111
import com.actiontech.dble.server.status.SlowQueryLog;
12-
import com.actiontech.dble.server.trace.TraceResult;
13-
import com.actiontech.dble.services.mysqlsharding.ShardingService;
12+
import com.actiontech.dble.server.trace.ITraceResult;
13+
import com.actiontech.dble.services.BusinessService;
1414
import com.google.common.util.concurrent.ThreadFactoryBuilder;
1515
import org.slf4j.Logger;
1616
import org.slf4j.LoggerFactory;
@@ -136,9 +136,9 @@ public void run() {
136136
};
137137
}
138138

139-
public void putSlowQueryLog(ShardingService service, TraceResult log) {
140-
if (log.isCompleted() && log.getOverAllMilliSecond() > SlowQueryLog.getInstance().getSlowTime()) {
141-
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(service.getExecuteSql(), log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
139+
public void putSlowQueryLog(BusinessService service, ITraceResult log, String executeSql) {
140+
if (log.isCompleted() && log.getOverAllMilliSecond() >= SlowQueryLog.getInstance().getSlowTime()) {
141+
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
142142
final boolean enQueue = queue.offer(logEntry);
143143
if (!enQueue) {
144144
//abort
@@ -147,4 +147,15 @@ public void putSlowQueryLog(ShardingService service, TraceResult log) {
147147
}
148148
}
149149
}
150+
151+
152+
public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String executeSql) {
153+
SlowQueryLogEntry logEntry = new SlowQueryLogEntry(executeSql, log, service.getUser(), service.getConnection().getHost(), service.getConnection().getId());
154+
final boolean enQueue = queue.offer(logEntry);
155+
if (!enQueue) {
156+
//abort
157+
String errorMsg = "since there are too many slow query logs to be written, some slow query logs will be discarded so as not to affect business requirements. Discard log entry: {" + logEntry.toString() + "}";
158+
LOGGER.warn(errorMsg);
159+
}
160+
}
150161
}

src/main/java/com/actiontech/dble/route/parser/ManagerParseOnOff.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,9 @@ private ManagerParseOnOff() {
1919
public static final int CUSTOM_MYSQL_HA = 3;
2020
public static final int CAP_CLIENT_FOUND_ROWS = 4;
2121

22+
23+
public static final int APPEND_TRACE_ID = 5;
24+
2225
public static int parse(String stmt, int offset) {
2326
int i = offset;
2427
for (; i < stmt.length(); i++) {
@@ -64,6 +67,9 @@ private static int aCheck(String stmt, int offset) {
6467
if (prefix.startsWith("ALERT") && (stmt.length() == offset + 5 || ParseUtil.isEOF(stmt, offset + 5))) {
6568
return ALERT;
6669
}
70+
if (prefix.startsWith("APPENDTRACEID") && (stmt.length() == offset + 13 || ParseUtil.isEOF(stmt, offset + 13))) {
71+
return APPEND_TRACE_ID;
72+
}
6773
}
6874
return OTHER;
6975
}

src/main/java/com/actiontech/dble/rwsplit/RWSplitNonBlockingSession.java

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,15 @@
33
import com.actiontech.dble.backend.datasource.PhysicalDbGroup;
44
import com.actiontech.dble.backend.datasource.PhysicalDbInstance;
55
import com.actiontech.dble.backend.mysql.ByteUtil;
6+
import com.actiontech.dble.backend.mysql.nio.handler.ResponseHandler;
67
import com.actiontech.dble.config.ErrorCode;
78
import com.actiontech.dble.net.connection.BackendConnection;
89
import com.actiontech.dble.net.mysql.MySQLPacket;
10+
import com.actiontech.dble.server.SessionStage;
11+
import com.actiontech.dble.server.status.SlowQueryLog;
12+
import com.actiontech.dble.server.trace.RwTraceResult;
13+
import com.actiontech.dble.server.trace.TraceRecord;
14+
import com.actiontech.dble.services.mysqlsharding.MySQLResponseService;
915
import com.actiontech.dble.services.rwsplit.*;
1016
import com.actiontech.dble.singleton.RouteService;
1117
import com.actiontech.dble.util.StringUtil;
@@ -16,6 +22,8 @@
1622
import java.io.IOException;
1723
import java.sql.SQLException;
1824
import java.sql.SQLSyntaxErrorException;
25+
import java.util.Map;
26+
import java.util.concurrent.ConcurrentHashMap;
1927

2028
public class RWSplitNonBlockingSession {
2129

@@ -24,6 +32,9 @@ public class RWSplitNonBlockingSession {
2432
private volatile BackendConnection conn;
2533
private final RWSplitService rwSplitService;
2634
private PhysicalDbGroup rwGroup;
35+
private volatile RwTraceResult traceResult = new RwTraceResult();
36+
37+
private volatile SessionStage sessionStage = SessionStage.Init;
2738

2839
public RWSplitNonBlockingSession(RWSplitService service) {
2940
this.rwSplitService = service;
@@ -62,6 +73,10 @@ public void execute(Boolean master, byte[] originPacket, Callback callback, bool
6273
if (handler == null) return;
6374
PhysicalDbInstance instance = rwGroup.rwSelect(canRunOnMaster(master), isWriteStatistical(writeStatistical), localRead);
6475
checkDest(!instance.isReadInstance());
76+
endRoute();
77+
setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY);
78+
setTraceSimpleHandler((ResponseHandler) handler);
79+
traceResult.setDBInstance(instance);
6580
instance.getConnection(rwSplitService.getSchema(), handler, null, false);
6681
} catch (IOException e) {
6782
LOGGER.warn("select conn error", e);
@@ -78,6 +93,10 @@ private RWSplitHandler getRwSplitHandler(byte[] originPacket, Callback callback)
7893
if (LOGGER.isDebugEnabled()) {
7994
LOGGER.debug("select bind conn[id={}]", conn.getId());
8095
}
96+
endRoute();
97+
setPreExecuteEnd(RwTraceResult.SqlTraceType.RWSPLIT_QUERY);
98+
setTraceSimpleHandler(handler);
99+
traceResult.setDBInstance((PhysicalDbInstance) conn.getInstance());
81100
// for ps needs to send master
82101
if ((originPacket != null && originPacket.length > 4 && originPacket[4] == MySQLPacket.COM_STMT_EXECUTE)) {
83102
long statementId = ByteUtil.readUB4(originPacket, 5);
@@ -204,4 +223,105 @@ public RWSplitService getService() {
204223
public BackendConnection getConn() {
205224
return conn;
206225
}
226+
227+
228+
public void setRequestTime() {
229+
sessionStage = SessionStage.Read_SQL;
230+
long requestTime = 0;
231+
232+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
233+
requestTime = System.nanoTime();
234+
traceResult.setVeryStartPrepare(requestTime);
235+
}
236+
237+
}
238+
239+
public void startProcess() {
240+
sessionStage = SessionStage.Parse_SQL;
241+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
242+
traceResult.setParseStartPrepare(new TraceRecord(System.nanoTime()));
243+
}
244+
}
245+
246+
public void endParse() {
247+
sessionStage = SessionStage.Route_Calculation;
248+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
249+
traceResult.ready();
250+
// traceResult.setRouteStart(new TraceRecord(System.nanoTime()));
251+
}
252+
}
253+
254+
255+
public void endRoute() {
256+
sessionStage = SessionStage.Prepare_to_Push;
257+
}
258+
259+
260+
public void setPreExecuteEnd(RwTraceResult.SqlTraceType type) {
261+
sessionStage = SessionStage.Execute_SQL;
262+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
263+
traceResult.setType(type);
264+
traceResult.setPreExecuteEnd(new TraceRecord(System.nanoTime()));
265+
traceResult.clearConnReceivedMap();
266+
traceResult.clearConnFlagMap();
267+
}
268+
}
269+
270+
public void setTraceSimpleHandler(ResponseHandler simpleHandler) {
271+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
272+
traceResult.setSimpleHandler(simpleHandler);
273+
}
274+
}
275+
276+
277+
public void setResponseTime(boolean isSuccess) {
278+
sessionStage = SessionStage.Finished;
279+
long responseTime = 0;
280+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
281+
responseTime = System.nanoTime();
282+
traceResult.setVeryEnd(responseTime);
283+
if (isSuccess && SlowQueryLog.getInstance().isEnableSlowLog()) {
284+
SlowQueryLog.getInstance().putSlowQueryLog(this.rwSplitService, (RwTraceResult) traceResult);
285+
traceResult = new RwTraceResult();
286+
}
287+
}
288+
}
289+
290+
public void setBackendResponseEndTime(MySQLResponseService service) {
291+
sessionStage = SessionStage.First_Node_Fetched_Result;
292+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
293+
ResponseHandler responseHandler = service.getResponseHandler();
294+
if (responseHandler != null) {
295+
TraceRecord record = new TraceRecord(System.nanoTime());
296+
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
297+
String key = String.valueOf(service.getConnection().getId());
298+
connMap.put(key, record);
299+
traceResult.addToConnFinishedMap(responseHandler, connMap);
300+
}
301+
}
302+
303+
}
304+
305+
306+
307+
public void setBackendResponseTime(MySQLResponseService service) {
308+
sessionStage = SessionStage.Fetching_Result;
309+
long responseTime = 0;
310+
if (SlowQueryLog.getInstance().isEnableSlowLog()) {
311+
ResponseHandler responseHandler = service.getResponseHandler();
312+
String key = String.valueOf(service.getConnection().getId());
313+
if (responseHandler != null && traceResult.addToConnFlagMap(key) == null) {
314+
responseTime = System.nanoTime();
315+
TraceRecord record = new TraceRecord(responseTime);
316+
Map<String, TraceRecord> connMap = new ConcurrentHashMap<>();
317+
connMap.put(key, record);
318+
traceResult.addToConnReceivedMap(responseHandler, connMap);
319+
}
320+
}
321+
322+
323+
}
324+
325+
326+
207327
}

src/main/java/com/actiontech/dble/server/status/SlowQueryLog.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
import com.actiontech.dble.config.model.SystemConfig;
99
import com.actiontech.dble.log.slow.SlowQueryLogProcessor;
10-
import com.actiontech.dble.server.trace.TraceResult;
11-
import com.actiontech.dble.services.mysqlsharding.ShardingService;
10+
import com.actiontech.dble.server.trace.ITraceResult;
11+
import com.actiontech.dble.services.BusinessService;
1212
import org.slf4j.Logger;
1313
import org.slf4j.LoggerFactory;
1414

@@ -84,7 +84,11 @@ public void setFlushSize(int flushSize) {
8484
this.flushSize = flushSize;
8585
}
8686

87-
public void putSlowQueryLog(ShardingService service, TraceResult log) {
88-
processor.putSlowQueryLog(service, log);
87+
public void putSlowQueryLog(BusinessService service, ITraceResult log) {
88+
processor.putSlowQueryLog(service, log, service.getExecuteSql());
89+
}
90+
91+
public void putSlowQueryLogForce(BusinessService service, ITraceResult log, String sql) {
92+
processor.putSlowQueryLogForce(service, log, sql);
8993
}
9094
}
Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
/*
2+
* Copyright (C) 2016-2023 ActionTech.
3+
* based on code by MyCATCopyrightHolder Copyright (c) 2013, OpenCloudDB/MyCAT.
4+
* License: http://www.gnu.org/licenses/gpl.html GPL version 2 or higher.
5+
*/
6+
7+
package com.actiontech.dble.server.trace;
8+
9+
import java.util.List;
10+
11+
/**
12+
* @author dcy
13+
* Create Date: 2025-04-17
14+
*/
15+
public interface ITraceResult {
16+
public enum SqlTraceType {
17+
SINGLE_NODE_QUERY, MULTI_NODE_QUERY, MULTI_NODE_GROUP, COMPLEX_QUERY, RWSPLIT_QUERY;
18+
}
19+
20+
boolean isCompleted();
21+
22+
RwTraceResult.SqlTraceType getType();
23+
24+
List<String[]> genLogResult();
25+
26+
double getOverAllMilliSecond();
27+
28+
String getOverAllSecond();
29+
}

0 commit comments

Comments
 (0)