Skip to content

Commit 41a8b71

Browse files
committed
feat:封装集群中发送消息方式
1 parent 40bbd3b commit 41a8b71

8 files changed

Lines changed: 950 additions & 59 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/HybridClusterTransport.cs

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@ public class HybridClusterTransport : IClusterTransport
2626
private readonly NodeInfo _nodeInfo;
2727
private readonly ConcurrentDictionary<string, NodeInfo> _knownNodes;
2828
private readonly CancellationTokenSource _cancellationTokenSource;
29-
private readonly ConcurrentDictionary<string, DateTime> _processedMessageIds; // 已处理的消息ID,用于去重
29+
// 已处理的消息ID,用于去重
30+
// Key format: "MessageId:FromNodeId" for broadcast messages, "MessageId:FromNodeId:ToNodeId" for targeted messages
31+
// 键格式:广播消息为 "MessageId:FromNodeId",定向消息为 "MessageId:FromNodeId:ToNodeId"
32+
private readonly ConcurrentDictionary<string, DateTime> _processedMessageIds;
3033
private readonly Timer _messageIdCleanupTimer; // 定期清理过期的消息ID
3134
private bool _disposed = false;
3235
private bool _started = false;
@@ -217,6 +220,20 @@ public async Task SendAsync(string nodeId, ClusterMessage message)
217220
message.FromNodeId = _nodeId;
218221
message.ToNodeId = nodeId;
219222

223+
// Ensure MessageId is set for deduplication / 确保设置 MessageId 以便去重
224+
// If MessageId is empty, generate a unique one based on node and timestamp
225+
// 如果 MessageId 为空,基于节点和时间戳生成唯一ID
226+
if (string.IsNullOrEmpty(message.MessageId))
227+
{
228+
message.MessageId = $"{_nodeId}:{nodeId}:{DateTime.UtcNow.Ticks}:{Guid.NewGuid():N}";
229+
}
230+
231+
// Ensure Timestamp is set / 确保设置时间戳
232+
if (message.Timestamp == default)
233+
{
234+
message.Timestamp = DateTime.UtcNow;
235+
}
236+
220237
var routingKey = $"node.{nodeId}";
221238
var messageBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
222239

@@ -254,6 +271,20 @@ public async Task BroadcastAsync(ClusterMessage message)
254271
message.FromNodeId = _nodeId;
255272
message.ToNodeId = null; // Broadcast / 广播
256273

274+
// Ensure MessageId is set for deduplication / 确保设置 MessageId 以便去重
275+
// If MessageId is empty, generate a unique one
276+
// 如果 MessageId 为空,生成唯一ID
277+
if (string.IsNullOrEmpty(message.MessageId))
278+
{
279+
message.MessageId = $"{_nodeId}:broadcast:{DateTime.UtcNow.Ticks}:{Guid.NewGuid():N}";
280+
}
281+
282+
// Ensure Timestamp is set / 确保设置时间戳
283+
if (message.Timestamp == default)
284+
{
285+
message.Timestamp = DateTime.UtcNow;
286+
}
287+
257288
var messageBytes = Encoding.UTF8.GetBytes(JsonSerializer.Serialize(message));
258289

259290
var properties = new MessageProperties
@@ -274,6 +305,73 @@ public async Task BroadcastAsync(ClusterMessage message)
274305
}
275306
}
276307

