Skip to content

Commit 3285438

Browse files
committed
fix
1 parent a1182e2 commit 3285438

8 files changed

Lines changed: 824 additions & 99 deletions

File tree

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ/Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ.csproj

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
<?xml version="1.0" encoding="UTF-8"?>
1+
<?xml version="1.0" encoding="utf-8"?>
22
<Project Sdk="Microsoft.NET.Sdk">
33
<PropertyGroup>
44
<TargetFrameworks>netstandard2.1;net6.0;net7.0;net8.0;net9.0;net10.0</TargetFrameworks>
@@ -29,16 +29,12 @@
2929
</ItemGroup>
3030
<!-- .NET Standard 2.1 需要显式引用,使用最新稳定版�? -->
3131
<ItemGroup Condition="'$(TargetFramework)' == 'netstandard2.1'">
32-
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions"
33-
Version="9.0.0" />
34-
<PackageReference Include="System.Text.Json"
35-
Version="9.0.0" />
32+
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="9.0.0" />
33+
<PackageReference Include="System.Text.Json" Version="9.0.0" />
3634
</ItemGroup>
3735
<!-- RabbitMQ.Client 7.0+ 支持 .NET Standard 2.0+,统一使用 7.2.0 -->
3836
<ItemGroup>
39-
<PackageReference Include="RabbitMQ.Client"
40-
Version="7.2.0" />
41-
<ProjectReference Include="..\Cyaim.WebSocketServer.Cluster.Hybrid\Cyaim.WebSocketServer.Cluster.Hybrid.csproj" />
37+
<PackageReference Include="RabbitMQ.Client" Version="7.2.0" />
4238
</ItemGroup>
4339
<ItemGroup>
4440
<None Include="..\..\..\LICENSE">

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.MessageQueue.RabbitMQ/RabbitMQMessageQueueService.cs

Lines changed: 574 additions & 21 deletions
Large diffs are not rendered by default.

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis/Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis.csproj

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,6 @@
3636

3737
<ItemGroup>
3838
<PackageReference Include="FreeRedis" Version="1.2.0" />
39-
<ProjectReference Include="..\Cyaim.WebSocketServer.Cluster.Hybrid\Cyaim.WebSocketServer.Cluster.Hybrid.csproj" />
4039
</ItemGroup>
4140

4241
<ItemGroup>

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid.Redis.FreeRedis/FreeRedisService.cs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -172,15 +172,23 @@ public async Task<List<string>> GetKeysAsync(string pattern)
172172
throw new InvalidOperationException("Redis is not connected");
173173
}
174174

175+
_logger.LogWarning($"[FreeRedisService] 搜索匹配模式的键 - Pattern: {pattern}");
176+
175177
var keys = _redis.Keys(pattern);
176-
return await Task.FromResult(keys?.ToList() ?? new List<string>());
178+
var keyList = keys?.ToList() ?? new List<string>();
179+
180+
_logger.LogWarning($"[FreeRedisService] 找到匹配的键 - Pattern: {pattern}, KeyCount: {keyList.Count}, Keys: {string.Join(", ", keyList)}");
181+
182+
return await Task.FromResult(keyList);
177183
}
178184

179185
/// <summary>
180186
/// Get all values for keys matching pattern / 获取所有匹配模式的键的值
181187
/// </summary>
182188
public async Task<Dictionary<string, string>> GetValuesAsync(string pattern)
183189
{
190+
_logger.LogWarning($"[FreeRedisService] 获取匹配模式的键值 - Pattern: {pattern}");
191+
184192
var keys = await GetKeysAsync(pattern);
185193
var result = new Dictionary<string, string>();
186194

@@ -192,14 +200,20 @@ public async Task<Dictionary<string, string>> GetValuesAsync(string pattern)
192200
if (value != null)
193201
{
194202
result[key] = value;
203+
_logger.LogWarning($"[FreeRedisService] 获取键值成功 - Key: {key}, ValueLength: {value.Length}");
204+
}
205+
else
206+
{
207+
_logger.LogWarning($"[FreeRedisService] 键值为空 - Key: {key}");
195208
}
196209
}
197210
catch (Exception ex)
198211
{
199-
_logger.LogWarning(ex, $"Failed to get value for key {key}");
212+
_logger.LogError(ex, $"[FreeRedisService] 获取键值失败 - Key: {key}, Error: {ex.Message}, StackTrace: {ex.StackTrace}");
200213
}
201214
}
202215

216+
_logger.LogWarning($"[FreeRedisService] 获取键值完成 - Pattern: {pattern}, ResultCount: {result.Count}");
203217
return result;
204218
}
205219

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/Cyaim.WebSocketServer.Cluster.Hybrid.csproj

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,6 @@
3939

