Skip to content

Commit 0893c7f

Browse files
committed
perfect:优化集群节点性能计数器
1 parent afb8c33 commit 0893c7f

6 files changed

Lines changed: 856 additions & 70 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/HybridClusterTransport.cs

Lines changed: 164 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,14 +58,16 @@ public class HybridClusterTransport : IClusterTransport
5858
/// <param name="nodeId">Current node ID / 当前节点 ID</param>
5959
/// <param name="nodeInfo">Current node information / 当前节点信息</param>
6060
/// <param name="loadBalancingStrategy">Load balancing strategy / 负载均衡策略</param>
61+
/// <param name="nodeInfoProvider">Optional function to automatically get latest node info during heartbeat / 可选函数,用于在心跳时自动获取最新节点信息</param>
6162
public HybridClusterTransport(
6263
ILogger<HybridClusterTransport> logger,
6364
ILoggerFactory loggerFactory,
6465
IRedisService redisService,
6566
IMessageQueueService messageQueueService,
6667
string nodeId,
6768
NodeInfo nodeInfo,
68-
LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LeastConnections)
69+
LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LeastConnections,
70+
Func<Task<NodeInfo>> nodeInfoProvider = null)
6971
{
7072
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
7173
loggerFactory = loggerFactory ?? throw new ArgumentNullException(nameof(loggerFactory));
@@ -81,11 +83,21 @@ public HybridClusterTransport(
8183
var discoveryLogger = loggerFactory.CreateLogger<RedisNodeDiscoveryService>();
8284
var loadBalancerLogger = loggerFactory.CreateLogger<LoadBalancer>();
8385

86+
// If no provider is provided, try to create a default one that auto-detects connection count
87+
// 如果没有提供 provider,尝试创建一个默认的,自动检测连接数
88+
Func<Task<NodeInfo>> finalProvider = nodeInfoProvider;
89+
if (finalProvider == null)
90+
{
91+
finalProvider = CreateDefaultNodeInfoProvider(nodeInfo);
92+
}
93+
8494
_discoveryService = new RedisNodeDiscoveryService(
8595
discoveryLogger,
8696
redisService,
8797
nodeId,
88-
nodeInfo);
98+
nodeInfo,
99+
clusterPrefix: "websocket:cluster",
100+
nodeInfoProvider: finalProvider);
89101

90102
_loadBalancer = new LoadBalancer(loadBalancerLogger, loadBalancingStrategy);
91103

@@ -288,6 +300,42 @@ public NodeInfo GetOptimalNode(string excludeNodeId = null)
288300
return _loadBalancer.SelectNode(nodes, excludeNodeId ?? _nodeId);
289301
}
290302

303+
/// <summary>
304+
/// Update current node information in Redis / 更新 Redis 中的当前节点信息
305+
/// </summary>
306+
/// <param name="nodeInfo">Updated node information / 更新的节点信息</param>
307+
/// <remarks>
308+
/// This method allows you to update dynamic node information such as connection count, CPU usage, and memory usage.
309+
/// The updated information will be stored in Redis and used for load balancing.
310+
/// 此方法允许您更新动态节点信息,如连接数、CPU 使用率和内存使用率。
311+
/// 更新的信息将存储在 Redis 中,并用于负载均衡。
312+
/// </remarks>
313+
public async Task UpdateNodeInfoAsync(NodeInfo nodeInfo)
314+
{
315+
if (nodeInfo == null)
316+
{
317+
throw new ArgumentNullException(nameof(nodeInfo));
318+
}
319+
320+
// Update internal node info / 更新内部节点信息
321+
_nodeInfo.NodeId = nodeInfo.NodeId;
322+
_nodeInfo.Address = nodeInfo.Address;
323+
_nodeInfo.Port = nodeInfo.Port;
324+
_nodeInfo.Endpoint = nodeInfo.Endpoint;
325+
_nodeInfo.ConnectionCount = nodeInfo.ConnectionCount;
326+
_nodeInfo.MaxConnections = nodeInfo.MaxConnections;
327+
_nodeInfo.CpuUsage = nodeInfo.CpuUsage;
328+
_nodeInfo.MemoryUsage = nodeInfo.MemoryUsage;
329+
_nodeInfo.Status = nodeInfo.Status;
330+
if (nodeInfo.Metadata != null)
331+
{
332+
_nodeInfo.Metadata = nodeInfo.Metadata;
333+
}
334+
335+
// Update in Redis via discovery service / 通过发现服务更新到 Redis
336+
await _discoveryService.UpdateNodeInfoAsync(_nodeInfo);
337+
}
338+
291339
/// <summary>
292340
/// Handle incoming message / 处理传入消息
293341
/// </summary>
@@ -372,6 +420,120 @@ private void OnNodeRemoved(object sender, string nodeId)
372420
}
373421
}
374422

