Skip to content

Commit 55b8850

Browse files
committed
fix: 提升集群的稳定性;优化路由表;修复节点断开连接不更新的问题
1 parent b668faf commit 55b8850

7 files changed

Lines changed: 770 additions & 30 deletions

File tree

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/ClusterManager.cs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,32 @@ public async Task StartAsync()
101101
}
102102
}
103103

104+
/// <summary>
105+
/// Gracefully shutdown the cluster with connection transfer / 优雅关闭集群并转移连接
106+
/// </summary>
107+
/// <param name="force">Force shutdown without transfer / 强制关闭不转移连接</param>
108+
/// <returns>Task / 任务</returns>
109+
public async Task ShutdownAsync(bool force = false)
110+
{
111+
_logger.LogInformation($"Shutting down cluster manager for node {_nodeId} (force: {force})");
112+
113+
try
114+
{
115+
if (!force)
116+
{
117+
// Graceful shutdown: transfer connections before stopping / 优雅关闭:在停止前转移连接
118+
await _router.GracefulShutdownAsync(_clusterOption);
119+
}
120+
121+
await StopAsync();
122+
}
123+
catch (Exception ex)
124+
{
125+
_logger.LogError(ex, $"Error during cluster shutdown for node {_nodeId}");
126+
throw;
127+
}
128+
}
129+
104130
/// <summary>
105131
/// Stop the cluster
106132
/// 停止集群

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/ClusterRouter.cs

