Skip to content

Commit d5d4012

Browse files
committed
added:heart support for mqtt
1 parent 1f64ecc commit d5d4012

7 files changed

Lines changed: 31 additions & 2 deletions

File tree

XEngine_Release/XEngine_Config/XEngine_Config.json

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,8 @@
1818
"nHeartCheck": 3,
1919
"nTCPTime": 5,
2020
"nWSTime": 5,
21-
"nHTTPTime": 5
21+
"nHTTPTime": 5,
22+
"nMQTTTime": 5
2223
},
2324
"XVerification": {
2425
"nTokenTimeout": 3600,

XEngine_Source/AuthorizeModule_Configure/Config_Define.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ typedef struct
3838
int nHeartCheck; //检测次数
3939
int nTCPTime; //TCP检测时间
4040
int nWSTime; //WEBSOCKET检测时间
41+
int nMQTime; //MQTT检测时间
4142
int nHTTPTime; //HTTP检测时间
4243
}st_XTime;
4344
struct

XEngine_Source/AuthorizeModule_Configure/ModuleConfigure_Json/ModuleConfigure_Json.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ bool CModuleConfigure_Json::ModuleConfigure_Json_File(LPCXSTR lpszConfigFile, XE
9494
pSt_ServerConfig->st_XMax.nHTTPThread = st_JsonXMax["nHTTPThread"].asInt();
9595
pSt_ServerConfig->st_XMax.nMQTTThread = st_JsonXMax["nMQTTThread"].asInt();
9696
//时间配置
97-
if (st_JsonRoot["XTime"].empty() || (4 != st_JsonRoot["XTime"].size()))
97+
if (st_JsonRoot["XTime"].empty() || (5 != st_JsonRoot["XTime"].size()))
9898
{
9999
Config_IsErrorOccur = true;
100100
Config_dwErrorCode = ERROR_AUTHORIZE_MODULE_CONFIGURE_XTIME;
@@ -105,6 +105,7 @@ bool CModuleConfigure_Json::ModuleConfigure_Json_File(LPCXSTR lpszConfigFile, XE
105105
pSt_ServerConfig->st_XTime.nTCPTime = st_JsonXTime["nTCPTime"].asInt();
106106
pSt_ServerConfig->st_XTime.nWSTime = st_JsonXTime["nWSTime"].asInt();
107107
pSt_ServerConfig->st_XTime.nHTTPTime = st_JsonXTime["nHTTPTime"].asInt();
108+
pSt_ServerConfig->st_XTime.nMQTime = st_JsonXTime["nMQTTTime"].asInt();
108109
//验证配置
109110
if (st_JsonRoot["XVerification"].empty() || (8 != st_JsonRoot["XVerification"].size()))
110111
{

XEngine_Source/XEngine_APPService/XEngine_AuthorizeService/Authorize_Hdr.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ extern XHANDLE xhHttpPacket;
7979
extern XHANDLE xhTCPHeart;
8080
extern XHANDLE xhWSHeart;
8181
extern XHANDLE xhHTTPHeart;
82+
extern XHANDLE xhMQTTHeart;
8283

8384
extern XHANDLE xhMemPool;
8485
extern XHANDLE xhTCPPool;

XEngine_Source/XEngine_APPService/XEngine_AuthorizeService/Authorize_Net.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,12 +101,17 @@ void XCALLBACK XEngine_Client_MQTTRecv(LPCXSTR lpszClientAddr, XSOCKET hSocket,
101101
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("MQTT客户端:%s,投递MQTT数据包到消息队列失败,错误:%lX"), lpszClientAddr, MQTTProtocol_GetLastError());
102102
return;
103103
}
104+
SocketOpt_HeartBeat_ActiveAddrEx(xhMQTTSocket, lpszClientAddr);
104105
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_DEBUG, _X("MQTT客户端:%s,投递MQTT数据包到消息队列成功,%d"), lpszClientAddr, nMsgLen);
105106
}
106107
void XCALLBACK XEngine_Client_MQTTLeave(LPCXSTR lpszClientAddr, XSOCKET hSocket, XPVOID lParam)
107108
{
108109
XEngine_CloseClient(lpszClientAddr, 0);
109110
}
111+
void XCALLBACK XEngine_Client_MQTTHeart(LPCXSTR lpszClientAddr, XSOCKET hSocket, int nStatus, XPVOID lParam)
112+
{
113+
XEngine_CloseClient(lpszClientAddr, 2);
114+
}
110115
//////////////////////////////////////////////////////////////////////////
111116
bool XEngine_CloseClient(LPCXSTR lpszClientAddr, int nLeaveType)
112117
{
@@ -207,6 +212,7 @@ bool XEngine_Client_TaskSend(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, int
207212
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("发送数据给MQTT客户端:%s,失败,错误:%lX"), lpszClientAddr, NetCore_GetLastError());
208213
return false;
209214
}
215+
SocketOpt_HeartBeat_ActiveAddrEx(xhMQTTSocket, lpszClientAddr);
210216
}
211217
else
212218
{

XEngine_Source/XEngine_APPService/XEngine_AuthorizeService/Authorize_Net.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ void XCALLBACK XEngine_Client_HttpHeart(LPCXSTR lpszClientAddr, XSOCKET hSocket,
1818
bool XCALLBACK XEngine_Client_MQTTLogin(LPCXSTR lpszClientAddr, XSOCKET hSocket, XPVOID lParam);
1919
void XCALLBACK XEngine_Client_MQTTRecv(LPCXSTR lpszClientAddr, XSOCKET hSocket, LPCXSTR lpszRecvMsg, int nMsgLen, XPVOID lParam);
2020
void XCALLBACK XEngine_Client_MQTTLeave(LPCXSTR lpszClientAddr, XSOCKET hSocket, XPVOID lParam);
21+
void XCALLBACK XEngine_Client_MQTTHeart(LPCXSTR lpszClientAddr, XSOCKET hSocket, int nStatus, XPVOID lParam);
2122

2223
bool XEngine_CloseClient(LPCXSTR lpszClientAddr, int nLeaveType = 0);
2324
bool XEngine_Client_TaskSend(LPCXSTR lpszClientAddr, LPCXSTR lpszMsgBuffer, int nMsgLen, int nNetType);

XEngine_Source/XEngine_APPService/XEngine_AuthorizeService/XEngine_AuthorizeService.cpp

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ XHANDLE xhHttpPacket = NULL;
1616
XHANDLE xhTCPHeart = NULL;
1717
XHANDLE xhWSHeart = NULL;
1818
XHANDLE xhHTTPHeart = NULL;
19+
XHANDLE xhMQTTHeart = NULL;
1920

2021
XHANDLE xhMemPool = NULL;
2122
XHANDLE xhTCPPool = NULL;
@@ -46,6 +47,7 @@ void ServiceApp_Stop(int signo)
4647
SocketOpt_HeartBeat_DestoryEx(xhTCPHeart);
4748
SocketOpt_HeartBeat_DestoryEx(xhWSHeart);
4849
SocketOpt_HeartBeat_DestoryEx(xhHTTPHeart);
50+
SocketOpt_HeartBeat_DestoryEx(xhMQTTHeart);
4951

5052
ManagePool_Thread_NQDestroy(xhTCPPool);
5153
ManagePool_Thread_NQDestroy(xhWSPool);
@@ -398,6 +400,21 @@ int main(int argc, char** argv)
398400
NetCore_TCPXCore_RegisterCallBackEx(xhMQTTSocket, XEngine_Client_MQTTLogin, XEngine_Client_MQTTRecv, XEngine_Client_MQTTLeave);
399401
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("启动服务中,注册MQTT网络事件成功"));
400402

403+
if (st_AuthConfig.st_XTime.nMQTime > 0)
404+
{
405+
xhMQTTHeart = SocketOpt_HeartBeat_InitEx(st_AuthConfig.st_XTime.nHeartCheck, st_AuthConfig.st_XTime.nMQTime, XEngine_Client_MQTTHeart);
406+
if (NULL == xhMQTTHeart)
407+
{
408+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X("启动服务中,初始化MQTT心跳服务失败,错误:%lX"), NetCore_GetLastError());
409+
goto XENGINE_EXITAPP;
410+
}
411+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X("启动服务中,初始化MQTT心跳服务成功,检测次数:%d,检测时间:%d"), st_AuthConfig.st_XTime.nHeartCheck, st_AuthConfig.st_XTime.nMQTime);
412+
}
413+
else
414+
{
415+
XLOG_PRINT(xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_WARN, _X("启动服务中,检测到MQTT心跳服务没有启用"));
416+
}
417+
401418
BaseLib_Memory_Malloc((XPPPMEM)&ppSt_ListMQTTParam, st_AuthConfig.st_XMax.nMQTTThread, sizeof(THREADPOOL_PARAMENT));
402419
for (int i = 0; i < st_AuthConfig.st_XMax.nMQTTThread; i++)
403420
{
@@ -506,6 +523,7 @@ int main(int argc, char** argv)
506523
SocketOpt_HeartBeat_DestoryEx(xhTCPHeart);
507524
SocketOpt_HeartBeat_DestoryEx(xhWSHeart);
508525
SocketOpt_HeartBeat_DestoryEx(xhHTTPHeart);
526+
SocketOpt_HeartBeat_DestoryEx(xhMQTTHeart);
509527

510528
ManagePool_Thread_NQDestroy(xhTCPPool);
511529
ManagePool_Thread_NQDestroy(xhWSPool);

0 commit comments

Comments
 (0)