Skip to content

Commit 800ba20

Browse files
committed
fix: 修复集群之间消息转发;release version
1 parent 1e6a71c commit 800ba20

2 files changed

Lines changed: 41 additions & 25 deletions

File tree

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/ClusterRouter.cs

Lines changed: 38 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public async Task<bool> RouteMessageAsync(string connectionId, byte[] data, int
226226
if (targetNodeId == _nodeId)
227227
{
228228
// Local connection - handle directly / 本地连接 - 直接处理
229-
_logger.LogDebug($"Routing message to local connection {connectionId}");
229+
_logger.LogDebug($"Routing message to local connection {connectionId} on node {_nodeId}");
230230

231231
if (_connectionProvider != null)
232232
{
@@ -262,27 +262,24 @@ public async Task<bool> RouteMessageAsync(string connectionId, byte[] data, int
262262
else
263263
{
264264
// Remote connection - forward via transport / 远程连接 - 通过传输转发
265+
_logger.LogDebug($"Routing message to remote connection {connectionId} on node {targetNodeId} (current node: {_nodeId})");
265266
return await ForwardToNodeAsync(targetNodeId, connectionId, data, messageType);
266267
}
267268
}
268269
else
269270
{
270-
// Connection not found - query cluster if leader / 未找到连接 - 如果是领导者则查询集群
271-
if (_raftNode.IsLeader())
272-
{
273-
var found = await QueryConnectionAsync(connectionId);
274-
if (found != null)
275-
{
276-
return await ForwardToNodeAsync(found, connectionId, data, messageType);
277-
}
278-
}
279-
else
271+
// Connection not found - query cluster to find the node / 未找到连接 - 查询集群以找到节点
272+
// 无论是否为 leader,都尝试查询连接位置(因为连接可能在其他节点)
273+
// Whether leader or not, try to query connection location (connection might be on another node)
274+
_logger.LogDebug($"Connection {connectionId} not found in local routing table, querying cluster...");
275+
var found = await QueryConnectionAsync(connectionId);
276+
if (found != null)
280277
{
281-
// Query leader for connection location / 向领导者查询连接位置
282-
// Simplified: would need to send query to leader / 简化版本:需要向领导者发送查询
278+
_logger.LogInformation($"Found connection {connectionId} on node {found} after query, forwarding message");
279+
return await ForwardToNodeAsync(found, connectionId, data, messageType);
283280
}
284281

285-
_logger.LogWarning($"Connection {connectionId} not found in routing table");
282+
_logger.LogWarning($"Connection {connectionId} not found in routing table and query returned no result. Available connections in routing table: {_connectionRoutes.Count}");
286283
return false;
287284
}
288285
}
@@ -299,6 +296,17 @@ private async Task<bool> ForwardToNodeAsync(string targetNodeId, string connecti
299296
{
300297
try
301298
{
299+
// 检查目标节点是否可用(如果传输层支持)
300+
// Check if target node is available (if transport supports it)
301+
if (_transport is Transports.WebSocketClusterTransport wsTransport)
302+
{
303+
if (!wsTransport.IsNodeConnected(targetNodeId))
304+
{
305+
_logger.LogWarning($"Cannot forward message to node {targetNodeId} for connection {connectionId}: node is not connected");
306+
return false;
307+
}
308+
}
309+
302310
var forwardMessage = new ForwardWebSocketMessage
303311
{
304312
ConnectionId = connectionId,
@@ -319,17 +327,19 @@ private async Task<bool> ForwardToNodeAsync(string targetNodeId, string connecti
319327
Payload = JsonSerializer.Serialize(forwardMessage)
320328
};
321329

330+
_logger.LogDebug($"Attempting to forward message for connection {connectionId} to node {targetNodeId}, message size: {data.Length} bytes");
331+
322332
await _transport.SendAsync(targetNodeId, message);
323333

324334
// 记录集群消息转发指标
325335
_metricsCollector?.RecordClusterMessageForwarded(_nodeId, targetNodeId);
326336

327-
_logger.LogDebug($"Forwarded message for connection {connectionId} to node {targetNodeId}");
337+
_logger.LogDebug($"Successfully forwarded message for connection {connectionId} to node {targetNodeId}");
328338
return true;
329339
}
330340
catch (Exception ex)
331341
{
332-
_logger.LogError(ex, $"Failed to forward message for connection {connectionId} to node {targetNodeId}");
342+
_logger.LogError(ex, $"Failed to forward message for connection {connectionId} to node {targetNodeId}. Error: {ex.Message}, StackTrace: {ex.StackTrace}");
333343
_metricsCollector?.RecordError("cluster_forward_failed", _nodeId);
334344
return false;
335345
}
@@ -528,15 +538,21 @@ private async Task<string> QueryConnectionAsync(string connectionId)
528538
// Broadcast query / 广播查询
529539
await _transport.BroadcastAsync(message);
530540

531-
// Wait for response (simplified - would need timeout and response handling)
532-
// 等待响应(简化版本 - 需要超时和响应处理)
533-
await Task.Delay(100);
534-
535-
if (_connectionRoutes.TryGetValue(connectionId, out var nodeId))
541+
// Wait for response with retries / 等待响应,带重试
542+
// 增加等待时间,因为网络延迟可能导致响应较慢
543+
// Increase wait time as network latency may cause slower responses
544+
for (int i = 0; i < 5; i++)
536545
{
537-
return nodeId;
546+
await Task.Delay(50); // 每次等待50ms,总共最多250ms
547+
548+
if (_connectionRoutes.TryGetValue(connectionId, out var nodeId))
549+
{
550+
_logger.LogDebug($"Found connection {connectionId} on node {nodeId} after {i + 1} query attempts");
551+
return nodeId;
552+
}
538553
}
539554

555+
_logger.LogWarning($"Connection {connectionId} not found after querying cluster");
540556
return null;
541557
}
542558

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/Transports/WebSocketClusterTransport.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ public async Task SendAsync(string nodeId, ClusterMessage message)
321321
if (!_nodes.TryGetValue(nodeId, out var node))
322322
{
323323
_logger.LogWarning($"Node {nodeId} not found in registered nodes. Available nodes: {string.Join(", ", _nodes.Keys)}");
324-
return;
324+
throw new InvalidOperationException($"Node {nodeId} not found in registered nodes. Available nodes: {string.Join(", ", _nodes.Keys)}");
325325
}
326326

327327
ClientWebSocket connection = null;
@@ -333,13 +333,13 @@ public async Task SendAsync(string nodeId, ClusterMessage message)
333333
catch (Exception ex)
334334
{
335335
_logger.LogWarning(ex, $"Failed to get or create connection to node {nodeId}, message will be dropped. Error: {ex.Message}");
336-
return;
336+
throw new InvalidOperationException($"Failed to get or create connection to node {nodeId}", ex);
337337
}
338338

339339
if (connection == null || connection.State != WebSocketState.Open)
340340
{
341341
_logger.LogWarning($"Cannot send message to node {nodeId}, connection not available (state: {connection?.State}). Active connections: {_connections.Count}");
342-
return;
342+
throw new InvalidOperationException($"Cannot send message to node {nodeId}, connection not available (state: {connection?.State})");
343343
}
344344

345345
_logger.LogDebug($"Sending {message.Type} message to node {nodeId}");

0 commit comments

Comments
 (0)