@@ -23,6 +23,7 @@ bool MessageQueue_HttpTask_Post(LPCXSTR lpszClientAddr, LPCXSTR lpszFuncName, LP
2323 LPCXSTR lpszAPICreateTopic = _X (" createtopic" );
2424 LPCXSTR lpszAPIDelTopic = _X (" deletetopic" );
2525 LPCXSTR lpszAPIDelUser = _X (" deleteuser" );
26+ LPCXSTR lpszAPIDelMsg = _X (" deletemsg" );
2627 // 判断请求
2728 if (0 == _tcsxncmp (lpszAPIRegister, lpszFuncName, _tcsxlen (lpszAPIRegister)))
2829 {
@@ -36,7 +37,7 @@ bool MessageQueue_HttpTask_Post(LPCXSTR lpszClientAddr, LPCXSTR lpszFuncName, LP
3637 }
3738 if (DBModule_MQUser_UserQuery (&st_UserInfo))
3839 {
39- ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_EXISTED , _X (" user is existed" ));
40+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_EXIST , _X (" user is existed" ));
4041 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
4142 XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP客户端:%s,请求用户注册失败,用户已经存在,错误:%lX" ), lpszClientAddr, SessionModule_GetLastError ());
4243 return false ;
@@ -114,10 +115,11 @@ bool MessageQueue_HttpTask_Post(LPCXSTR lpszClientAddr, LPCXSTR lpszFuncName, LP
114115 {
115116 // 主题 http://127.0.0.1:5202/api?function=gettopic
116117 int nDBCount = 0 ;
117- XCHAR tszTopicName[XPATH_MAX] = {};
118- ProtocolModule_Parse_Name (lpszMsgBuffer, nMsgLen, tszTopicName);
119- DBModule_MQData_GetLeftCount (tszTopicName, 0 , &nDBCount);
120- ProtocolModule_Packet_TopicName (tszSDBuffer, &nSDLen, tszTopicName, nDBCount);
118+ XENGINE_PROTOCOL_XMQ st_MQProtocol = {};
119+
120+ ProtocolModule_Parse_XMQ (lpszMsgBuffer, nMsgLen, &st_MQProtocol);
121+ DBModule_MQData_GetLeftCount (st_MQProtocol.tszMQKey , 0 , &nDBCount);
122+ ProtocolModule_Packet_TopicName (tszSDBuffer, &nSDLen, st_MQProtocol.tszMQKey , nDBCount);
121123 XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
122124 }
123125 else if (0 == _tcsxncmp (lpszAPIGetList, lpszFuncName, _tcsxlen (lpszAPIGetList)))
@@ -134,33 +136,126 @@ bool MessageQueue_HttpTask_Post(LPCXSTR lpszClientAddr, LPCXSTR lpszFuncName, LP
134136 else if (0 == _tcsxncmp (lpszAPICreateTopic, lpszFuncName, _tcsxlen (lpszAPICreateTopic)))
135137 {
136138 // http://127.0.0.1:5202/api?function=createtopic
137-
138139 XENGINE_PROTOCOL_XMQ st_MQProtocol = {};
139- XENGINE_PROTOCOLHDR st_ProtocolHdr = {};
140140
141- ProtocolModule_Parse_Name (lpszMsgBuffer, nMsgLen, st_MQProtocol.tszMQKey );
142- APIHelp_MQHelp_JsonToHex (&st_ProtocolHdr);
141+ if (!ProtocolModule_Parse_XMQ (lpszMsgBuffer, nMsgLen, &st_MQProtocol))
142+ {
143+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_PARSE, _X (" request json parse failure" ));
144+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
145+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,请求的创建主题失败,数据不正确:%s" ), lpszClientAddr, lpszMsgBuffer);
146+ return false ;
147+ }
148+ int nListCount = 0 ;
149+ XCHAR** ppszTableName;
150+ // 检查表是否存在
151+ DBModule_MQData_ShowTable (&ppszTableName, &nListCount);
152+ for (int i = 0 ; i < nListCount; i++)
153+ {
154+ if (0 == _tcsxnicmp (ppszTableName[i], st_MQProtocol.tszMQKey , _tcsxlen (ppszTableName[i])))
155+ {
156+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_EXIST, _X (" topic name is exist" ));
157+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
158+ BaseLib_Memory_Free ((XPPPMEM)&ppszTableName, nListCount);
159+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,创建主题失败,主题名称:%s,主题存在,无法继续" ), lpszClientAddr, st_MQProtocol.tszMQKey );
160+ return false ;
161+ }
162+ }
163+ BaseLib_Memory_Free ((XPPPMEM)&ppszTableName, nListCount);
164+ // 创建表
165+ if (!DBModule_MQData_CreateTable (st_MQProtocol.tszMQKey ))
166+ {
167+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_SERVICE, _X (" create topic is failure" ));
168+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
169+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,创建主题失败,创建表失败,主题名称:%s,无法继续,错误:%lX" ), lpszClientAddr, st_MQProtocol.tszMQKey , DBModule_GetLastError ());
170+ return false ;
171+ }
172+ // 插入所有者
173+ XENGINE_DBTOPICOWNER st_DBOwner;
174+ memset (&st_DBOwner, ' \0 ' , sizeof (XENGINE_DBTOPICOWNER));
175+
176+ _tcsxcpy (st_DBOwner.tszUserName , st_MQProtocol.tszMQUsr );
177+ _tcsxcpy (st_DBOwner.tszQueueName , st_MQProtocol.tszMQKey );
143178
144- st_ProtocolHdr.xhToken = xhToken;
145- st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICCREATE;
146- MessageQueue_TCP_Handle (&st_ProtocolHdr, lpszClientAddr, (LPCXSTR)&st_MQProtocol, sizeof (XENGINE_PROTOCOL_XMQ), XENGINE_MQAPP_NETTYPE_HTTP);
147- XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" HTTP客户端:%s,请求主题删除成功,主题名:%s" ), lpszClientAddr, st_MQProtocol.tszMQKey );
179+ if (!DBModule_MQUser_OwnerInsert (&st_DBOwner))
180+ {
181+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_SERVICE, _X (" create topic bind with user is failure" ));
182+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
183+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,创建主题失败,插入所有者失败,主题名称:%s,无法继续,错误:%lX" ), lpszClientAddr, st_MQProtocol.tszMQKey , DBModule_GetLastError ());
184+ return false ;
185+ }
186+ // 回复
187+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen);
188+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
189+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" HTTP消息端:%s,主题:%s,创建主题成功" ), lpszClientAddr, st_MQProtocol.tszMQKey );
148190 }
149191 else if (0 == _tcsxncmp (lpszAPIDelTopic, lpszFuncName, _tcsxlen (lpszAPIDelTopic)))
150192 {
151193 // http://127.0.0.1:5202/api?function=deletetopic
152-
194+ XENGINE_DBTOPICOWNER st_DBOwner = {};
195+ XENGINE_DBUSERKEY st_UserKey = {};
196+ XENGINE_DBTIMERELEASE st_DBInfo = {};
153197 XENGINE_PROTOCOL_XMQ st_MQProtocol = {};
154- XENGINE_PROTOCOLHDR st_ProtocolHdr = {};
155-
156- ProtocolModule_Parse_Name (lpszMsgBuffer, nMsgLen, st_MQProtocol.tszMQKey );
157- APIHelp_MQHelp_JsonToHex (&st_ProtocolHdr);
158-
159- st_ProtocolHdr.xhToken = xhToken;
160- st_ProtocolHdr.unOperatorCode = XENGINE_COMMUNICATION_PROTOCOL_OPERATOR_CODE_MQ_REQTOPICDELETE;
161- MessageQueue_TCP_Handle (&st_ProtocolHdr, lpszClientAddr, (LPCXSTR)&st_MQProtocol, sizeof (XENGINE_PROTOCOL_XMQ), XENGINE_MQAPP_NETTYPE_HTTP);
198+
199+ ProtocolModule_Parse_XMQ (lpszMsgBuffer, nMsgLen, &st_MQProtocol);
200+
201+ _tcsxcpy (st_DBOwner.tszUserName , st_MQProtocol.tszMQUsr );
202+ _tcsxcpy (st_DBOwner.tszQueueName , st_MQProtocol.tszMQKey );
203+ _tcsxcpy (st_UserKey.tszKeyName , st_MQProtocol.tszMQKey );
204+ _tcsxcpy (st_DBInfo.tszQueueName , st_MQProtocol.tszMQKey );
205+
206+ if (_tcsxlen (st_DBOwner.tszUserName ) <= 0 )
207+ {
208+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_MISS, _X (" user name missing" ));
209+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
210+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,删除主题失败,删除所有者失败,主题名称:%s,用户名为空" ), lpszClientAddr, st_MQProtocol.tszMQKey , st_DBOwner.tszUserName );
211+ return false ;
212+ }
213+ if (!DBModule_MQUser_OwnerDelete (&st_DBOwner))
214+ {
215+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_SERVICE, _X (" delete owner db failure" ));
216+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
217+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,删除主题失败,删除所有者失败,主题名称:%s,无法继续,错误:%lX" ), lpszClientAddr, st_MQProtocol.tszMQKey , DBModule_GetLastError ());
218+ return false ;
219+ }
220+ // 清楚数据库
221+ APIHelp_Counter_SerialDel (st_MQProtocol.tszMQKey );
222+ DBModule_MQData_DeleteTable (st_MQProtocol.tszMQKey );
223+ DBModule_MQUser_KeyDelete (&st_UserKey);
224+ DBModule_MQUser_TimeDelete (&st_DBInfo);
225+
226+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen);
227+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
162228 XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" HTTP客户端:%s,请求主题删除成功,主题名:%s" ), lpszClientAddr, st_MQProtocol.tszMQKey );
163229 }
230+ else if (0 == _tcsxncmp (lpszAPIDelMsg, lpszFuncName, _tcsxlen (lpszAPIDelMsg)))
231+ {
232+ // http://127.0.0.1:5202/api?function=deletemsg
233+ XENGINE_PROTOCOL_XMQ st_MQProtocol = {};
234+ XENGINE_DBMESSAGEQUEUE st_MessageQueue = {};
235+
236+ ProtocolModule_Parse_XMQ (lpszMsgBuffer, nMsgLen, &st_MQProtocol);
237+
238+ if (st_MQProtocol.nSerial <= 0 )
239+ {
240+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_MISS, _X (" message serial not set" ));
241+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
242+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,主题:%s,删除消息数据失败,删除指定消息序列:%lld 失败" ), lpszClientAddr, st_MQProtocol.tszMQKey , st_MQProtocol.nSerial );
243+ return false ;
244+ }
245+ st_MessageQueue.nQueueSerial = st_MQProtocol.nSerial ;
246+ _tcsxcpy (st_MessageQueue.tszQueueName , st_MQProtocol.tszMQKey );
247+ if (!DBModule_MQData_Delete (&st_MessageQueue))
248+ {
249+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen, ERROR_XENGINE_MESSAGE_HTTP_SERVICE, _X (" message delete failure" ));
250+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
251+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_ERROR, _X (" HTTP消息端:%s,主题:%s,删除消息数据失败,删除指定消息序列:%lld 失败,错误码:%lX" ), lpszClientAddr, st_MQProtocol.tszMQKey , st_MQProtocol.nSerial , DBModule_GetLastError ());
252+ return false ;
253+ }
254+
255+ ProtocolModule_Packet_Http (tszSDBuffer, &nSDLen);
256+ XEngine_MQXService_Send (lpszClientAddr, tszSDBuffer, nSDLen, XENGINE_MQAPP_NETTYPE_HTTP);
257+ XLOG_PRINT (xhLog, XENGINE_HELPCOMPONENTS_XLOG_IN_LOGLEVEL_INFO, _X (" HTTP客户端:%s,删除消息数据成功主题:%s,序列:%lld," ), lpszClientAddr, st_MQProtocol.tszMQKey , st_MessageQueue.nQueueSerial );
258+ }
164259 else if (0 == _tcsxncmp (lpszAPIDelUser, lpszFuncName, _tcsxlen (lpszAPIDelUser)))
165260 {
166261 XENGINE_PROTOCOL_USERINFO st_UserInfo = {};
0 commit comments