@@ -203,9 +203,47 @@ public async Task StartAsync()
203203
204204 // Start consuming messages / 开始消费消息
205205 // Pass currentNodeId to enable early filtering of self-messages / 传递 currentNodeId 以启用早期过滤自己的消息
206- await _messageQueueService . ConsumeAsync ( _queueName , HandleMessageAsync , autoAck : false , currentNodeId : _nodeId ) ;
206+ try
207+ {
208+ _logger . LogWarning ( $ "[HybridClusterTransport] 开始启动消费者 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } ") ;
209+ await _messageQueueService . ConsumeAsync ( _queueName , HandleMessageAsync , autoAck : false , currentNodeId : _nodeId ) ;
210+ _logger . LogWarning ( $ "[HybridClusterTransport] 消费者启动完成 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } ") ;
207211
208- _logger . LogWarning ( $ "[HybridClusterTransport] 消费者启动完成 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } ") ;
212+ // Verify consumer was actually created / 验证消费者是否真的已创建
213+ await Task . Delay ( 1000 ) ; // Wait 1 second for consumer to register / 等待 1 秒让消费者注册
214+ try
215+ {
216+ var consumerCount = await _messageQueueService . GetQueueConsumerCountAsync ( _queueName ) ;
217+ _logger . LogWarning ( $ "[HybridClusterTransport] 消费者启动后验证 - QueueName: { _queueName } , ConsumerCount: { consumerCount } , CurrentNodeId: { _nodeId } ") ;
218+
219+ if ( consumerCount == 0 )
220+ {
221+ _logger . LogError ( $ "[HybridClusterTransport] 错误:消费者启动后数量为 0,消费者可能未成功注册 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } ") ;
222+ // Try to recreate consumer / 尝试重新创建消费者
223+ _logger . LogWarning ( $ "[HybridClusterTransport] 尝试重新创建消费者 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } ") ;
224+ await _messageQueueService . ConsumeAsync ( _queueName , HandleMessageAsync , autoAck : false , currentNodeId : _nodeId ) ;
225+ await Task . Delay ( 1000 ) ;
226+ var newConsumerCount = await _messageQueueService . GetQueueConsumerCountAsync ( _queueName ) ;
227+ _logger . LogWarning ( $ "[HybridClusterTransport] 消费者重新创建后验证 - QueueName: { _queueName } , ConsumerCount: { newConsumerCount } , CurrentNodeId: { _nodeId } ") ;
228+
229+ if ( newConsumerCount == 0 )
230+ {
231+ _logger . LogError ( $ "[HybridClusterTransport] 严重错误:消费者重新创建后数量仍为 0 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } ") ;
232+ throw new InvalidOperationException ( $ "Failed to create consumer for queue { _queueName } after retry: consumer count is still 0") ;
233+ }
234+ }
235+ }
236+ catch ( Exception verifyEx )
237+ {
238+ _logger . LogError ( verifyEx , $ "[HybridClusterTransport] 验证消费者数量失败 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } , Error: { verifyEx . Message } , StackTrace: { verifyEx . StackTrace } ") ;
239+ // Don't throw - consumer might still work / 不抛出异常 - 消费者可能仍然有效
240+ }
241+ }
242+ catch ( Exception consumeEx )
243+ {
244+ _logger . LogError ( consumeEx , $ "[HybridClusterTransport] 启动消费者失败 - QueueName: { _queueName } , CurrentNodeId: { _nodeId } , Error: { consumeEx . Message } , StackTrace: { consumeEx . StackTrace } ") ;
245+ throw ; // Re-throw to fail startup / 重新抛出以失败启动
246+ }
209247
210248 // Start periodic binding health check / 启动定期绑定健康检查
211249 _bindingHealthCheckTimer = new Timer ( async _ => await CheckAndRepairBindingsAsync ( ) , null , TimeSpan . FromMinutes ( 1 ) , TimeSpan . FromMinutes ( 5 ) ) ;
0 commit comments