Skip to content

Commit b21d069

Browse files
committed
modify:unread message and bind and unbind protocol to http reply
delete:SUBSCRIBE and UNSUBSCRIBE for mqtt protocol
1 parent 1fe6ec5 commit b21d069

9 files changed

Lines changed: 141 additions & 360 deletions

File tree

XEngine_Source/MQCore_ProtocolModule/MQCore_ProtocolModule.def

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,7 @@ EXPORTS
1313
ProtocolModule_Packet_TopicList
1414
ProtocolModule_Packet_OnlineList
1515
ProtocolModule_Packet_TopicName
16-
ProtocolModule_Packet_UNReadCreate
17-
ProtocolModule_Packet_UNReadInsert
18-
ProtocolModule_Packet_UNReadDelete
16+
ProtocolModule_Packet_UNReadMsg
1917

2018
ProtocolModule_Parse_Websocket
2119
ProtocolModule_Parse_Register

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.cpp

Lines changed: 33 additions & 143 deletions
Original file line numberDiff line numberDiff line change
@@ -576,159 +576,66 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_TopicName(XCHAR* ptszMsgBuffe
576576
return true;
577577
}
578578
/********************************************************************
579-
函数名称:ProtocolModule_Packet_UNReadCreate
580-
函数功能:未读消息打包创建函数
581-
参数.一:pSt_ProtocolHdr
582-
In/Out:In
583-
类型:数据结构指针
584-
可空:N
585-
意思:输入要打包的协议头
586-
参数.二:enPayType
587-
In/Out:In
588-
类型:枚举型
579+
函数名称:ProtocolModule_Packet_UNReadMsg
580+
函数功能:获取未读消息打包函数
581+
参数.一:ptszMsgBuffer
582+
In/Out:Out
583+
类型:字符指针
589584
可空:N
590-
意思:输入打包的负载类型
591-
返回值
592-
类型:逻辑型
593-
意思:是否成功
594-
备注:
595-
*********************************************************************/
596-
XHANDLE CProtocolModule_Packet::ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType)
597-
{
598-
Protocol_IsErrorOccur = false;
599-
600-
if (NULL == pSt_ProtocolHdr)
601-
{
602-
Protocol_IsErrorOccur = true;
603-
Protocol_dwErrorCode = ERROR_MQ_MODULE_PROTOCOL_PARAMENT;
604-
return NULL;
605-
}
606-
//申请内存
607-
PROTOCOL_PACKETUNREAD* pSt_UNRead = new PROTOCOL_PACKETUNREAD;
608-
if (NULL == pSt_UNRead)
609-
{
610-
Protocol_IsErrorOccur = true;
611-
Protocol_dwErrorCode = ERROR_MQ_MODULE_PROTOCOL_MALLOC;
612-
return NULL;
613-
}
614-
memset(&pSt_UNRead->st_ProtocolHdr, '\0', sizeof(XENGINE_PROTOCOLHDR));
615-
616-
pSt_UNRead->nType = enPayType;
617-
if (pSt_UNRead->nType == ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN)
618-
{
619-
pSt_UNRead->st_ProtocolHdr.wHeader = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_HEADER;
620-
pSt_UNRead->st_ProtocolHdr.wTail = XENGIEN_COMMUNICATION_PACKET_PROTOCOL_TAIL;
621-
pSt_UNRead->st_ProtocolHdr.unOperatorType = ENUM_XENGINE_COMMUNICATION_PROTOCOL_TYPE_XMQ;
622-
pSt_UNRead->st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPUNREAD;
623-
pSt_UNRead->st_ProtocolHdr.byVersion = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
624-
}
625-
else
626-
{
627-
pSt_UNRead->st_JsonRoot["wHeader"] = pSt_ProtocolHdr->wHeader;
628-
pSt_UNRead->st_JsonRoot["wTail"] = pSt_ProtocolHdr->wTail;
629-
pSt_UNRead->st_JsonRoot["unOperatorType"] = pSt_ProtocolHdr->unOperatorType;
630-
pSt_UNRead->st_JsonRoot["unOperatorCode"] = pSt_ProtocolHdr->unOperatorCode;
631-
pSt_UNRead->st_JsonRoot["byVersion"] = ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_JSON;
632-
pSt_UNRead->st_JsonRoot["wReserve"] = 0;
633-
}
634-
return pSt_UNRead;
635-
}
636-
/********************************************************************
637-
函数名称:ProtocolModule_Packet_UNReadInsert
638-
函数功能:消息打包数据插入
639-
参数.一:xhToken
640-
In/Out:In
641-
类型:句柄
585+
意思:输出打包的内容
586+
参数.二:pInt_MsgLen
587+
In/Out:Out
588+
类型:整数型指针
642589
可空:N
643-
意思:输入要操作的句柄
644-
参数.二:lpszKeyName
590+
意思:输出打包大小
591+
参数.三:pppSt_UserKey
645592
In/Out:In
646593
类型:三级指针
647594
可空:N
648-
意思:输入队列名称
649-
参数.:nListCount
595+
意思:输入要打包的数据
596+
参数.:nListCount
650597
In/Out:In
651598
类型:整数型
652599
可空:N
653-
意思:输入队列个数
654-
返回值
655-
类型:逻辑型
656-
意思:是否成功
657-
备注:
658-
*********************************************************************/
659-
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount)
660-
{
661-
Protocol_IsErrorOccur = false;
662-
663-
PROTOCOL_PACKETUNREAD* pSt_UNRead = (PROTOCOL_PACKETUNREAD*)xhToken;
664-
if (NULL == pSt_UNRead)
665-
{
666-
Protocol_IsErrorOccur = true;
667-
Protocol_dwErrorCode = ERROR_MQ_MODULE_PROTOCOL_NOTFOUND;
668-
return false;
669-
}
670-
Json::Value st_JsonSubArray;
671-
672-
st_JsonSubArray["Name"] = lpszKeyName;
673-
st_JsonSubArray["Count"] = nListCount;
674-
pSt_UNRead->st_JsonArray.append(st_JsonSubArray);
675-
return true;
676-
}
677-
/********************************************************************
678-
函数名称:ProtocolModule_Packet_UNReadDelete
679-
函数功能:删除数据并且导出
680-
参数.一:xhToken
681-
In/Out:In
682-
类型:句柄
683-
可空:N
684-
意思:输入要操作的句柄
685-
参数.二:ptszMsgBuffer
686-
In/Out:Out
687-
类型:字符指针
688-
可空:N
689-
意思:输出打好包的数据
690-
参数.三:pInt_MsgLen
691-
In/Out:Out
692-
类型:整数型指针
693-
可空:N
694-
意思:输出数据大小
600+
意思:输入要打包的数据的个数
695601
返回值
696602
类型:逻辑型
697603
意思:是否成功
698604
备注:
699605
*********************************************************************/
700-
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR* ptszMsgBuffer, int* pInt_MsgLen)
606+
bool CProtocolModule_Packet::ProtocolModule_Packet_UNReadMsg(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XENGINE_DBUSERKEY*** pppSt_UserKey, int nListCount)
701607
{
702608
Protocol_IsErrorOccur = false;
703609

704-
PROTOCOL_PACKETUNREAD* pSt_UNRead = (PROTOCOL_PACKETUNREAD*)xhToken;
705-
if (NULL == pSt_UNRead)
610+
if (NULL == ptszMsgBuffer || NULL == pInt_MsgLen)
706611
{
707612
Protocol_IsErrorOccur = true;
708613
Protocol_dwErrorCode = ERROR_MQ_MODULE_PROTOCOL_NOTFOUND;
709614
return false;
710615
}
616+
Json::Value st_JsonRoot;
617+
Json::Value st_JsonArray;
711618
Json::StreamWriterBuilder st_JsonBuilder;
712-
st_JsonBuilder["emitUTF8"] = true;
713619

714-
pSt_UNRead->st_JsonRoot["Array"] = pSt_UNRead->st_JsonArray;
715-
pSt_UNRead->st_JsonRoot["Count"] = pSt_UNRead->st_JsonArray.size();
620+
st_JsonRoot["code"] = 0;
621+
st_JsonRoot["msg"] = "success";
716622

717-
if (pSt_UNRead->nType == ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE_BIN)
718-
{
719-
pSt_UNRead->st_ProtocolHdr.unPacketSize = Json::writeString(st_JsonBuilder, pSt_UNRead->st_JsonRoot).length();
720-
memcpy(ptszMsgBuffer, &pSt_UNRead->st_ProtocolHdr, sizeof(XENGINE_PROTOCOLHDR));
721-
memcpy(ptszMsgBuffer + sizeof(XENGINE_PROTOCOLHDR), Json::writeString(st_JsonBuilder, pSt_UNRead->st_JsonRoot).c_str(), pSt_UNRead->st_ProtocolHdr.unPacketSize);
722-
*pInt_MsgLen = sizeof(XENGINE_PROTOCOLHDR) + pSt_UNRead->st_ProtocolHdr.unPacketSize;
723-
}
724-
else
623+
for (int i = 0; i < nListCount; i++)
725624
{
726-
*pInt_MsgLen = Json::writeString(st_JsonBuilder, pSt_UNRead->st_JsonRoot).length();
727-
memcpy(ptszMsgBuffer, Json::writeString(st_JsonBuilder, pSt_UNRead->st_JsonRoot).c_str(), *pInt_MsgLen);
625+
Json::Value st_JsonObject;
626+
st_JsonObject["tszUserName"] = (*pppSt_UserKey)[i]->tszUserName;
627+
st_JsonObject["nKeySerial"] = (*pppSt_UserKey)[i]->nKeySerial;
628+
st_JsonObject["tszCreateTime"] = (*pppSt_UserKey)[i]->tszCreateTime;
629+
st_JsonObject["tszKeyName"] = (*pppSt_UserKey)[i]->tszKeyName;
630+
st_JsonObject["tszUPTime"] = (*pppSt_UserKey)[i]->tszUPTime;
631+
st_JsonArray.append(st_JsonObject);
728632
}
633+
st_JsonRoot["Array"] = st_JsonArray;
634+
st_JsonRoot["Count"] = nListCount;
729635

730-
delete pSt_UNRead;
731-
pSt_UNRead = NULL;
636+
st_JsonBuilder["emitUTF8"] = true;
637+
*pInt_MsgLen = Json::writeString(st_JsonBuilder, st_JsonRoot).length();
638+
memcpy(ptszMsgBuffer, Json::writeString(st_JsonBuilder, st_JsonRoot).c_str(), *pInt_MsgLen);
732639
return true;
733640
}
734641
///////////////////////////////////////////////////////////////////////////////
@@ -965,23 +872,6 @@ bool CProtocolModule_Packet::ProtocolModule_Packet_MQTTCommon(XENGINE_PROTOCOLHD
965872

966873
}
967874
}
968-
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPTOPICBIND == pSt_ProtocolHdr->unOperatorCode)
969-
{
970-
if (0 == pSt_ProtocolHdr->wReserve)
971-
{
972-
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
973-
}
974-
else
975-
{
976-
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_TOPICNAME);
977-
}
978-
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_SUBACK, tszRVBuffer, nRVLen);
979-
}
980-
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REPTOPICUNBIND == pSt_ProtocolHdr->unOperatorCode)
981-
{
982-
MQTTProtocol_Packet_REPComm(tszRVBuffer, &nRVLen, pSt_ProtocolHdr->wPacketSerial, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_REASON_SUCCESS);
983-
MQTTProtocol_Packet_Header(ptszMsgBuffer, pInt_MsgLen, XENGINE_RFCCOMPONENTS_MQTT_PROTOCOL_TYPE_UNSUBACK, tszRVBuffer, nRVLen);
984-
}
985875
else if (XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_MSGNOTIFY == pSt_ProtocolHdr->unOperatorCode)
986876
{
987877
MQTTProtocol_Packet_REQPublish(tszRVBuffer, &nRVLen, pSt_MQProtocol->tszMQKey, lpszMsgBuffer, nMsgLen);

XEngine_Source/MQCore_ProtocolModule/ProtocolModule_Packet/ProtocolModule_Packet.h

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,7 @@ class CProtocolModule_Packet
3535
bool ProtocolModule_Packet_TopicList(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XCHAR*** pppszTableName, int nListCount);
3636
bool ProtocolModule_Packet_OnlineList(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XCHAR*** ppptszListUser, int nListCount);
3737
bool ProtocolModule_Packet_TopicName(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszTopicName, int nTopicCount);
38-
public:
39-
XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType);
40-
bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount);
41-
bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR* ptszMsgBuffer, int* pInt_MsgLen);
38+
bool ProtocolModule_Packet_UNReadMsg(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XENGINE_DBUSERKEY*** pppSt_UserKey, int nListCount);
4239
protected:
4340
bool ProtocolModule_Packet_TCPCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);
4441
bool ProtocolModule_Packet_WSCommon(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, XENGINE_PROTOCOL_XMQ* pSt_MQProtocol, XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszMsgBuffer = NULL, int nMsgLen = 0);