308+
/// <summary>
309+
/// Send message to multiple nodes / 向多个节点发送消息
310+
/// </summary>
311+
/// <param name="nodeIds">Target node IDs / 目标节点 ID 列表</param>
312+
/// <param name="message">Message to send / 要发送的消息</param>
313+
/// <returns>Dictionary of node ID and send result / 节点ID和发送结果的字典</returns>
314+
/// <remarks>
315+
/// This method sends the same message to multiple nodes. Each node will receive a copy of the message.
316+
/// If you want to send different messages to different nodes, call SendAsync multiple times.
317+
/// 此方法向多个节点发送相同的消息。每个节点都会收到消息的副本。
318+
/// 如果要向不同节点发送不同的消息,请多次调用 SendAsync。
319+
/// </remarks>
320+
public async Task<Dictionary<string, bool>> SendToMultipleNodesAsync(IEnumerable<string> nodeIds, ClusterMessage message)
321+
{
322+
if (nodeIds == null)
323+
{
324+
throw new ArgumentNullException(nameof(nodeIds));
325+
}
326+
327+
if (message == null)
328+
{
329+
throw new ArgumentNullException(nameof(message));
330+
}
331+
332+
var results = new Dictionary<string, bool>();
333+
var tasks = new List<Task>();
334+
335+
foreach (var nodeId in nodeIds)
336+
{
337+
if (string.IsNullOrEmpty(nodeId))
338+
{
339+
continue;
340+
}
341+
342+
var task = Task.Run(async () =>
343+
{
344+
try
345+
{
346+
// Create a copy of the message for each node to ensure unique MessageId per node
347+
// 为每个节点创建消息副本,确保每个节点都有唯一的 MessageId
348+
var nodeMessage = new ClusterMessage
349+
{
350+
Type = message.Type,
351+
FromNodeId = message.FromNodeId,
352+
ToNodeId = nodeId,
353+
Payload = message.Payload,
354+
MessageId = message.MessageId, // Will be auto-generated if empty / 如果为空将自动生成
355+
Timestamp = message.Timestamp
356+
};
357+
358+
await SendAsync(nodeId, nodeMessage);
359+
results[nodeId] = true;
360+
}
361+
catch (Exception ex)
362+
{
363+
_logger.LogError(ex, $"Failed to send message to node {nodeId}");
364+
results[nodeId] = false;
365+
}
366+
});
367+
368+
tasks.Add(task);
369+
}
370+
371+
await Task.WhenAll(tasks);
372+
return results;
373+
}
374+
277375
/// <summary>
278376
/// Check if node is connected / 检查节点是否已连接
279377
/// </summary>
@@ -373,23 +471,44 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
373471
if (!string.IsNullOrEmpty(message.ToNodeId) && message.ToNodeId != _nodeId)
374472
{
375473
// This message is for another node, ignore it / 此消息是发送给其他节点的,忽略它
376-
_logger.LogDebug($"Ignoring message {message.MessageId} - target node is {message.ToNodeId}, current node is {_nodeId}");
474+
// Note: This is expected behavior for targeted messages. Use BroadcastAsync to send to all nodes.
475+
// 注意:这是定向消息的预期行为。使用 BroadcastAsync 可以向所有节点发送消息。
476+
_logger.LogTrace($"Ignoring message {message.MessageId} (type: {message.Type}) - target node is {message.ToNodeId}, current node is {_nodeId}. This is normal for targeted messages.");
377477
return true; // Ack to remove from queue / 确认以从队列中移除
378478
}
379479

380480
// Check for duplicate messages using message ID / 使用消息ID检查重复消息
481+
// For broadcast messages, use "MessageId:FromNodeId" as key to allow same message from same sender to be processed once per node
482+
// For targeted messages, use "MessageId:FromNodeId:ToNodeId" to allow same message to different targets
483+
// 对于广播消息,使用 "MessageId:FromNodeId" 作为键,允许来自同一发送者的相同消息在每个节点上处理一次
484+
// 对于定向消息,使用 "MessageId:FromNodeId:ToNodeId" 允许相同消息发送到不同目标
381485
if (!string.IsNullOrEmpty(message.MessageId))
382486
{
487+
// Create unique key based on message type / 根据消息类型创建唯一键
488+
string deduplicationKey;
489+
if (string.IsNullOrEmpty(message.ToNodeId))
490+
{
491+
// Broadcast message: same message from same sender should be processed once per node
492+
// 广播消息:来自同一发送者的相同消息应在每个节点上处理一次
493+
deduplicationKey = $"{message.MessageId}:{message.FromNodeId}";
494+
}
495+
else
496+
{
497+
// Targeted message: same message can be sent to different nodes
498+
// 定向消息:相同消息可以发送到不同节点
499+
deduplicationKey = $"{message.MessageId}:{message.FromNodeId}:{message.ToNodeId}";
500+
}
501+
383502
// Check if we've already processed this message / 检查是否已处理过此消息
384-
if (_processedMessageIds.TryGetValue(message.MessageId, out var processedTime))
503+
if (_processedMessageIds.TryGetValue(deduplicationKey, out var processedTime))
385504
{
386505
// Message already processed, ignore it / 消息已处理,忽略它
387-
_logger.LogDebug($"Ignoring duplicate message {message.MessageId} (processed at {processedTime:yyyy-MM-dd HH:mm:ss})");
506+
_logger.LogDebug($"Ignoring duplicate message {message.MessageId} (key: {deduplicationKey}, processed at {processedTime:yyyy-MM-dd HH:mm:ss})");
388507
return true; // Ack to remove from queue / 确认以从队列中移除
389508
}
390509

391510
// Mark message as processed / 标记消息为已处理
392-
_processedMessageIds.TryAdd(message.MessageId, DateTime.UtcNow);
511+
_processedMessageIds.TryAdd(deduplicationKey, DateTime.UtcNow);
393512
}
394513

