Skip to content

Commit bc3d823

Browse files
committed
fix
1 parent 763a168 commit bc3d823

8 files changed

Lines changed: 136 additions & 195 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.Implementations/RabbitMQMessageQueueService.cs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,36 @@ public async Task ConnectAsync()
8686
}
8787
}
8888

89+
/// <summary>
90+
/// Verify connection is ready / 验证连接已就绪
91+
/// </summary>
92+
public async Task VerifyConnectionAsync()
93+
{
94+
await EnsureChannelAsync().ConfigureAwait(false);
95+
96+
if (_connection == null || !_connection.IsOpen)
97+
{
98+
throw new InvalidOperationException("RabbitMQ connection is not ready");
99+
}
100+
101+
if (_channel == null || !_channel.IsOpen)
102+
{
103+
throw new InvalidOperationException("RabbitMQ channel is not ready");
104+
}
105+
106+
// 简单访问一次属性,确保通道可用 / Simple property access to ensure channel is usable
107+
try
108+
{
109+
var channelNumber = _channel.ChannelNumber;
110+
_logger.LogDebug($"[RabbitMQMessageQueueService] 连接验证成功 - ChannelNumber: {channelNumber}");
111+
}
112+
catch (Exception ex)
113+
{
114+
_logger.LogError(ex, "[RabbitMQMessageQueueService] Channel 验证失败");
115+
throw new InvalidOperationException("RabbitMQ channel verification failed", ex);
116+
}
117+
}
118+
89119
/// <summary>
90120
/// Disconnect from message queue / 断开消息队列连接
91121
/// </summary>
@@ -191,7 +221,7 @@ public async Task PublishAsync(string exchangeName, string routingKey, byte[] me
191221
/// <summary>
192222
/// Consume messages from queue / 从队列消费消息
193223
/// </summary>
194-
public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false)
224+
public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false, string currentNodeId = null)
195225
{
196226
await EnsureChannelAsync().ConfigureAwait(false);
197227

Original file line numberDiff line numberDiff line change
@@ -1,49 +1,53 @@
1-
<?xml version="1.0" encoding="utf-8"?>
1+
<?xml version="1.0" encoding="UTF-8"?>
22
<Project Sdk="Microsoft.NET.Sdk">
3-
<PropertyGroup>
4-
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0;net9.0;net10.0</TargetFrameworks>
5-
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
6-
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
7-
<Authors>Psyche</Authors>
8-
<Company>Cyaim Studio</Company>
9-
<PackageProjectUrl>https://github.com/Cyaim/WebSocketServer</PackageProjectUrl>
10-
<PackageLicenseFile>LICENSE</PackageLicenseFile>
11-
<RepositoryUrl>https://github.com/Cyaim/WebSocketServer</RepositoryUrl>
12-
<Copyright>Copyright © Cyaim Studio</Copyright>
13-
<Version>2.0.0</Version>
14-
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
15-
<PackageReleaseNotes>
16-
Version 2.0.0 - Major Release
17-
18-
- RabbitMQ.Client implementation for Hybrid cluster transport
19-
- Support for .NET 9.0 and .NET 10.0
20-
- Modular and pluggable design
21-
</PackageReleaseNotes>
22-
<Description>
23-
RabbitMQ.Client implementation for WebSocketServer Hybrid cluster transport (message routing)
24-
WebSocketServer 混合集群传输�?RabbitMQ.Client 实现(消息路由)
25-
</Description>
26-
</PropertyGroup>
27-
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0' Or '$(TargetFramework)' == 'net10.0'">
28-
<FrameworkReference Include="Microsoft.AspNetCore.App" />
29-
</ItemGroup>
30-
<!-- .NET Standard 2.1 需要显式引用,使用最新稳定版�? -->
31-
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
32-
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
33-
<PackageReference Include="System.Text.Json" Version="9.0.0" />
34-
</ItemGroup>
35-
<!-- RabbitMQ.Client 7.0+ 支持 .NET Standard 2.0+,统一使用 7.2.0 -->
36-
<ItemGroup>
37-
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" />
38-
</ItemGroup>
39-
<ItemGroup>
40-
<None Include="..\..\..\LICENSE">
41-
<Pack>True</Pack>
42-
<PackagePath>\</PackagePath>
43-
</None>
44-
<None Include="..\..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png">
45-
<Pack>True</Pack>
46-
<PackagePath />
47-
</None>
48-
</ItemGroup>
3+
<PropertyGroup>
4+
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0;net9.0;net10.0</TargetFrameworks>
5+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
6+
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
7+
<Authors>Psyche</Authors>
8+
<Company>Cyaim Studio</Company>
9+
<PackageProjectUrl>https://github.com/Cyaim/WebSocketServer</PackageProjectUrl>
10+
<PackageLicenseFile>LICENSE</PackageLicenseFile>
11+
<RepositoryUrl>https://github.com/Cyaim/WebSocketServer</RepositoryUrl>
12+
<Copyright>Copyright © Cyaim Studio</Copyright>
13+
<Version>2.0.0</Version>
14+
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
15+
<PackageReleaseNotes>
16+
Version 2.0.0 - Major Release
17+
18+
- RabbitMQ.Client implementation for Hybrid cluster transport
19+
- Support for .NET 9.0 and .NET 10.0
20+
- Modular and pluggable design
21+
</PackageReleaseNotes>
22+
<Description>
23+
RabbitMQ.Client implementation for WebSocketServer Hybrid cluster transport (message routing)
24+
WebSocketServer 混合集群传输�?RabbitMQ.Client 实现(消息路由)
25+
</Description>
26+
</PropertyGroup>
27+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0' Or '$(TargetFramework)' == 'net10.0'">
28+
<FrameworkReference Include="Microsoft.AspNetCore.App" />
29+
</ItemGroup>
30+
<!-- .NET Standard 2.1 需要显式引用,使用最新稳定版 -->
31+
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
32+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions"
33+
Version="9.0.0" />
34+
<PackageReference Include="System.Text.Json"
35+
Version="9.0.0" />
36+
</ItemGroup>
37+
<!-- RabbitMQ.Client 7.0+ 支持 .NET Standard 2.0+,统一使用 7.2.0 -->
38+
<ItemGroup>
39+
<PackageReference Include="RabbitMQ.Client"
40+
Version="7.2.0" />
41+
<ProjectReference Include="..\Cyaim.WebSocketServer.Cluster.Hybrid\Cyaim.WebSocketServer.Cluster.Hybrid.csproj" />
42+
</ItemGroup>
43+
<ItemGroup>
44+
<None Include="..\..\..\LICENSE">
45+
<Pack>True</Pack>
46+
<PackagePath>\</PackagePath>
47+
</None>
48+
<None Include="..\..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png">
49+
<Pack>True</Pack>
50+
<PackagePath />
51+
</None>
52+
</ItemGroup>
4953
</Project>

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ/RabbitMQMessageQueueService.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -544,7 +544,7 @@ await _channel.BasicPublishAsync(
544544
/// <summary>
545545
/// Consume messages from queue / 从队列消费消息
546546
/// </summary>
547-
public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false)
547+
public async Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false, string currentNodeId = null)
548548
{
549549
await EnsureChannelAsync().ConfigureAwait(false);
550550

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis/Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis.csproj

Lines changed: 8 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@
1313
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
1414
<PackageReleaseNotes>
1515
Version 2.0.0 - Major Release
16-
17-
- FreeRedis implementation for Hybrid cluster transport
18-
- Support for .NET 9.0 and .NET 10.0
19-
- Modular and pluggable design
20-
</PackageReleaseNotes>
16+
17+
- FreeRedis implementation for Hybrid cluster transport
18+
- Support for .NET 9.0 and .NET 10.0
19+
- Modular and pluggable design
20+
</PackageReleaseNotes>
2121
<Description>
2222
FreeRedis implementation for WebSocketServer Hybrid cluster transport (service discovery)
2323
WebSocketServer 混合集群传输的 FreeRedis 实现(服务发现)
@@ -36,12 +36,13 @@
3636

3737
<ItemGroup>
3838
<PackageReference Include="FreeRedis" Version="1.2.0" />
39+
<ProjectReference Include="..\Cyaim.WebSocketServer.Cluster.Hybrid\Cyaim.WebSocketServer.Cluster.Hybrid.csproj" />
3940
</ItemGroup>
4041

4142
<ItemGroup>
4243
<None Include="..\..\..\LICENSE">
43-
<Pack>True</Pack>
44-
<PackagePath>\</PackagePath>
44+
<Pack>True</Pack>
45+
<PackagePath>\</PackagePath>
4546
</None>
4647
<None Include="..\..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png">
4748
<Pack>True</Pack>

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/Abstractions/IMessageQueueService.cs

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ public interface IMessageQueueService : IDisposable
1414
/// </summary>
1515
Task ConnectAsync();
1616

17+
/// <summary>
18+
/// Verify connection is ready (optional but recommended) / 验证连接已就绪(可选但推荐)
19+
/// </summary>
20+
/// <remarks>
21+
/// Implementations should ensure the underlying connection and channel are usable.
22+
/// 实现应确保底层连接和通道可用。
23+
/// </remarks>
24+
Task VerifyConnectionAsync();
25+
1726
/// <summary>
1827
/// Disconnect from message queue / 断开消息队列连接
1928
/// </summary>
@@ -60,7 +69,11 @@ public interface IMessageQueueService : IDisposable
6069
/// <param name="queueName">Queue name / 队列名称</param>
6170
/// <param name="handler">Message handler / 消息处理器</param>
6271
/// <param name="autoAck">Whether to auto-acknowledge / 是否自动确认</param>
63-
Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false);
72+
/// <param name="currentNodeId">
73+
/// Current node id, for implementations that want to early-filter self messages using headers (e.g. FromNodeId).
74+
/// 当前节点 ID,供实现使用消息头(例如 FromNodeId)进行早期自发消息过滤。
75+
/// </param>
76+
Task ConsumeAsync(string queueName, Func<byte[], MessageProperties, Task<bool>> handler, bool autoAck = false, string currentNodeId = null);
6477