XEngine_Source/MQCore_ProtocolModule/Protocol_Define.h

Lines changed: 15 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -323,72 +323,34 @@ extern "C" bool ProtocolModule_Packet_OnlineList(XCHAR* ptszMsgBuffer, int* pInt
323323
*********************************************************************/
324324
extern "C" bool ProtocolModule_Packet_TopicName(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, LPCXSTR lpszTopicName, int nTopicCount);
325325
/********************************************************************
326-
函数名称:ProtocolModule_Packet_UNReadCreate
327-
函数功能:未读消息打包创建函数
328-
参数.一:pSt_ProtocolHdr
329-
In/Out:In
330-
类型:数据结构指针
331-
可空:N
332-
意思:输入要打包的协议头
333-
参数.二:enPayType
334-
In/Out:In
335-
类型:枚举型
326+
函数名称:ProtocolModule_Packet_UNReadMsg
327+
函数功能:获取未读消息打包函数
328+
参数.一:ptszMsgBuffer
329+
In/Out:Out
330+
类型:字符指针
336331
可空:N
337-
意思:输入打包的负载类型
338-
返回值
339-
类型:逻辑型
340-
意思:是否成功
341-
备注:
342-
*********************************************************************/
343-
extern "C" XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR* pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType);
344-
/********************************************************************
345-
函数名称:ProtocolModule_Packet_UNReadInsert
346-
函数功能:消息打包数据插入
347-
参数.一:xhToken
348-
In/Out:In
349-
类型:句柄
332+
意思:输出打包的内容
333+
参数.二:pInt_MsgLen
334+
In/Out:Out
335+
类型:整数型指针
350336
可空:N
351-
意思:输入要操作的句柄
352-
参数.二:lpszKeyName
337+
意思:输出打包大小
338+
参数.三:pppSt_UserKey
353339
In/Out:In
354340
类型:三级指针
355341
可空:N
356-
意思:输入队列名称
357-
参数.:nListCount
342+
意思:输入要打包的数据
343+
参数.:nListCount
358344
In/Out:In
359345
类型:整数型
360346
可空:N
361-
意思:输入队列个数
362-
返回值
363-
类型:逻辑型
364-
意思:是否成功
365-
备注:
366-
*********************************************************************/
367-
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount);
368-
/********************************************************************
369-
函数名称:ProtocolModule_Packet_UNReadDelete
370-
函数功能:删除数据并且导出
371-
参数.一:xhToken
372-
In/Out:In
373-
类型:句柄
374-
可空:N
375-
意思:输入要操作的句柄
376-
参数.二:ptszMsgBuffer
377-
In/Out:Out
378-
类型:字符指针
379-
可空:N
380-
意思:输出打好包的数据
381-
参数.三:pInt_MsgLen
382-
In/Out:Out
383-
类型:整数型指针
384-
可空:N
385-
意思:输出数据大小
347+
意思:输入要打包的数据的个数
386348
返回值
387349
类型:逻辑型
388350
意思:是否成功
389351
备注:
390352
*********************************************************************/
391-
extern "C" bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR* ptszMsgBuffer, int* pInt_MsgLen);
353+
extern "C" bool ProtocolModule_Packet_UNReadMsg(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XENGINE_DBUSERKEY*** pppSt_UserKey, int nListCount);
392354
/************************************************************************/
393355
/* 解析类函数 */
394356
/************************************************************************/

