Skip to content

Commit b668faf

Browse files
committed
perf: 提升集群稳定性;支持2节点集群;
1 parent b1f8677 commit b668faf

2 files changed

Lines changed: 211 additions & 5 deletions

File tree

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/RaftNode.cs

Lines changed: 124 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -297,14 +297,52 @@ private async Task StartElectionAsync()
297297

298298
_logger.LogInformation($"Election result: {votesReceived}/{votesNeeded} votes received");
299299

300-
if (votesReceived >= votesNeeded && State == RaftNodeState.Candidate)
300+
var totalNodes = knownNodes.Count + 1;
301+
var isTwoNodeCluster = totalNodes == 2;
302+
303+
// Special handling for 2-node clusters or when majority cannot be achieved / 2节点集群或无法达到多数的特殊处理
304+
if (votesReceived < votesNeeded && State == RaftNodeState.Candidate)
301305
{
302-
_logger.LogInformation($"Node {_nodeId} won election with {votesReceived} votes, becoming leader");
303-
BecomeLeader();
306+
if (isTwoNodeCluster)
307+
{
308+
_logger.LogInformation($"Two-node cluster detected. Using network quality-based leader selection.");
309+
310+
// Add a small random delay to avoid simultaneous elections / 添加小的随机延迟以避免同时选举
311+
var randomDelay = new Random().Next(100, 300);
312+
await Task.Delay(randomDelay);
313+
314+
// Re-check state in case we received a heartbeat from the other node / 重新检查状态,以防我们从另一个节点收到心跳
315+
if (State != RaftNodeState.Candidate)
316+
{
317+
_logger.LogInformation($"Node {_nodeId} state changed during delay, aborting leader selection");
318+
return;
319+
}
320+
321+
// For 2-node clusters, select leader based on network quality / 对于2节点集群,基于网络质量选择Leader
322+
var shouldBecomeLeader = await ShouldBecomeLeaderBasedOnNetworkQuality(knownNodes);
323+
324+
if (shouldBecomeLeader)
325+
{
326+
_logger.LogInformation($"Node {_nodeId} selected as leader based on network quality in 2-node cluster");
327+
BecomeLeader();
328+
return;
329+
}
330+
else
331+
{
332+
_logger.LogInformation($"Node {_nodeId} will wait for other node to become leader based on network quality");
333+
// Wait a bit longer before retrying / 在重试前等待更长时间
334+
await Task.Delay(1000);
335+
}
336+
}
337+
else
338+
{
339+
_logger.LogInformation($"Node {_nodeId} did not receive enough votes ({votesReceived}/{votesNeeded}), will retry");
340+
}
304341
}
305-
else if (State == RaftNodeState.Candidate)
342+
else if (votesReceived >= votesNeeded && State == RaftNodeState.Candidate)
306343
{
307-
_logger.LogInformation($"Node {_nodeId} did not receive enough votes ({votesReceived}/{votesNeeded}), will retry");
344+
_logger.LogInformation($"Node {_nodeId} won election with {votesReceived} votes, becoming leader");
345+
BecomeLeader();
308346
}
309347
}
310348

@@ -755,6 +793,87 @@ private List<string> GetKnownNodeIds()
755793
return nodeIds;
756794
}
757795

796+
/// <summary>
797+
/// Determine if this node should become leader based on network quality (for 2-node clusters) / 基于网络质量确定此节点是否应成为Leader(用于2节点集群)
798+
/// </summary>
799+
/// <param name="knownNodes">List of known node IDs / 已知节点ID列表</param>
800+
/// <returns>True if this node should become leader / 如果此节点应成为Leader返回 true</returns>
801+
private async Task<bool> ShouldBecomeLeaderBasedOnNetworkQuality(List<string> knownNodes)
802+
{
803+
if (knownNodes.Count == 0)
804+
{
805+
return true; // Only node, become leader / 唯一节点,成为Leader
806+
}
807+
808+
if (_transport is Transports.WebSocketClusterTransport wsTransport)
809+
{
810+
try
811+
{
812+
// Measure network quality to all other nodes / 测量到所有其他节点的网络质量
813+
var qualityScores = new Dictionary<string, int>();
814+
var myTotalQuality = 0;
815+
816+
foreach (var nodeId in knownNodes)
817+
{
818+
var quality = await wsTransport.GetNetworkQualityAsync(nodeId);
819+
qualityScores[nodeId] = quality;
820+
myTotalQuality += quality;
821+
_logger.LogDebug($"Network quality to node {nodeId}: {quality}/100");
822+
}
823+
824+
// For 2-node cluster, compare our quality with the other node's quality
825+
// 对于2节点集群,比较我们的质量与另一个节点的质量
826+
if (knownNodes.Count == 1)
827+
{
828+
var otherNodeId = knownNodes[0];
829+
var otherQuality = qualityScores[otherNodeId];
830+
831+
// If we have better or equal network quality, we should be leader
832+
// 如果我们有更好或相等的网络质量,我们应该成为Leader
833+
// Also consider: if we can't measure the other node's quality to us,
834+
// we'll use a tie-breaker (node ID comparison)
835+
// 同时考虑:如果我们无法测量另一个节点到我们的质量,我们将使用平局决胜(节点ID比较)
836+
837+
if (myTotalQuality > otherQuality)
838+
{
839+
_logger.LogInformation($"Node {_nodeId} has better network quality ({myTotalQuality} vs {otherQuality}), becoming leader");
840+
return true;
841+
}
842+
else if (myTotalQuality < otherQuality)
843+
{
844+
_logger.LogInformation($"Other node has better network quality ({otherQuality} vs {myTotalQuality}), waiting");
845+
return false;
846+
}
847+
else
848+
{
849+
// Tie: use node ID as tie-breaker (lexicographically smaller becomes leader)
850+
// 平局:使用节点ID作为平局决胜(字典序较小的成为Leader)
851+
var shouldLead = string.Compare(_nodeId, otherNodeId, StringComparison.Ordinal) < 0;
852+
_logger.LogInformation($"Network quality tie ({myTotalQuality} = {otherQuality}), using node ID comparison: {_nodeId} {(shouldLead ? "<" : ">")} {otherNodeId}");
853+
return shouldLead;
854+
}
855+
}
856+
}
857+
catch (Exception ex)
858+
{
859+
_logger.LogWarning(ex, "Error evaluating network quality, using node ID as tie-breaker");
860+
// Fallback to node ID comparison / 回退到节点ID比较
861+
if (knownNodes.Count == 1)
862+
{
863+
return string.Compare(_nodeId, knownNodes[0], StringComparison.Ordinal) < 0;
864+
}
865+
}
866+
}
867+
868+
// Fallback: use node ID comparison / 回退:使用节点ID比较
869+
if (knownNodes.Count == 1)
870+
{
871+
return string.Compare(_nodeId, knownNodes[0], StringComparison.Ordinal) < 0;
872+
}
873+
874+
return false;
875+
}
876+
758877
/// <summary>
759878
/// Check if this node is the leader / 检查此节点是否为领导者
760879
/// </summary>

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Cluster/Transports/WebSocketClusterTransport.cs

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,93 @@ public bool IsNodeConnected(string nodeId)
386386
return false;
387387
}
388388

