@@ -100,16 +100,26 @@ async def queue_worker():
100100 request_data = request_item ["request_data" ]
101101 http_request = request_item ["http_request" ]
102102 result_future = request_item ["result_future" ]
103-
103+
104104 if request_item .get ("cancelled" , False ):
105105 logger .info (f"[{ req_id } ] (Worker) 请求已取消,跳过。" )
106106 if not result_future .done ():
107107 result_future .set_exception (HTTPException (status_code = 499 , detail = f"[{ req_id } ] 请求已被用户取消" ))
108108 request_queue .task_done ()
109109 continue
110-
110+
111111 is_streaming_request = request_data .stream
112112 logger .info (f"[{ req_id } ] (Worker) 取出请求。模式: { '流式' if is_streaming_request else '非流式' } " )
113+
114+ # 优化:在开始处理前主动检测客户端连接状态,避免不必要的处理
115+ from api_utils .request_processor import _test_client_connection
116+ is_connected = await _test_client_connection (req_id , http_request )
117+ if not is_connected :
118+ logger .info (f"[{ req_id } ] (Worker) ✅ 主动检测到客户端已断开,跳过处理节省资源" )
119+ if not result_future .done ():
120+ result_future .set_exception (HTTPException (status_code = 499 , detail = f"[{ req_id } ] 客户端在处理前已断开连接" ))
121+ request_queue .task_done ()
122+ continue
113123
114124 # 流式请求间隔控制
115125 current_time = time .time ()
@@ -118,8 +128,10 @@ async def queue_worker():
118128 logger .info (f"[{ req_id } ] (Worker) 连续流式请求,添加 { delay_time :.2f} s 延迟..." )
119129 await asyncio .sleep (delay_time )
120130
121- if await http_request .is_disconnected ():
122- logger .info (f"[{ req_id } ] (Worker) 客户端在等待锁时断开。取消。" )
131+ # 等待锁前再次主动检测客户端连接
132+ is_connected = await _test_client_connection (req_id , http_request )
133+ if not is_connected :
134+ logger .info (f"[{ req_id } ] (Worker) ✅ 等待锁时检测到客户端断开,取消处理" )
123135 if not result_future .done ():
124136 result_future .set_exception (HTTPException (status_code = 499 , detail = f"[{ req_id } ] 客户端关闭了请求" ))
125137 request_queue .task_done ()
@@ -129,8 +141,10 @@ async def queue_worker():
129141 async with processing_lock :
130142 logger .info (f"[{ req_id } ] (Worker) 已获取处理锁。开始核心处理..." )
131143
132- if await http_request .is_disconnected ():
133- logger .info (f"[{ req_id } ] (Worker) 客户端在获取锁后断开。取消。" )
144+ # 获取锁后最终主动检测客户端连接
145+ is_connected = await _test_client_connection (req_id , http_request )
146+ if not is_connected :
147+ logger .info (f"[{ req_id } ] (Worker) ✅ 获取锁后检测到客户端断开,取消处理" )
134148 if not result_future .done ():
135149 result_future .set_exception (HTTPException (status_code = 499 , detail = f"[{ req_id } ] 客户端关闭了请求" ))
136150 elif result_future .done ():
@@ -161,16 +175,79 @@ async def queue_worker():
161175 current_request_was_streaming = False
162176 logger .warning (f"[{ req_id } ] (Worker) _process_request_refactored returned unexpected type: { type (returned_value )} " )
163177
164- # 关键修复:在锁内等待流式完成(与原始参考文件一致)
178+ # 统一的客户端断开检测和响应处理
165179 if completion_event :
180+ # 流式模式:等待流式生成器完成信号
166181 logger .info (f"[{ req_id } ] (Worker) 等待流式生成器完成信号..." )
167- try :
182+
183+ # 创建一个增强的客户端断开检测器,支持提前done信号触发
184+ client_disconnected_early = False
185+
186+ async def enhanced_disconnect_monitor ():
187+ nonlocal client_disconnected_early
188+ while not completion_event .is_set ():
189+ try :
190+ # 主动检查客户端是否断开连接
191+ is_connected = await _test_client_connection (req_id , http_request )
192+ if not is_connected :
193+ logger .info (f"[{ req_id } ] (Worker) ✅ 流式处理中检测到客户端断开,提前触发done信号" )
194+ client_disconnected_early = True
195+ # 立即设置completion_event以提前结束等待
196+ if not completion_event .is_set ():
197+ completion_event .set ()
198+ break
199+ await asyncio .sleep (0.3 ) # 更频繁的检查间隔
200+ except Exception as e :
201+ logger .error (f"[{ req_id } ] (Worker) 增强断开检测器错误: { e } " )
202+ break
203+
204+ # 启动增强的断开连接监控
205+ disconnect_monitor_task = asyncio .create_task (enhanced_disconnect_monitor ())
206+ else :
207+ # 非流式模式:等待处理完成并检测客户端断开
208+ logger .info (f"[{ req_id } ] (Worker) 非流式模式,等待处理完成..." )
209+
210+ client_disconnected_early = False
211+
212+ async def non_streaming_disconnect_monitor ():
213+ nonlocal client_disconnected_early
214+ while not result_future .done ():
215+ try :
216+ # 主动检查客户端是否断开连接
217+ is_connected = await _test_client_connection (req_id , http_request )
218+ if not is_connected :
219+ logger .info (f"[{ req_id } ] (Worker) ✅ 非流式处理中检测到客户端断开,取消处理" )
220+ client_disconnected_early = True
221+ # 取消result_future
222+ if not result_future .done ():
223+ result_future .set_exception (HTTPException (status_code = 499 , detail = f"[{ req_id } ] 客户端在非流式处理中断开连接" ))
224+ break
225+ await asyncio .sleep (0.3 ) # 更频繁的检查间隔
226+ except Exception as e :
227+ logger .error (f"[{ req_id } ] (Worker) 非流式断开检测器错误: { e } " )
228+ break
229+
230+ # 启动非流式断开连接监控
231+ disconnect_monitor_task = asyncio .create_task (non_streaming_disconnect_monitor ())
232+
233+ # 等待处理完成(流式或非流式)
234+ try :
235+ if completion_event :
236+ # 流式模式:等待completion_event
168237 from server import RESPONSE_COMPLETION_TIMEOUT
169238 await asyncio .wait_for (completion_event .wait (), timeout = RESPONSE_COMPLETION_TIMEOUT / 1000 + 60 )
170- logger .info (f"[{ req_id } ] (Worker) ✅ 流式生成器完成信号收到。" )
239+ logger .info (f"[{ req_id } ] (Worker) ✅ 流式生成器完成信号收到。客户端提前断开: { client_disconnected_early } " )
240+ else :
241+ # 非流式模式:等待result_future完成
242+ from server import RESPONSE_COMPLETION_TIMEOUT
243+ await asyncio .wait_for (asyncio .shield (result_future ), timeout = RESPONSE_COMPLETION_TIMEOUT / 1000 + 60 )
244+ logger .info (f"[{ req_id } ] (Worker) ✅ 非流式处理完成。客户端提前断开: { client_disconnected_early } " )
171245
172- # 等待发送按钮禁用确认流式响应完全结束
173- if submit_btn_loc and client_disco_checker :
246+ # 如果客户端提前断开,跳过按钮状态处理
247+ if client_disconnected_early :
248+ logger .info (f"[{ req_id } ] (Worker) 客户端提前断开,跳过按钮状态处理" )
249+ elif submit_btn_loc and client_disco_checker and completion_event :
250+ # 等待发送按钮禁用确认流式响应完全结束
174251 logger .info (f"[{ req_id } ] (Worker) 流式响应完成,检查并处理发送按钮状态..." )
175252 wait_timeout_ms = 30000 # 30 seconds
176253 try :
@@ -208,17 +285,25 @@ async def queue_worker():
208285 await save_error_snapshot (f"stream_post_submit_button_handling_timeout_{ req_id } " )
209286 except ClientDisconnectedError :
210287 logger .info (f"[{ req_id } ] 客户端在流式响应后按钮状态处理时断开连接。" )
211- elif current_request_was_streaming :
212- logger .warning (f"[{ req_id } ] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。" )
288+ elif completion_event and current_request_was_streaming :
289+ logger .warning (f"[{ req_id } ] (Worker) 流式请求但 submit_btn_loc 或 client_disco_checker 未提供。跳过按钮禁用等待。" )
213290
214- except asyncio .TimeoutError :
215- logger .warning (f"[{ req_id } ] (Worker) ⚠️ 等待流式生成器完成信号超时。" )
216- if not result_future .done ():
217- result_future .set_exception (HTTPException (status_code = 504 , detail = f"[{ req_id } ] Stream generation timed out waiting for completion signal." ))
218- except Exception as ev_wait_err :
219- logger .error (f"[{ req_id } ] (Worker) ❌ 等待流式完成事件时出错: { ev_wait_err } " )
220- if not result_future .done ():
221- result_future .set_exception (HTTPException (status_code = 500 , detail = f"[{ req_id } ] Error waiting for stream completion: { ev_wait_err } " ))
291+ except asyncio .TimeoutError :
292+ logger .warning (f"[{ req_id } ] (Worker) ⚠️ 等待处理完成超时。" )
293+ if not result_future .done ():
294+ result_future .set_exception (HTTPException (status_code = 504 , detail = f"[{ req_id } ] Processing timed out waiting for completion." ))
295+ except Exception as ev_wait_err :
296+ logger .error (f"[{ req_id } ] (Worker) ❌ 等待处理完成时出错: { ev_wait_err } " )
297+ if not result_future .done ():
298+ result_future .set_exception (HTTPException (status_code = 500 , detail = f"[{ req_id } ] Error waiting for completion: { ev_wait_err } " ))
299+ finally :
300+ # 清理断开连接监控任务
301+ if 'disconnect_monitor_task' in locals () and not disconnect_monitor_task .done ():
302+ disconnect_monitor_task .cancel ()
303+ try :
304+ await disconnect_monitor_task
305+ except asyncio .CancelledError :
306+ pass
222307
223308 except Exception as process_err :
224309 logger .error (f"[{ req_id } ] (Worker) _process_request_refactored execution error: { process_err } " )
0 commit comments