Skip to content

Commit a41c256

Browse files
committed
perfect: 重构管道实现
1 parent 2b7ff10 commit a41c256

3 files changed

Lines changed: 233 additions & 101 deletions

File tree

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Cyaim.WebSocketServer.csproj

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
Version="8.0.1" />
4343
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions"
4444
Version="8.0.0" />
45+
<PackageReference Include="Microsoft.Extensions.ObjectPool"
46+
Version="8.0.11" />
4547
<PackageReference Include="Microsoft.AspNetCore.Hosting.Server.Abstractions"
4648
Version="2.1.1" />
4749
<PackageReference Include="System.Text.Json"
@@ -58,13 +60,30 @@
5860
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="9.0.0" />
5961
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="9.0.0" />
6062
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="9.0.0" />
63+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="9.0.0" />
6164
<PackageReference Include="Microsoft.AspNetCore.Hosting.Server.Abstractions" Version="2.3.0" />
6265
<PackageReference Include="System.Text.Json" Version="9.0.0" />
6366
<PackageReference Include="System.Threading.Channels" Version="9.0.0" />
6467
</ItemGroup>
6568

6669
<!-- .NET 6.0+ 框架已内置大部分包,无需显式引用,除非需要特定版本 -->
6770
<!-- 这些框架版本会自动使用框架内置的包,更安全且性能更好 -->
71+
<!-- 为 .NET 6.0+ 显式添加 ObjectPool 以确保兼容性 -->
72+
<ItemGroup Condition="'$(TargetFramework)' == 'net6.0'">
73+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.11" />
74+
</ItemGroup>
75+
<ItemGroup Condition="'$(TargetFramework)' == 'net7.0'">
76+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.11" />
77+
</ItemGroup>
78+
<ItemGroup Condition="'$(TargetFramework)' == 'net8.0'">
79+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="8.0.11" />
80+
</ItemGroup>
81+
<ItemGroup Condition="'$(TargetFramework)' == 'net9.0'">
82+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="9.0.0" />
83+
</ItemGroup>
84+
<ItemGroup Condition="'$(TargetFramework)' == 'net10.0'">
85+
<PackageReference Include="Microsoft.Extensions.ObjectPool" Version="10.0.0" />
86+
</ItemGroup>
6887
<ItemGroup>
6988
<None Include="..\..\LICENSE">
7089
<Pack>True</Pack>

Cyaim.WebSocketServer/Cyaim.WebSocketServer/Infrastructure/Handlers/MvcHandler/MvcChannelHandler.cs

Lines changed: 46 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -107,13 +107,6 @@ public MvcChannelHandler(int receiveBufferSize = 4 * 1024, int sendBufferSize =
107107

108108

109109
#region Pipeline
110-
111-
const string PipeStr_RequestForwardPipeline = "RequestForwardPipeline";
112-
113-
const string PipeStr_RequestReceivePipeline = "RequestReceivePipeline";
114-
115-
const string PipeStr_RequestPipeline = "RequestPipeline";
116-
117110
#endregion
118111

119112
/// <summary>
@@ -218,7 +211,7 @@ public async Task ConnectionEntry(HttpContext context, ILogger<WebSocketRouteMid
218211
var remoteIpAddress = context.Connection.RemoteIpAddress?.ToString();
219212
var remotePort = context.Connection.RemotePort;
220213
await clusterManager.RegisterConnectionAsync(
221-
context.Connection.Id,
214+
context.Connection.Id,
222215
context.Request.Path,
223216
remoteIpAddress,
224217
remotePort);
@@ -230,8 +223,8 @@ await clusterManager.RegisterConnectionAsync(
230223
}
231224
}
232225

233-
// 执行BeforeReceivingData管道
234-
_ = await InvokePipeline(RequestPipelineStage.Connected, context, webSocket, null, null);
226+
// 执行Connected管道
227+
_ = await InvokePipeline(RequestPipelineStage.Connected, PipelineContext.CreateReceive(context, webSocket, null, null, webSocketOptions));
235228

236229
IHostApplicationLifetime appLifetime = WebSocketRouteOption.ApplicationServices.GetRequiredService<IHostApplicationLifetime>();
237230

@@ -277,7 +270,7 @@ await clusterManager.RegisterConnectionAsync(
277270
await MvcChannel_OnDisconnected(context, webSocketCloseStatus, webSocketOptions, logger);
278271

279272
// 执行管道 Disconnected
280-
_ = await InvokePipeline(RequestPipelineStage.Disconnected, context, webSocketOptions);
273+
_ = await InvokePipeline(RequestPipelineStage.Disconnected, PipelineContext.CreateBasic(context, webSocketOptions));
281274
}
282275
}
283276

