Skip to content

Commit 1ab4b8b

Browse files
committed
优化客户端连接状态检测,减少不必要的请求处理,增强异常处理逻辑
1 parent 76a1f0c commit 1ab4b8b

3 files changed

Lines changed: 97 additions & 14 deletions

File tree

api_utils/queue_worker.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -100,16 +100,26 @@ async def queue_worker():
100100
request_data = request_item["request_data"]
101101
http_request = request_item["http_request"]
102102
result_future = request_item["result_future"]
103-
103+
104104
if request_item.get("cancelled", False):
105105
logger.info(f"[{req_id}] (Worker) 请求已取消,跳过。")
106106
if not result_future.done():
107107
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 请求已被用户取消"))
108108
request_queue.task_done()
109109
continue
110-
110+
111111
is_streaming_request = request_data.stream
112112
logger.info(f"[{req_id}] (Worker) 取出请求。模式: {'流式' if is_streaming_request else '非流式'}")
113+
114+
# 优化:在开始处理前主动检测客户端连接状态,避免不必要的处理
115+
from api_utils.request_processor import _test_client_connection
116+
is_connected = await _test_client_connection(req_id, http_request)
117+
if not is_connected:
118+
logger.info(f"[{req_id}] (Worker) ✅ 主动检测到客户端已断开,跳过处理节省资源")
119+
if not result_future.done():
120+
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端在处理前已断开连接"))
121+
request_queue.task_done()
122+
continue
113123

114124
# 流式请求间隔控制
115125
current_time = time.time()
@@ -118,8 +128,10 @@ async def queue_worker():
118128
logger.info(f"[{req_id}] (Worker) 连续流式请求,添加 {delay_time:.2f}s 延迟...")
119129
await asyncio.sleep(delay_time)
120130

121-
if await http_request.is_disconnected():
122-
logger.info(f"[{req_id}] (Worker) 客户端在等待锁时断开。取消。")
131+
# 等待锁前再次主动检测客户端连接
132+
is_connected = await _test_client_connection(req_id, http_request)
133+
if not is_connected:
134+
logger.info(f"[{req_id}] (Worker) ✅ 等待锁时检测到客户端断开,取消处理")
123135
if not result_future.done():
124136
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
125137
request_queue.task_done()
@@ -129,8 +141,10 @@ async def queue_worker():
129141
async with processing_lock:
130142
logger.info(f"[{req_id}] (Worker) 已获取处理锁。开始核心处理...")
131143

132-
if await http_request.is_disconnected():
133-
logger.info(f"[{req_id}] (Worker) 客户端在获取锁后断开。取消。")
144+
# 获取锁后最终主动检测客户端连接
145+
is_connected = await _test_client_connection(req_id, http_request)
146+
if not is_connected:
147+
logger.info(f"[{req_id}] (Worker) ✅ 获取锁后检测到客户端断开,取消处理")
134148
if not result_future.done():
135149
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
136150
elif result_future.done():
@@ -172,15 +186,16 @@ async def enhanced_disconnect_monitor():
172186
nonlocal client_disconnected_early
173187
while not completion_event.is_set():
174188
try:
175-
# 检查客户端是否断开连接
176-
if await http_request.is_disconnected():
177-
logger.info(f"[{req_id}] (Worker) 检测到客户端断开连接,提前触发done信号")
189+
# 主动检查客户端是否断开连接
190+
is_connected = await _test_client_connection(req_id, http_request)
191+
if not is_connected:
192+
logger.info(f"[{req_id}] (Worker) ✅ 流式处理中检测到客户端断开,提前触发done信号")
178193
client_disconnected_early = True
179194
# 立即设置completion_event以提前结束等待
180195
if not completion_event.is_set():
181196
completion_event.set()
182197
break
183-
await asyncio.sleep(0.5) # 更频繁的检查间隔
198+
await asyncio.sleep(0.3) # 更频繁的检查间隔
184199
except Exception as e:
185200
logger.error(f"[{req_id}] (Worker) 增强断开检测器错误: {e}")
186201
break

