Skip to content

Commit afb8c33

Browse files
committed
chore: 更新rabbitmq库版本到7.2;release version
1 parent 7c9a497 commit afb8c33

3 files changed

Lines changed: 72 additions & 51 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ/RabbitMQMessageQueueService.cs

Lines changed: 27 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@ public class RabbitMQMessageQueueService : IMessageQueueService
1919
private readonly ILogger<RabbitMQMessageQueueService> _logger;
2020
private readonly string _connectionString;
2121
private IConnection _connection;
22-
private IModel _channel;
22+
// RabbitMQ.Client 7.0+ uses IChannel instead of IModel
23+
// RabbitMQ.Client 7.0+ 使用 IChannel 替代 IModel
24+
private IChannel _channel;
25+
// RabbitMQ.Client 7.0+ uses AsyncEventingBasicConsumer
26+
// RabbitMQ.Client 7.0+ 使用 AsyncEventingBasicConsumer
2327
private readonly Dictionary<string, AsyncEventingBasicConsumer> _consumers;
2428
private bool _disposed = false;
2529

@@ -48,8 +52,8 @@ public async Task ConnectAsync()
4852
try
4953
{
5054
var factory = new ConnectionFactory { Uri = new Uri(_connectionString) };
51-
_connection = factory.CreateConnection();
52-
_channel = _connection.CreateModel();
55+
_connection = await factory.CreateConnectionAsync();
56+
_channel = await _connection.CreateChannelAsync();
5357
_logger.LogInformation("Connected to RabbitMQ");
5458
}
5559
catch (Exception ex)
@@ -69,14 +73,14 @@ public async Task DisconnectAsync()
6973

7074
if (_channel != null)
7175
{
72-
_channel.Close();
76+
await _channel.CloseAsync();
7377
_channel.Dispose();
7478
_channel = null;
7579
}
7680

7781
if (_connection != null)
7882
{
79-
_connection.Close();
83+
await _connection.CloseAsync();
8084
_connection.Dispose();
8185
_connection = null;
8286
_logger.LogInformation("Disconnected from RabbitMQ");
@@ -95,8 +99,7 @@ public async Task DeclareExchangeAsync(string exchangeName, string exchangeType,
9599
throw new InvalidOperationException("RabbitMQ is not connected");
96100
}
97101

98-
_channel.ExchangeDeclare(exchangeName, exchangeType, durable: durable, autoDelete: false);
99-
await Task.CompletedTask;
102+
await _channel.ExchangeDeclareAsync(exchangeName, exchangeType, durable: durable, autoDelete: false);
100103
}
101104

102105
/// <summary>
@@ -109,14 +112,14 @@ public async Task<string> DeclareQueueAsync(string queueName, bool durable = fal
109112
throw new InvalidOperationException("RabbitMQ is not connected");
110113
}
111114

112-
var result = _channel.QueueDeclare(
115+
var result = await _channel.QueueDeclareAsync(
113116
queue: queueName,
114117
durable: durable,
115118
exclusive: exclusive,
116119
autoDelete: autoDelete,
117120
arguments: null);
118121

119-
return await Task.FromResult(result.QueueName);
122+
return result.QueueName;
120123
}
121124

122125
/// <summary>
@@ -129,8 +132,7 @@ public async Task BindQueueAsync(string queueName, string exchangeName, string r
129132
throw new InvalidOperationException("RabbitMQ is not connected");
130133
}
131134

132-
_channel.QueueBind(queueName, exchangeName, routingKey);
133-
await Task.CompletedTask;
135+
await _channel.QueueBindAsync(queueName, exchangeName, routingKey);
134136
}
135137

136138
/// <summary>
@@ -143,7 +145,7 @@ public async Task PublishAsync(string exchangeName, string routingKey, byte[] me
143145
throw new InvalidOperationException("RabbitMQ is not connected");
144146
}
145147

146-
var basicProperties = _channel.CreateBasicProperties();
148+
var basicProperties = new BasicProperties();
147149

148150
if (properties != null)
149151
{
@@ -167,8 +169,12 @@ public async Task PublishAsync(string exchangeName, string routingKey, byte[] me
167169
}
168170
}
169171

170-
_channel.BasicPublish(exchangeName, routingKey, basicProperties, message);
171-
await Task.CompletedTask;
172+
await _channel.BasicPublishAsync(
173+
exchange: exchangeName,
174+
routingKey: routingKey,
175+
mandatory: false,
176+
basicProperties: basicProperties,
177+
body: new ReadOnlyMemory<byte>(message));
172178
}
173179

174180
/// <summary>
@@ -192,7 +198,7 @@ public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties,
192198
var consumer = new AsyncEventingBasicConsumer(_channel);
193199
_consumers[queueName] = consumer;
194200

