Skip to content

Commit 7b45b29

Browse files
committed
统一客户端断开检测与响应处理,增强流式与非流式请求的生命周期管理
1 parent 1ab4b8b commit 7b45b29

1 file changed

Lines changed: 59 additions & 24 deletions

File tree

api_utils/queue_worker.py

Lines changed: 59 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -175,8 +175,9 @@ async def queue_worker():
175175
current_request_was_streaming = False
176176
logger.warning(f"[{req_id}] (Worker) _process_request_refactored returned unexpected type: {type(returned_value)}")
177177

178-
# 优化的流式响应生命周期管理
178+
# 统一的客户端断开检测和响应处理
179179
if completion_event:
180+
# 流式模式:等待流式生成器完成信号
180181
logger.info(f"[{req_id}] (Worker) 等待流式生成器完成信号...")
181182

182183
# 创建一个增强的客户端断开检测器,支持提前done信号触发
@@ -202,16 +203,50 @@ async def enhanced_disconnect_monitor():
202203

203204
# 启动增强的断开连接监控
204205
disconnect_monitor_task = asyncio.create_task(enhanced_disconnect_monitor())
206+
else:
207+
# 非流式模式:等待处理完成并检测客户端断开
208+
logger.info(f"[{req_id}] (Worker) 非流式模式,等待处理完成...")
209+
210+
client_disconnected_early = False
205211

206-
try:
212+
async def non_streaming_disconnect_monitor():
213+
nonlocal client_disconnected_early
214+
while not result_future.done():
215+
try:
216+
# 主动检查客户端是否断开连接
217+
is_connected = await _test_client_connection(req_id, http_request)
218+
if not is_connected:
219+
logger.info(f"[{req_id}] (Worker) ✅ 非流式处理中检测到客户端断开,取消处理")
220+
client_disconnected_early = True
221+
# 取消result_future
222+
if not result_future.done():
223+
result_future.set_exception(HTTPException(status_code=499, detail=f"[{req_id}] 客户端在非流式处理中断开连接"))
224+
break
225+
await asyncio.sleep(0.3) # 更频繁的检查间隔
226+
except Exception as e:
227+
logger.error(f"[{req_id}] (Worker) 非流式断开检测器错误: {e}")
228+
break
229+
230+
# 启动非流式断开连接监控
231+
disconnect_monitor_task = asyncio.create_task(non_streaming_disconnect_monitor())
232+
233+
# 等待处理完成(流式或非流式)
234+
try:
235+
if completion_event:
236+
# 流式模式:等待completion_event
207237
from server import RESPONSE_COMPLETION_TIMEOUT
208238
await asyncio.wait_for(completion_event.wait(), timeout=RESPONSE_COMPLETION_TIMEOUT/1000 + 60)
209239
logger.info(f"[{req_id}] (Worker) ✅ 流式生成器完成信号收到。客户端提前断开: {client_disconnected_early}")
240+
else:
241+
# 非流式模式:等待result_future完成
242+
from server import RESPONSE_COMPLETION_TIMEOUT
243+
await asyncio.wait_for(asyncio.shield(result_future), timeout=RESPONSE_COMPLETION_TIMEOUT/1000 + 60)
244+
logger.info(f"[{req_id}] (Worker) ✅ 非流式处理完成。客户端提前断开: {client_disconnected_early}")
210245

211-
# 如果客户端提前断开,跳过按钮状态处理
212-
if client_disconnected_early:
213-
logger.info(f"[{req_id}] (Worker) 客户端提前断开,跳过按钮状态处理")
214-
elif submit_btn_loc and client_disco_checker:
246+
# 如果客户端提前断开,跳过按钮状态处理
247+
if client_disconnected_early:
248+
logger.info(f"[{req_id}] (Worker) 客户端提前断开,跳过按钮状态处理")
249+
elif submit_btn_loc and client_disco_checker and completion_event:
215250
# 等待发送按钮禁用确认流式响应完全结束
216251
logger.info(f"[{req_id}] (Worker) 流式响应完成,检查并处理发送按钮状态...")
217252
wait_timeout_ms = 30000 # 30 seconds
@@ -250,25 +285,25 @@ async def enhanced_disconnect_monitor():
250285
await save_error_snapshot(f"stream_post_submit_button_handling_timeout_{req_id}")
251286
except ClientDisconnectedError:
252287
logger.info(f"[{req_id}] 客户端在流式响应后按钮状态处理时断开连接。")
253-
elif current_request_was_streaming:
254-
logger.warning(f"[{req_id}] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。")
288+
elif completion_event and current_request_was_streaming:
289+
logger.warning(f"[{req_id}] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。")
255290

256-
except asyncio.TimeoutError:
257-
logger.warning(f"[{req_id}] (Worker) ⚠️ 等待流式生成器完成信号超时。")
258-
if not result_future.done():
259-
result_future.set_exception(HTTPException(status_code=504, detail=f"[{req_id}] Stream generation timed out waiting for completion signal."))
260-
except Exception as ev_wait_err:
261-
logger.error(f"[{req_id}] (Worker) ❌ 等待流式完成事件时出错: {ev_wait_err}")
262-
if not result_future.done():
263-
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Error waiting for stream completion: {ev_wait_err}"))
264-
finally:
265-
# 清理断开连接监控任务
266-
if not disconnect_monitor_task.done():
267-
disconnect_monitor_task.cancel()
268-
try:
269-
await disconnect_monitor_task
270-
except asyncio.CancelledError:
271-
pass
291+
except asyncio.TimeoutError:
292+
logger.warning(f"[{req_id}] (Worker) ⚠️ 等待处理完成超时。")
293+
if not result_future.done():
294+
result_future.set_exception(HTTPException(status_code=504, detail=f"[{req_id}] Processing timed out waiting for completion."))
295+
except Exception as ev_wait_err:
296+
logger.error(f"[{req_id}] (Worker) ❌ 等待处理完成时出错: {ev_wait_err}")
297+
if not result_future.done():
298+
result_future.set_exception(HTTPException(status_code=500, detail=f"[{req_id}] Error waiting for completion: {ev_wait_err}"))
299+
finally:
300+
# 清理断开连接监控任务
301+
if 'disconnect_monitor_task' in locals() and not disconnect_monitor_task.done():
302+
disconnect_monitor_task.cancel()
303+
try:
304+
await disconnect_monitor_task
305+
except asyncio.CancelledError:
306+
pass
272307

273308
except Exception as process_err:
274309
logger.error(f"[{req_id}] (Worker) _process_request_refactored execution error: {process_err}")

0 commit comments

Comments
 (0)