Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 89 additions & 25 deletions src/app/v1/_lib/proxy/forwarder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
getPreferredProviderEndpoints,
} from "@/lib/provider-endpoints/endpoint-selector";
import { getGlobalAgentPool, getProxyAgentForProvider } from "@/lib/proxy-agent";
import { RateLimitService } from "@/lib/rate-limit/service";
import { SessionManager } from "@/lib/session-manager";
import {
detectUpstreamErrorFromSseOrJsonText,
Expand Down Expand Up @@ -1077,7 +1078,7 @@ export class ProxyForwarder {
});
}

failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);
attemptCount = maxAttemptsPerProvider;
} else {
endpointCandidates.push({ endpointId: null, baseUrl: currentProvider.url });
Expand Down Expand Up @@ -1140,7 +1141,7 @@ export class ProxyForwarder {
vendorId: currentProvider.providerVendorId,
providerType: currentProvider.providerType,
});
failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);
attemptCount = maxAttemptsPerProvider;
}

Expand Down Expand Up @@ -1708,7 +1709,7 @@ export class ProxyForwarder {
const env = getEnvConfig();

// 无论是否计入熔断器,都要加入 failedProviderIds(避免重复选择同一供应商)
failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);

if (env.ENABLE_CIRCUIT_BREAKER_ON_NETWORK_ERRORS) {
logger.warn(
Expand Down Expand Up @@ -1806,7 +1807,7 @@ export class ProxyForwarder {
}

// 重试耗尽:加入失败列表并切换供应商
failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);
break; // ⭐ 跳出内层循环,进入供应商切换逻辑
}

Expand Down Expand Up @@ -1878,7 +1879,7 @@ export class ProxyForwarder {
}
}

failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);
break; // 跳出内层循环,进入供应商切换逻辑
}

Expand Down Expand Up @@ -1927,7 +1928,7 @@ export class ProxyForwarder {
currentProvider.providerVendorId,
currentProvider.providerType
);
failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);
break;
}

Expand Down Expand Up @@ -2023,7 +2024,7 @@ export class ProxyForwarder {
}