195-
consumer.Received += async (model, ea) =>
201+
consumer.ReceivedAsync += async (model, ea) =>
196202
{
197203
try
198204
{
@@ -224,11 +230,11 @@ public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties,
224230
{
225231
if (success)
226232
{
227-
_channel.BasicAck(ea.DeliveryTag, false);
233+
await _channel.BasicAckAsync(ea.DeliveryTag, false);
228234
}
229235
else
230236
{
231-
_channel.BasicNack(ea.DeliveryTag, false, true); // Requeue / 重新入队
237+
await _channel.BasicNackAsync(ea.DeliveryTag, false, true); // Requeue / 重新入队
232238
}
233239
}
234240
}
@@ -237,13 +243,12 @@ public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties,
237243
_logger.LogError(ex, $"Error processing message from queue {queueName}");
238244
if (!autoAck)
239245
{
240-
_channel.BasicNack(ea.DeliveryTag, false, true);
246+
await _channel.BasicNackAsync(ea.DeliveryTag, false, true);
241247
}
242248
}
243249
};
244250

245-
_channel.BasicConsume(queueName, autoAck, consumer);
246-
await Task.CompletedTask;
251+
await _channel.BasicConsumeAsync(queueName, autoAck, consumer);
247252
}
248253

249254
/// <summary>
@@ -256,8 +261,7 @@ public async Task AckAsync(ulong deliveryTag)
256261
throw new InvalidOperationException("RabbitMQ is not connected");
257262
}
258263

259-
_channel.BasicAck(deliveryTag, false);
260-
await Task.CompletedTask;
264+
await _channel.BasicAckAsync(deliveryTag, false);
261265
}
262266

263267
/// <summary>
@@ -270,8 +274,7 @@ public async Task RejectAsync(ulong deliveryTag, bool requeue = false)
270274
throw new InvalidOperationException("RabbitMQ is not connected");
271275
}
272276

273-
_channel.BasicNack(deliveryTag, false, requeue);
274-
await Task.CompletedTask;
277+
await _channel.BasicNackAsync(deliveryTag, false, requeue);
275278
}
276279

277280
/// <summary>

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.RabbitMQ/Cyaim.WebSocketServer.Cluster.RabbitMQ.csproj

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,9 @@
3737

3838
<!-- .NET 6.0+ 框架已内置这些包,无需显式引用,框架会自动使用内置版本 -->
3939

40+
<!-- RabbitMQ.Client 7.0+ 支持 .NET Standard 2.0+,统一使用 7.2.0 -->
4041
<ItemGroup>
41-
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
42+
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" />
4243
<ProjectReference Include="..\..\Cyaim.WebSocketServer\Cyaim.WebSocketServer.csproj" />
4344
</ItemGroup>
4445

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.RabbitMQ/RabbitMQClusterTransport.cs

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ namespace Cyaim.WebSocketServer.Cluster.RabbitMQ
1515
/// <summary>
1616
/// RabbitMQ-based cluster transport
1717
/// 基于 RabbitMQ 的集群传输
18+
/// Supports RabbitMQ.Client 7.0+ (uses AsyncEventingBasicConsumer)
19+
/// 支持 RabbitMQ.Client 7.0+(使用 AsyncEventingBasicConsumer)
1820
/// </summary>
1921
public class RabbitMQClusterTransport : IClusterTransport
2022
{
@@ -31,8 +33,10 @@ public class RabbitMQClusterTransport : IClusterTransport
3133
private IConnection _connection;
3234
/// <summary>
3335
/// RabbitMQ channel / RabbitMQ 通道
36+
/// RabbitMQ.Client 7.0+ uses IChannel instead of IModel
37+
/// RabbitMQ.Client 7.0+ 使用 IChannel 替代 IModel
3438
/// </summary>
35-
private IModel _channel;
39+
private IChannel _channel;
3640
/// <summary>
3741
/// Queue name for this node / 此节点的队列名称
3842
/// </summary>
@@ -86,29 +90,32 @@ public async Task StartAsync()
8690
try
8791
{
8892
var factory = new ConnectionFactory { Uri = new Uri(_connectionString) };
89-
_connection = factory.CreateConnection();
90-
_channel = _connection.CreateModel();
93+
_connection = await factory.CreateConnectionAsync();
94+
_channel = await _connection.CreateChannelAsync();
9195

9296
// Declare exchange / 声明交换机
93-
_channel.ExchangeDeclare(ExchangeName, ExchangeType.Topic, durable: true, autoDelete: false);
97+
await _channel.ExchangeDeclareAsync(ExchangeName, ExchangeType.Topic, durable: true, autoDelete: false);
9498

9599
// Declare queue for this node / 为此节点声明队列
96-
_queueName = _channel.QueueDeclare(
100+
var queueDeclareResult = await _channel.QueueDeclareAsync(
97101
queue: $"cluster:node:{_nodeId}",
98102
durable: false,
99103
exclusive: false,
100104
autoDelete: true,
101-
arguments: null).QueueName;
105+
arguments: null);
106+
_queueName = queueDeclareResult.QueueName;
102107

103108
// Bind queue to node-specific routing key / 将队列绑定到节点特定的路由键
104-
_channel.QueueBind(_queueName, ExchangeName, _nodeId);
109+
await _channel.QueueBindAsync(_queueName, ExchangeName, _nodeId);
105110

106111
// Also bind to broadcast / 同时绑定到广播
107-
_channel.QueueBind(_queueName, ExchangeName, BroadcastRoutingKey);
112+
await _channel.QueueBindAsync(_queueName, ExchangeName, BroadcastRoutingKey);
108113

109114
// Start consuming / 开始消费
110-
var consumer = new EventingBasicConsumer(_channel);
111-
consumer.Received += (model, ea) =>
115+
// Use AsyncEventingBasicConsumer for RabbitMQ.Client 7.0+
116+
// 使用 AsyncEventingBasicConsumer 支持 RabbitMQ.Client 7.0+
117+
var consumer = new AsyncEventingBasicConsumer(_channel);
118+
consumer.ReceivedAsync += async (model, ea) =>
112119
{
113120
try
114121
{
@@ -119,7 +126,7 @@ public async Task StartAsync()
119126
// Skip messages from self / 跳过来自自己的消息
120127
if (clusterMessage.FromNodeId == _nodeId)
121128
{
122-
_channel.BasicAck(ea.DeliveryTag, false);
129+
await _channel.BasicAckAsync(ea.DeliveryTag, false);
123130
return;
124131
}
125132

@@ -129,16 +136,16 @@ public async Task StartAsync()
129136
Message = clusterMessage
130137
});
131138

132-
_channel.BasicAck(ea.DeliveryTag, false);
139+
await _channel.BasicAckAsync(ea.DeliveryTag, false);
133140
}
134141
catch (Exception ex)
135142
{
136143
_logger.LogError(ex, "Failed to process RabbitMQ message");
137-
_channel.BasicNack(ea.DeliveryTag, false, true);
144+
await _channel.BasicNackAsync(ea.DeliveryTag, false, true);
138145
}
139146
};
140147

