11using System ;
22using System . Collections . Generic ;
3+ using System . Threading ;
34using System . Threading . Tasks ;
45using Cyaim . WebSocketServer . Cluster . Hybrid . Abstractions ;
56using Microsoft . Extensions . Logging ;
@@ -20,6 +21,7 @@ public class RabbitMQMessageQueueService : IMessageQueueService
2021 private IModel _channel ;
2122 private readonly Dictionary < string , EventingBasicConsumer > _consumers ;
2223 private bool _disposed = false ;
24+ private readonly SemaphoreSlim _channelLock = new SemaphoreSlim ( 1 , 1 ) ;
2325
2426 /// <summary>
2527 /// Constructor / 构造函数
@@ -38,22 +40,49 @@ public RabbitMQMessageQueueService(ILogger<RabbitMQMessageQueueService> logger,
3840 /// </summary>
3941 public async Task ConnectAsync ( )
4042 {
41- if ( _connection != null && _connection . IsOpen )
42- {
43- return ;
44- }
45-
43+ // 避免并发重连 / Avoid concurrent reconnects
44+ await _channelLock . WaitAsync ( ) . ConfigureAwait ( false ) ;
4645 try
4746 {
48- var factory = new ConnectionFactory { Uri = new Uri ( _connectionString ) } ;
49- _connection = factory . CreateConnection ( ) ;
50- _channel = _connection . CreateModel ( ) ;
51- _logger . LogInformation ( "Connected to RabbitMQ" ) ;
47+ if ( _connection != null && _connection . IsOpen && _channel != null && _channel . IsOpen )
48+ {
49+ return ;
50+ }
51+
52+ // 清理旧连接 / Cleanup old connection
53+ try
54+ {
55+ _channel ? . Close ( ) ;
56+ _channel ? . Dispose ( ) ;
57+ }
58+ catch { }
59+
60+ try
61+ {
62+ _connection ? . Close ( ) ;
63+ _connection ? . Dispose ( ) ;
64+ }
65+ catch { }
66+
67+ _connection = null ;
68+ _channel = null ;
69+
70+ try
71+ {
72+ var factory = new ConnectionFactory { Uri = new Uri ( _connectionString ) } ;
73+ _connection = factory . CreateConnection ( ) ;
74+ _channel = _connection . CreateModel ( ) ;
75+ _logger . LogInformation ( "Connected to RabbitMQ" ) ;
76+ }
77+ catch ( Exception ex )
78+ {
79+ _logger . LogError ( ex , "Failed to connect to RabbitMQ" ) ;
80+ throw ;
81+ }
5282 }
53- catch ( Exception ex )
83+ finally
5484 {
55- _logger . LogError ( ex , "Failed to connect to RabbitMQ" ) ;
56- throw ;
85+ _channelLock . Release ( ) ;
5786 }
5887 }
5988
@@ -85,10 +114,7 @@ public async Task DisconnectAsync()
85114 /// </summary>
86115 public async Task DeclareExchangeAsync ( string exchangeName , string exchangeType , bool durable = true )
87116 {
88- if ( _channel == null )
89- {
90- throw new InvalidOperationException ( "RabbitMQ is not connected" ) ;
91- }
117+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
92118
93119 _channel . ExchangeDeclare ( exchangeName , exchangeType , durable : durable , autoDelete : false ) ;
94120 await Task . CompletedTask ;
@@ -99,10 +125,7 @@ public async Task DeclareExchangeAsync(string exchangeName, string exchangeType,
99125 /// </summary>
100126 public async Task < string > DeclareQueueAsync ( string queueName , bool durable = false , bool exclusive = false , bool autoDelete = true )
101127 {
102- if ( _channel == null )
103- {
104- throw new InvalidOperationException ( "RabbitMQ is not connected" ) ;
105- }
128+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
106129
107130 var result = _channel . QueueDeclare (
108131 queue : queueName ,
@@ -119,10 +142,7 @@ public async Task<string> DeclareQueueAsync(string queueName, bool durable = fal
119142 /// </summary>
120143 public async Task BindQueueAsync ( string queueName , string exchangeName , string routingKey )
121144 {
122- if ( _channel == null )
123- {
124- throw new InvalidOperationException ( "RabbitMQ is not connected" ) ;
125- }
145+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
126146
127147 _channel . QueueBind ( queueName , exchangeName , routingKey ) ;
128148 await Task . CompletedTask ;
@@ -133,9 +153,11 @@ public async Task BindQueueAsync(string queueName, string exchangeName, string r
133153 /// </summary>
134154 public async Task PublishAsync ( string exchangeName , string routingKey , byte [ ] message , MessageProperties properties = null )
135155 {
136- if ( _channel == null )
156+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
157+
158+ if ( _channel == null || ! _channel . IsOpen )
137159 {
138- throw new InvalidOperationException ( "RabbitMQ is not connected " ) ;
160+ throw new InvalidOperationException ( "RabbitMQ channel is not open " ) ;
139161 }
140162
141163 var basicProperties = _channel . CreateBasicProperties ( ) ;
@@ -171,10 +193,7 @@ public async Task PublishAsync(string exchangeName, string routingKey, byte[] me
171193 /// </summary>
172194 public async Task ConsumeAsync ( string queueName , Func < byte [ ] , MessageProperties , Task < bool > > handler , bool autoAck = false )
173195 {
174- if ( _channel == null )
175- {
176- throw new InvalidOperationException ( "RabbitMQ is not connected" ) ;
177- }
196+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
178197
179198 if ( _consumers . ContainsKey ( queueName ) )
180199 {
@@ -244,10 +263,7 @@ public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties,
244263 /// </summary>
245264 public async Task AckAsync ( ulong deliveryTag )
246265 {
247- if ( _channel == null )
248- {
249- throw new InvalidOperationException ( "RabbitMQ is not connected" ) ;
250- }
266+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
251267
252268 _channel . BasicAck ( deliveryTag , false ) ;
253269 await Task . CompletedTask ;
@@ -258,15 +274,25 @@ public async Task AckAsync(ulong deliveryTag)
258274 /// </summary>
259275 public async Task RejectAsync ( ulong deliveryTag , bool requeue = false )
260276 {
261- if ( _channel == null )
262- {
263- throw new InvalidOperationException ( "RabbitMQ is not connected" ) ;
264- }
277+ await EnsureChannelAsync ( ) . ConfigureAwait ( false ) ;
265278
266279 _channel . BasicNack ( deliveryTag , false , requeue ) ;
267280 await Task . CompletedTask ;
268281 }
269282
283+ /// <summary>
284+ /// Ensure connection and channel are open / 确保连接和通道已打开
285+ /// </summary>
286+ private async Task EnsureChannelAsync ( )
287+ {
288+ if ( _channel != null && _channel . IsOpen && _connection != null && _connection . IsOpen )
289+ {
290+ return ;
291+ }
292+
293+ await ConnectAsync ( ) . ConfigureAwait ( false ) ;
294+ }
295+
270296 /// <summary>
271297 /// Dispose / 释放资源
272298 /// </summary>
0 commit comments