// 加入失败列表并切换供应商
failedProviderIds.push(currentProvider.id);
ProxyForwarder.markProviderFailed(session, failedProviderIds, currentProvider.id);
break; // 跳出内层循环,进入供应商切换逻辑
}
}
Expand Down Expand Up @@ -3397,6 +3398,7 @@ export class ProxyForwarder {
let lastError: Error | null = null;
let lastErrorCategory: ErrorCategory | null = null;
const attempts = new Set<StreamingHedgeAttempt>();
const failedProviderIds: number[] = [];

let resolveResult: ((result: { response?: Response; error?: Error }) => void) | null = null;
const resultPromise = new Promise<{ response?: Response; error?: Error }>((resolve) => {
Expand Down Expand Up @@ -3444,6 +3446,7 @@ export class ProxyForwarder {
attemptNumber: attempt.sequence,
modelRedirect: getAttemptModelRedirect(attempt),
});
ProxyForwarder.markProviderFailed(session, failedProviderIds, attempt.provider.id);
}
try {
attempt.responseController?.abort(new Error(reason));
Expand Down Expand Up @@ -3511,21 +3514,24 @@ export class ProxyForwarder {
}

launchingAlternative = (async () => {
const alternativeProvider = await ProxyForwarder.selectAlternative(
session,
Array.from(launchedProviderIds)
);
if (!alternativeProvider) {
noMoreProviders = true;
// No alternative providers available — let in-flight attempt(s) continue.
// If all attempts already completed, settle with last error.
if (attempts.size === 0) {
await finishIfExhausted();
while (!settled && !winnerCommitted && !noMoreProviders) {
const alternativeProvider = await ProxyForwarder.selectAlternative(
session,
Array.from(launchedProviderIds)
);
if (!alternativeProvider) {
noMoreProviders = true;
// No alternative providers available — let in-flight attempt(s) continue.
// If all attempts already completed, settle with last error.
if (attempts.size === 0) {
await finishIfExhausted();
}
return;
}
return;
}

await startAttempt(alternativeProvider, false);
const launched = await startAttempt(alternativeProvider, false);
if (launched) return;
}
})()
.catch(async (error) => {
const normalizedError = error instanceof Error ? error : new Error(String(error));
Expand Down Expand Up @@ -3767,6 +3773,7 @@ export class ProxyForwarder {
attempt.thresholdTimer = null;
}
attempts.delete(attempt);
ProxyForwarder.markProviderFailed(session, failedProviderIds, attempt.provider.id);

if (errorCategory === ErrorCategory.PROVIDER_ERROR && statusCode !== 404) {
await recordFailure(attempt.provider.id, error);
Expand Down Expand Up @@ -3916,11 +3923,38 @@ export class ProxyForwarder {
settleSuccess(response);
};

const startAttempt = async (provider: Provider, useOriginalSession: boolean) => {
if (settled || winnerCommitted || launchedProviderIds.has(provider.id)) return;
const startAttempt = async (
provider: Provider,
useOriginalSession: boolean
): Promise<boolean> => {
if (settled || winnerCommitted || launchedProviderIds.has(provider.id)) return false;

launchedProviderIds.add(provider.id);

if (!useOriginalSession && session.sessionId) {
const limit = provider.limitConcurrentSessions || 0;
const checkResult = await RateLimitService.checkAndTrackProviderSession(
provider.id,
session.sessionId,
limit
);

if (!checkResult.allowed) {
ProxyForwarder.markProviderFailed(session, failedProviderIds, provider.id);
session.addProviderToChain(provider, {
reason: "concurrent_limit_failed",
circuitState: getCircuitState(provider.id),
attemptNumber: launchedProviderCount + 1,
errorMessage: checkResult.reason || "并发限制已达到",
});
return false;
}

if (checkResult.referenced) {
session.recordProviderSessionRef(provider.id);
}
}

let endpointSelection: {
endpointId: number | null;
baseUrl: string;
Expand All @@ -3931,9 +3965,9 @@ export class ProxyForwarder {
} catch (endpointError) {
lastError = endpointError as Error;
lastErrorCategory = null;
await launchAlternative();
ProxyForwarder.markProviderFailed(session, failedProviderIds, provider.id);
await finishIfExhausted();
return;
return false;
}

launchedProviderCount += 1;
Expand Down Expand Up @@ -3984,6 +4018,7 @@ export class ProxyForwarder {
armAttemptThreshold(attempt);

runAttempt(attempt);
return true;
};

if (session.clientAbortSignal) {
Expand Down Expand Up @@ -4012,7 +4047,10 @@ export class ProxyForwarder {
);
}

await startAttempt(initialProvider, true);
const initialLaunched = await startAttempt(initialProvider, true);
if (!initialLaunched) {
await launchAlternative();
}
await finishIfExhausted();
const result = await resultPromise;
if (result.error) {
Expand Down Expand Up @@ -4250,6 +4288,32 @@ export class ProxyForwarder {
await SessionManager.clearSessionProvider(session.sessionId);
}

private static markProviderFailed(
session: ProxySession,
failedProviderIds: number[],
providerId: number
): void {
if (failedProviderIds.includes(providerId)) {
return;
}

failedProviderIds.push(providerId);

if (!session.sessionId) {
return;
}

const providerSessionRefConsumer = (
session as { consumeProviderSessionRef?: (providerId: number) => boolean }
).consumeProviderSessionRef;

if (!providerSessionRefConsumer?.call(session, providerId)) {
return;
}

void RateLimitService.releaseProviderSession(providerId, session.sessionId);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve provider session membership for concurrent requests

markProviderFailed now always calls releaseProviderSession, which does a raw ZREM by sessionId; because provider concurrency is tracked as one ZSET member per session (via checkAndTrackProviderSession), this removes the provider session for all in-flight requests sharing that session. If one concurrent request fails over while another request from the same session is still actively using that provider, the active session count is under-reported and new sessions can be admitted past the configured provider concurrency limit until the next refresh/retrack.

Useful? React with 👍 / 👎.

}
Comment on lines +4291 to +4315
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Verify ProxySession exposes recordProviderSessionRef and consumeProviderSessionRef on its public type.
fd -t f 'session.ts' src/app/v1/_lib/proxy/ --exec rg -nP -C2 'recordProviderSessionRef|consumeProviderSessionRef|providerSessionRefs' {}

Repository: ding113/claude-code-hub

Length of output: 842


统一使用直接方法调用而非防御式类型断言

验证表明 recordProviderSessionRefconsumeProviderSessionRef 两个方法都已在 ProxySession 中公开定义(分别在 session.ts 的第 320 和 330 行),因此 forwarder.ts 第 4306-4310 行的防御式类型断言:

const providerSessionRefConsumer = (
  session as { consumeProviderSessionRef?: (providerId: number) => boolean }
).consumeProviderSessionRef;

是冗余的。建议统一使用直接调用方式,保持与第 3954 行 session.recordProviderSessionRef(provider.id) 的风格一致。

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/app/v1/_lib/proxy/forwarder.ts` around lines 4291 - 4315, The code in
markProviderFailed uses a defensive type assertion to access
consumeProviderSessionRef but ProxySession already exposes that method; remove
the cast and call the method directly (mirror the style used for
session.recordProviderSessionRef), e.g. replace the casted
providerSessionRefConsumer usage with a direct invocation of
session.consumeProviderSessionRef(providerId), keep the same null/boolean check
and the subsequent call to
RateLimitService.releaseProviderSession(session.sessionId) when it returns true.


private static buildAllProvidersUnavailableError(finalError?: Error | null): ProxyError {
const safeClientMessageCandidate =
finalError instanceof ProxyError &&
Expand Down
4 changes: 4 additions & 0 deletions src/app/v1/_lib/proxy/provider-selector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,10 @@ export class ProxyProviderResolver {
}

// === 成功 ===
if (checkResult.referenced) {
session.recordProviderSessionRef(session.provider.id);
}

logger.debug("ProviderSelector: Session tracked atomically", {
sessionId: session.sessionId,
providerName: session.provider.name,
Expand Down
23 changes: 23 additions & 0 deletions src/app/v1/_lib/proxy/session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ export class ProxySession {
*/
private providersSnapshot: Provider[] | null = null;

// 本请求已通过 Provider 并发检查获得的引用。
// 失败切换 provider 时只能释放这里记录过的引用,避免 hedge/fallback 释放未 acquire 的 Redis 计数。
private providerSessionRefs = new Set<number>();

private constructor(init: {
startTime: number;
method: string;
Expand Down Expand Up @@ -313,6 +317,25 @@ export class ProxySession {
}
}

recordProviderSessionRef(providerId: number): void {
if (!this.providerSessionRefs) {
this.providerSessionRefs = new Set<number>();
}

if (Number.isInteger(providerId) && providerId > 0) {
this.providerSessionRefs.add(providerId);
}
}

consumeProviderSessionRef(providerId: number): boolean {
if (!this.providerSessionRefs?.has(providerId)) {
return false;
}

this.providerSessionRefs.delete(providerId);
return true;
}

setCacheTtlResolved(ttl: CacheTtlResolved | null): void {
this.cacheTtlResolved = ttl;
}
Expand Down
Loading
Loading