api_utils/request_processor.py

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,44 @@ async def _analyze_model_requirements(req_id: str, context: dict, request: ChatC
9696
return context
9797

9898

99+
async def _test_client_connection(req_id: str, http_request: Request) -> bool:
100+
"""通过发送测试数据包来主动检测客户端连接状态"""
101+
try:
102+
# 尝试发送一个小的测试数据包
103+
test_chunk = "data: {\"type\":\"ping\"}\n\n"
104+
105+
# 获取底层的响应对象
106+
if hasattr(http_request, '_receive'):
107+
# 检查接收通道是否还活跃
108+
try:
109+
# 尝试非阻塞地检查是否有断开消息
110+
import asyncio
111+
receive_task = asyncio.create_task(http_request._receive())
112+
done, pending = await asyncio.wait([receive_task], timeout=0.01)
113+
114+
if done:
115+
message = receive_task.result()
116+
if message.get("type") == "http.disconnect":
117+
return False
118+
else:
119+
# 取消未完成的任务
120+
receive_task.cancel()
121+
try:
122+
await receive_task
123+
except asyncio.CancelledError:
124+
pass
125+
126+
except Exception:
127+
# 如果检查过程中出现异常,可能表示连接有问题
128+
return False
129+
130+
# 如果上述检查都通过,认为连接正常
131+
return True
132+
133+
except Exception as e:
134+
# 任何异常都认为连接已断开
135+
return False
136+
99137
async def _setup_disconnect_monitoring(req_id: str, http_request: Request, result_future: Future) -> Tuple[Event, asyncio.Task, Callable]:
100138
"""设置客户端断开连接监控"""
101139
from server import logger
@@ -105,13 +143,24 @@ async def _setup_disconnect_monitoring(req_id: str, http_request: Request, resul
105143
async def check_disconnect_periodically():
106144
while not client_disconnected_event.is_set():
107145
try:
146+
# 使用主动检测方法
147+
is_connected = await _test_client_connection(req_id, http_request)
148+
if not is_connected:
149+
logger.info(f"[{req_id}] 主动检测到客户端断开连接。")
150+
client_disconnected_event.set()
151+
if not result_future.done():
152+
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
153+
break
154+
155+
# 备用检查:使用原有的is_disconnected方法
108156
if await http_request.is_disconnected():
109-
logger.info(f"[{req_id}] 客户端断开,设置事件。")
157+
logger.info(f"[{req_id}] 备用检测到客户端断开连接。")
110158
client_disconnected_event.set()
111159
if not result_future.done():
112160
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端关闭了请求"))
113161
break
114-
await asyncio.sleep(0.5) # 更频繁的检查间隔,从1.0秒改为0.5秒
162+
163+
await asyncio.sleep(0.3) # 更频繁的检查间隔,从0.5秒改为0.3秒
115164
except asyncio.CancelledError:
116165
break
117166
except Exception as e:
@@ -755,7 +804,16 @@ async def _process_request_refactored(
755804
result_future: Future
756805
) -> Optional[Tuple[Event, Locator, Callable[[str], bool]]]:
757806
"""核心请求处理函数 - 重构版本"""
758-
807+
808+
# 优化:在开始任何处理前主动检测客户端连接状态
809+
is_connected = await _test_client_connection(req_id, http_request)
810+
if not is_connected:
811+
from server import logger
812+
logger.info(f"[{req_id}] ✅ 核心处理前检测到客户端断开,提前退出节省资源")
813+
if not result_future.done():
814+
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端在处理开始前已断开连接"))
815+
return None
816+
759817
context = await _initialize_request_context(req_id, request)
760818
context = await _analyze_model_requirements(req_id, context, request)
761819

@@ -788,7 +846,10 @@ async def _process_request_refactored(
788846
context['parsed_model_list'],
789847
check_client_disconnected
790848
)
791-
849+
850+
# 优化:在提交提示前再次检查客户端连接,避免不必要的后台请求
851+
check_client_disconnected("提交提示前最终检查")
852+
792853
await page_controller.submit_prompt(prepared_prompt, check_client_disconnected)
793854

794855
# 响应处理仍然需要在这里,因为它决定了是流式还是非流式,并设置future

api_utils/routes.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,13 @@ async def chat_completions(
202202
raise HTTPException(status_code=504, detail=f"[{req_id}] 请求处理超时。")
203203
except asyncio.CancelledError:
204204
raise HTTPException(status_code=499, detail=f"[{req_id}] 请求被客户端取消。")
205+
except HTTPException as http_exc:
206+
# 对于客户端断开连接的情况,使用更友好的日志级别
207+
if http_exc.status_code == 499:
208+
logger.info(f"[{req_id}] 客户端断开连接: {http_exc.detail}")
209+
else:
210+
logger.warning(f"[{req_id}] HTTP异常: {http_exc.detail}")
211+
raise http_exc
205212
except Exception as e:
206213
logger.exception(f"[{req_id}] 等待Worker响应时出错")
207214
raise HTTPException(status_code=500, detail=f"[{req_id}] 服务器内部错误: {e}")

0 commit comments

Comments
 (0)