389+
/// <summary>
390+
/// Measure network latency to a node / 测量到节点的网络延迟
391+
/// </summary>
392+
/// <param name="nodeId">Target node ID / 目标节点 ID</param>
393+
/// <returns>Latency in milliseconds, or -1 if node is not connected / 延迟(毫秒),如果节点未连接则返回 -1</returns>
394+
public async Task<long> MeasureLatencyAsync(string nodeId)
395+
{
396+
if (!IsNodeConnected(nodeId))
397+
{
398+
return -1;
399+
}
400+
401+
try
402+
{
403+
// Measure latency by sending a small message and measuring response time
404+
// 通过发送小消息并测量响应时间来测量延迟
405+
// For simplicity, we'll use connection state and a simple heuristic
406+
// 为简单起见,我们将使用连接状态和简单的启发式方法
407+
408+
// Check connection state and estimate latency based on connection quality
409+
// 检查连接状态并根据连接质量估计延迟
410+
if (_connections.TryGetValue(nodeId, out var connection))
411+
{
412+
if (connection.State == WebSocketState.Open)
413+
{
414+
// Estimate latency: if connection is stable, assume low latency
415+
// 估计延迟:如果连接稳定,假设低延迟
416+
// In a production system, you'd track actual RTT from message exchanges
417+
// 在生产系统中,您会跟踪消息交换的实际RTT
418+
419+
// For now, return a conservative estimate (50ms for local, 100ms for remote)
420+
// 目前,返回保守估计(本地50ms,远程100ms)
421+
var node = _nodes.TryGetValue(nodeId, out var n) ? n : null;
422+
if (node != null)
423+
{
424+
// If address is localhost, assume low latency / 如果地址是localhost,假设低延迟
425+
if (node.Address == "localhost" || node.Address == "127.0.0.1")
426+
{
427+
return 5; // Very low latency for local / 本地延迟非常低
428+
}
429+
}
430+
return 50; // Default estimate for connected node / 已连接节点的默认估计
431+
}
432+
}
433+
434+
return -1;
435+
}
436+
catch (Exception ex)
437+
{
438+
_logger.LogWarning(ex, $"Failed to measure latency to node {nodeId}");
439+
return -1;
440+
}
441+
}
442+
443+
/// <summary>
444+
/// Get network quality score for a node (0-100, higher is better) / 获取节点的网络质量分数(0-100,越高越好)
445+
/// </summary>
446+
/// <param name="nodeId">Target node ID / 目标节点 ID</param>
447+
/// <returns>Quality score (0-100) / 质量分数(0-100)</returns>
448+
public async Task<int> GetNetworkQualityAsync(string nodeId)
449+
{
450+
if (!IsNodeConnected(nodeId))
451+
{
452+
return 0;
453+
}
454+
455+
try
456+
{
457+
var latency = await MeasureLatencyAsync(nodeId);
458+
if (latency < 0)
459+
{
460+
return 0;
461+
}
462+
463+
// Calculate quality score based on latency / 基于延迟计算质量分数
464+
// Lower latency = higher quality / 延迟越低 = 质量越高
465+
// 0ms = 100, 100ms = 50, 200ms+ = 0
466+
var quality = Math.Max(0, 100 - (int)(latency / 2));
467+
return quality;
468+
}
469+
catch (Exception ex)
470+
{
471+
_logger.LogWarning(ex, $"Failed to get network quality for node {nodeId}");
472+
return 0;
473+
}
474+
}
475+
389476
/// <summary>
390477
/// Register a node to connect to / 注册要连接的节点
391478
/// </summary>

0 commit comments

Comments
 (0)