11using System . Collections . Concurrent ;
2- using System . Net . WebSockets ;
32using System . Reflection ;
43using System . Text ;
54using HuaJiBot . NET . Bot ;
65using HuaJiBot . NET . Commands ;
76using HuaJiBot . NET . Events ;
7+ using HuaJiBot . NET . MQ ;
88using HuaJiBot . NET . Plugin . MessageBridge . Types ;
99using HuaJiBot . NET . Plugin . MessageBridge . Types . Packet ;
10+ using Microsoft . Extensions . Logging ;
1011using Newtonsoft . Json ;
1112using Newtonsoft . Json . Converters ;
12- using Websocket . Client ;
1313using ClientEventType = HuaJiBot . NET . Plugin . MessageBridge . PluginConfig . GroupConfig . ClientEventType ;
1414
1515namespace HuaJiBot . NET . Plugin . MessageBridge ;
@@ -70,81 +70,61 @@ public partial class PluginMain : PluginBase, IPluginWithConfig<PluginConfig>
7070{
7171 //配置
7272 public PluginConfig Config { get ; } = new ( ) ;
73- private readonly Dictionary < PluginConfig . ClientInfo , WebsocketClient > _clients = new ( ) ;
73+ private readonly Dictionary < PluginConfig . ClientInfo , IServerlessMQ > _clients = new ( ) ;
7474
7575 //初始化
7676 protected override async Task InitializeAsync ( )
7777 {
7878 foreach ( var clientInfo in Config . Clients )
7979 {
80- WebsocketClient client =
81- new (
82- new Uri ( clientInfo . Address ) ,
83- ( ) =>
84- {
85- var cfg = new ClientWebSocket
86- {
87- Options =
88- {
89- KeepAliveInterval = TimeSpan . FromSeconds ( 5 ) ,
90- CollectHttpResponseDetails = true ,
91- } ,
92- } ;
93- if ( ! string . IsNullOrEmpty ( clientInfo . Token ) )
94- {
95- cfg . Options . SetRequestHeader (
96- "Authorization" ,
97- "Bearer " + clientInfo . Token
98- ) ;
99- cfg . Options . SetRequestHeader ( "client-type" , "IM" ) ;
100- cfg . Options . SetRequestHeader ( "client-subtype" , "QQ" ) ;
101- cfg . Options . SetRequestHeader (
102- "client-name" ,
103- BasePacket . DefaultInformation ? . Name
104- ) ;
105- cfg . Options . SetRequestHeader (
106- "client-version" ,
107- BasePacket . DefaultInformation ? . Version
108- ) ;
109- cfg . Options . SetRequestHeader ( "address" , clientInfo . Address ) ;
110- }
111- return cfg ;
112- }
113- )
114- {
115- IsReconnectionEnabled = true ,
116- ReconnectTimeout = null ,
117- MessageEncoding = Encoding . UTF8 ,
118- IsTextMessageConversionEnabled = true ,
119- } ;
120- client . MessageReceived . Subscribe ( msg =>
80+ // 构建自定义请求头
81+ var headers = new Dictionary < string , string >
12182 {
122- if ( msg . MessageType == WebSocketMessageType . Text )
83+ [ "client-type" ] = "IM" ,
84+ [ "client-subtype" ] = "QQ" ,
85+ [ "client-name" ] = BasePacket . DefaultInformation ? . Name ?? "Unknown" ,
86+ [ "client-version" ] = BasePacket . DefaultInformation ? . Version ?? "Unknown" ,
87+ [ "address" ] = clientInfo . Address ,
88+ } ;
89+
90+ // 创建 ILogger
91+ ILogger logger = Service . Logger ;
92+
93+ var client = new ServerlessMQ (
94+ url : clientInfo . Address ,
95+ token : clientInfo . Token ,
96+ logger : logger ,
97+ headers : headers
98+ ) ;
99+
100+ // 订阅连接事件
101+ client . OnConnected += info =>
102+ {
103+ var status = info . IsReconnect ? "重新连接" : "连接" ;
104+ Info ( $ "[{ clientInfo . Address } ] { status } 成功 - { info . Timestamp : HH:mm:ss} ") ;
105+ } ;
106+
107+ client . OnClosed += info =>
108+ {
109+ Info (
110+ $ "[{ clientInfo . Address } ] 连接断开 - 类型: { info . Type } , 原因: { info . Reason ?? "未知" } "
111+ ) ;
112+ } ;
113+
114+ // 订阅消息接收事件
115+ client . OnPacket += async data =>
116+ {
117+ try
123118 {
124- try
125- {
126- _ = ProcessMessageFromClientAsync (
127- msg . Text ?? throw new NullReferenceException ( "msg.Text" ) ,
128- clientInfo
129- ) ;
130- }
131- catch ( Exception e )
132- {
133- Error ( "处理消息时出现异常:" , e ) ;
134- }
119+ var messageRaw = data . ToString ( ) ;
120+ await ProcessMessageFromClientAsync ( messageRaw , clientInfo ) ;
135121 }
136- else
122+ catch ( Exception e )
137123 {
138- Info ( "收到非文本消息!" ) ;
124+ Error ( $ "[ { clientInfo . Address } ] 处理消息时出现异常:" , e ) ;
139125 }
140- } ) ;
141- client . DisconnectionHappened . Subscribe ( info =>
142- Info ( "Disconnection Happened " + info . Type )
143- ) ;
144- client . ReconnectionHappened . Subscribe ( info =>
145- Info ( "Reconnection Happened " + info . Type )
146- ) ;
147- await client . Start ( ) ;
126+ } ;
127+
148128 _clients . Add ( clientInfo , client ) ;
149129 }
150130
@@ -332,16 +312,10 @@ private static bool StringToToggle(string input, out bool result)
332312 {
333313 switch ( input . ToLower ( ) )
334314 {
335- case "开"
336- or "开启"
337- or "on"
338- or "true" :
315+ case "开" or "开启" or "on" or "true" :
339316 result = true ;
340317 return true ;
341- case "关"
342- or "关闭"
343- or "off"
344- or "false" :
318+ case "关" or "关闭" or "off" or "false" :
345319 result = false ;
346320 return true ;
347321 default :
@@ -409,7 +383,8 @@ select EnumToAttributeName(x)
409383 Config
410384 . Clients . SelectMany ( x => x . Groups )
411385 . FirstOrDefault ( x => x is { Enabled : true } )
412- ? . ForwardFromClientDisabledEvent . Contains ( type ) ?? true ;
386+ ? . ForwardFromClientDisabledEvent . Contains ( type )
387+ ?? true ;
413388 e . Reply ( "状态 可选:true、false\n " + $ "当前{ name } 状态:{ ! currentStatusDisabled } ") ;
414389 return ;
415390 }
@@ -432,5 +407,20 @@ select EnumToAttributeName(x)
432407 e . Reply ( $ "未找到群 { e . GroupId } 的配置") ;
433408 }
434409
435- protected override void Unload ( ) { }
410+ protected override void Unload ( )
411+ {
412+ // 释放所有 WebSocket 连接
413+ foreach ( var ( _, client ) in _clients )
414+ {
415+ try
416+ {
417+ client . Dispose ( ) ;
418+ }
419+ catch ( Exception e )
420+ {
421+ Warn ( "释放 WebSocket 连接时出现异常:" , e ) ;
422+ }
423+ }
424+ _clients . Clear ( ) ;
425+ }
436426}
0 commit comments