XEngine_Source/MQCore_ProtocolModule/pch.cpp

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -71,17 +71,9 @@ extern "C" bool ProtocolModule_Packet_TopicName(XCHAR * ptszMsgBuffer, int* pInt
7171
{
7272
return m_ProtocolPacket.ProtocolModule_Packet_TopicName(ptszMsgBuffer, pInt_MsgLen, lpszTopicName, nTopicCount);
7373
}
74-
extern "C" XHANDLE ProtocolModule_Packet_UNReadCreate(XENGINE_PROTOCOLHDR * pSt_ProtocolHdr, ENUM_XENGINE_PROTOCOLHDR_PAYLOAD_TYPE enPayType)
74+
extern "C" bool ProtocolModule_Packet_UNReadMsg(XCHAR* ptszMsgBuffer, int* pInt_MsgLen, XENGINE_DBUSERKEY*** pppSt_UserKey, int nListCount)
7575
{
76-
return m_ProtocolPacket.ProtocolModule_Packet_UNReadCreate(pSt_ProtocolHdr, enPayType);
77-
}
78-
extern "C" bool ProtocolModule_Packet_UNReadInsert(XHANDLE xhToken, LPCXSTR lpszKeyName, int nListCount)
79-
{
80-
return m_ProtocolPacket.ProtocolModule_Packet_UNReadInsert(xhToken, lpszKeyName, nListCount);
81-
}
82-
extern "C" bool ProtocolModule_Packet_UNReadDelete(XHANDLE xhToken, XCHAR * ptszMsgBuffer, int* pInt_MsgLen)
83-
{
84-
return m_ProtocolPacket.ProtocolModule_Packet_UNReadDelete(xhToken, ptszMsgBuffer, pInt_MsgLen);
76+
return m_ProtocolPacket.ProtocolModule_Packet_UNReadMsg(ptszMsgBuffer, pInt_MsgLen, pppSt_UserKey, nListCount);
8577
}
8678
/************************************************************************/
8779
/* 解析类函数 */

0 commit comments

Comments
 (0)