From d89f91c5ef8219c8c478fc5e8fe0603bd327a65f Mon Sep 17 00:00:00 2001 From: guangdianclaw Date: Mon, 27 Apr 2026 20:35:47 +0800 Subject: [PATCH] fix(proxy): clean up combined abort signal listeners MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The AbortSignal.any polyfill in forwarder.ts attached `abort` listeners to source signals (responseController.signal and session.clientAbortSignal) with `{ once: true }`, but `once: true` only auto-detaches when the abort event actually fires. When a request completes normally — the common case — those listeners were never removed and their closures kept holding the combinedController, the cleanup array, and (transitively) the session and the request body, preventing GC. This is the same leak pattern as #1113 fixed for the client abort listener in `_handleStreamingHedge`; the polyfill branch in `forwardSingleAttempt` was not covered by that PR. Extract the polyfill into `combine-abort-signals.ts` returning an explicit `{ signal, cleanup }` pair. The native `AbortSignal.any` branch returns a noop cleanup since V8 owns the listener lifecycle. In `forwardSingleAttempt`: - Call cleanup in the fetch failure catch path (response-handler never runs). - Call cleanup in the inner `try { throw fromUpstreamResponse } finally` path used for HTTP 4xx/5xx errors (response-handler also skipped here). - Compose cleanup into the `sessionWithTimeout.releaseAgent` callback so the existing response-handler finally invocation point handles the successful path without touching every call site. Cleanup is idempotent via an internal `cleaned` flag. Add unit tests covering both branches: native delegation, polyfill abort propagation, listener detachment after explicit cleanup, automatic cleanup on source abort, and pre-aborted source signals. --- .../v1/_lib/proxy/combine-abort-signals.ts | 55 ++++++++++++ src/app/v1/_lib/proxy/forwarder.ts | 73 +++++---------- .../unit/proxy/combine-abort-signals.test.ts | 90 +++++++++++++++++++ 3 files changed, 168 insertions(+), 50 deletions(-) create mode 100644 src/app/v1/_lib/proxy/combine-abort-signals.ts create mode 100644 tests/unit/proxy/combine-abort-signals.test.ts diff --git a/src/app/v1/_lib/proxy/combine-abort-signals.ts b/src/app/v1/_lib/proxy/combine-abort-signals.ts new file mode 100644 index 000000000..94086b6a4 --- /dev/null +++ b/src/app/v1/_lib/proxy/combine-abort-signals.ts @@ -0,0 +1,55 @@ +/** + * 组合多个 AbortSignal 为单个信号,并返回显式 cleanup。 + * + * 优先使用原生 `AbortSignal.any`(Node.js 20.3+ / V8 内部管理 listener)。 + * 仅在原生不可用(例如 Next.js standalone 覆盖全局 AbortSignal)时使用 polyfill。 + * + * Polyfill 路径必须由调用方在请求生命周期结束时调用 cleanup,否则源信号上的 abort + * listener 会一直持有闭包(包含 combinedController、cleanups 数组及源信号引用), + * 导致 session/请求体无法被 GC——和 #1113 修复的 client abort listener 是同一类泄漏。 + */ +export interface CombinedAbortSignal { + signal: AbortSignal; + cleanup: () => void; +} + +const NOOP_CLEANUP = () => {}; + +export function combineAbortSignals(signals: AbortSignal[]): CombinedAbortSignal { + if ("any" in AbortSignal && typeof AbortSignal.any === "function") { + return { signal: AbortSignal.any(signals), cleanup: NOOP_CLEANUP }; + } + + const combinedController = new AbortController(); + const detachers: Array<() => void> = []; + let cleaned = false; + + const cleanup = () => { + if (cleaned) return; + cleaned = true; + for (const detach of detachers) { + detach(); + } + detachers.length = 0; + }; + + for (const signal of signals) { + if (signal.aborted) { + combinedController.abort(); + cleanup(); + break; + } + + const abortHandler = () => { + combinedController.abort(); + cleanup(); + }; + + signal.addEventListener("abort", abortHandler, { once: true }); + detachers.push(() => { + signal.removeEventListener("abort", abortHandler); + }); + } + + return { signal: combinedController.signal, cleanup }; +} diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 8b6f383f1..c54625203 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -52,6 +52,7 @@ import { buildProxyUrl } from "../url"; import { rectifyBillingHeader } from "./billing-header-rectifier"; import { bindClientAbortListener } from "./client-abort-listener"; import { deriveClientSafeUpstreamErrorMessage } from "./client-error-message"; +import { combineAbortSignals } from "./combine-abort-signals"; import { isStandardProxyEndpointPath } from "./endpoint-family-catalog"; import { resolveEndpointPolicy, shouldEnforceStrictEndpointPoolPolicy } from "./endpoint-policy"; import { @@ -2706,56 +2707,18 @@ export class ProxyForwarder { } // 2. 组合双路信号:response + client - let combinedSignal: AbortSignal | undefined; const signals = [responseController.signal]; if (session.clientAbortSignal) { signals.push(session.clientAbortSignal); } - // ⭐ AbortSignal.any 实现(兼容所有环境) - // 原因:Next.js standalone 可能覆盖全局 AbortSignal,导致原生 any 方法不可用 - if ("any" in AbortSignal && typeof AbortSignal.any === "function") { - // 优先使用原生实现(Node.js 20.3+) - combinedSignal = AbortSignal.any(signals); - logger.debug("ProxyForwarder: Using native AbortSignal.any", { - signalCount: signals.length, - }); - } else { - // Polyfill: 手动实现多信号组合逻辑 - logger.debug("ProxyForwarder: Using AbortSignal.any polyfill", { - signalCount: signals.length, - reason: "Native AbortSignal.any not available", - }); - - const combinedController = new AbortController(); - const cleanupHandlers: Array<() => void> = []; - - // 为每个信号添加监听器 - for (const signal of signals) { - // 如果已经有信号中断,立即中断组合信号 - if (signal.aborted) { - combinedController.abort(); - break; - } - - // 监听信号中断事件 - const abortHandler = () => { - // 中断组合信号 - combinedController.abort(); - // 清理所有监听器(避免内存泄漏) - cleanupHandlers.forEach((cleanup) => cleanup()); - }; - - signal.addEventListener("abort", abortHandler, { once: true }); - - // 记录清理函数 - cleanupHandlers.push(() => { - signal.removeEventListener("abort", abortHandler); - }); - } - - combinedSignal = combinedController.signal; - } + // 优先 Node 20.3+ 原生 AbortSignal.any(V8 内部管理 listener,无需手动 cleanup); + // Next.js standalone 覆盖全局时 fallback 到 polyfill,由调用方在请求结束时调用 + // cleanupCombinedSignal 解绑源信号上的 listener,避免持有 session/请求体闭包。 + const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = combineAbortSignals(signals); + logger.debug("ProxyForwarder: Combined abort signals", { + signalCount: signals.length, + }); const init: UndiciFetchOptions = { method: session.method, @@ -2849,6 +2812,9 @@ export class ProxyForwarder { clearTimeout(responseTimeoutId); } + // Polyfill 路径上需要主动解绑源信号的 abort listener(response-handler 不会执行)。 + cleanupCombinedSignal(); + // Release agent ref count on fetch failure (request never started streaming) const releaseKey = proxyConfig?.cacheKey ?? directConnectionCacheKey; const releaseDispatcherId = proxyConfig?.dispatcherId ?? directConnectionDispatcherId; @@ -3281,6 +3247,8 @@ export class ProxyForwarder { if (errorReleaseKey && errorReleaseDispatcherId) { getGlobalAgentPool().releaseAgent(errorReleaseKey, errorReleaseDispatcherId); } + // 同上:response-handler 不会跑,polyfill 路径上的源信号 listener 必须在此解绑。 + cleanupCombinedSignal(); } } @@ -3308,14 +3276,19 @@ export class ProxyForwarder { // Attach agent release callback for in-flight reference counting. // response-handler must call this in its finally block after the stream is fully consumed. + // 同时复用此回调作为 combineAbortSignals polyfill 的 cleanup 入口:response-handler 已经 + // 保证在请求结束时(成功/异常)幂等地调用 releaseAgent,把 cleanup 合并到这里就不必再 + // 改造 response-handler 的所有 finally 调用点。两个动作互不影响,cleanup 内部自带 cleaned + // 标志,重复调用安全。 const agentCacheKeyToRelease = proxyConfig?.cacheKey ?? directConnectionCacheKey; const agentDispatcherIdToRelease = proxyConfig?.dispatcherId ?? directConnectionDispatcherId; - if (agentCacheKeyToRelease && agentDispatcherIdToRelease) { - const pool = getGlobalAgentPool(); - sessionWithTimeout.releaseAgent = () => { + const pool = agentCacheKeyToRelease && agentDispatcherIdToRelease ? getGlobalAgentPool() : null; + sessionWithTimeout.releaseAgent = () => { + if (pool && agentCacheKeyToRelease && agentDispatcherIdToRelease) { pool.releaseAgent(agentCacheKeyToRelease, agentDispatcherIdToRelease); - }; - } + } + cleanupCombinedSignal(); + }; return response; } diff --git a/tests/unit/proxy/combine-abort-signals.test.ts b/tests/unit/proxy/combine-abort-signals.test.ts new file mode 100644 index 000000000..7a86ce975 --- /dev/null +++ b/tests/unit/proxy/combine-abort-signals.test.ts @@ -0,0 +1,90 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { combineAbortSignals } from "@/app/v1/_lib/proxy/combine-abort-signals"; + +type MutableAbortSignal = { any?: unknown }; + +describe("combineAbortSignals", () => { + describe("native AbortSignal.any path", () => { + it("delegates to AbortSignal.any when available and cleanup is noop", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + + expect(signal.aborted).toBe(false); + c1.abort(); + expect(signal.aborted).toBe(true); + + // cleanup should be safe to call (noop) — no listeners owned by us. + expect(() => cleanup()).not.toThrow(); + }); + }); + + describe("polyfill path (AbortSignal.any unavailable)", () => { + let originalAny: unknown; + + beforeEach(() => { + originalAny = (AbortSignal as MutableAbortSignal).any; + // 赋 undefined 让 helper 的 `typeof ... === "function"` check 走 polyfill; + // delete 在部分 V8 版本上对 static 不生效,赋值更可靠。 + (AbortSignal as MutableAbortSignal).any = undefined; + }); + + afterEach(() => { + (AbortSignal as MutableAbortSignal).any = originalAny; + }); + + it("aborts combined signal when any source aborts", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(false); + c2.abort(); + expect(signal.aborted).toBe(true); + }); + + it("source-side abort listeners are detached after cleanup is invoked", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(false); + + // 模拟请求正常完成:调用方在 finally 中触发 cleanup。 + cleanup(); + + // 源信号此后再 abort,不应再传播到组合信号(listener 已解绑)。 + c1.abort(); + c2.abort(); + expect(signal.aborted).toBe(false); + }); + + it("auto-cleans listeners when a source aborts (does not require explicit cleanup)", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + c1.abort(); + expect(signal.aborted).toBe(true); + + // 二次 cleanup 必须幂等(请求结束的 finally 仍会调)。 + expect(() => cleanup()).not.toThrow(); + expect(() => cleanup()).not.toThrow(); + }); + + it("immediately aborts and cleans up when a source signal is already aborted", () => { + const c1 = new AbortController(); + c1.abort(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(true); + + // 后到的源 abort 不应再触发任何路径(已 cleanup)。 + c2.abort(); + expect(signal.aborted).toBe(true); + expect(() => cleanup()).not.toThrow(); + }); + }); +});