4040
<!-- .NET 6.0+ 框架已内置这些包,无需显式引用,框架会自动使用内置版本 -->
4141

42-
<ItemGroup>
43-
<ProjectReference Include="..\..\Cyaim.WebSocketServer\Cyaim.WebSocketServer.csproj" />
44-
</ItemGroup>
45-
4642
<ItemGroup>
4743
<None Include="..\..\..\LICENSE">
4844
<Pack>True</Pack>

Cyaim.WebSocketServer/Cluster/Cyaim.WebSocketServer.Cluster.Hybrid/HybridClusterTransport.cs

Lines changed: 94 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,30 @@ public async Task StartAsync()
138138

139139
// Connect to message queue / 连接到消息队列
140140
await _messageQueueService.ConnectAsync();
141+
142+
// Verify connection is ready before proceeding / 验证连接已就绪再继续
143+
// This ensures channel is fully established before RaftNode starts / 这确保在 RaftNode 启动前 channel 已完全建立
144+
int verifyAttempts = 0;
145+
while (verifyAttempts < 10) // Try up to 10 times / 最多尝试 10 次
146+
{
147+
try
148+
{
149+
// Try to ensure connection is ready / 尝试确保连接已就绪
150+
await _messageQueueService.VerifyConnectionAsync();
151+
break; // Connection verified / 连接已验证
152+
}
153+
catch (Exception ex)
154+
{
155+
verifyAttempts++;
156+
if (verifyAttempts >= 10)
157+
{
158+
_logger.LogError(ex, $"[HybridClusterTransport] 验证 RabbitMQ 连接失败,已达到最大重试次数 - NodeId: {_nodeId}");
159+
throw;
160+
}
161+
_logger.LogWarning($"[HybridClusterTransport] 验证 RabbitMQ 连接失败,重试中 ({verifyAttempts}/10) - NodeId: {_nodeId}, Error: {ex.Message}");
162+
await Task.Delay(200); // Wait 200ms before retry / 重试前等待 200ms
163+
}
164+
}
141165

142166
// Declare exchange / 声明交换机
143167
await _messageQueueService.DeclareExchangeAsync(ExchangeName, "topic", durable: true);
@@ -156,10 +180,19 @@ public async Task StartAsync()
156180
await _messageQueueService.BindQueueAsync(_queueName, ExchangeName, BroadcastRoutingKey);
157181

158182
// Start consuming messages / 开始消费消息
159-
await _messageQueueService.ConsumeAsync(_queueName, HandleMessageAsync, autoAck: false);
183+
// Pass currentNodeId to enable early filtering of self-messages / 传递 currentNodeId 以启用早期过滤自己的消息
184+
await _messageQueueService.ConsumeAsync(_queueName, HandleMessageAsync, autoAck: false, currentNodeId: _nodeId);
185+
186+
// Wait a bit for initial node discovery / 等待初始节点发现
187+
_logger.LogWarning($"[HybridClusterTransport] 等待初始节点发现... - NodeId: {_nodeId}");
188+
await Task.Delay(2000); // Wait 2 seconds for initial discovery / 等待 2 秒进行初始发现
189+
190+
// Log discovered nodes / 记录发现的节点
191+
var discoveredNodes = GetKnownNodeIds();
192+
_logger.LogWarning($"[HybridClusterTransport] 初始节点发现完成 - NodeId: {_nodeId}, DiscoveredNodeCount: {discoveredNodes.Count}, DiscoveredNodes: {string.Join(", ", discoveredNodes)}");
160193