395514
// Trigger message received event / 触发消息接收事件

Cyaim.WebSocketServer/Cyaim.WebSocketServer.MessagePack/MessagePackExtensions.cs

Lines changed: 71 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,18 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Net.WebSockets;
34
using System.Text;
45
using System.Threading;
56
using System.Threading.Tasks;
67
using Cyaim.WebSocketServer.Infrastructure;
8+
using Cyaim.WebSocketServer.Infrastructure.Cluster;
79
using MessagePack;
810

911
namespace Cyaim.WebSocketServer.MessagePack
1012
{
1113
/// <summary>
1214
/// MessagePack serialization extensions for WebSocket
15+
/// MessagePack 序列化扩展方法
1316
/// </summary>
1417
public static class MessagePackExtensions
1518
{
@@ -41,7 +44,7 @@ public static async Task SendAsync<T>(
4144

4245
options ??= MessagePackSerializerOptions.Standard;
4346
var serializedData = MessagePackSerializer.Serialize(data, options);
44-
await WebSocketManager.SendAsync(serializedData, messageType, serializedData.Length <= sendBufferSize, cancellationToken: cancellationToken ?? CancellationToken.None, timeout, sendBufferSize: (uint)sendBufferSize, sockets: socket);
47+
await WebSocketManager.SendLocalAsync(serializedData, messageType, serializedData.Length <= sendBufferSize, cancellationToken: cancellationToken ?? CancellationToken.None, timeout, sendBufferSize: (uint)sendBufferSize, sockets: socket);
4548
}
4649

4750
/// <summary>
@@ -72,8 +75,74 @@ public static async Task SendAsync<T>(
7275

7376
options ??= MessagePackSerializerOptions.Standard;
7477
var serializedData = MessagePackSerializer.Serialize(data, options);
75-
await WebSocketManager.SendAsync(serializedData, messageType, serializedData.Length <= sendBufferSize, cancellationToken: cancellationToken ?? CancellationToken.None, timeout, sendBufferSize: (uint)sendBufferSize, sockets: socket);
78+
await WebSocketManager.SendLocalAsync(serializedData, messageType, serializedData.Length <= sendBufferSize, cancellationToken: cancellationToken ?? CancellationToken.None, timeout, sendBufferSize: (uint)sendBufferSize, sockets: socket);
7679
}
80+
81+
#region ClusterManager Extensions / 集群管理器扩展
82+
83+
/// <summary>
84+
/// Route object as MessagePack binary message to a connection (supports cross-node)
85+
/// 将对象序列化为 MessagePack 二进制消息路由到连接(支持跨节点)
86+
/// </summary>
87+
/// <typeparam name="T">Object type / 对象类型</typeparam>
88+
/// <param name="clusterManager">Cluster manager instance / 集群管理器实例</param>
89+
/// <param name="connectionId">Connection ID / 连接 ID</param>
90+
/// <param name="data">Object to serialize / 要序列化的对象</param>
91+
/// <param name="options">MessagePack serializer options / MessagePack 序列化选项</param>
92+
/// <returns>True if routed successfully / 路由成功返回 true</returns>
93+
public static async Task<bool> RouteMessagePackAsync<T>(
94+
this ClusterManager clusterManager,
95+
string connectionId,
96+
T data,
97+
MessagePackSerializerOptions options = null)
98+
{
99+
if (clusterManager == null)
100+
{
101+
throw new ArgumentNullException(nameof(clusterManager));
102+
}
103+
104+
if (data == null)
105+
{
106+
return false;
107+
}
108+
109+
options ??= MessagePackSerializerOptions.Standard;
110+
var bytes = MessagePackSerializer.Serialize(data, options);
111+
return await clusterManager.RouteMessageAsync(connectionId, bytes, (int)WebSocketMessageType.Binary);
112+
}
113+
114+
/// <summary>
115+
/// Route object as MessagePack binary message to multiple connections (supports cross-node)
116+
/// 将对象序列化为 MessagePack 二进制消息路由到多个连接(支持跨节点)
117+
/// </summary>
118+
/// <typeparam name="T">Object type / 对象类型</typeparam>
119+
/// <param name="clusterManager">Cluster manager instance / 集群管理器实例</param>
120+
/// <param name="connectionIds">Connection IDs / 连接 ID 列表</param>
121+
/// <param name="data">Object to serialize / 要序列化的对象</param>
122+
/// <param name="options">MessagePack serializer options / MessagePack 序列化选项</param>
123+
/// <returns>Dictionary of connection ID to routing result / 连接ID到路由结果的字典</returns>
124+
public static async Task<Dictionary<string, bool>> RouteMessagePacksAsync<T>(
125+
this ClusterManager clusterManager,
126+
IEnumerable<string> connectionIds,
127+
T data,
128+
MessagePackSerializerOptions options = null)
129+
{
130+
if (clusterManager == null)
131+
{
132+
throw new ArgumentNullException(nameof(clusterManager));
133+
}
134+
135+
if (data == null)
136+
{
137+
return new Dictionary<string, bool>();
138+
}
139+
140+
options ??= MessagePackSerializerOptions.Standard;
141+
var bytes = MessagePackSerializer.Serialize(data, options);
142+
return await clusterManager.RouteMessagesAsync(connectionIds, bytes, (int)WebSocketMessageType.Binary);
143+
}
144+
145+
#endregion
77146
}
78147
}
79148