Lines changed: 673 additions & 14 deletions
Large diffs are not rendered by default.

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Handlers/MvcHandler/MvcChannelHandler.cs

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,22 @@ public async Task ConnectionEntry(HttpContext context, ILogger<WebSocketRouteMid
206206
var currentNodeId = Infrastructure.Cluster.GlobalClusterCenter.ClusterContext?.NodeId;
207207
_metricsCollector?.RecordConnectionEstablished(currentNodeId, context.Request.Path);
208208

209+
// Register connection with cluster manager if cluster is enabled
210+
// 如果启用了集群,向集群管理器注册连接
211+
var clusterManager = Infrastructure.Cluster.GlobalClusterCenter.ClusterManager;
212+
if (clusterManager != null)
213+
{
214+
try
215+
{
216+
await clusterManager.RegisterConnectionAsync(context.Connection.Id, context.Request.Path);
217+
logger.LogDebug($"Registered connection {context.Connection.Id} with cluster manager");
218+
}
219+
catch (Exception ex)
220+
{
221+
logger.LogWarning(ex, $"Failed to register connection {context.Connection.Id} with cluster manager");
222+
}
223+
}
224+
209225
// 执行BeforeReceivingData管道
210226
_ = await InvokePipeline(RequestPipelineStage.Connected, context, webSocket, null, null);
211227

@@ -1039,6 +1055,22 @@ private async Task MvcChannel_OnDisconnected(HttpContext context, WebSocketClose
10391055
if (wsExists)
10401056
{
10411057
Clients.TryRemove(context.Connection.Id, out var _);
1058+
1059+
// Unregister connection from cluster manager if cluster is enabled
1060+
// 如果启用了集群,从集群管理器注销连接
1061+
var clusterManager = Infrastructure.Cluster.GlobalClusterCenter.ClusterManager;
1062+
if (clusterManager != null)
1063+
{
1064+
try
1065+
{
1066+
await clusterManager.UnregisterConnectionAsync(context.Connection.Id);
1067+
logger.LogDebug($"Unregistered connection {context.Connection.Id} from cluster manager");
1068+
}
1069+
catch (Exception ex)
1070+
{
1071+
logger.LogWarning(ex, $"Failed to unregister connection {context.Connection.Id} from cluster manager");
1072+
}
1073+
}
10421074
}
10431075

10441076
ParallelForwardLimitSlim?.Dispose();

Cyaim.WebSocketServer/Dashboard/Cyaim.WebSocketServer.Dashboard/Controllers/ClientController.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,15 @@ public ActionResult<ApiResponse<List<ClientConnectionInfo>>> GetAll([FromQuery]
8888
// If not found locally or on remote node, create basic info / 如果未在本地找到或在远程节点,创建基本信息
8989
if (clientInfo == null)
9090
{
91+
// If connection is in routing table, assume it's Open (disconnected connections are unregistered)
92+
// 如果连接在路由表中,假设它是 Open 状态(断开的连接会被注销)
93+
var state = "Open"; // Assume open if in routing table / 如果在路由表中,假设为 Open
94+
9195
clientInfo = new ClientConnectionInfo
9296
{
9397
ConnectionId = connectionId,
9498
NodeId = targetNodeId,
95-
State = "Unknown", // Remote connections state is unknown / 远程连接状态未知
99+
State = state,
96100
BytesSent = 0,
97101
BytesReceived = 0,
98102
MessagesSent = 0,

Cyaim.WebSocketServer/Dashboard/Cyaim.WebSocketServer.Dashboard/Controllers/StatisticsController.cs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,11 +108,15 @@ public ActionResult<ApiResponse<List<ClientConnectionInfo>>> GetConnections()
108108
// If not found locally or on remote node, create basic info / 如果未在本地找到或在远程节点,创建基本信息
109109
if (clientInfo == null)
110110
{
111+
// If connection is in routing table, assume it's Open (disconnected connections are unregistered)
112+
// 如果连接在路由表中,假设它是 Open 状态(断开的连接会被注销)
113+
var state = "Open"; // Assume open if in routing table / 如果在路由表中,假设为 Open
114+
111115
clientInfo = new ClientConnectionInfo
112116
{
113117
ConnectionId = connectionId,
114118
NodeId = targetNodeId,
115-
State = "Unknown",
119+
State = state,
116120
BytesSent = 0,
117121
BytesReceived = 0,
118122
MessagesSent = 0,

Cyaim.WebSocketServer/Dashboard/Cyaim.WebSocketServer.Dashboard/Services/DashboardHelperService.cs

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -23,22 +23,27 @@ public Dictionary<string, string> GetAllClusterConnections()
2323
var clusterManager = GlobalClusterCenter.ClusterManager;
2424
var currentNodeId = GlobalClusterCenter.ClusterContext?.NodeId ?? "unknown";
2525

26-
// Get local connections / 获取本地连接
27-
if (MvcChannelHandler.Clients != null)
26+
// Priority 1: Get connections from cluster routing table (most authoritative) / 优先级1:从集群路由表获取连接(最权威)
27+
// This includes all connections from all nodes in the cluster / 这包括集群中所有节点的所有连接
28+
if (clusterManager != null && clusterManager.ConnectionRoutes != null)
2829
{
29-
foreach (var kvp in MvcChannelHandler.Clients)
30+
foreach (var kvp in clusterManager.ConnectionRoutes)
3031
{
31-
connections[kvp.Key] = currentNodeId;
32+
connections[kvp.Key] = kvp.Value;
3233
}
3334
}
3435

35-
// Get connections from cluster routing table / 从集群路由表获取连接
36-
if (clusterManager != null && clusterManager.ConnectionRoutes != null)
36+
// Priority 2: Add local connections that might not be in routing table yet / 优先级2:添加可能尚未在路由表中的本地连接
37+
// This ensures we don't miss any local connections that haven't been registered yet / 这确保我们不会遗漏任何尚未注册的本地连接
38+
if (MvcChannelHandler.Clients != null)
3739
{
38-
foreach (var kvp in clusterManager.ConnectionRoutes)
40+
foreach (var kvp in MvcChannelHandler.Clients)
3941
{
40-
// Merge with local connections, cluster routes take precedence / 与本地连接合并,集群路由优先
41-
connections[kvp.Key] = kvp.Value;
42+
// Only add if not already in connections (cluster routes take precedence) / 仅在尚未在连接中时添加(集群路由优先)
43+
if (!connections.ContainsKey(kvp.Key))
44+
{
45+
connections[kvp.Key] = currentNodeId;
46+
}
4247
}
4348
}
4449

Cyaim.WebSocketServer/Sample/Cyaim.WebSocketServer.Sample.Dashboard/Program.cs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -229,21 +229,31 @@
229229
});
230230
});
231231

232-
// 在应用关闭时停止集群
232+
// 在应用关闭时停止集群(优雅关闭)
233233
lifetime.ApplicationStopping.Register(() =>
234234
{
235235
Task.Run(async () =>
236236
{
237237
try
238238
{
239-
await clusterManager.StopAsync();
240-
logger.LogInformation($"集群节点 {nodeId} 已停止");
239+
// Graceful shutdown: transfer connections before stopping / 优雅关闭:在停止前转移连接
240+
await clusterManager.ShutdownAsync(force: false);
241+
logger.LogInformation($"集群节点 {nodeId} 已优雅关闭");
241242
}
242243
catch (Exception ex)
243244
{
244-
logger.LogError(ex, "停止集群时发生错误");
245+
logger.LogError(ex, "优雅关闭集群时发生错误,尝试强制关闭");
246+
try
247+
{
248+
// Force shutdown if graceful shutdown fails / 如果优雅关闭失败,强制关闭
249+
await clusterManager.ShutdownAsync(force: true);
250+
}
251+
catch (Exception forceEx)
252+
{
253+
logger.LogError(forceEx, "强制关闭集群时发生错误");
254+
}
245255
}
246-
}).Wait(TimeSpan.FromSeconds(5)); // 等待最多5秒
256+
}).Wait(TimeSpan.FromSeconds(30)); // 等待最多30秒(优雅关闭需要更多时间)
247257
});
248258
}
249259
catch (Exception ex)

0 commit comments

Comments
 (0)