Skip to content

Commit b6fc81b

Browse files
committed
feat: 实现redis节点自动发现和支持FreeRedis #15
1 parent ee542d8 commit b6fc81b

16 files changed

Lines changed: 2986 additions & 0 deletions
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0;net9.0</TargetFrameworks>
4+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
5+
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
6+
<Authors>Psyche</Authors>
7+
<Company>Cyaim Studio</Company>
8+
<PackageProjectUrl>https://github.com/Cyaim/WebSocketServer</PackageProjectUrl>
9+
<PackageLicenseFile>LICENSE</PackageLicenseFile>
10+
<RepositoryUrl>https://github.com/Cyaim/WebSocketServer</RepositoryUrl>
11+
<Copyright>Copyright © Cyaim Studio</Copyright>
12+
<Version>1.7.8</Version>
13+
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
14+
<Description>
15+
WebSocketServer FreeRedis cluster transport extension
16+
WebSocketServer FreeRedis 集群传输扩展
17+
</Description>
18+
</PropertyGroup>
19+
20+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0'">
21+
<FrameworkReference Include="Microsoft.AspNetCore.App" />
22+
</ItemGroup>
23+
24+
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
25+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.10" />
26+
<PackageReference Include="System.Text.Json" Version="9.0.10" />
27+
</ItemGroup>
28+
29+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0'">
30+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
31+
<PackageReference Include="System.Text.Json" Version="8.0.6" />
32+
</ItemGroup>
33+
34+
<ItemGroup>
35+
<PackageReference Include="FreeRedis" Version="1.2.0" />
36+
<ProjectReference Include="..\Cyaim.WebSocketServer\Cyaim.WebSocketServer.csproj" />
37+
</ItemGroup>
38+
39+
<ItemGroup>
40+
<None Include="..\..\LICENSE">
41+
<Pack>True</Pack>
42+
<PackagePath>\</PackagePath>
43+
</None>
44+
<None Include="..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png">
45+
<Pack>True</Pack>
46+
<PackagePath></PackagePath>
47+
</None>
48+
</ItemGroup>
49+
50+
</Project>
51+
Lines changed: 257 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,257 @@
1+
using System;
2+
using System.Collections.Concurrent;
3+
using System.Text;
4+
using System.Text.Json;
5+
using System.Threading;
6+
using System.Threading.Tasks;
7+
using Cyaim.WebSocketServer.Infrastructure.Cluster;
8+
using Cyaim.WebSocketServer.Infrastructure.Configures;
9+
using Microsoft.Extensions.Logging;
10+
using FreeRedis;
11+
using ClusterNode = Cyaim.WebSocketServer.Infrastructure.Cluster.ClusterNode;
12+
13+
namespace Cyaim.WebSocketServer.Cluster.FreeRedis
14+
{
15+
/// <summary>
16+
/// FreeRedis-based cluster transport (using Redis Pub/Sub)
17+
/// 基于 FreeRedis 的集群传输(使用 Redis 发布/订阅)
18+
/// </summary>
19+
public class FreeRedisClusterTransport : IClusterTransport
20+
{
21+
private readonly ILogger<FreeRedisClusterTransport> _logger;
22+
private readonly string _nodeId;
23+
private readonly string _connectionString;
24+
private readonly ConcurrentDictionary<string, ClusterNode> _nodes;
25+
private readonly CancellationTokenSource _cancellationTokenSource;
26+
private bool _disposed = false;
27+
28+
/// <summary>
29+
/// Redis client / Redis 客户端
30+
/// </summary>
31+
private RedisClient _redis;
32+
33+
/// <summary>
34+
/// Event triggered when message received / 消息接收时触发的事件
35+
/// </summary>
36+
public event EventHandler<ClusterMessageEventArgs> MessageReceived;
37+
/// <summary>
38+
/// Event triggered when node connected / 节点连接时触发的事件
39+
/// </summary>
40+
public event EventHandler<ClusterNodeEventArgs> NodeConnected;
41+
/// <summary>
42+
/// Event triggered when node disconnected / 节点断开连接时触发的事件
43+
/// </summary>
44+
public event EventHandler<ClusterNodeEventArgs> NodeDisconnected;
45+
46+
/// <summary>
47+
/// Constructor / 构造函数
48+
/// </summary>
49+
/// <param name="logger">Logger instance / 日志实例</param>
50+
/// <param name="nodeId">Node ID / 节点 ID</param>
51+
/// <param name="connectionString">Redis connection string / Redis 连接字符串</param>
52+
public FreeRedisClusterTransport(ILogger<FreeRedisClusterTransport> logger, string nodeId, string connectionString)
53+
{
54+
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
55+
_nodeId = nodeId ?? throw new ArgumentNullException(nameof(nodeId));
56+
_connectionString = connectionString ?? throw new ArgumentNullException(nameof(connectionString));
57+
_nodes = new ConcurrentDictionary<string, ClusterNode>();
58+
_cancellationTokenSource = new CancellationTokenSource();
59+
}
60+
61+
/// <summary>
62+
/// Start the transport service / 启动传输服务
63+
/// </summary>
64+
public async Task StartAsync()
65+
{
66+
_logger.LogInformation($"Starting FreeRedis cluster transport for node {_nodeId}");
67+
68+
try
69+
{
70+
_redis = new RedisClient(_connectionString);
71+
72+
// Subscribe to node-specific channel / 订阅节点特定通道
73+
_redis.Subscribe($"cluster:node:{_nodeId}", (channel, message) =>
74+
{
75+
try
76+
{
77+
var clusterMessage = JsonSerializer.Deserialize<ClusterMessage>(message);
78+
MessageReceived?.Invoke(this, new ClusterMessageEventArgs
79+
{
80+
FromNodeId = clusterMessage.FromNodeId,
81+
Message = clusterMessage
82+
});
83+
}
84+
catch (Exception ex)
85+
{
86+
_logger.LogError(ex, "Failed to process FreeRedis message");
87+
}
88+
});
89+
90+
// Subscribe to broadcast channel / 订阅广播通道
91+
_redis.Subscribe("cluster:broadcast", (channel, message) =>
92+
{
93+
try
94+
{
95+
var clusterMessage = JsonSerializer.Deserialize<ClusterMessage>(message);
96+
if (clusterMessage.FromNodeId != _nodeId)
97+
{
98+
MessageReceived?.Invoke(this, new ClusterMessageEventArgs
99+
{
100+
FromNodeId = clusterMessage.FromNodeId,
101+
Message = clusterMessage
102+
});
103+
}
104+
}
105+
catch (Exception ex)
106+
{
107+
_logger.LogError(ex, "Failed to process FreeRedis broadcast message");
108+
}
109+
});
110+
111+
_logger.LogInformation($"FreeRedis cluster transport started successfully for node {_nodeId}");
112+
}
113+
catch (Exception ex)
114+
{
115+
_logger.LogError(ex, "Failed to start FreeRedis cluster transport");
116+
throw;
117+
}
118+
119+
await Task.CompletedTask;
120+
}
121+
122+
/// <summary>
123+
/// Stop the transport service / 停止传输服务
124+
/// </summary>
125+
public async Task StopAsync()
126+
{
127+
_logger.LogInformation($"Stopping FreeRedis cluster transport for node {_nodeId}");
128+
_cancellationTokenSource.Cancel();
129+
130+
try
131+
{
132+
if (_redis != null)
133+
{
134+
_redis.UnSubscribe($"cluster:node:{_nodeId}");
135+
_redis.UnSubscribe("cluster:broadcast");
136+
_redis.Dispose();
137+
_redis = null;
138+
}
139+
140+
_nodes.Clear();
141+
}
142+
catch (Exception ex)
143+
{
144+
_logger.LogError(ex, "Error stopping FreeRedis cluster transport");
145+
}
146+
147+
await Task.CompletedTask;
148+
}
149+
150+
/// <summary>
151+
/// Send message to specific node / 向指定节点发送消息
152+
/// </summary>
153+
/// <param name="nodeId">Target node ID / 目标节点 ID</param>
154+
/// <param name="message">Message to send / 要发送的消息</param>
155+
public async Task SendAsync(string nodeId, ClusterMessage message)
156+
{
157+
if (string.IsNullOrEmpty(nodeId))
158+
throw new ArgumentNullException(nameof(nodeId));
159+
160+
message.FromNodeId = _nodeId;
161+
message.ToNodeId = nodeId;
162+
var messageJson = JsonSerializer.Serialize(message);
163+
164+
try
165+
{
166+
if (_redis == null || !_redis.IsConnected)
167+
{
168+
_logger.LogWarning("FreeRedis client is not initialized or connected");
169+
return;
170+
}
171+
172+
_redis.Publish($"cluster:node:{nodeId}", messageJson);
173+
174+
_logger.LogDebug($"Sent message to node {nodeId} via FreeRedis");
175+
}
176+
catch (Exception ex)
177+
{
178+
_logger.LogError(ex, $"Failed to send message to node {nodeId} via FreeRedis");
179+
throw;
180+
}
181+
182+
await Task.CompletedTask;
183+
}
184+
185+
/// <summary>
186+
/// Broadcast message to all nodes / 向所有节点广播消息
187+
/// </summary>
188+
/// <param name="message">Message to broadcast / 要广播的消息</param>
189+
public async Task BroadcastAsync(ClusterMessage message)
190+
{
191+
message.FromNodeId = _nodeId;
192+
var messageJson = JsonSerializer.Serialize(message);
193+
194+
try
195+
{
196+
if (_redis == null || !_redis.IsConnected)
197+
{
198+
_logger.LogWarning("FreeRedis client is not initialized or connected");
199+
return;
200+
}
201+
202+
_redis.Publish("cluster:broadcast", messageJson);
203+
204+
_logger.LogDebug("Broadcasted message via FreeRedis");
205+
}
206+
catch (Exception ex)
207+
{
208+
_logger.LogError(ex, "Failed to broadcast message via FreeRedis");
209+
throw;
210+
}
211+
212+
await Task.CompletedTask;
213+
}
214+
215+
/// <summary>
216+
/// Check if node is connected / 检查节点是否已连接
217+
/// </summary>
218+
/// <param name="nodeId">Node ID to check / 要检查的节点 ID</param>
219+
/// <returns>True if connected, false otherwise / 已连接返回 true,否则返回 false</returns>
220+
public bool IsNodeConnected(string nodeId)
221+
{
222+
// With FreeRedis, we consider a node connected if it's registered
223+
// 对于 FreeRedis,如果节点已注册,我们认为它已连接
224+
// In practice, you might want to track node health separately
225+
// 在实践中,您可能需要单独跟踪节点健康状况
226+
return _nodes.ContainsKey(nodeId);
227+
}
228+
229+
/// <summary>
230+
/// Register a node / 注册节点
231+
/// </summary>
232+
/// <param name="node">Node information / 节点信息</param>
233+
public void RegisterNode(ClusterNode node)
234+
{
235+
if (node == null)
236+
throw new ArgumentNullException(nameof(node));
237+
238+
_nodes.AddOrUpdate(node.NodeId, node, (key, oldValue) => node);
239+
NodeConnected?.Invoke(this, new ClusterNodeEventArgs { NodeId = node.NodeId });
240+
_logger.LogDebug($"Registered node {node.NodeId} in FreeRedis transport");
241+
}
242+
243+
/// <summary>
244+
/// Dispose resources / 释放资源
245+
/// </summary>
246+
public void Dispose()
247+
{
248+
if (!_disposed)
249+
{
250+
StopAsync().GetAwaiter().GetResult();
251+
_cancellationTokenSource.Dispose();
252+
_disposed = true;
253+
}
254+
}
255+
}
256+
}
257+
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
using Cyaim.WebSocketServer.Infrastructure.Cluster;
2+
using Microsoft.Extensions.Logging;
3+
4+
namespace Cyaim.WebSocketServer.Cluster.FreeRedis
5+
{
6+
/// <summary>
7+
/// Factory for creating FreeRedis cluster transport
8+
/// 创建 FreeRedis 集群传输的工厂
9+
/// </summary>
10+
public class FreeRedisClusterTransportFactory
11+
{
12+
/// <summary>
13+
/// Create FreeRedis cluster transport / 创建 FreeRedis 集群传输
14+
/// </summary>
15+
/// <param name="logger">Logger instance / 日志实例</param>
16+
/// <param name="nodeId">Node ID / 节点 ID</param>
17+
/// <param name="connectionString">Redis connection string / Redis 连接字符串</param>
18+
/// <returns>FreeRedis cluster transport instance / FreeRedis 集群传输实例</returns>
19+
public static IClusterTransport Create(
20+
ILogger<FreeRedisClusterTransport> logger,
21+
string nodeId,
22+
string connectionString)
23+
{
24+
return new FreeRedisClusterTransport(logger, nodeId, connectionString);
25+
}
26+
}
27+
}
28+
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
<Project Sdk="Microsoft.NET.Sdk">
2+
<PropertyGroup>
3+
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0;net9.0</TargetFrameworks>
4+
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
5+
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
6+
<Authors>Psyche</Authors>
7+
<Company>Cyaim Studio</Company>
8+
<PackageProjectUrl>https://github.com/Cyaim/WebSocketServer</PackageProjectUrl>
9+
<PackageLicenseFile>LICENSE</PackageLicenseFile>
10+
<RepositoryUrl>https://github.com/Cyaim/WebSocketServer</RepositoryUrl>
11+
<Copyright>Copyright © Cyaim Studio</Copyright>
12+
<Version>1.7.8</Version>
13+
<PackageIcon>WebSocketRepository_Logo.png</PackageIcon>
14+
<Description>
15+
WebSocketServer Hybrid cluster transport implementations (StackExchange.Redis, FreeRedis and RabbitMQ.Client)
16+
WebSocketServer 混合集群传输实现(StackExchange.Redis、FreeRedis 和 RabbitMQ.Client)
17+
</Description>
18+
</PropertyGroup>
19+
20+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0'">
21+
<FrameworkReference Include="Microsoft.AspNetCore.App" />
22+
</ItemGroup>
23+
24+
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
25+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.10" />
26+
<PackageReference Include="System.Text.Json" Version="9.0.10" />
27+
</ItemGroup>
28+
29+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0' Or '$(TargetFramework)' == 'net7.0' Or '$(TargetFramework)' == 'net8.0' Or '$(TargetFramework)' == 'net9.0'">
30+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="8.0.1" />
31+
<PackageReference Include="System.Text.Json" Version="8.0.6" />
32+
</ItemGroup>
33+
34+
<ItemGroup>
35+
<PackageReference Include="StackExchange.Redis" Version="2.7.33" />
36+
<PackageReference Include="FreeRedis" Version="1.2.0" />
37+
<PackageReference Include="RabbitMQ.Client" Version="6.8.1" />
38+
<ProjectReference Include="..\Cyaim.WebSocketServer.Cluster.Hybrid\Cyaim.WebSocketServer.Cluster.Hybrid.csproj" />
39+
</ItemGroup>
40+
41+
<ItemGroup>
42+
<None Include="..\..\LICENSE">
43+
<Pack>True</Pack>
44+
<PackagePath>\</PackagePath>
45+
</None>
46+
<None Include="..\Cyaim.WebSocketServer\WebSocketRepository_Logo.png">
47+
<Pack>True</Pack>
48+
<PackagePath></PackagePath>
49+
</None>
50+
</ItemGroup>
51+
52+
</Project>
53+

0 commit comments

Comments
 (0)