fix(proxy): clean up client abort listeners#1113
Conversation
📝 WalkthroughWalkthrough将客户端 AbortSignal 的中止处理提取为可复用绑定工具 Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
🧹 Nitpick comments (1)
tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts (1)
101-105: 建议让异步任务排空支持链式注册当前只排空一轮任务;如果任务执行过程中再次
register,这段 helper 可能提前返回,带来偶发不稳定。建议循环排空直到队列为空。可选改法
async function drainAsyncTasks(): Promise<void> { - const tasks = testState.asyncTasks.splice(0); - await Promise.allSettled(tasks); + while (testState.asyncTasks.length > 0) { + const tasks = testState.asyncTasks.splice(0); + await Promise.allSettled(tasks); + } await new Promise((resolve) => setTimeout(resolve, 0)); }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts` around lines 101 - 105, drainAsyncTasks only drains a single batch so if tasks register more tasks while settling it may return early; change drainAsyncTasks to loop until testState.asyncTasks.splice(0) yields an empty array, awaiting Promise.allSettled on each batch and a microtask tick (setTimeout(...,0) or equivalent) between batches so newly-registered tasks are also drained; reference testState.asyncTasks and drainAsyncTasks (and any register helper used in tests) when implementing the loop.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts`:
- Around line 101-105: drainAsyncTasks only drains a single batch so if tasks
register more tasks while settling it may return early; change drainAsyncTasks
to loop until testState.asyncTasks.splice(0) yields an empty array, awaiting
Promise.allSettled on each batch and a microtask tick (setTimeout(...,0) or
equivalent) between batches so newly-registered tasks are also drained;
reference testState.asyncTasks and drainAsyncTasks (and any register helper used
in tests) when implementing the loop.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 67484a53-7446-4338-8b6d-ee782955fcca
📒 Files selected for processing (4)
src/app/v1/_lib/proxy/forwarder.tssrc/app/v1/_lib/proxy/response-handler.tstests/unit/proxy/proxy-forwarder-hedge-first-byte.test.tstests/unit/proxy/response-handler-abort-listener-cleanup.test.ts
🧪 测试结果
总体结果: ✅ 所有测试通过 |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to ensure client abort event listeners are properly cleaned up in ProxyForwarder and ProxyResponseHandler, preventing memory leaks. It adds a bindClientAbortListener helper to handle pre-aborted signals and provides a cleanup function for use in finally blocks. New unit tests verify the removal of these listeners. A review comment suggests that ProxyForwarder should also check the aborted state of the signal before adding a listener to avoid missing events if the client disconnects early.
| if (session.clientAbortSignal) { | ||
| session.clientAbortSignal.addEventListener( | ||
| "abort", | ||
| () => { | ||
| if (settled || winnerCommitted) return; | ||
| noMoreProviders = true; | ||
| lastError = new ProxyError("Request aborted by client", 499, undefined, true); | ||
| lastErrorCategory = ErrorCategory.CLIENT_ABORT; | ||
| for (const attempt of Array.from(attempts)) { | ||
| if (!attempt.settled) { | ||
| session.addProviderToChain(attempt.provider, { | ||
| ...attempt.endpointAudit, | ||
| reason: "client_abort", | ||
| attemptNumber: attempt.sequence, | ||
| errorMessage: "Client aborted request", | ||
| modelRedirect: getAttemptModelRedirect(attempt), | ||
| }); | ||
| } | ||
| const handleClientAbort = () => { | ||
| if (settled || winnerCommitted) return; | ||
| noMoreProviders = true; | ||
| lastError = new ProxyError("Request aborted by client", 499, undefined, true); | ||
| lastErrorCategory = ErrorCategory.CLIENT_ABORT; | ||
| for (const attempt of Array.from(attempts)) { | ||
| if (!attempt.settled) { | ||
| session.addProviderToChain(attempt.provider, { | ||
| ...attempt.endpointAudit, | ||
| reason: "client_abort", | ||
| attemptNumber: attempt.sequence, | ||
| errorMessage: "Client aborted request", | ||
| modelRedirect: getAttemptModelRedirect(attempt), | ||
| }); | ||
| } | ||
| abortAllAttempts(undefined, "client_abort"); | ||
| void finishIfExhausted(); | ||
| }, | ||
| { once: true } | ||
| ); | ||
| } | ||
| abortAllAttempts(undefined, "client_abort"); | ||
| void finishIfExhausted(); | ||
| }; | ||
| session.clientAbortSignal.addEventListener("abort", handleClientAbort, { once: true }); | ||
| cleanupClientAbortListener = () => { | ||
| session.clientAbortSignal?.removeEventListener("abort", handleClientAbort); | ||
| }; | ||
| } |
There was a problem hiding this comment.
The client abort listener is added without checking if the signal is already aborted. If the client disconnects before this line is reached, the listener will never fire, potentially allowing the hedge race to start unnecessarily. It is recommended to check session.clientAbortSignal.aborted immediately, similar to the implementation in bindClientAbortListener in response-handler.ts. Additionally, ensure that non-critical operations in this performance-sensitive failover path are handled as fire-and-forget tasks.
let cleanupClientAbortListener = () => {};
if (session.clientAbortSignal) {
const handleClientAbort = () => {
if (settled || winnerCommitted) return;
noMoreProviders = true;
lastError = new ProxyError("Request aborted by client", 499, undefined, true);
lastErrorCategory = ErrorCategory.CLIENT_ABORT;
for (const attempt of Array.from(attempts)) {
if (!attempt.settled) {
session.addProviderToChain(attempt.provider, {
...attempt.endpointAudit,
reason: "client_abort",
attemptNumber: attempt.sequence,
errorMessage: "Client aborted request",
modelRedirect: getAttemptModelRedirect(attempt),
});
}
}
abortAllAttempts(undefined, "client_abort");
void finishIfExhausted();
};
if (session.clientAbortSignal.aborted) {
handleClientAbort();
} else {
session.clientAbortSignal.addEventListener("abort", handleClientAbort, { once: true });
cleanupClientAbortListener = () => {
session.clientAbortSignal?.removeEventListener("abort", handleClientAbort);
};
}
}References
- In performance-sensitive code paths like provider failover, non-critical I/O operations should be executed as fire-and-forget tasks to avoid blocking the main logic.
There was a problem hiding this comment.
Code Review Summary
This PR correctly addresses memory leaks caused by retained client abort event listeners across three code paths: non-stream background tasks, streaming background tasks, and streaming hedge forwarding. The bindClientAbortListener helper is well-designed with proper handling of null signals, already-aborted signals, idempotent cleanup, and { once: true } for auto-removal on abort. All abort listener registrations in the proxy layer now have corresponding cleanup paths.
PR Size: M
- Lines changed: 439 (372 additions + 67 deletions)
- Files changed: 4
Review Coverage
- Logic and correctness - Clean
- Security (OWASP Top 10) - Clean
- Error handling - Clean
- Type safety - Clean
- Documentation accuracy - Clean
- Test coverage - Adequate
- Code clarity - Good
Automated review by Claude AI
| it("removes non-stream client abort listener after response processing completes", async () => { | ||
| const controller = new AbortController(); | ||
| const addSpy = vi.spyOn(controller.signal, "addEventListener"); | ||
| const removeSpy = vi.spyOn(controller.signal, "removeEventListener"); |
There was a problem hiding this comment.
[MEDIUM] [TEST-EDGE-CASE] bindClientAbortListener already-aborted branch is untested
Evidence:
src/app/v1/_lib/proxy/response-handler.ts:71:if (signal.aborted) { onAbort(); return () => {}; }tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts:206andtests/unit/proxy/response-handler-abort-listener-cleanup.test.ts:229only cover the non-aborted path.CLAUDE.md:13:**Test Coverage** - All new features must have unit test coverage of at least 80%
Why this is a problem: The already-aborted branch is a key race-condition fix (client disconnects before listener attachment). Without a dedicated test, it is easy to regress and reintroduce background work retention in the exact scenario this PR is targeting.
Suggested fix (add a third test case):
it("handles already-aborted clientAbortSignal without registering a listener", async () => {
const controller = new AbortController();
controller.abort(new Error("already_aborted"));
const addSpy = vi.spyOn(controller.signal, "addEventListener");
const removeSpy = vi.spyOn(controller.signal, "removeEventListener");
const session = makeSession(controller.signal, false);
const upstreamResponse = new Response(
JSON.stringify({ choices: [{ message: { content: "ok" } }] }),
{ headers: { "content-type": "application/json" } }
);
const response = await ProxyResponseHandler.dispatch(session, upstreamResponse);
await response.text();
await drainAsyncTasks();
const abortAddCalls = addSpy.mock.calls.filter(([type]) => type === "abort");
expect(abortAddCalls).toHaveLength(0);
expect(removeSpy).not.toHaveBeenCalled();
expect(testState.cancelTask).toHaveBeenCalled();
});There was a problem hiding this comment.
Code Review Summary
This PR is a focused resource-cleanup change around clientAbortSignal listener lifecycles in ProxyResponseHandler and streaming-hedge forwarding.
One edge case in the new helper (signal.aborted fast-path) is not covered by unit tests, leaving a regression gap for early disconnect races.
PR Size: M
- Lines changed: 439
- Files changed: 4
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 1 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Critical Issues (Must Fix)
None.
High Priority Issues (Should Fix)
- [MEDIUM] [TEST-EDGE-CASE] Missing coverage for
bindClientAbortListeneralready-aborted branch (src/app/v1/_lib/proxy/response-handler.ts:71).
Review Coverage
- Logic and correctness
- Security (OWASP Top 10)
- Error handling
- Type safety
- Documentation accuracy
- Test coverage
- Code clarity
Automated review by Codex AI
There was a problem hiding this comment.
- Identified the active PR as
#1113and applied the labelsize/M(439 lines across 4 files). - Posted 1 inline review comment on
tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts(diff position209) for [MEDIUM] [TEST-EDGE-CASE]: missing coverage for the newsignal.abortedfast-path inbindClientAbortListener(src/app/v1/_lib/proxy/response-handler.ts:71). - Submitted the required PR review summary via
gh pr review --commentwith the issue reflected in the “Tests” category.
There was a problem hiding this comment.
🧹 Nitpick comments (3)
tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts (2)
200-251: 建议补充两个分支用例以覆盖bindClientAbortListener全部路径
src/app/v1/_lib/proxy/response-handler.ts中的bindClientAbortListener还包含两条尚未在本套件中覆盖的分支:
signal == null时直接返回 no-op cleanup;signal.aborted === true时同步调用onAbort()(会触发AsyncTaskManager.cancel与上游abortController.abort()),且不会注册监听器。当前两个用例只覆盖 "未中止 → 完成 → 清理监听器" 这条主路径。补充上述两种入口状态可以防止后续重构时回归(例如 "预先已中止" 路径若被改成异步监听则无法再触发取消)。
建议的补充用例
it("invokes cancel synchronously when client signal is already aborted", async () => { const controller = new AbortController(); controller.abort(); const addSpy = vi.spyOn(controller.signal, "addEventListener"); const session = makeSession(controller.signal, false); const upstreamResponse = new Response(JSON.stringify({ choices: [] }), { headers: { "content-type": "application/json" }, }); const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); await response.text(); await drainAsyncTasks(); expect(addSpy.mock.calls.filter(([type]) => type === "abort")).toHaveLength(0); expect(testState.cancelTask).toHaveBeenCalled(); });🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts` around lines 200 - 251, Add two unit tests to cover the remaining branches of bindClientAbortListener: one where the session signal is null (assert no listeners registered and cleanup is a no-op) and one where the signal is already aborted (assert no addEventListener calls, that onAbort runs synchronously by checking testState.cancelTask was called and that no listener was registered). Use the same test scaffolding (makeSession, ProxyResponseHandler.dispatch, drainAsyncTasks, and AbortController spies on addEventListener/removeEventListener) and name tests to reflect "signal null returns no-op cleanup" and "signal already aborted invokes cancel synchronously" so bindClientAbortListener's null and aborted branches are exercised.
134-134: 测试夹具中的大字符串可精简
log和forwardedRequestBody分别构造了 256KB 与 512KB 的占位字符串,这看起来是堆保留场景复现时遗留的痕迹。当前两个用例只断言addEventListener/removeEventListener的调用关系,与字符串大小无关;保留这些分配会无谓地增加测试内存占用与运行开销,且可能误导后续读者以为它们参与了断言。建议改为短字符串或直接省略。建议的精简
- log: "x".repeat(256 * 1024), + log: "", @@ - forwardedRequestBody: "y".repeat(512 * 1024), + forwardedRequestBody: "",Also applies to: 149-149
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts` at line 134, The test fixture unnecessarily allocates huge placeholder strings for `log` and `forwardedRequestBody`; replace those 256KB/512KB repeats with short literal strings (or remove them if unused) in the failing tests so they don’t bloat memory—update the occurrences that set `log: "x".repeat(256 * 1024)` and the `forwardedRequestBody` assignment(s) (also present at the other occurrence) to use small values, ensuring the assertions around `addEventListener`/`removeEventListener` remain unchanged.src/app/v1/_lib/proxy/forwarder.ts (1)
4026-4056: 建议复用 response-handler.ts 中的bindClientAbortListener工具,避免重复实现。本段绑定/清理逻辑(无信号短路、
signal.aborted时同步触发、否则addEventListener+ 返回removeEventListener清理函数)与src/app/v1/_lib/proxy/response-handler.ts中的bindClientAbortListener完全同构。后者还自带cleaned幂等标记,这里的cleanupClientAbortListener仅在 finally 调用一次所以幂等性不是必需,但两处分头维护同一模式后续容易走偏(例如 hedge 路径修了{ once: true }但忘了同步另一处)。建议把
bindClientAbortListener提升为命名导出(或抽到共享模块),forwarder 直接复用:♻️ 建议改写
- let cleanupClientAbortListener = () => {}; - const clientAbortSignal = session.clientAbortSignal; - if (clientAbortSignal) { - const handleClientAbort = () => { - if (settled || winnerCommitted) return; - noMoreProviders = true; - lastError = new ProxyError("Request aborted by client", 499, undefined, true); - lastErrorCategory = ErrorCategory.CLIENT_ABORT; - for (const attempt of Array.from(attempts)) { - if (!attempt.settled) { - session.addProviderToChain(attempt.provider, { - ...attempt.endpointAudit, - reason: "client_abort", - attemptNumber: attempt.sequence, - errorMessage: "Client aborted request", - modelRedirect: getAttemptModelRedirect(attempt), - }); - } - } - abortAllAttempts(undefined, "client_abort"); - void finishIfExhausted(); - }; - if (clientAbortSignal.aborted) { - handleClientAbort(); - } else { - clientAbortSignal.addEventListener("abort", handleClientAbort, { once: true }); - cleanupClientAbortListener = () => { - clientAbortSignal.removeEventListener("abort", handleClientAbort); - }; - } - } + const cleanupClientAbortListener = bindClientAbortListener( + session.clientAbortSignal, + () => { + if (settled || winnerCommitted) return; + noMoreProviders = true; + lastError = new ProxyError("Request aborted by client", 499, undefined, true); + lastErrorCategory = ErrorCategory.CLIENT_ABORT; + for (const attempt of Array.from(attempts)) { + if (!attempt.settled) { + session.addProviderToChain(attempt.provider, { + ...attempt.endpointAudit, + reason: "client_abort", + attemptNumber: attempt.sequence, + errorMessage: "Client aborted request", + modelRedirect: getAttemptModelRedirect(attempt), + }); + } + } + abortAllAttempts(undefined, "client_abort"); + void finishIfExhausted(); + } + );需要在 response-handler.ts 中将
bindClientAbortListener改为命名导出,并在本文件 import:-import { setDeferredStreamingFinalization } from "./stream-finalization"; +import { bindClientAbortListener } from "./response-handler"; +import { setDeferredStreamingFinalization } from "./stream-finalization";🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/app/v1/_lib/proxy/forwarder.ts` around lines 4026 - 4056, The client-abort binding in forwarder.ts duplicates the logic in response-handler.ts; replace the inlined listener logic with a call to the shared utility bindClientAbortListener: import the named export bindClientAbortListener from src/app/v1/_lib/proxy/response-handler.ts, remove the local handleClientAbort and cleanupClientAbortListener declarations, and call bindClientAbortListener(session.clientAbortSignal, <callback>) where the callback performs the same actions (set noMoreProviders, set lastError/lastErrorCategory, add provider chain entries for unsettled attempts using getAttemptModelRedirect, call abortAllAttempts, and invoke finishIfExhausted). Ensure the callback uses the same early-returns (if settled || winnerCommitted) as the original, and that bindClientAbortListener's returned cleanup function replaces the previous cleanupClientAbortListener usage.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/app/v1/_lib/proxy/forwarder.ts`:
- Around line 4026-4056: The client-abort binding in forwarder.ts duplicates the
logic in response-handler.ts; replace the inlined listener logic with a call to
the shared utility bindClientAbortListener: import the named export
bindClientAbortListener from src/app/v1/_lib/proxy/response-handler.ts, remove
the local handleClientAbort and cleanupClientAbortListener declarations, and
call bindClientAbortListener(session.clientAbortSignal, <callback>) where the
callback performs the same actions (set noMoreProviders, set
lastError/lastErrorCategory, add provider chain entries for unsettled attempts
using getAttemptModelRedirect, call abortAllAttempts, and invoke
finishIfExhausted). Ensure the callback uses the same early-returns (if settled
|| winnerCommitted) as the original, and that bindClientAbortListener's returned
cleanup function replaces the previous cleanupClientAbortListener usage.
In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts`:
- Around line 200-251: Add two unit tests to cover the remaining branches of
bindClientAbortListener: one where the session signal is null (assert no
listeners registered and cleanup is a no-op) and one where the signal is already
aborted (assert no addEventListener calls, that onAbort runs synchronously by
checking testState.cancelTask was called and that no listener was registered).
Use the same test scaffolding (makeSession, ProxyResponseHandler.dispatch,
drainAsyncTasks, and AbortController spies on
addEventListener/removeEventListener) and name tests to reflect "signal null
returns no-op cleanup" and "signal already aborted invokes cancel synchronously"
so bindClientAbortListener's null and aborted branches are exercised.
- Line 134: The test fixture unnecessarily allocates huge placeholder strings
for `log` and `forwardedRequestBody`; replace those 256KB/512KB repeats with
short literal strings (or remove them if unused) in the failing tests so they
don’t bloat memory—update the occurrences that set `log: "x".repeat(256 * 1024)`
and the `forwardedRequestBody` assignment(s) (also present at the other
occurrence) to use small values, ensuring the assertions around
`addEventListener`/`removeEventListener` remain unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 6ffb764a-d643-427d-8bab-49ddf4271117
📒 Files selected for processing (3)
src/app/v1/_lib/proxy/forwarder.tstests/unit/proxy/proxy-forwarder-hedge-first-byte.test.tstests/unit/proxy/response-handler-abort-listener-cleanup.test.ts
🚧 Files skipped from review as they are similar to previous changes (1)
- tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts
🧪 测试结果
总体结果: ✅ 所有测试通过 |
🧪 测试结果
总体结果: ✅ 所有测试通过 |
There was a problem hiding this comment.
🧹 Nitpick comments (3)
src/app/v1/_lib/proxy/response-handler.ts (1)
2127-2148: 流式路径的中止处理与 idleTimeoutId 释放符合预期。将
idleTimeoutId提升到 IIFE 外部作用域并在中止回调中显式清理,是必要的——否则即便 task 已 cancel,仍可能由setTimeout触发上游responseController.abort()、写入“假超时”分支并污染熔断/会话归因。finally中通过cleanupClientAbortListener()解绑、配合clearIdleTimer()与reader.releaseLock(),能确保监听器闭包对messageContext/session/provider的引用被释放,符合#1083的根因修复目标。注意:Line 2129 处
bindClientAbortListener触发时idleTimeoutId仍为null(在 IIFE 启动前startIdleTimer()还未运行),因此 pre-aborted 场景下闭包内if (idleTimeoutId)安全跳过,无 NPE 风险。建议在新增的tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts中补充“pre-aborted signal 情况下 stream 路径不会双重 cancel idle timer / 不会触发上游 abort”的断言(PR 描述中也提及该测试覆盖缺口)。Also applies to: 2776-2779
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/app/v1/_lib/proxy/response-handler.ts` around lines 2127 - 2148, The added client-abort listener correctly clears the outer-scope idleTimeoutId and cancels the async task, but we need to ensure the listener is unbound in the finally path and add a unit test for pre-aborted signals: ensure cleanupClientAbortListener (returned from bindClientAbortListener) is invoked inside the finally block alongside clearIdleTimer() and reader.releaseLock(), so the listener’s closure releases messageContext/session/provider references and prevents double-cancel; add a test in tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts that simulates a pre-aborted session.clientAbortSignal and asserts the stream path does not call clearIdleTimer()/responseController.abort() twice or trigger upstream aborts (verify AsyncTaskManager.cancel and abortController.abort are called exactly once).src/app/v1/_lib/proxy/client-abort-listener.ts (1)
9-12: 已中止信号会同步触发onAbort,建议在文档中明确并审视调用点假设。当
signal.aborted为true时,bindClientAbortListener会在bind调用栈上同步执行onAbort()。这在当前调用点(response-handler.ts非流/流式路径)是安全的,因为:
- 此时
AsyncTaskManager.register(taskId, ...)尚未执行,AsyncTaskManager.cancel(taskId)进入“未找到任务”的 no-op 分支;abortController.abort()仅设置本地控制器状态,后续if (clientAbortSignal?.aborted || abortController.signal.aborted)仍能识别并走finalizeNonStreamAbort()。但这是隐含约束,未来若新增调用方在
bind之前持有尚未初始化的资源(例如先 bind、再创建AbortController/注册 task),同步触发会引发 NPE 或漏注册。建议在 JSDoc 中显式说明“信号已中止时会立即同步调用onAbort”,并提示调用方在 bind 之前确保onAbort闭包引用的所有资源已就绪。建议补充 JSDoc
+/** + * 绑定客户端中止监听器,返回幂等的清理函数。 + * + * 注意: + * - 当 `signal` 已经处于 aborted 状态时,会在当前调用栈上 **同步** 执行 `onAbort()`, + * 随后返回 no-op 清理函数。调用方需保证在 bind 之前 `onAbort` 引用的所有资源(控制器、taskId 等)已经初始化。 + * - 监听器以 `{ once: true }` 注册;返回的清理函数通过内部 `cleaned` 标记保证幂等。 + */ export function bindClientAbortListener( signal: AbortSignal | null | undefined, onAbort: () => void ): () => void {🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@src/app/v1/_lib/proxy/client-abort-listener.ts` around lines 9 - 12, bindClientAbortListener currently calls onAbort synchronously when signal.aborted is true; add a JSDoc to bindClientAbortListener that explicitly states "if the provided AbortSignal is already aborted, onAbort will be invoked synchronously during bind," and warn callers to ensure any resources/closures referenced by onAbort (e.g., things created after bind like AbortController or AsyncTaskManager registrations) are initialized before calling bind, referencing the onAbort callback and the signal.aborted check so future callers know the invariant.tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts (1)
109-126:makeProvider的providerType: "openai"不在Provider.providerType的联合类型中。根据
src/types/provider.ts定义,有效值为"claude" | "claude-auth" | "codex" | "gemini" | "gemini-cli" | "openai-compatible",其中不包含裸"openai"。当前代码通过as Provider强制转换绕过了类型检查。虽然本测试的代码路径未触发providerType分支,但若后续在ProxyResponseHandler中添加基于providerType的条件分流逻辑,会导致该用例静默走错分支。建议将第 113 行和第 169 行的"openai"改为"openai-compatible"。建议的补丁
- providerType: "openai", + providerType: "openai-compatible",同步修改第 169 行:
- providerType: "openai", + providerType: "openai-compatible",🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts` around lines 109 - 126, The makeProvider helper constructs a Provider with providerType set to an invalid literal "openai"; update makeProvider (and the other occurrence at the second usage) to use the correct union member "openai-compatible" so the returned object conforms to the Provider type instead of relying on the unsafe cast; locate the makeProvider function and the second test instance that sets providerType and replace "openai" with "openai-compatible" to avoid silent branch-miss issues in ProxyResponseHandler.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Nitpick comments:
In `@src/app/v1/_lib/proxy/client-abort-listener.ts`:
- Around line 9-12: bindClientAbortListener currently calls onAbort
synchronously when signal.aborted is true; add a JSDoc to
bindClientAbortListener that explicitly states "if the provided AbortSignal is
already aborted, onAbort will be invoked synchronously during bind," and warn
callers to ensure any resources/closures referenced by onAbort (e.g., things
created after bind like AbortController or AsyncTaskManager registrations) are
initialized before calling bind, referencing the onAbort callback and the
signal.aborted check so future callers know the invariant.
In `@src/app/v1/_lib/proxy/response-handler.ts`:
- Around line 2127-2148: The added client-abort listener correctly clears the
outer-scope idleTimeoutId and cancels the async task, but we need to ensure the
listener is unbound in the finally path and add a unit test for pre-aborted
signals: ensure cleanupClientAbortListener (returned from
bindClientAbortListener) is invoked inside the finally block alongside
clearIdleTimer() and reader.releaseLock(), so the listener’s closure releases
messageContext/session/provider references and prevents double-cancel; add a
test in tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts that
simulates a pre-aborted session.clientAbortSignal and asserts the stream path
does not call clearIdleTimer()/responseController.abort() twice or trigger
upstream aborts (verify AsyncTaskManager.cancel and abortController.abort are
called exactly once).
In `@tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts`:
- Around line 109-126: The makeProvider helper constructs a Provider with
providerType set to an invalid literal "openai"; update makeProvider (and the
other occurrence at the second usage) to use the correct union member
"openai-compatible" so the returned object conforms to the Provider type instead
of relying on the unsafe cast; locate the makeProvider function and the second
test instance that sets providerType and replace "openai" with
"openai-compatible" to avoid silent branch-miss issues in ProxyResponseHandler.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 64e0a507-cc19-4473-a9ce-cf458a3bd13f
📒 Files selected for processing (4)
src/app/v1/_lib/proxy/client-abort-listener.tssrc/app/v1/_lib/proxy/forwarder.tssrc/app/v1/_lib/proxy/response-handler.tstests/unit/proxy/response-handler-abort-listener-cleanup.test.ts
The AbortSignal.any polyfill in forwarder.ts attached `abort` listeners
to source signals (responseController.signal and session.clientAbortSignal)
with `{ once: true }`, but `once: true` only auto-detaches when the abort
event actually fires. When a request completes normally — the common case —
those listeners were never removed and their closures kept holding the
combinedController, the cleanup array, and (transitively) the session and
the request body, preventing GC.
This is the same leak pattern as #1113 fixed for the client abort listener
in `_handleStreamingHedge`; the polyfill branch in
`forwardSingleAttempt` was not covered by that PR.
Extract the polyfill into `combine-abort-signals.ts` returning an explicit
`{ signal, cleanup }` pair. The native `AbortSignal.any` branch returns a
noop cleanup since V8 owns the listener lifecycle.
In `forwardSingleAttempt`:
- Call cleanup in the fetch failure catch path (response-handler never runs).
- Call cleanup in the inner `try { throw fromUpstreamResponse } finally`
path used for HTTP 4xx/5xx errors (response-handler also skipped here).
- Compose cleanup into the `sessionWithTimeout.releaseAgent` callback so
the existing response-handler finally invocation point handles the
successful path without touching every call site. Cleanup is idempotent
via an internal `cleaned` flag.
Add unit tests covering both branches: native delegation, polyfill abort
propagation, listener detachment after explicit cleanup, automatic
cleanup on source abort, and pre-aborted source signals.
Co-authored-by: guangdianclaw <claw@guangdian.studio>
Summary
Fixes #1083 - This PR addresses the root cause of excessive 499 errors by properly cleaning up client abort listeners, preventing memory leaks that could lead to premature connection handling issues.
Problem
Client abort listeners were being retained in several code paths:
Response Handler (
response-handler.ts):Streaming Hedge Forwarder (
forwarder.ts):This caused:
Solution
Response Handler
Introduced
bindClientAbortListener()helper that returns a cleanup function:finallyblock)Streaming Hedge Forwarder
cleanupClientAbortListener()that removes the abort listener after winner response is returnedChanges
Core Changes
src/app/v1/_lib/proxy/response-handler.ts:bindClientAbortListener()helper functionsrc/app/v1/_lib/proxy/forwarder.ts:Test Coverage
tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts(NEW):tests/unit/proxy/proxy-forwarder-hedge-first-byte.test.ts:Testing
Notes
bun run lint/lint:fixexit 0; Biome still reports existing schema-version info and two unrelated unsafe-fix suggestions in leaderboard tests.Greptile Summary
This PR fixes client abort listener leaks in
response-handler.ts(both stream and non-stream background tasks) andforwarder.ts(streaming hedge) by extracting abindClientAbortListenerhelper that returns a cleanup function called fromfinallyblocks. It also adds anoMoreProvidersguard instartAttemptto prevent new upstream attempts from launching after a client abort.Confidence Score: 5/5
Safe to merge — pure bug fix with no behavioral regressions and comprehensive test coverage.
No P0 or P1 issues found. The bindClientAbortListener helper correctly handles null signals, pre-aborted signals, normal cleanup, and double-call safety via the cleaned flag. The try/finally placement ensures cleanup even on errors. Tests cover all branches.
No files require special attention.
Important Files Changed
{ once: true }, and guards against double-cleanup via acleanedflag.finallyblocks for both non-stream and stream paths.noMoreProvidersguard to startAttempt to prevent launching upstream attempts after a client abort.Sequence Diagram
sequenceDiagram participant Client participant Handler as ResponseHandler / Forwarder participant BAL as bindClientAbortListener participant Signal as AbortSignal participant Task as AsyncTaskManager Client->>Handler: request Handler->>BAL: bindClientAbortListener(signal, onAbort) BAL->>Signal: addEventListener("abort", onAbort, {once:true}) BAL-->>Handler: cleanupFn alt normal completion Handler->>Task: process task Task-->>Handler: done Handler->>BAL: cleanupFn() BAL->>Signal: removeEventListener("abort", onAbort) else client abort mid-flight Client->>Signal: abort() Signal->>BAL: onAbort() fires BAL->>Task: AsyncTaskManager.cancel(taskId) BAL->>Handler: abortController.abort() Handler->>Handler: finally block Handler->>BAL: cleanupFn() [no-op: listener already auto-removed] else signal already aborted BAL->>BAL: onAbort() called synchronously BAL-->>Handler: () => {} (no-op cleanup) Handler->>Handler: abortController.signal.aborted check → bail early endReviews (3): Last reviewed commit: "test(proxy): cover client abort listener..." | Re-trigger Greptile