Cyaim.WebSocketServer/Cyaim.WebSocketServer.MessagePack/README.md

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,11 +149,64 @@ public class MessagePackResponseScheme
149149
}
150150
```
151151

152+
## 集群支持
153+
154+
### 使用 MessagePack 发送消息到集群
155+
156+
MessagePack 扩展提供了便捷的方法,可以直接通过集群管理器发送 MessagePack 序列化的对象:
157+
158+
```csharp
159+
using Cyaim.WebSocketServer.MessagePack;
160+
using Cyaim.WebSocketServer.Infrastructure.Cluster;
161+
162+
// 获取集群管理器
163+
var clusterManager = GlobalClusterCenter.ClusterManager;
164+
165+
// 定义要发送的对象
166+
var user = new { Id = 1, Name = "Alice", Age = 30 };
167+
168+
// 向单个连接发送 MessagePack 序列化的对象(支持跨节点)
169+
await clusterManager.RouteMessagePackAsync("connection-1", user);
170+
171+
// 向多个连接批量发送 MessagePack 序列化的对象(支持跨节点)
172+
var connectionIds = new[] { "connection-1", "connection-2", "connection-3" };
173+
var results = await clusterManager.RouteMessagePacksAsync(connectionIds, user);
174+
175+
// 检查发送结果
176+
foreach (var result in results)
177+
{
178+
if (result.Value)
179+
{
180+
Console.WriteLine($"成功发送到连接 {result.Key}");
181+
}
182+
else
183+
{
184+
Console.WriteLine($"发送到连接 {result.Key} 失败");
185+
}
186+
}
187+
188+
// 使用自定义 MessagePack 选项
189+
var options = MessagePackSerializerOptions.Standard.WithCompression(MessagePackCompression.Lz4Block);
190+
await clusterManager.RouteMessagePackAsync("connection-1", user, options);
191+
```
192+
193+
### 扩展方法说明
194+
195+
- **`RouteMessagePackAsync<T>`**: 向单个连接发送 MessagePack 序列化的对象
196+
- **`RouteMessagePacksAsync<T>`**: 向多个连接批量发送 MessagePack 序列化的对象
197+
198+
这些方法会自动:
199+
200+
- 将对象序列化为 MessagePack 二进制格式
201+
- 通过集群路由系统发送到目标连接(支持跨节点)
202+
- 使用 `WebSocketMessageType.Binary` 消息类型
203+
152204
## 注意事项
153205

154206
1. **消息类型** - MessagePackChannelHandler 只处理二进制消息 (`WebSocketMessageType.Binary`)
155207
2. **兼容性** - 内部仍使用 JSON 进行参数绑定,以保持与现有端点的兼容性
156208
3. **客户端** - 需要支持 MessagePack 的客户端库
209+
4. **集群扩展** - 使用集群扩展方法需要引用 `Cyaim.WebSocketServer.Infrastructure.Cluster` 命名空间
157210

158211
## 许可证
159212

0 commit comments

Comments
 (0)