Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 55 additions & 0 deletions src/app/v1/_lib/proxy/combine-abort-signals.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/**
* 组合多个 AbortSignal 为单个信号,并返回显式 cleanup。
*
* 优先使用原生 `AbortSignal.any`(Node.js 20.3+ / V8 内部管理 listener)。
* 仅在原生不可用(例如 Next.js standalone 覆盖全局 AbortSignal)时使用 polyfill。
*
* Polyfill 路径必须由调用方在请求生命周期结束时调用 cleanup,否则源信号上的 abort
* listener 会一直持有闭包(包含 combinedController、cleanups 数组及源信号引用),
* 导致 session/请求体无法被 GC——和 #1113 修复的 client abort listener 是同一类泄漏。
*/
export interface CombinedAbortSignal {
signal: AbortSignal;
cleanup: () => void;
}

const NOOP_CLEANUP = () => {};

export function combineAbortSignals(signals: AbortSignal[]): CombinedAbortSignal {
if ("any" in AbortSignal && typeof AbortSignal.any === "function") {
return { signal: AbortSignal.any(signals), cleanup: NOOP_CLEANUP };
}

const combinedController = new AbortController();
const detachers: Array<() => void> = [];
let cleaned = false;

const cleanup = () => {
if (cleaned) return;
cleaned = true;
for (const detach of detachers) {
detach();
}
detachers.length = 0;
};

for (const signal of signals) {
if (signal.aborted) {
combinedController.abort();
cleanup();
break;
}

const abortHandler = () => {
combinedController.abort();
cleanup();
};

signal.addEventListener("abort", abortHandler, { once: true });
detachers.push(() => {
signal.removeEventListener("abort", abortHandler);
});
}
Comment on lines +37 to +52
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

为了与原生的 AbortSignal.any 行为保持一致,建议在 polyfill 路径中传播源信号的 reason。原生实现会将组合信号的 reason 设置为第一个触发中断的信号的 reason

  for (const signal of signals) {
    if (signal.aborted) {
      combinedController.abort(signal.reason);
      cleanup();
      break;
    }

    const abortHandler = () => {
      combinedController.abort(signal.reason);
      cleanup();
    };

    signal.addEventListener("abort", abortHandler, { once: true });
    detachers.push(() => {
      signal.removeEventListener("abort", abortHandler);
    });
  }

Comment on lines +36 to +52
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

polyfill 未传播源信号的 abort reason,与原生 AbortSignal.any 行为不一致。

原生 AbortSignal.any 在某个源 abort 时会把该源的 reason 传给组合信号;当前 polyfill 调用 combinedController.abort() 不带参数,会丢失上游传入的 reason(例如 responseController.signal.reasonclientAbortSignal.reason)。这会导致两条路径下 combinedSignal.reason 行为不同,下游若读取 signal.reason 做诊断/分类会拿到不一致的结果。

♻️ 建议改动
   for (const signal of signals) {
     if (signal.aborted) {
-      combinedController.abort();
+      combinedController.abort(signal.reason);
       cleanup();
       break;
     }

     const abortHandler = () => {
-      combinedController.abort();
+      combinedController.abort(signal.reason);
       cleanup();
     };
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/v1/_lib/proxy/combine-abort-signals.ts` around lines 36 - 52, The
polyfill currently aborts the combinedController without forwarding the source
signal's reason, causing combinedSignal.reason to differ from native
AbortSignal.any; update the logic in combine-abort-signals (the loop that checks
signal.aborted and the abortHandler closure) to read the originating
signal.reason and call combinedController.abort(reason) so the source reason is
propagated both for already-aborted signals and for the event handler path,
keeping the existing cleanup() call and detacher removal behavior.


return { signal: combinedController.signal, cleanup };
}
73 changes: 23 additions & 50 deletions src/app/v1/_lib/proxy/forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import { buildProxyUrl } from "../url";
import { rectifyBillingHeader } from "./billing-header-rectifier";
import { bindClientAbortListener } from "./client-abort-listener";
import { deriveClientSafeUpstreamErrorMessage } from "./client-error-message";
import { combineAbortSignals } from "./combine-abort-signals";
import { isStandardProxyEndpointPath } from "./endpoint-family-catalog";
import { resolveEndpointPolicy, shouldEnforceStrictEndpointPoolPolicy } from "./endpoint-policy";
import {
Expand Down Expand Up @@ -2706,56 +2707,18 @@ export class ProxyForwarder {
}

// 2. 组合双路信号:response + client
let combinedSignal: AbortSignal | undefined;
const signals = [responseController.signal];
if (session.clientAbortSignal) {
signals.push(session.clientAbortSignal);
}

// ⭐ AbortSignal.any 实现(兼容所有环境)
// 原因:Next.js standalone 可能覆盖全局 AbortSignal,导致原生 any 方法不可用
if ("any" in AbortSignal && typeof AbortSignal.any === "function") {
// 优先使用原生实现(Node.js 20.3+)
combinedSignal = AbortSignal.any(signals);
logger.debug("ProxyForwarder: Using native AbortSignal.any", {
signalCount: signals.length,
});
} else {
// Polyfill: 手动实现多信号组合逻辑
logger.debug("ProxyForwarder: Using AbortSignal.any polyfill", {
signalCount: signals.length,
reason: "Native AbortSignal.any not available",
});

const combinedController = new AbortController();
const cleanupHandlers: Array<() => void> = [];

// 为每个信号添加监听器
for (const signal of signals) {
// 如果已经有信号中断,立即中断组合信号
if (signal.aborted) {
combinedController.abort();
break;
}

// 监听信号中断事件
const abortHandler = () => {
// 中断组合信号
combinedController.abort();
// 清理所有监听器(避免内存泄漏)
cleanupHandlers.forEach((cleanup) => cleanup());
};

signal.addEventListener("abort", abortHandler, { once: true });

// 记录清理函数
cleanupHandlers.push(() => {
signal.removeEventListener("abort", abortHandler);
});
}

combinedSignal = combinedController.signal;
}
// 优先 Node 20.3+ 原生 AbortSignal.any(V8 内部管理 listener,无需手动 cleanup);
// Next.js standalone 覆盖全局时 fallback 到 polyfill,由调用方在请求结束时调用
// cleanupCombinedSignal 解绑源信号上的 listener,避免持有 session/请求体闭包。
const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = combineAbortSignals(signals);
logger.debug("ProxyForwarder: Combined abort signals", {
signalCount: signals.length,
});
Comment on lines +2719 to +2721
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Debug log loses native/polyfill visibility

The original code emitted distinct logger.debug messages for the native path ("Using native AbortSignal.any") and the polyfill path ("Using AbortSignal.any polyfill"). The consolidated message no longer tells you which branch was actually taken, which matters when debugging the Next.js-standalone override scenario that motivated this fix.

Consider tagging the log entry with a usingPolyfill boolean so the information is preserved without restoring two separate log calls:

Suggested change
logger.debug("ProxyForwarder: Combined abort signals", {
signalCount: signals.length,
});
logger.debug("ProxyForwarder: Combined abort signals", {
signalCount: signals.length,
usingPolyfill: !("any" in AbortSignal && typeof AbortSignal.any === "function"),
});
Prompt To Fix With AI
This is a comment left during a code review.
Path: src/app/v1/_lib/proxy/forwarder.ts
Line: 2719-2721

Comment:
**Debug log loses native/polyfill visibility**

The original code emitted distinct `logger.debug` messages for the native path (`"Using native AbortSignal.any"`) and the polyfill path (`"Using AbortSignal.any polyfill"`). The consolidated message no longer tells you which branch was actually taken, which matters when debugging the Next.js-standalone override scenario that motivated this fix.

Consider tagging the log entry with a `usingPolyfill` boolean so the information is preserved without restoring two separate log calls:

```suggestion
    logger.debug("ProxyForwarder: Combined abort signals", {
      signalCount: signals.length,
      usingPolyfill: !("any" in AbortSignal && typeof AbortSignal.any === "function"),
    });
```

How can I resolve this? If you propose a fix, please make it concise.


const init: UndiciFetchOptions = {
method: session.method,
Expand Down Expand Up @@ -2849,6 +2812,9 @@ export class ProxyForwarder {
clearTimeout(responseTimeoutId);
}

// Polyfill 路径上需要主动解绑源信号的 abort listener(response-handler 不会执行)。
cleanupCombinedSignal();
Comment on lines +2815 to +2816
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Defer combined-signal cleanup until fallback paths finish

In forwardSingleAttempt, this unconditional cleanupCombinedSignal() runs at catch entry even though the same catch block can continue with HTTP/2→HTTP/1.1 or proxy→direct fallback fetches. In the polyfill path, cleanup detaches responseController/clientAbortSignal listeners from the already-created init.signal, so those fallback requests are no longer abortable by timeout or client disconnect and can hang instead of returning the expected timeout/499 behavior. This should only clean up on terminal exits, or a fresh combined signal should be created before retrying.

Useful? React with 👍 / 👎.

Comment on lines +2815 to +2816
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

polyfill 路径上 cleanup 调用过早,会破坏 HTTP/2→HTTP/1.1 与代理→直连 fallback 的信号取消。

本 catch 块下方还有两条 fallback 路径会复用同一个 init(因此复用同一个 combinedSignal):

  • 第 ~2991-3026 行 HTTP/2→HTTP/1.1 fallback:const http1FallbackInit = { ...init },随后 fetchWithoutAutoDecode(proxyUrl, http1FallbackInit, ...)
  • 第 ~3097-3109 行 代理→直连 fallback:const fallbackInit = { ...init },随后 fetchWithoutAutoDecode(proxyUrl, fallbackInit, ...)

进入 catch 后第 2816 行立刻执行 cleanupCombinedSignal()。在 polyfill 分支这会把源信号上的 abort listener 全部解绑,且 cleaned 标志位永不复位。之后 H1/直连 fallback 在第 3040-3048、3126-3134 行重新启动响应超时(setTimeout(() => responseController.abort(), ...)),但 responseController.signal 上已没有 listener 把 abort 传播给 combinedController——combinedSignal.aborted 始终为 false,fallback 的 fetch 不会被首字节/总响应超时取消,session.clientAbortSignal 触发的客户端中断同样无法传到 fallback fetch,最终只能依赖 undici 的 headersTimeout/bodyTimeout 兜底,明显退化于 PR 之前的行为。原生 AbortSignal.any 路径因为 cleanup 是 noop 不受影响。

建议把 cleanup 从 catch 顶部移除,改为在每个真正抛出的分支前调用(响应超时、流式 idle、客户端中断、SSL/通用错误的最终 throw 处),fallback 成功路径已经由 releaseAgent 内的幂等 cleanup 兜底;fallback 自身的内层 catch 在 re-throw 前也需要补一次 cleanup。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/v1/_lib/proxy/forwarder.ts` around lines 2815 - 2816, The
cleanupCombinedSignal() call is currently invoked immediately at the top of the
catch block which unbinds abort listeners from the original signal and breaks
the later fallback flows that reuse init/combinedSignal (e.g. the
HTTP/2→HTTP/1.1 path that creates http1FallbackInit and the proxy→direct
fallback that creates fallbackInit and calls fetchWithoutAutoDecode). Move the
cleanupCombinedSignal() invocation out of the catch head and instead call it
right before any final throw or when a branch truly finishes (responses timed
out, streaming idle abort, client abort, SSL/general fatal errors), and also
ensure each inner fallback catch calls cleanupCombinedSignal() before
re-throwing; rely on releaseAgent’s idempotent cleanup for successful fallback
paths. Use the existing symbols cleanupCombinedSignal, combinedSignal,
init/http1FallbackInit/fallbackInit, fetchWithoutAutoDecode,
responseController.abort, combinedController and session.clientAbortSignal to
locate where to add/remove the calls.


// Release agent ref count on fetch failure (request never started streaming)
const releaseKey = proxyConfig?.cacheKey ?? directConnectionCacheKey;
const releaseDispatcherId = proxyConfig?.dispatcherId ?? directConnectionDispatcherId;
Expand Down Expand Up @@ -3281,6 +3247,8 @@ export class ProxyForwarder {
if (errorReleaseKey && errorReleaseDispatcherId) {
getGlobalAgentPool().releaseAgent(errorReleaseKey, errorReleaseDispatcherId);
}
// 同上:response-handler 不会跑,polyfill 路径上的源信号 listener 必须在此解绑。
cleanupCombinedSignal();
}
}

Expand Down Expand Up @@ -3308,14 +3276,19 @@ export class ProxyForwarder {

// Attach agent release callback for in-flight reference counting.
// response-handler must call this in its finally block after the stream is fully consumed.
// 同时复用此回调作为 combineAbortSignals polyfill 的 cleanup 入口:response-handler 已经
// 保证在请求结束时(成功/异常)幂等地调用 releaseAgent,把 cleanup 合并到这里就不必再
// 改造 response-handler 的所有 finally 调用点。两个动作互不影响,cleanup 内部自带 cleaned
// 标志,重复调用安全。
const agentCacheKeyToRelease = proxyConfig?.cacheKey ?? directConnectionCacheKey;
const agentDispatcherIdToRelease = proxyConfig?.dispatcherId ?? directConnectionDispatcherId;
if (agentCacheKeyToRelease && agentDispatcherIdToRelease) {
const pool = getGlobalAgentPool();
sessionWithTimeout.releaseAgent = () => {
const pool = agentCacheKeyToRelease && agentDispatcherIdToRelease ? getGlobalAgentPool() : null;
sessionWithTimeout.releaseAgent = () => {
if (pool && agentCacheKeyToRelease && agentDispatcherIdToRelease) {
pool.releaseAgent(agentCacheKeyToRelease, agentDispatcherIdToRelease);
};
}
}
cleanupCombinedSignal();
};
Comment on lines +3286 to +3291
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

虽然将 cleanupCombinedSignal 合并到 releaseAgent 很好地复用了 response-handler 的清理点,但目前实现仍存在两处潜在的泄漏/失效风险:

  1. Hedge 失败者泄漏:在 sendStreamingWithHedgerunAttempt 中,被丢弃的尝试(losers)不会进入 response-handler。目前代码仅对 losers 调用了 body.cancel(),未调用 releaseAgent(),这会导致 polyfill 路径下的 listener 泄漏,且所有路径下的 agent 引用计数也无法释放。
  2. Shadow Session 同步问题:如果 Hedge 的胜出者是 shadow session,syncWinningAttemptSession 目前没有同步 releaseAgent 回主 session。这会导致 response-handler 在主 session 上调用 releaseAgent 时,实际上清理的是第一轮尝试(或为空),而胜出尝试的 listener 依然残留在源信号上。

建议在 runAttempt 的 loser 处理路径中显式调用 releaseAgent(),并在 syncWinningAttemptSession 中同步该回调。注意:由于此处于性能敏感的 Hedge 路径,且 releaseAgent 可能涉及 Redis 会话释放等非关键 I/O,根据项目规则,应将其作为 fire-and-forget 任务执行,以避免阻塞主逻辑。

References
  1. In performance-sensitive code paths like provider failover, non-critical I/O operations (e.g., releasing a session in Redis) should be executed as fire-and-forget tasks to avoid blocking the main logic.


return response;
}
Expand Down
90 changes: 90 additions & 0 deletions tests/unit/proxy/combine-abort-signals.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import { afterEach, beforeEach, describe, expect, it } from "vitest";
import { combineAbortSignals } from "@/app/v1/_lib/proxy/combine-abort-signals";

type MutableAbortSignal = { any?: unknown };

describe("combineAbortSignals", () => {
describe("native AbortSignal.any path", () => {
it("delegates to AbortSignal.any when available and cleanup is noop", () => {
const c1 = new AbortController();
const c2 = new AbortController();

const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]);

expect(signal.aborted).toBe(false);
c1.abort();
expect(signal.aborted).toBe(true);

// cleanup should be safe to call (noop) — no listeners owned by us.
expect(() => cleanup()).not.toThrow();
});
});

describe("polyfill path (AbortSignal.any unavailable)", () => {
let originalAny: unknown;

beforeEach(() => {
originalAny = (AbortSignal as MutableAbortSignal).any;
// 赋 undefined 让 helper 的 `typeof ... === "function"` check 走 polyfill;
// delete 在部分 V8 版本上对 static 不生效,赋值更可靠。
(AbortSignal as MutableAbortSignal).any = undefined;
});

afterEach(() => {
(AbortSignal as MutableAbortSignal).any = originalAny;
});

it("aborts combined signal when any source aborts", () => {
const c1 = new AbortController();
const c2 = new AbortController();

const { signal } = combineAbortSignals([c1.signal, c2.signal]);
expect(signal.aborted).toBe(false);
c2.abort();
expect(signal.aborted).toBe(true);
});

it("source-side abort listeners are detached after cleanup is invoked", () => {
const c1 = new AbortController();
const c2 = new AbortController();

const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]);
expect(signal.aborted).toBe(false);

// 模拟请求正常完成:调用方在 finally 中触发 cleanup。
cleanup();

// 源信号此后再 abort,不应再传播到组合信号(listener 已解绑)。
c1.abort();
c2.abort();
expect(signal.aborted).toBe(false);
});

it("auto-cleans listeners when a source aborts (does not require explicit cleanup)", () => {
const c1 = new AbortController();
const c2 = new AbortController();

const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]);
c1.abort();
expect(signal.aborted).toBe(true);

// 二次 cleanup 必须幂等(请求结束的 finally 仍会调)。
expect(() => cleanup()).not.toThrow();
expect(() => cleanup()).not.toThrow();
});

it("immediately aborts and cleans up when a source signal is already aborted", () => {
const c1 = new AbortController();
c1.abort();
const c2 = new AbortController();

const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]);
expect(signal.aborted).toBe(true);

// 后到的源 abort 不应再触发任何路径(已 cleanup)。
c2.abort();
expect(signal.aborted).toBe(true);
expect(() => cleanup()).not.toThrow();
});
});
});
Loading