141-
_channel.BasicConsume(
148+
await _channel.BasicConsumeAsync(
142149
queue: _queueName,
143150
autoAck: false,
144151
consumer: consumer);
@@ -164,10 +171,18 @@ public async Task StopAsync()
164171

165172
try
166173
{
167-
_channel?.Close();
168-
_channel?.Dispose();
169-
_connection?.Close();
170-
_connection?.Dispose();
174+
if (_channel != null)
175+
{
176+
await _channel.CloseAsync();
177+
_channel.Dispose();
178+
_channel = null;
179+
}
180+
if (_connection != null)
181+
{
182+
await _connection.CloseAsync();
183+
_connection.Dispose();
184+
_connection = null;
185+
}
171186

172187
_nodes.Clear();
173188
}
@@ -196,22 +211,23 @@ public async Task SendAsync(string nodeId, ClusterMessage message)
196211

197212
try
198213
{
199-
if (_channel == null || !_channel.IsOpen)
214+
if (_channel == null)
200215
{
201216
_logger.LogWarning("RabbitMQ channel is not available");
202217
return;
203218
}
204219

205-
var properties = _channel.CreateBasicProperties();
220+
var properties = new BasicProperties();
206221
properties.Persistent = false;
207222
properties.MessageId = message.MessageId;
208223
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
209224

210-
_channel.BasicPublish(
225+
await _channel.BasicPublishAsync(
211226
exchange: ExchangeName,
212227
routingKey: nodeId,
228+
mandatory: false,
213229
basicProperties: properties,
214-
body: messageBytes);
230+
body: new ReadOnlyMemory<byte>(messageBytes));
215231

216232
_logger.LogDebug($"Sent message to node {nodeId} via RabbitMQ");
217233
}
@@ -236,22 +252,23 @@ public async Task BroadcastAsync(ClusterMessage message)
236252

237253
try
238254
{
239-
if (_channel == null || !_channel.IsOpen)
255+
if (_channel == null)
240256
{
241257
_logger.LogWarning("RabbitMQ channel is not available");
242258
return;
243259
}
244260

245-
var properties = _channel.CreateBasicProperties();
261+
var properties = new BasicProperties();
246262
properties.Persistent = false;
247263
properties.MessageId = message.MessageId;
248264
properties.Timestamp = new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeSeconds());
249265

250-
_channel.BasicPublish(
266+
await _channel.BasicPublishAsync(
251267
exchange: ExchangeName,
252268
routingKey: BroadcastRoutingKey,
269+
mandatory: false,
253270
basicProperties: properties,
254-
body: messageBytes);
271+
body: new ReadOnlyMemory<byte>(messageBytes));
255272

256273
_logger.LogDebug("Broadcasted message via RabbitMQ");
257274
}

0 commit comments

Comments
 (0)