@@ -136,6 +136,30 @@ public async Task StartAsync()
136136
137137 // Connect to message queue / 连接到消息队列
138138 await _messageQueueService . ConnectAsync ( ) ;
139+
140+ // Verify connection is ready before proceeding / 验证连接已就绪再继续
141+ // This ensures channel is fully established before RaftNode starts / 这确保在 RaftNode 启动前 channel 已完全建立
142+ int verifyAttempts = 0 ;
143+ while ( verifyAttempts < 10 ) // Try up to 10 times / 最多尝试 10 次
144+ {
145+ try
146+ {
147+ // Try to ensure connection is ready / 尝试确保连接已就绪
148+ await _messageQueueService . VerifyConnectionAsync ( ) ;
149+ break ; // Connection verified / 连接已验证
150+ }
151+ catch ( Exception ex )
152+ {
153+ verifyAttempts ++ ;
154+ if ( verifyAttempts >= 10 )
155+ {
156+ _logger . LogError ( ex , $ "[HybridClusterTransport] 验证 RabbitMQ 连接失败,已达到最大重试次数 - NodeId: { _nodeId } ") ;
157+ throw ;
158+ }
159+ _logger . LogWarning ( $ "[HybridClusterTransport] 验证 RabbitMQ 连接失败,重试中 ({ verifyAttempts } /10) - NodeId: { _nodeId } , Error: { ex . Message } ") ;
160+ await Task . Delay ( 200 ) ; // Wait 200ms before retry / 重试前等待 200ms
161+ }
162+ }
139163
140164 // Declare exchange / 声明交换机
141165 await _messageQueueService . DeclareExchangeAsync ( ExchangeName , "topic" , durable : true ) ;
@@ -154,7 +178,8 @@ public async Task StartAsync()
154178 await _messageQueueService . BindQueueAsync ( _queueName , ExchangeName , BroadcastRoutingKey ) ;
155179
156180 // Start consuming messages / 开始消费消息
157- await _messageQueueService . ConsumeAsync ( _queueName , HandleMessageAsync , autoAck : false ) ;
181+ // Pass currentNodeId to enable early filtering of self-messages / 传递 currentNodeId 以启用早期过滤自己的消息
182+ await _messageQueueService . ConsumeAsync ( _queueName , HandleMessageAsync , autoAck : false , currentNodeId : _nodeId ) ;
158183
159184 // Wait a bit for initial node discovery / 等待初始节点发现
160185 _logger . LogWarning ( $ "[HybridClusterTransport] 等待初始节点发现... - NodeId: { _nodeId } ") ;
@@ -253,7 +278,7 @@ public async Task SendAsync(string nodeId, ClusterMessage message)
253278
254279 await _messageQueueService . PublishAsync ( ExchangeName , routingKey , messageBytes , properties ) ;
255280
256- _logger . LogWarning ( $ "[HybridClusterTransport] 消息发送成功 - TargetNodeId: { nodeId } , MessageId: { message . MessageId } , MessageType: { message . Type } , CurrentNodeId: { _nodeId } , MessageSize: { messageBytes . Length } bytes") ;
281+ _logger . LogTrace ( $ "[HybridClusterTransport] 消息发送成功 - TargetNodeId: { nodeId } , MessageId: { message . MessageId } , MessageType: { message . Type } , CurrentNodeId: { _nodeId } , MessageSize: { messageBytes . Length } bytes") ;
257282 }
258283 catch ( Exception ex )
259284 {
@@ -298,12 +323,26 @@ public async Task BroadcastAsync(ClusterMessage message)
298323 {
299324 MessageId = message . MessageId ,
300325 CorrelationId = message . MessageId ,
301- Timestamp = message . Timestamp
326+ Timestamp = message . Timestamp ,
327+ Headers = new Dictionary < string , object >
328+ {
329+ { "FromNodeId" , _nodeId } // Add FromNodeId header to filter self-messages early / 添加 FromNodeId header 以便早期过滤自己的消息
330+ }
302331 } ;
303332
304- await _messageQueueService . PublishAsync ( ExchangeName , BroadcastRoutingKey , messageBytes , properties ) ;
305-
306- _logger . LogWarning ( $ "[HybridClusterTransport] 广播消息成功 - MessageId: { message . MessageId } , MessageType: { message . Type } , CurrentNodeId: { _nodeId } , MessageSize: { messageBytes . Length } bytes, 已知节点数: { _knownNodes . Count } ") ;
333+ // Only broadcast if there are other nodes (excluding self) / 只有在有其他节点时才广播(排除自己)
334+ // Note: _knownNodes does not include self, so _knownNodes.Count is the count of other nodes
335+ // 注意:_knownNodes 不包含自己,所以 _knownNodes.Count 就是其他节点的数量
336+ var otherNodesCount = _knownNodes . Count ;
337+ if ( otherNodesCount > 0 )
338+ {
339+ await _messageQueueService . PublishAsync ( ExchangeName , BroadcastRoutingKey , messageBytes , properties ) ;
340+ _logger . LogTrace ( $ "[HybridClusterTransport] 广播消息成功 - MessageId: { message . MessageId } , MessageType: { message . Type } , CurrentNodeId: { _nodeId } , MessageSize: { messageBytes . Length } bytes, 已知节点数: { _knownNodes . Count } , 其他节点数: { otherNodesCount } ") ;
341+ }
342+ else
343+ {
344+ _logger . LogTrace ( $ "[HybridClusterTransport] 跳过广播消息(没有其他节点)- MessageId: { message . MessageId } , MessageType: { message . Type } , CurrentNodeId: { _nodeId } , 已知节点数: { _knownNodes . Count } , 其他节点数: { otherNodesCount } ") ;
345+ }
307346 }
308347 catch ( Exception ex )
309348 {
@@ -620,9 +659,9 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
620659 return true ; // Ack to remove from queue / 确认以从队列中移除
621660 }
622661
623- _logger . LogWarning ( $ "[HybridClusterTransport] 收到集群消息 - MessageType: { message . Type } , FromNodeId: { message . FromNodeId } , ToNodeId: { message . ToNodeId } , MessageId: { message . MessageId } , CurrentNodeId: { _nodeId } , MessageSize: { body . Length } bytes") ;
662+ _logger . LogTrace ( $ "[HybridClusterTransport] 收到集群消息 - MessageType: { message . Type } , FromNodeId: { message . FromNodeId } , ToNodeId: { message . ToNodeId } , MessageId: { message . MessageId } , CurrentNodeId: { _nodeId } , MessageSize: { body . Length } bytes") ;
624663
625- // Ignore messages from self / 忽略来自自己的消息
664+ // Ignore messages from self FIRST to avoid processing own messages / 首先忽略来自自己的消息,避免处理自己的消息
626665 if ( message . FromNodeId == _nodeId )
627666 {
628667 _logger . LogWarning ( $ "[HybridClusterTransport] 忽略来自自己的消息 - MessageType: { message . Type } , MessageId: { message . MessageId } , FromNodeId: { message . FromNodeId } , CurrentNodeId: { _nodeId } ") ;
@@ -690,29 +729,25 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
690729 {
691730 var logData = new
692731 {
693- location = "HybridClusterTransport.cs:516" ,
694- message = "Before invoking MessageReceived event" ,
695- data = new
696- {
697- messageType = message . Type . ToString ( ) ,
698- fromNodeId = message . FromNodeId ,
699- toNodeId = message . ToNodeId ?? "null" ,
700- messageId = message . MessageId ,
701- currentNodeId = _nodeId ,
702- hasSubscribers = MessageReceived != null ,
703- subscriberCount = MessageReceived ? . GetInvocationList ( ) ? . Length ?? 0
704- } ,
705- timestamp = DateTimeOffset . UtcNow . ToUnixTimeMilliseconds ( ) ,
706- sessionId = "debug-session" ,
707- runId = "run1" ,
708- hypothesisId = "A"
709- } ;
710- var logJson = JsonSerializer . Serialize ( logData ) ;
711- System . IO . File . AppendAllText ( @"e:\OneDrive\Work\WorkSpaces\.cursor\debug.log" , logJson + Environment . NewLine ) ;
732+ // Targeted message: same message can be sent to different nodes
733+ // 定向消息:相同消息可以发送到不同节点
734+ deduplicationKey = $ "{ message . MessageId } :{ message . FromNodeId } :{ message . ToNodeId } ";
735+ }
736+
737+ // Check if we've already processed this message / 检查是否已处理过此消息
738+ if ( _processedMessageIds . TryGetValue ( deduplicationKey , out var processedTime ) )
739+ {
740+ // Message already processed, ignore it / 消息已处理,忽略它
741+ _logger . LogDebug ( $ "Ignoring duplicate message { message . MessageId } (key: { deduplicationKey } , processed at { processedTime : yyyy-MM-dd HH:mm:ss} )") ;
742+ return true ; // Ack to remove from queue / 确认以从队列中移除
743+ }
744+
745+ // Mark message as processed / 标记消息为已处理
746+ _processedMessageIds . TryAdd ( deduplicationKey , DateTime . UtcNow ) ;
712747 }
713- catch { }
714- // #endregion
715-
748+
749+ // Trigger message received event / 触发消息接收事件
750+ _logger . LogTrace ( $ "[HybridClusterTransport] 触发消息接收事件 - MessageType: { message . Type } , FromNodeId: { message . FromNodeId } , ToNodeId: { message . ToNodeId } , MessageId: { message . MessageId } , CurrentNodeId: { _nodeId } " ) ;
716751 MessageReceived ? . Invoke ( this , new ClusterMessageEventArgs
717752 {
718753 FromNodeId = message . FromNodeId ,
0 commit comments