@@ -322,7 +315,7 @@ private async Task MvcForward(HttpContext context, WebSocket webSocket, WebSocke
322315
}
323316

324317
// 执行BeforeReceivingData管道
325-
_ = await InvokePipeline(RequestPipelineStage.BeforeReceivingData, context, webSocket, null, null);
318+
_ = await InvokePipeline(RequestPipelineStage.BeforeReceivingData, PipelineContext.CreateReceive(context, webSocket, null, null, webSocketOption));
326319

327320
#region 接收数据
328321
byte[] buffer = ArrayPool<byte>.Shared.Rent(ReceiveTextBufferSize);
@@ -370,12 +363,12 @@ await bandwidthLimitManager.WaitForBandwidthAsync(
370363
// 记录消息接收指标
371364
var currentNodeId = Infrastructure.Cluster.GlobalClusterCenter.ClusterContext?.NodeId;
372365
_metricsCollector?.RecordMessageReceived(result.Count, currentNodeId, context.Request.Path);
373-
366+
374367
// 记录统计信息(如果统计记录器可用)
375368
Infrastructure.Cluster.GlobalClusterCenter.StatisticsRecorder?.RecordBytesReceived(context.Connection.Id, result.Count);
376369

377370
// 执行ReceivingData管道
378-
_ = await InvokePipeline(RequestPipelineStage.ReceivingData, context, webSocket, result, buffer);
371+
_ = await InvokePipeline(RequestPipelineStage.ReceivingData, PipelineContext.CreateReceive(context, webSocket, result, buffer, webSocketOption));
379372

380373
// 已经接受完数据了
381374
if (result.EndOfMessage || result.CloseStatus.HasValue)
@@ -412,7 +405,7 @@ await bandwidthLimitManager.WaitForBandwidthAsync(
412405
#endregion
413406

414407
// 执行AfterReceivingData管道
415-
_ = await InvokePipeline(RequestPipelineStage.ReceivingData, context, webSocket, result, wsReceiveReader.GetBuffer());
408+
_ = await InvokePipeline(RequestPipelineStage.AfterReceivingData, PipelineContext.CreateReceive(context, webSocket, result, wsReceiveReader.GetBuffer(), webSocketOption));
416409

417410
if (result == null)
418411
{
@@ -480,7 +473,7 @@ await bandwidthLimitManager.WaitForBandwidthAsync(
480473
}
481474

482475
// 执行管道 BeforeForwardingData
483-
_ = await InvokePipeline(RequestPipelineStage.BeforeForwardingData, context, webSocket, result, wsReceiveReader.GetBuffer(), requestScheme, requestBody);
476+
_ = await InvokePipeline(RequestPipelineStage.BeforeForwardingData, PipelineContext.CreateForward(context, webSocket, result, wsReceiveReader.GetBuffer(), requestScheme, requestBody, webSocketOption));
484477

485478
//requestScheme = JsonSerializer.Deserialize<MvcRequestScheme>(wsReceiveReader.GetBuffer(), webSocketOption.DefaultRequestJsonSerialiazerOptions);
486479
// 改异步转发
@@ -492,7 +485,7 @@ await bandwidthLimitManager.WaitForBandwidthAsync(
492485
}
493486

494487
// 执行管道 AfterForwardingData
495-
_ = await InvokePipeline(RequestPipelineStage.AfterForwardingData, context, webSocket, result, wsReceiveReader.GetBuffer(), requestScheme, requestBody);
488+
_ = await InvokePipeline(RequestPipelineStage.AfterForwardingData, PipelineContext.CreateForward(context, webSocket, result, wsReceiveReader.GetBuffer(), requestScheme, requestBody, webSocketOption));
496489

497490
CONTINUE_RECEIVE:;
498491
}
@@ -566,13 +559,13 @@ private async Task MvcForwardSendData(WebSocket webSocket, HttpContext context,
566559
// 序列化响应以获取大小
567560
string serialJson = JsonSerializer.Serialize(invokeResult, webSocketOption.DefaultResponseJsonSerializerOptions);
568561
var responseBytes = Encoding.UTF8.GetBytes(serialJson);
569-
562+
570563
await invokeResult.SendAsync(webSocketOption.DefaultResponseJsonSerializerOptions, result.MessageType, timeout: ResponseSendTimeout, encoding: Encoding.UTF8, sendBufferSize: SendTextBufferSize, socket: webSocket).ConfigureAwait(false);
571564

572565
// 记录消息发送指标
573566
var currentNodeId = Infrastructure.Cluster.GlobalClusterCenter.ClusterContext?.NodeId;
574567
_metricsCollector?.RecordMessageSent(responseBytes.Length, currentNodeId, context.Request.Path);
575-
568+
576569
// 记录统计信息(如果统计记录器可用)
577570
Infrastructure.Cluster.GlobalClusterCenter.StatisticsRecorder?.RecordBytesSent(context.Connection.Id, responseBytes.Length);
578571
}
@@ -1069,7 +1062,7 @@ private async Task MvcChannel_OnDisconnected(HttpContext context, WebSocketClose
10691062
if (wsExists)
10701063
{
10711064
Clients.TryRemove(context.Connection.Id, out var _);
1072-
1065+
10731066
// Unregister connection from cluster manager if cluster is enabled
10741067
// 如果启用了集群,从集群管理器注销连接
10751068
var clusterManager = Infrastructure.Cluster.GlobalClusterCenter.ClusterManager;
@@ -1112,7 +1105,7 @@ public virtual async Task<bool> MvcChannel_OnBeforeConnection(HttpContext contex
11121105
{
11131106
var ipAddress = context.Connection.RemoteIpAddress?.ToString();
11141107
var isAllowed = await accessControlService.IsAllowedAsync(ipAddress);
1115-
1108+
11161109
if (!isAllowed)
11171110
{
11181111
var policy = WebSocketRouteOption.ApplicationServices.GetService<AccessControlPolicy>();
@@ -1140,7 +1133,7 @@ public virtual async Task<bool> MvcChannel_OnBeforeConnection(HttpContext contex
11401133
{
11411134
logger.LogWarning($"Access denied for IP {ipAddress} from {context.Request.Path}");
11421135
}
1143-
1136+
11441137
return false;
11451138
}
11461139
}
@@ -1206,69 +1199,48 @@ public string FindJsonPropertyValue(ReadOnlySpan<byte> jsonFragment, string Prop
12061199
#region Invoke pipeline
12071200

12081201
/// <summary>
1209-
/// 通用的管道调用入口:统一排序与异常记录
1202+
/// 统一的管道调用方法
12101203
/// </summary>
12111204
/// <param name="requestStage">处理阶段</param>
1212-
/// <param name="invoker">对每个管道项的具体调用逻辑</param>
1205+
/// <param name="context">管道上下文(从对象池获取,使用完毕后自动归还)</param>
12131206
/// <returns></returns>
1214-
private async Task<ConcurrentQueue<PipelineItem>> InvokePipelineInternal(RequestPipelineStage requestStage, Func<PipelineItem, Task> invoker)
1207+
/// <remarks>
1208+
/// 注意:context 对象会在所有管道处理器执行完毕后自动归还到对象池。
1209+
/// 管道处理器不应在异步操作中保存 context 的引用,因为方法返回后对象会被清理和重用。
1210+
/// </remarks>
1211+
private async Task<ConcurrentQueue<PipelineItem>> InvokePipeline(RequestPipelineStage requestStage, PipelineContext context)
12151212
{
1216-
if (!RequestPipeline.TryGetValue(requestStage, out ConcurrentQueue<PipelineItem> invokes) || invokes == null)
1217-
{
1218-
return await Task.FromResult<ConcurrentQueue<PipelineItem>>(null);
1219-
}
1220-
1221-
var ordered = invokes.OrderBy(x => x.Order);
1222-
foreach (PipelineItem item in ordered)
1213+
try
12231214
{
1224-
try
1215+
if (!RequestPipeline.TryGetValue(requestStage, out ConcurrentQueue<PipelineItem> invokes) || invokes == null)
12251216
{
1226-
await invoker(item);
1217+
return null;
12271218
}
1228-
catch (Exception ex)
1219+
1220+
var ordered = invokes.OrderBy(x => x.Order);
1221+
foreach (PipelineItem item in ordered)
12291222
{
1230-
item.Exception = ex;
1231-
item.ExceptionItem = item;
1223+
try
1224+
{
1225+
if (item.Item != null)
1226+
{
1227+
await item.Item.InvokeAsync(context);
1228+
}
1229+
}
1230+
catch (Exception ex)
1231+
{
1232+
item.Exception = ex;
1233+
item.ExceptionItem = item;
1234+
}
12321235
}
1233-
}
1234-
1235-
return invokes;
1236-
}
12371236

1238-
/// <summary>
1239-
/// 调用请求转发阶段的管道
1240-
/// </summary>
1241-
private Task<ConcurrentQueue<PipelineItem>> InvokePipeline(RequestPipelineStage requestStage, HttpContext context, WebSocket webSocket, WebSocketReceiveResult result, byte[] buffer, MvcRequestScheme request, JsonObject requestBody)
1242-
{
1243-
return InvokePipelineInternal(requestStage, async x =>
1244-
{
1245-
var p = (x.Item as RequestForwardPipeline) ?? throw new NotSupportedException(I18nText.AtThisStagePipelineNotSupport + PipeStr_RequestForwardPipeline);
1246-
await p?.Invoke?.Invoke(context, webSocket, result, buffer, request, requestBody);
1247-
});
1248-
}
1249-
1250-
/// <summary>
1251-
/// 调用请求接收阶段的管道
1252-
/// </summary>
1253-
private Task<ConcurrentQueue<PipelineItem>> InvokePipeline(RequestPipelineStage requestStage, HttpContext context, WebSocket webSocket, WebSocketReceiveResult result, byte[] buffer)
1254-
{
1255-
return InvokePipelineInternal(requestStage, async x =>
1256-
{
1257-
var p = (x.Item as RequestReceivePipeline) ?? throw new NotSupportedException(I18nText.AtThisStagePipelineNotSupport + PipeStr_RequestReceivePipeline);
1258-
await p?.Invoke?.Invoke(context, webSocket, result, buffer);
1259-
});
1260-
}
1261-
1262-
/// <summary>
1263-
/// 调用基础阶段的管道
1264-
/// </summary>
1265-
private Task<ConcurrentQueue<PipelineItem>> InvokePipeline(RequestPipelineStage requestStage, HttpContext context, WebSocketRouteOption webSocketOptions)
1266-
{
1267-
return InvokePipelineInternal(requestStage, async x =>
1237+
return invokes;
1238+
}
1239+
finally
12681240
{
1269-
var p = x.Item ?? throw new NotSupportedException(I18nText.AtThisStagePipelineNotSupport + PipeStr_RequestPipeline);
1270-
await p?.Invoke?.Invoke(context, webSocketOptions);
1271-
});
1241+
// 使用完毕后归还到对象池,确保即使发生异常也能正确归还
1242+
context?.Return();
1243+
}
12721244
}
12731245

12741246

0 commit comments

Comments
 (0)