6578
/// <summary>
6679
/// Acknowledge message / 确认消息

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/Cyaim.WebSocketServer.Cluster.Hybrid.csproj

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
1414
<PackageReleaseNotes>
1515
Version 2.0.0 - Major Release
16-
17-
- Updated to support WebSocketServer 2.0.0
18-
- Enhanced hybrid cluster transport with improved reliability
19-
- Better error handling and connection management
20-
- Support for .NET 9.0 and .NET 10.0
21-
</PackageReleaseNotes>
16+
17+
- Updated to support WebSocketServer 2.0.0
18+
- Enhanced hybrid cluster transport with improved reliability
19+
- Better error handling and connection management
20+
- Support for .NET 9.0 and .NET 10.0
21+
</PackageReleaseNotes>
2222
<Description>
2323
WebSocketServer Hybrid cluster transport extension (Redis for service discovery, RabbitMQ for message routing)
2424
WebSocketServer 混合集群传输扩展(Redis 用于服务发现,RabbitMQ 用于消息路由)
@@ -39,10 +39,14 @@
3939

4040
<!-- .NET 6.0+ 框架已内置这些包,无需显式引用,框架会自动使用内置版本 -->
4141

42+
<ItemGroup>
43+
<ProjectReference Include="..\..\Cyaim.WebSocketServer\Cyaim.WebSocketServer.csproj" />
44+
</ItemGroup>
45+
4246
<ItemGroup>
4347
<None Include="..\..\..\LICENSE">
44-
<Pack>True</Pack>
45-
<PackagePath>\</PackagePath>
48+
<Pack>True</Pack>
49+
<PackagePath>\</PackagePath>
4650
</None>
4751
<None Include="..\..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png" Condition="Exists('..\..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png')">
4852
<Pack>True</Pack>

0 commit comments

Comments
 (0)