161194
_started = true;
162-
_logger.LogWarning($"[HybridClusterTransport] 混合集群传输启动成功 - NodeId: {_nodeId}, QueueName: {_queueName}, ExchangeName: {ExchangeName}");
195+
_logger.LogWarning($"[HybridClusterTransport] 混合集群传输启动成功 - NodeId: {_nodeId}, QueueName: {_queueName}, ExchangeName: {ExchangeName}, KnownNodes: {discoveredNodes.Count}");
163196
}
164197
catch (Exception ex)
165198
{
@@ -246,7 +279,7 @@ public async Task SendAsync(string nodeId, ClusterMessage message)
246279

247280
await _messageQueueService.PublishAsync(ExchangeName, routingKey, messageBytes, properties);
248281

249-
_logger.LogWarning($"[HybridClusterTransport] 消息发送成功 - TargetNodeId: {nodeId}, MessageId: {message.MessageId}, MessageType: {message.Type}, CurrentNodeId: {_nodeId}, MessageSize: {messageBytes.Length} bytes");
282+
_logger.LogTrace($"[HybridClusterTransport] 消息发送成功 - TargetNodeId: {nodeId}, MessageId: {message.MessageId}, MessageType: {message.Type}, CurrentNodeId: {_nodeId}, MessageSize: {messageBytes.Length} bytes");
250283
}
251284
catch (Exception ex)
252285
{
@@ -291,12 +324,26 @@ public async Task BroadcastAsync(ClusterMessage message)
291324
{
292325
MessageId = message.MessageId,
293326
CorrelationId = message.MessageId,
294-
Timestamp = message.Timestamp
327+
Timestamp = message.Timestamp,
328+
Headers = new Dictionary<string, object>
329+
{
330+
{ "FromNodeId", _nodeId } // Add FromNodeId header to filter self-messages early / 添加 FromNodeId header 以便早期过滤自己的消息
331+
}
295332
};
296333

297-
await _messageQueueService.PublishAsync(ExchangeName, BroadcastRoutingKey, messageBytes, properties);
298-
299-
_logger.LogWarning($"[HybridClusterTransport] 广播消息成功 - MessageId: {message.MessageId}, MessageType: {message.Type}, CurrentNodeId: {_nodeId}, MessageSize: {messageBytes.Length} bytes, 已知节点数: {_knownNodes.Count}");
334+
// Only broadcast if there are other nodes (excluding self) / 只有在有其他节点时才广播(排除自己)
335+
// Note: _knownNodes does not include self, so _knownNodes.Count is the count of other nodes
336+
// 注意:_knownNodes 不包含自己,所以 _knownNodes.Count 就是其他节点的数量
337+
var otherNodesCount = _knownNodes.Count;
338+
if (otherNodesCount > 0)
339+
{
340+
await _messageQueueService.PublishAsync(ExchangeName, BroadcastRoutingKey, messageBytes, properties);
341+
_logger.LogTrace($"[HybridClusterTransport] 广播消息成功 - MessageId: {message.MessageId}, MessageType: {message.Type}, CurrentNodeId: {_nodeId}, MessageSize: {messageBytes.Length} bytes, 已知节点数: {_knownNodes.Count}, 其他节点数: {otherNodesCount}");
342+
}
343+
else
344+
{
345+
_logger.LogTrace($"[HybridClusterTransport] 跳过广播消息(没有其他节点)- MessageId: {message.MessageId}, MessageType: {message.Type}, CurrentNodeId: {_nodeId}, 已知节点数: {_knownNodes.Count}, 其他节点数: {otherNodesCount}");
346+
}
300347
}
301348
catch (Exception ex)
302349
{
@@ -405,6 +452,17 @@ public NodeInfo GetOptimalNode(string excludeNodeId = null)
405452
return _loadBalancer.SelectNode(nodes, excludeNodeId ?? _nodeId);
406453
}
407454

