Skip to content

Commit e5b6efa

Browse files
committed
feat: 集群新增转发Stream功能;release version
1 parent 41a8b71 commit e5b6efa

5 files changed

Lines changed: 632 additions & 9 deletions

File tree

Cyaim.WebSocketServer/Clients/Cyaim.WebSocketServer.Client/WebSocketClient.cs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -111,7 +111,7 @@ await _webSocket.SendAsync(
111111

112112
// Wait for response / 等待响应
113113
var response = await ReceiveResponseAsync(requestId, cancellationToken);
114-
114+
115115
if (response.Status != 0)
116116
{
117117
throw new Exception($"Request failed: {response.Msg ?? "Unknown error"}");
@@ -168,13 +168,13 @@ private async Task<MvcResponseScheme> ReceiveResponseAsync(string requestId, Can
168168
if (_options.Protocol == SerializationProtocol.MessagePack)
169169
{
170170
// 使用 MessagePack 反序列化
171-
var messagePackResponse = await ReceiveMessagePackResponseAsync(requestId, result, buffer, cancellationToken);
172-
171+
MessagePackResponseScheme messagePackResponse = await ReceiveMessagePackResponseAsync(requestId, result, buffer, cancellationToken);
172+
173173
// 转换为 MvcResponseScheme 格式
174174
response = new MvcResponseScheme
175175
{
176176
Id = messagePackResponse.Id,
177-
Target = messagePackResponse.Target,
177+
Target = messagePackResponse.Target ?? string.Empty,
178178
Status = messagePackResponse.Status,
179179
Msg = messagePackResponse.Msg,
180180
Body = messagePackResponse.Body
@@ -208,13 +208,13 @@ private async Task<MvcResponseScheme> ReceiveResponseAsync(string requestId, Can
208208
/// Receive MessagePack response / 接收 MessagePack 响应
209209
/// </summary>
210210
private async Task<MessagePackResponseScheme> ReceiveMessagePackResponseAsync(
211-
string requestId,
212-
WebSocketReceiveResult initialResult,
213-
byte[] buffer,
211+
string requestId,
212+
WebSocketReceiveResult initialResult,
213+
byte[] buffer,
214214
CancellationToken cancellationToken)
215215
{
216216
using var ms = new MemoryStream();
217-
217+
218218
// 接收完整消息(可能分片)
219219
do
220220
{

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/ClusterManager.cs

Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
using System;
22
using System.Collections.Generic;
3+
using System.IO;
34
using System.Linq;
45
using System.Reflection;
56
using System.Text;
67
using System.Text.Json;
8+
using System.Threading;
79
using System.Threading.Tasks;
810
using Cyaim.WebSocketServer.Infrastructure.Configures;
911
using Cyaim.WebSocketServer.Infrastructure.Metrics;
@@ -453,6 +455,108 @@ public async Task<Dictionary<string, bool>> RouteObjectsAsync<T>(
453455

454456
#endregion
455457

458+
#region Stream Routing Methods / 流路由方法
459+
460+
/// <summary>
461+
/// Route stream to connection(s) - automatically handles single machine or cluster mode (supports chunked transmission)
462+
/// 将流转发到连接 - 自动处理单机或集群模式(支持分块传输)
463+
/// </summary>
464+
/// <param name="connectionId">Connection ID / 连接 ID</param>
465+
/// <param name="stream">Stream to send / 要发送的流</param>
466+
/// <param name="messageType">WebSocket message type / WebSocket 消息类型</param>
467+
/// <param name="chunkSize">Chunk size in bytes (default: 64KB) / 块大小(字节,默认:64KB)</param>
468+
/// <param name="cancellationToken">Cancellation token / 取消令牌</param>
469+
/// <returns>True if routed successfully / 路由成功返回 true</returns>
470+
/// <remarks>
471+
/// This method reads the stream in chunks and sends them sequentially.
472+
/// For cluster mode, it automatically forwards chunks to the correct node.
473+
/// 此方法以块为单位读取流并顺序发送。
474+
/// 对于集群模式,它会自动将块转发到正确的节点。
475+
/// </remarks>
476+
public async Task<bool> RouteStreamAsync(
477+
string connectionId,
478+
Stream stream,
479+
WebSocketMessageType messageType = WebSocketMessageType.Binary,
480+
int chunkSize = 64 * 1024,
481+
CancellationToken cancellationToken = default)
482+
{
483+
if (string.IsNullOrEmpty(connectionId) || stream == null || !stream.CanRead)
484+
{
485+
return false;
486+
}
487+
488+
return await _router.RouteStreamAsync(connectionId, stream, (int)messageType, chunkSize, cancellationToken);
489+
}
490+
491+
/// <summary>
492+
/// Route stream to multiple connections - automatically handles single machine or cluster mode (supports chunked transmission)
493+
/// 将流转发到多个连接 - 自动处理单机或集群模式(支持分块传输)
494+
/// </summary>
495+
/// <param name="connectionIds">Connection IDs / 连接 ID 列表</param>
496+
/// <param name="stream">Stream to send / 要发送的流</param>
497+
/// <param name="messageType">WebSocket message type / WebSocket 消息类型</param>
498+
/// <param name="chunkSize">Chunk size in bytes (default: 64KB) / 块大小(字节,默认:64KB)</param>
499+
/// <param name="cancellationToken">Cancellation token / 取消令牌</param>
500+
/// <returns>Dictionary of connection ID to routing result / 连接ID到路由结果的字典</returns>
501+
/// <remarks>
502+
/// This method reads the stream once and sends chunks to all connections in parallel.
503+
/// For cluster mode, it automatically forwards chunks to the correct nodes.
504+
/// 此方法读取流一次,并并行向所有连接发送块。
505+
/// 对于集群模式,它会自动将块转发到正确的节点。
506+
/// </remarks>
507+
public async Task<Dictionary<string, bool>> RouteStreamsAsync(
508+
IEnumerable<string> connectionIds,
509+
Stream stream,
510+
WebSocketMessageType messageType = WebSocketMessageType.Binary,
511+
int chunkSize = 64 * 1024,
512+
CancellationToken cancellationToken = default)
513+
{
514+
if (connectionIds == null || stream == null || !stream.CanRead)
515+
{
516+
return new Dictionary<string, bool>();
517+
}
518+
519+
var results = new Dictionary<string, bool>();
520+
var connectionIdList = connectionIds.Where(id => !string.IsNullOrEmpty(id)).ToList();
521+
522+
if (connectionIdList.Count == 0)
523+
{
524+
return results;
525+
}
526+
527+
// Read stream once and send to all connections / 读取流一次并发送到所有连接
528+
// For multiple connections, we need to read the stream multiple times or buffer it
529+
// 对于多个连接,我们需要多次读取流或缓冲它
530+
// For simplicity, we'll buffer the stream for multiple connections
531+
// 为简单起见,我们将为多个连接缓冲流
532+
if (connectionIdList.Count > 1)
533+
{
534+
// Buffer the stream / 缓冲流
535+
using var memoryStream = new MemoryStream();
536+
await stream.CopyToAsync(memoryStream, cancellationToken);
537+
memoryStream.Position = 0;
538+
539+
var tasks = connectionIdList.Select(async connectionId =>
540+
{
541+
memoryStream.Position = 0;
542+
var success = await _router.RouteStreamAsync(connectionId, memoryStream, (int)messageType, chunkSize, cancellationToken);
543+
results[connectionId] = success;
544+
});
545+
546+
await Task.WhenAll(tasks);
547+
}
548+
else
549+
{
550+
// Single connection - stream directly / 单个连接 - 直接流式传输
551+
var connectionId = connectionIdList[0];
552+
results[connectionId] = await _router.RouteStreamAsync(connectionId, stream, (int)messageType, chunkSize, cancellationToken);
553+
}
554+
555+
return results;
556+
}
557+
558+
#endregion
559+
456560
/// <summary>
457561
/// Get connection count for this node
458562
/// 获取此节点的连接数

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/ClusterMessage.cs

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ public enum ClusterMessageType
3737
/// </summary>
3838
ForwardWebSocketMessage,
3939
/// <summary>
40+
/// Forward WebSocket stream / 转发 WebSocket 流
41+
/// </summary>
42+
ForwardWebSocketStream,
43+
/// <summary>
4044
/// Forward WebSocket response / 转发 WebSocket 响应
4145
/// </summary>
4246
ForwardWebSocketResponse,
@@ -286,6 +290,58 @@ public class ForwardWebSocketMessage
286290
public string Endpoint { get; set; }
287291
}
288292

293+
/// <summary>
294+
/// WebSocket stream forward message (supports chunked transmission)
295+
/// WebSocket 流转发消息(支持分块传输)
296+
/// </summary>
297+
public class ForwardWebSocketStream
298+
{
299+
/// <summary>
300+
/// Connection ID / 连接 ID
301+
/// </summary>
302+
public string ConnectionId { get; set; }
303+
304+
/// <summary>
305+
/// Target node ID / 目标节点 ID
306+
/// </summary>
307+
public string TargetNodeId { get; set; }
308+
309+
/// <summary>
310+
/// Stream ID for chunk correlation / 用于块关联的流 ID
311+
/// </summary>
312+
public string StreamId { get; set; }
313+
314+
/// <summary>
315+
/// Chunk index (0-based) / 块索引(从 0 开始)
316+
/// </summary>
317+
public int ChunkIndex { get; set; }
318+
319+
/// <summary>
320+
/// Whether this is the last chunk / 是否是最后一块
321+
/// </summary>
322+
public bool IsLastChunk { get; set; }
323+
324+
/// <summary>
325+
/// Chunk data / 块数据
326+
/// </summary>
327+
public byte[] Data { get; set; }
328+
329+
/// <summary>
330+
/// Message type (WebSocketMessageType as int) / 消息类型(WebSocketMessageType 的整数值)
331+
/// </summary>
332+
public int MessageType { get; set; }
333+
334+
/// <summary>
335+
/// Total stream size in bytes (optional, for progress tracking) / 流总大小(字节,可选,用于进度跟踪)
336+
/// </summary>
337+
public long? TotalSize { get; set; }
338+
339+
/// <summary>
340+
/// Endpoint / 端点
341+
/// </summary>
342+
public string Endpoint { get; set; }
343+
}
344+
289345
/// <summary>
290346
/// WebSocket connection registration
291347
/// WebSocket 连接注册信息

0 commit comments

Comments
 (0)