Skip to content

Commit 871e496

Browse files
committed
perfect: 拆分Hybrid实现
1 parent 9242a7f commit 871e496

11 files changed

Lines changed: 1415 additions & 63 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.Implementations/Cyaim.WebSocketServer.Cluster.Hybrid.Implementations.csproj

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,26 @@
1212
<Version>2.0.0</Version>
1313
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
1414
<PackageReleaseNotes>
15-
Version 2.0.0 - Major Release
15+
Version 2.0.0 - Major Release (DEPRECATED)
1616

17-
- Updated to support WebSocketServer 2.0.0
18-
- Enhanced hybrid cluster transport implementations
19-
- Better error handling and connection management
20-
- Support for .NET 9.0 and .NET 10.0
17+
⚠️ This package is DEPRECATED. Please use the new modular packages instead:
18+
⚠️ 此包已弃用。请使用新的模块化包:
19+
20+
- Cyaim.WebSocketServer.Cluster.Hybrid.Redis.StackExchange (for StackExchange.Redis)
21+
- Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis (for FreeRedis)
22+
- Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ (for RabbitMQ)
23+
24+
The new packages provide better modularity and allow you to choose only the implementations you need.
25+
新包提供更好的模块化,允许您只选择需要的实现。
2126
</PackageReleaseNotes>
2227
<Description>
28+
⚠️ DEPRECATED: This package is deprecated. Please use the new modular packages:
29+
⚠️ 已弃用:此包已弃用。请使用新的模块化包:
30+
31+
- Cyaim.WebSocketServer.Cluster.Hybrid.Redis.StackExchange
32+
- Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis
33+
- Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ
34+
2335
WebSocketServer Hybrid cluster transport implementations (StackExchange.Redis, FreeRedis and RabbitMQ.Client)
2436
WebSocketServer 混合集群传输实现(StackExchange.Redis、FreeRedis 和 RabbitMQ.Client)
2537
</Description>
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0;net9.0;net10.0</TargetFrameworks>
4+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
5+
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
6+
<Authors>Psyche</Authors>
7+
<Company>Cyaim Studio</Company>
8+
<PackageProjectUrl>https://github.com/Cyaim/WebSocketServer</PackageProjectUrl>
9+
<PackageLicenseFile>LICENSE</PackageLicenseFile>
10+
<RepositoryUrl>https://github.com/Cyaim/WebSocketServer</RepositoryUrl>
11+
<Copyright>Copyright © Cyaim Studio</Copyright>
12+
<Version>2.0.0</Version>
13+
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
14+
<PackageReleaseNotes>
15+
Version 2.0.0 - Major Release
16+
17+
- RabbitMQ.Client implementation for Hybrid cluster transport
18+
- Support for .NET 9.0 and .NET 10.0
19+
- Modular and pluggable design
20+
</PackageReleaseNotes>
21+
<Description>
22+
RabbitMQ.Client implementation for WebSocketServer Hybrid cluster transport (message routing)
23+
WebSocketServer 混合集群传输的 RabbitMQ.Client 实现(消息路由)
24+
</Description>
25+
</PropertyGroup>
26+
27+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0' Or '$(TargetFramework)' == 'net10.0'">
28+
<FrameworkReference Include="Microsoft.AspNetCore.App" />
29+
</ItemGroup>
30+
31+
<!-- .NET Standard 2.1 需要显式引用,使用最新稳定版本 -->
32+
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
33+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
34+
<PackageReference Include="System.Text.Json" Version="9.0.0" />
35+
</ItemGroup>
36+
37+
<ItemGroup>
38+
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
39+
<ProjectReference Include="..\Cyaim.WebSocketServer.Cluster.Hybrid\Cyaim.WebSocketServer.Cluster.Hybrid.csproj" />
40+
</ItemGroup>
41+
42+
<ItemGroup>
43+
<None Include="..\..\..\LICENSE">
44+
<Pack>True</Pack>
45+
<PackagePath>\</PackagePath>
46+
</None>
47+
<None Include="..\..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png">
48+
<Pack>True</Pack>
49+
<PackagePath></PackagePath>
50+
</None>
51+
</ItemGroup>
52+
53+
</Project>
54+
Lines changed: 283 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,283 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Threading.Tasks;
4+
using Cyaim.WebSocketServer.Cluster.Hybrid.Abstractions;
5+
using Microsoft.Extensions.Logging;
6+
using RabbitMQ.Client;
7+
using RabbitMQ.Client.Events;
8+
9+
namespace Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ
10+
{
11+
/// <summary>
12+
/// RabbitMQ.Client implementation of IMessageQueueService
13+
/// RabbitMQ.Client 的 IMessageQueueService 实现
14+
/// </summary>
15+
public class RabbitMQMessageQueueService : IMessageQueueService
16+
{
17+
private readonly ILogger<RabbitMQMessageQueueService> _logger;
18+
private readonly string _connectionString;
19+
private IConnection _connection;
20+
private IModel _channel;
21+
private readonly Dictionary<string, EventingBasicConsumer> _consumers;
22+
private bool _disposed = false;
23+
24+
/// <summary>
25+
/// Constructor / 构造函数
26+
/// </summary>
27+
/// <param name="logger">Logger instance / 日志实例</param>
28+
/// <param name="connectionString">RabbitMQ connection string / RabbitMQ 连接字符串</param>
29+
public RabbitMQMessageQueueService(ILogger<RabbitMQMessageQueueService> logger, string connectionString)
30+
{
31+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
32+
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
33+
_consumers = new Dictionary<string, EventingBasicConsumer>();
34+
}
35+
36+
/// <summary>
37+
/// Connect to message queue / 连接到消息队列
38+
/// </summary>
39+
public async Task ConnectAsync()
40+
{
41+
if (_connection != null && _connection.IsOpen)
42+
{
43+
return;
44+
}
45+
46+
try
47+
{
48+
var factory = new ConnectionFactory { Uri = new Uri(_connectionString) };
49+
_connection = factory.CreateConnection();
50+
_channel = _connection.CreateModel();
51+
_logger.LogInformation("Connected to RabbitMQ");
52+
}
53+
catch (Exception ex)
54+
{
55+
_logger.LogError(ex, "Failed to connect to RabbitMQ");
56+
throw;
57+
}
58+
}
59+
60+
/// <summary>
61+
/// Disconnect from message queue / 断开消息队列连接
62+
/// </summary>
63+
public async Task DisconnectAsync()
64+
{
65+
if (_channel != null)
66+
{
67+
_channel.Close();
68+
_channel.Dispose();
69+
_channel = null;
70+
}
71+
72+
if (_connection != null)
73+
{
74+
_connection.Close();
75+
_connection.Dispose();
76+
_connection = null;
77+
_logger.LogInformation("Disconnected from RabbitMQ");
78+
}
79+
80+
await Task.CompletedTask;
81+
}
82+
83+
/// <summary>
84+
/// Declare an exchange / 声明交换机
85+
/// </summary>
86+
public async Task DeclareExchangeAsync(string exchangeName, string exchangeType, bool durable = true)
87+
{
88+
if (_channel == null)
89+
{
90+
throw new InvalidOperationException("RabbitMQ is not connected");
91+
}
92+
93+
_channel.ExchangeDeclare(exchangeName, exchangeType, durable: durable, autoDelete: false);
94+
await Task.CompletedTask;
95+
}
96+
97+
/// <summary>
98+
/// Declare a queue / 声明队列
99+
/// </summary>
100+
public async Task<string> DeclareQueueAsync(string queueName, bool durable = false, bool exclusive = false, bool autoDelete = true)
101+
{
102+
if (_channel == null)
103+
{
104+
throw new InvalidOperationException("RabbitMQ is not connected");
105+
}
106+
107+
var result = _channel.QueueDeclare(
108+
queue: queueName,
109+
durable: durable,
110+
exclusive: exclusive,
111+
autoDelete: autoDelete,
112+
arguments: null);
113+
114+
return await Task.FromResult(result.QueueName);
115+
}
116+
117+
/// <summary>
118+
/// Bind queue to exchange / 将队列绑定到交换机
119+
/// </summary>
120+
public async Task BindQueueAsync(string queueName, string exchangeName, string routingKey)
121+
{
122+
if (_channel == null)
123+
{
124+
throw new InvalidOperationException("RabbitMQ is not connected");
125+
}
126+
127+
_channel.QueueBind(queueName, exchangeName, routingKey);
128+
await Task.CompletedTask;
129+
}
130+
131+
/// <summary>
132+
/// Publish message to exchange / 向交换机发布消息
133+
/// </summary>
134+
public async Task PublishAsync(string exchangeName, string routingKey, byte[] message, MessageProperties properties = null)
135+
{
136+
if (_channel == null)
137+
{
138+
throw new InvalidOperationException("RabbitMQ is not connected");
139+
}
140+
141+
var basicProperties = _channel.CreateBasicProperties();
142+
143+
if (properties != null)
144+
{
145+
basicProperties.MessageId = properties.MessageId;
146+
basicProperties.CorrelationId = properties.CorrelationId;
147+
basicProperties.ReplyTo = properties.ReplyTo;
148+
149+
if (properties.Timestamp.HasValue)
150+
{
151+
basicProperties.Timestamp = new AmqpTimestamp(
152+
(long)(properties.Timestamp.Value - new DateTime(1970, 1, 1)).TotalSeconds);
153+
}
154+
155+
if (properties.Headers != null && properties.Headers.Count > 0)
156+
{
157+
basicProperties.Headers = new Dictionary<string, object>();
158+
foreach (var header in properties.Headers)
159+
{
160+
basicProperties.Headers[header.Key] = header.Value;
161+
}
162+
}
163+
}
164+
165+
_channel.BasicPublish(exchangeName, routingKey, basicProperties, message);
166+
await Task.CompletedTask;
167+
}
168+
169+
/// <summary>
170+
/// Consume messages from queue / 从队列消费消息
171+
/// </summary>
172+
public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false)
173+
{
174+
if (_channel == null)
175+
{
176+
throw new InvalidOperationException("RabbitMQ is not connected");
177+
}
178+
179+
if (_consumers.ContainsKey(queueName))
180+
{
181+
_logger.LogWarning($"Consumer for queue {queueName} already exists");
182+
return;
183+
}
184+
185+
var consumer = new EventingBasicConsumer(_channel);
186+
_consumers[queueName] = consumer;
187+
188+
consumer.Received += async (model, ea) =>
189+
{
190+
try
191+
{
192+
var properties = new MessageProperties
193+
{
194+
MessageId = ea.BasicProperties.MessageId,
195+
CorrelationId = ea.BasicProperties.CorrelationId,
196+
ReplyTo = ea.BasicProperties.ReplyTo,
197+
DeliveryTag = ea.DeliveryTag,
198+
Headers = new Dictionary<string, object>()
199+
};
200+
201+
if (ea.BasicProperties.Timestamp.UnixTime > 0)
202+
{
203+
properties.Timestamp = new DateTime(1970, 1, 1).AddSeconds(ea.BasicProperties.Timestamp.UnixTime);
204+
}
205+
206+
if (ea.BasicProperties.Headers != null)
207+
{
208+
foreach (var header in ea.BasicProperties.Headers)
209+
{
210+
properties.Headers[header.Key.ToString()] = header.Value;
211+
}
212+
}
213+
214+
var success = await handler(ea.Body.ToArray(), properties);
215+
216+
if (!autoAck)
217+
{
218+
if (success)
219+
{
220+
_channel.BasicAck(ea.DeliveryTag, false);
221+
}
222+
else
223+
{
224+
_channel.BasicNack(ea.DeliveryTag, false, true); // Requeue / 重新入队
225+
}
226+
}
227+
}
228+
catch (Exception ex)
229+
{
230+
_logger.LogError(ex, $"Error processing message from queue {queueName}");
231+
if (!autoAck)
232+
{
233+
_channel.BasicNack(ea.DeliveryTag, false, true);
234+
}
235+
}
236+
};
237+
238+
_channel.BasicConsume(queueName, autoAck, consumer);
239+
await Task.CompletedTask;
240+
}
241+
242+
/// <summary>
243+
/// Acknowledge message / 确认消息
244+
/// </summary>
245+
public async Task AckAsync(ulong deliveryTag)
246+
{
247+
if (_channel == null)
248+
{
249+
throw new InvalidOperationException("RabbitMQ is not connected");
250+
}
251+
252+
_channel.BasicAck(deliveryTag, false);
253+
await Task.CompletedTask;
254+
}
255+
256+
/// <summary>
257+
/// Reject message / 拒绝消息
258+
/// </summary>
259+
public async Task RejectAsync(ulong deliveryTag, bool requeue = false)
260+
{
261+
if (_channel == null)
262+
{
263+
throw new InvalidOperationException("RabbitMQ is not connected");
264+
}
265+
266+
_channel.BasicNack(deliveryTag, false, requeue);
267+
await Task.CompletedTask;
268+
}
269+
270+
/// <summary>
271+
/// Dispose / 释放资源
272+
/// </summary>
273+
public void Dispose()
274+
{
275+
if (_disposed)
276+
return;
277+
278+
_disposed = true;
279+
DisconnectAsync().GetAwaiter().GetResult();
280+
}
281+
}
282+
}
283+

0 commit comments

Comments
 (0)