455+
/// <summary>
456+
/// Get all known node IDs / 获取所有已知节点 ID
457+
/// </summary>
458+
/// <returns>List of known node IDs / 已知节点 ID 列表</returns>
459+
public List<string> GetKnownNodeIds()
460+
{
461+
var nodeIds = new List<string>(_knownNodes.Keys);
462+
_logger.LogTrace($"[HybridClusterTransport] 获取已知节点ID - CurrentNodeId: {_nodeId}, KnownNodeCount: {nodeIds.Count}, NodeIds: {string.Join(", ", nodeIds)}");
463+
return nodeIds;
464+
}
465+
408466
/// <summary>
409467
/// Update current node information in Redis / 更新 Redis 中的当前节点信息
410468
/// </summary>
@@ -448,22 +506,44 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
448506
{
449507
try
450508
{
509+
_logger.LogTrace($"[HybridClusterTransport] HandleMessageAsync 开始处理 - CurrentNodeId: {_nodeId}, BodyLength: {body.Length}, MessageId: {properties.MessageId}, CorrelationId: {properties.CorrelationId}");
510+
451511
var messageJson = Encoding.UTF8.GetString(body);
512+
_logger.LogTrace($"[HybridClusterTransport] 消息JSON解析 - CurrentNodeId: {_nodeId}, MessageJsonLength: {messageJson.Length}, First100Chars: {(messageJson.Length > 100 ? messageJson.Substring(0, 100) : messageJson)}");
513+
452514
var message = JsonSerializer.Deserialize<ClusterMessage>(messageJson);
453515

454516
if (message == null)
455517
{
456-
_logger.LogWarning($"[HybridClusterTransport] 收到空或无效消息 - CurrentNodeId: {_nodeId}");
518+
_logger.LogWarning($"[HybridClusterTransport] 收到空或无效消息 - CurrentNodeId: {_nodeId}, MessageJson: {messageJson}");
457519
return true; // Ack to remove from queue / 确认以从队列中移除
458520
}
459521

460-
_logger.LogWarning($"[HybridClusterTransport] 收到集群消息 - MessageType: {message.Type}, FromNodeId: {message.FromNodeId}, ToNodeId: {message.ToNodeId}, MessageId: {message.MessageId}, CurrentNodeId: {_nodeId}, MessageSize: {body.Length} bytes");
522+
_logger.LogTrace($"[HybridClusterTransport] 收到集群消息 - MessageType: {message.Type}, FromNodeId: {message.FromNodeId}, ToNodeId: {message.ToNodeId}, MessageId: {message.MessageId}, CurrentNodeId: {_nodeId}, MessageSize: {body.Length} bytes");
461523

462-
// Ignore messages from self / 忽略来自自己的消息
524+
// Ignore messages from self FIRST to avoid processing own messages / 首先忽略来自自己的消息,避免处理自己的消息
463525
if (message.FromNodeId == _nodeId)
464526
{
465-
_logger.LogWarning($"[HybridClusterTransport] 忽略来自自己的消息 - MessageId: {message.MessageId}, CurrentNodeId: {_nodeId}");
466-
return true;
527+
// Check for duplicate messages from self to avoid processing same message multiple times / 检查来自自己的重复消息,避免多次处理相同消息
528+
if (!string.IsNullOrEmpty(message.MessageId))
529+
{
530+
string deduplicationKey = string.IsNullOrEmpty(message.ToNodeId)
531+
? $"{message.MessageId}:{message.FromNodeId}"
532+
: $"{message.MessageId}:{message.FromNodeId}:{message.ToNodeId}";
533+
534+
if (_processedMessageIds.TryGetValue(deduplicationKey, out var processedTime))
535+
{
536+
// Already processed this message from self, ignore it / 已处理过此来自自己的消息,忽略它
537+
_logger.LogTrace($"[HybridClusterTransport] 忽略来自自己的重复消息 - MessageId: {message.MessageId}, Key: {deduplicationKey}, ProcessedAt: {processedTime:yyyy-MM-dd HH:mm:ss}, CurrentNodeId: {_nodeId}");
538+
return true; // Ack to remove from queue / 确认以从队列中移除
539+
}
540+
541+
// Mark as processed / 标记为已处理
542+
_processedMessageIds.TryAdd(deduplicationKey, DateTime.UtcNow);
543+
}
544+
545+
_logger.LogTrace($"[HybridClusterTransport] 忽略来自自己的消息 - MessageId: {message.MessageId}, CurrentNodeId: {_nodeId}, MessageType: {message.Type}");
546+
return true; // Ack to remove from queue / 确认以从队列中移除
467547
}
468548

469549
// Check if message is for this node / 检查消息是否发送给当前节点
@@ -509,7 +589,7 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
509589
if (_processedMessageIds.TryGetValue(deduplicationKey, out var processedTime))
510590
{
511591
// Message already processed, ignore it / 消息已处理,忽略它
512-
_logger.LogDebug($"Ignoring duplicate message {message.MessageId} (key: {deduplicationKey}, processed at {processedTime:yyyy-MM-dd HH:mm:ss})");
592+
_logger.LogTrace($"[HybridClusterTransport] 忽略重复消息 - MessageId: {message.MessageId}, Key: {deduplicationKey}, ProcessedAt: {processedTime:yyyy-MM-dd HH:mm:ss}, CurrentNodeId: {_nodeId}");
513593
return true; // Ack to remove from queue / 确认以从队列中移除
514594
}
515595

@@ -518,7 +598,7 @@ private async Task<bool> HandleMessageAsync(byte[] body, MessageProperties prope
518598
}
519599

520600
// Trigger message received event / 触发消息接收事件
521-
_logger.LogWarning($"[HybridClusterTransport] 触发消息接收事件 - MessageType: {message.Type}, FromNodeId: {message.FromNodeId}, ToNodeId: {message.ToNodeId}, MessageId: {message.MessageId}, CurrentNodeId: {_nodeId}");
601+
_logger.LogTrace($"[HybridClusterTransport] 触发消息接收事件 - MessageType: {message.Type}, FromNodeId: {message.FromNodeId}, ToNodeId: {message.ToNodeId}, MessageId: {message.MessageId}, CurrentNodeId: {_nodeId}");
522602
MessageReceived?.Invoke(this, new ClusterMessageEventArgs
523603
{
524604
FromNodeId = message.FromNodeId,

0 commit comments

Comments
 (0)