Skip to content

Commit c0a648f

Browse files
committed
fix
1 parent 5cdbcbc commit c0a648f

1 file changed

Lines changed: 51 additions & 14 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ/RabbitMQMessageQueueService.cs

Lines changed: 51 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -815,29 +815,66 @@ public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties,
815815
}
816816
};
817817

818-
_logger.LogWarning($"[RabbitMQMessageQueueService] 准备启动消费者 - QueueName: {queueName}, AutoAck: {autoAck}, ChannelIsOpen: {_channel.IsOpen}, CurrentConsumers: {_consumers.Count}");
818+
_logger.LogWarning($"[RabbitMQMessageQueueService] 准备启动消费者 - QueueName: {queueName}, AutoAck: {autoAck}, ChannelIsOpen: {_channel.IsOpen}, ChannelNumber: {_channel.ChannelNumber}, CurrentConsumers: {_consumers.Count}");
819819

820-
var consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck, consumer);
821-
822-
_logger.LogWarning($"[RabbitMQMessageQueueService] 消费者启动成功 - QueueName: {queueName}, ConsumerTag: {consumerTag}, AutoAck: {autoAck}, TotalConsumers: {_consumers.Count}");
823-
824-
// Verify consumer is actually consuming by checking queue status / 通过检查队列状态验证消费者确实在消费
825-
// Note: QueueDeclarePassiveAsync will throw if queue doesn't exist
826-
// 注意:如果队列不存在,QueueDeclarePassiveAsync 会抛出异常
820+
string consumerTag = null;
827821
try
828822
{
829-
var queueInfo = await _channel.QueueDeclarePassiveAsync(queueName);
830-
_logger.LogWarning($"[RabbitMQMessageQueueService] 队列状态确认 - QueueName: {queueName}, ConsumerCount: {queueInfo.ConsumerCount}, MessageCount: {queueInfo.MessageCount}, QueueExists: true");
823+
// RabbitMQ.Client 7.0+ BasicConsumeAsync signature: BasicConsumeAsync(string queue, bool autoAck, IBasicConsumer consumer)
824+
// RabbitMQ.Client 7.0+ BasicConsumeAsync 签名:BasicConsumeAsync(string queue, bool autoAck, IBasicConsumer consumer)
825+
consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck, consumer);
826+
827+
if (string.IsNullOrEmpty(consumerTag))
828+
{
829+
_logger.LogError($"[RabbitMQMessageQueueService] 消费者启动失败:返回的 ConsumerTag 为空 - QueueName: {queueName}");
830+
throw new InvalidOperationException($"Consumer registration failed: returned empty consumer tag for queue {queueName}");
831+
}
832+
833+
_logger.LogWarning($"[RabbitMQMessageQueueService] 消费者启动成功 - QueueName: {queueName}, ConsumerTag: {consumerTag}, AutoAck: {autoAck}, TotalConsumers: {_consumers.Count}, ChannelIsOpen: {_channel.IsOpen}");
834+
835+
// Wait a bit for consumer to register / 等待消费者注册
836+
await Task.Delay(100);
837+
838+
// Verify consumer is actually consuming by checking queue status / 通过检查队列状态验证消费者确实在消费
839+
// Note: QueueDeclarePassiveAsync will throw if queue doesn't exist
840+
// 注意:如果队列不存在,QueueDeclarePassiveAsync 会抛出异常
841+
try
842+
{
843+
var queueInfo = await _channel.QueueDeclarePassiveAsync(queueName);
844+
_logger.LogWarning($"[RabbitMQMessageQueueService] 队列状态确认 - QueueName: {queueName}, ConsumerCount: {queueInfo.ConsumerCount}, MessageCount: {queueInfo.MessageCount}, QueueExists: true, ConsumerTag: {consumerTag}");
831845

832-
if (queueInfo.ConsumerCount == 0)
846+
if (queueInfo.ConsumerCount == 0)
847+
{
848+
_logger.LogError($"[RabbitMQMessageQueueService] 错误:队列消费者数量为 0,消费者可能未成功注册 - QueueName: {queueName}, ConsumerTag: {consumerTag}, ChannelIsOpen: {_channel.IsOpen}, ChannelNumber: {_channel.ChannelNumber}");
849+
850+
// Try to consume again / 尝试再次消费
851+
_logger.LogWarning($"[RabbitMQMessageQueueService] 尝试重新注册消费者 - QueueName: {queueName}");
852+
_consumers.Remove(queueName); // Remove from dictionary to allow retry / 从字典中移除以允许重试
853+
consumerTag = await _channel.BasicConsumeAsync(queueName, autoAck, consumer);
854+
_logger.LogWarning($"[RabbitMQMessageQueueService] 消费者重新注册完成 - QueueName: {queueName}, ConsumerTag: {consumerTag}");
855+
856+
// Verify again / 再次验证
857+
await Task.Delay(100);
858+
var queueInfo2 = await _channel.QueueDeclarePassiveAsync(queueName);
859+
_logger.LogWarning($"[RabbitMQMessageQueueService] 重新注册后队列状态 - QueueName: {queueName}, ConsumerCount: {queueInfo2.ConsumerCount}, MessageCount: {queueInfo2.MessageCount}");
860+
}
861+
else
862+
{
863+
_logger.LogWarning($"[RabbitMQMessageQueueService] 消费者注册成功确认 - QueueName: {queueName}, ConsumerCount: {queueInfo.ConsumerCount}, ConsumerTag: {consumerTag}");
864+
}
865+
}
866+
catch (Exception ex)
833867
{
834-
_logger.LogWarning($"[RabbitMQMessageQueueService] 警告:队列消费者数量为 0,但消费者已启动 - QueueName: {queueName}, ConsumerTag: {consumerTag}");
868+
_logger.LogError(ex, $"[RabbitMQMessageQueueService] 队列状态确认失败 - QueueName: {queueName}, ConsumerTag: {consumerTag}, Error: {ex.Message}, StackTrace: {ex.StackTrace}");
869+
// Don't throw - consumer might still work / 不抛出异常 - 消费者可能仍然有效
835870
}
836871
}
837872
catch (Exception ex)
838873
{
839-
_logger.LogError(ex, $"[RabbitMQMessageQueueService] 队列状态确认失败 - QueueName: {queueName}, Error: {ex.Message}");
840-
// Don't throw - consumer might still work / 不抛出异常 - 消费者可能仍然有效
874+
_logger.LogError(ex, $"[RabbitMQMessageQueueService] 启动消费者时发生异常 - QueueName: {queueName}, ChannelIsOpen: {_channel?.IsOpen ?? false}, Error: {ex.Message}, StackTrace: {ex.StackTrace}");
875+
// Remove consumer from dictionary on failure / 失败时从字典中移除消费者
876+
_consumers.Remove(queueName);
877+
throw;
841878
}
842879
}
843880

0 commit comments

Comments
 (0)