423+
/// <summary>
424+
/// Create default node info provider that auto-detects connection count
425+
/// 创建默认的节点信息提供者,自动检测连接数
426+
/// </summary>
427+
/// <param name="baseNodeInfo">Base node info to use / 要使用的基础节点信息</param>
428+
/// <returns>Node info provider function / 节点信息提供者函数</returns>
429+
private Func<Task<NodeInfo>> CreateDefaultNodeInfoProvider(NodeInfo baseNodeInfo)
430+
{
431+
return async () =>
432+
{
433+
try
434+
{
435+
// Try to get connection count from MvcChannelHandler (most common case)
436+
// 尝试从 MvcChannelHandler 获取连接数(最常见的情况)
437+
int connectionCount = 0;
438+
try
439+
{
440+
// Use reflection to avoid direct dependency / 使用反射避免直接依赖
441+
var mvcHandlerType = Type.GetType("Cyaim.WebSocketServer.Infrastructure.Handlers.MvcHandler.MvcChannelHandler, Cyaim.WebSocketServer");
442+
if (mvcHandlerType != null)
443+
{
444+
var clientsProperty = mvcHandlerType.GetProperty("Clients", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static);
445+
if (clientsProperty != null)
446+
{
447+
var clients = clientsProperty.GetValue(null);
448+
if (clients is System.Collections.ICollection collection)
449+
{
450+
connectionCount = collection.Count;
451+
}
452+
}
453+
}
454+
}
455+
catch
456+
{
457+
// Ignore reflection errors / 忽略反射错误
458+
}
459+
460+
// Try to get from ClusterManager if available / 如果可用,尝试从 ClusterManager 获取
461+
if (connectionCount == 0)
462+
{
463+
try
464+
{
465+
var clusterCenterType = Type.GetType("Cyaim.WebSocketServer.Infrastructure.Cluster.GlobalClusterCenter, Cyaim.WebSocketServer");
466+
if (clusterCenterType != null)
467+
{
468+
var clusterManagerProperty = clusterCenterType.GetProperty("ClusterManager", System.Reflection.BindingFlags.Public | System.Reflection.BindingFlags.Static);
469+
if (clusterManagerProperty != null)
470+
{
471+
var clusterManager = clusterManagerProperty.GetValue(null);
472+
if (clusterManager != null)
473+
{
474+
var getLocalConnectionCountMethod = clusterManager.GetType().GetMethod("GetLocalConnectionCount");
475+
if (getLocalConnectionCountMethod != null)
476+
{
477+
var count = getLocalConnectionCountMethod.Invoke(clusterManager, null);
478+
if (count is int localCount)
479+
{
480+
connectionCount = localCount;
481+
}
482+
}
483+
}
484+
}
485+
}
486+
}
487+
catch
488+
{
489+
// Ignore reflection errors / 忽略反射错误
490+
}
491+
}
492+
493+
// Get CPU and memory usage / 获取 CPU 和内存使用率
494+
double cpuUsage = 0.0;
495+
double memoryUsage = 0.0;
496+
try
497+
{
498+
var process = System.Diagnostics.Process.GetCurrentProcess();
499+
var totalProcessorTime = process.TotalProcessorTime.TotalMilliseconds;
500+
var uptime = (DateTime.UtcNow - process.StartTime.ToUniversalTime()).TotalMilliseconds;
501+
cpuUsage = uptime > 0 ? (totalProcessorTime / uptime) * 100 : 0.0;
502+
503+
var workingSet = process.WorkingSet64;
504+
memoryUsage = (double)workingSet / (1024 * 1024); // Convert to MB / 转换为 MB
505+
}
506+
catch
507+
{
508+
// Ignore errors / 忽略错误
509+
}
510+
511+
// Return updated node info / 返回更新的节点信息
512+
return new NodeInfo
513+
{
514+
NodeId = baseNodeInfo.NodeId,
515+
Address = baseNodeInfo.Address,
516+
Port = baseNodeInfo.Port,
517+
Endpoint = baseNodeInfo.Endpoint,
518+
ConnectionCount = connectionCount,
519+
MaxConnections = baseNodeInfo.MaxConnections,
520+
CpuUsage = cpuUsage,
521+
MemoryUsage = memoryUsage,
522+
Status = baseNodeInfo.Status,
523+
Metadata = baseNodeInfo.Metadata ?? new Dictionary<string, string>(),
524+
RegisteredAt = baseNodeInfo.RegisteredAt
525+
};
526+
}
527+
catch (Exception ex)
528+
{
529+
_logger.LogWarning(ex, "Failed to auto-update node info, using cached info");
530+
// Return base info with updated heartbeat / 返回基础信息并更新心跳
531+
baseNodeInfo.LastHeartbeat = DateTime.UtcNow;
532+
return baseNodeInfo;
533+
}
534+
};
535+
}
536+
375537
/// <summary>
376538
/// Dispose / 释放资源
377539
/// </summary>

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/HybridClusterTransportFactory.cs

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading.Tasks;
23
using Cyaim.WebSocketServer.Cluster.Hybrid.Abstractions;
34
using Cyaim.WebSocketServer.Infrastructure.Cluster;
45
using Microsoft.Extensions.Logging;
@@ -24,6 +25,7 @@ public class HybridClusterTransportFactory
2425
/// <param name="endpoint">WebSocket endpoint / WebSocket 端点</param>
2526
/// <param name="maxConnections">Maximum connections / 最大连接数</param>
2627
/// <param name="loadBalancingStrategy">Load balancing strategy / 负载均衡策略</param>
28+
/// <param name="nodeInfoProvider">Optional function to automatically get latest node info during heartbeat / 可选函数,用于在心跳时自动获取最新节点信息</param>
2729
/// <returns>Hybrid cluster transport instance / 混合集群传输实例</returns>
2830
public static IClusterTransport Create(
2931
ILogger<HybridClusterTransport> logger,
@@ -35,7 +37,8 @@ public static IClusterTransport Create(
3537
int nodePort,
3638
string endpoint = "/ws",
3739
int maxConnections = 0,
38-
LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LeastConnections)
40+
LoadBalancingStrategy loadBalancingStrategy = LoadBalancingStrategy.LeastConnections,
41+
Func<Task<NodeInfo>> nodeInfoProvider = null)
3942
{
4043
if (logger == null)
4144
throw new ArgumentNullException(nameof(logger));
@@ -69,8 +72,8 @@ public static IClusterTransport Create(
6972
messageQueueService,
7073
nodeId,
7174
nodeInfo,
72-
loadBalancingStrategy);
75+
loadBalancingStrategy,
76+
nodeInfoProvider);
7377
}
7478
}
75-
}
76-
79+
}

0 commit comments

Comments
 (0)