Skip to content

Commit 744b1f4

Browse files
committed
feat(email): implement cancellation handling in email services
1 parent 8b8ef7c commit 744b1f4

15 files changed

Lines changed: 680 additions & 100 deletions

src/core/register.py

Lines changed: 126 additions & 55 deletions
Large diffs are not rendered by default.

src/services/base.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import time
1010
from dataclasses import dataclass
1111
from datetime import datetime
12-
from typing import Optional, Dict, Any, List
12+
from typing import Optional, Dict, Any, List, Callable
1313
from enum import Enum
1414

1515
from ..config.constants import EmailServiceType, OPENAI_EMAIL_SENDERS, OTP_CODE_PATTERN, OTP_CODE_SEMANTIC_PATTERN
@@ -139,6 +139,10 @@ def __init__(self, message: str = "当前邮件批次未发现 OpenAI 发件人"
139139
self.error_code = error_code
140140

141141

142+
class EmailServiceCancelledError(EmailServiceError):
143+
"""邮箱服务在轮询过程中收到取消信号。"""
144+
145+
142146
class EmailServiceStatus(Enum):
143147
"""邮箱服务状态"""
144148
HEALTHY = "healthy"
@@ -168,6 +172,7 @@ def __init__(self, service_type: EmailServiceType, name: str = None):
168172
self._provider_backoff = reset_adaptive_backoff()
169173
self._used_verification_codes: Dict[str, set] = {}
170174
self._seen_verification_messages: Dict[str, set] = {}
175+
self.check_cancelled: Optional[Callable[[], bool]] = None
171176

172177
_EMAIL_ADDRESS_PATTERN = re.compile(r"[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}")
173178

@@ -190,6 +195,35 @@ def apply_provider_backoff_state(self, state: Optional[EmailProviderBackoffState
190195
"""注入外部持久化的邮箱供应商退避状态"""
191196
self._provider_backoff = state or reset_adaptive_backoff()
192197

198+
def set_check_cancelled(self, callback: Optional[Callable[[], bool]]) -> None:
199+
"""注入外部取消检查回调。"""
200+
self.check_cancelled = callback if callable(callback) else None
201+
202+
def _is_cancelled_requested(self) -> bool:
203+
"""检查邮箱服务是否收到取消请求。"""
204+
callback = self.check_cancelled
205+
if not callable(callback):
206+
return False
207+
try:
208+
return bool(callback())
209+
except Exception as e:
210+
logger.warning(f"检查邮箱服务取消状态失败: {e}")
211+
return False
212+
213+
def _raise_if_cancelled(self, message: str = "任务已取消") -> None:
214+
"""在轮询/等待阶段响应取消请求。"""
215+
if self._is_cancelled_requested():
216+
raise EmailServiceCancelledError(message)
217+
218+
def _sleep_with_cancel(self, seconds: float, chunk_seconds: float = 0.2) -> None:
219+
"""可响应取消的短分片休眠。"""
220+
remaining = max(0.0, float(seconds))
221+
while remaining > 0:
222+
self._raise_if_cancelled()
223+
sleep_for = min(chunk_seconds, remaining)
224+
time.sleep(sleep_for)
225+
remaining -= sleep_for
226+
193227
@abc.abstractmethod
194228
def create_email(self, config: Dict[str, Any] = None) -> Dict[str, Any]:
195229
"""
@@ -520,6 +554,7 @@ def wait_for_email(
520554
last_email_id = None
521555

522556
while time.time() - start_time < timeout:
557+
self._raise_if_cancelled("等待邮件时任务已取消")
523558
try:
524559
emails = self.list_emails()
525560
for email_info in emails:
@@ -562,7 +597,7 @@ def wait_for_email(
562597
except Exception as e:
563598
logger.warning(f"等待邮件时出错: {e}")
564599

565-
time.sleep(check_interval)
600+
self._sleep_with_cancel(check_interval)
566601

567602
return None
568603

src/services/cloud_mail.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,7 @@ def get_verification_code(
236236
seen_mail_ids: set = set()
237237

238238
while time.time() - start_time < timeout:
239+
self._raise_if_cancelled("等待 Cloud Mail 验证码时任务已取消")
239240
try:
240241
token = self._get_public_token()
241242
mails = self._make_request(
@@ -253,7 +254,7 @@ def get_verification_code(
253254
mails = mails["list"]
254255

255256
if not isinstance(mails, list):
256-
time.sleep(poll_interval)
257+
self._sleep_with_cancel(poll_interval)
257258
continue
258259

259260
if mails:
@@ -302,7 +303,7 @@ def get_verification_code(
302303
raise
303304
logger.debug(f"检查 Cloud Mail 邮件时出错: {e}")
304305

305-
time.sleep(poll_interval)
306+
self._sleep_with_cancel(poll_interval)
306307

307308
logger.warning(f"等待 Cloud Mail 验证码超时: {email}")
308309
return None

src/services/duck_mail.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,6 +263,7 @@ def get_verification_code(
263263
seen_message_ids = set()
264264

265265
while time.time() - start_time < timeout:
266+
self._raise_if_cancelled("等待 DuckMail 验证码时任务已取消")
266267
try:
267268
response = self._make_request(
268269
"GET",
@@ -324,7 +325,7 @@ def get_verification_code(
324325
raise
325326
logger.debug(f"DuckMail 轮询验证码失败: {e}")
326327

327-
time.sleep(poll_interval)
328+
self._sleep_with_cancel(poll_interval)
328329

329330
return None
330331

src/services/freemail.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,11 @@ def get_verification_code(
214214
seen_mail_ids: set = set()
215215

216216
while time.time() - start_time < timeout:
217+
self._raise_if_cancelled("等待 Freemail 验证码时任务已取消")
217218
try:
218219
mails = self._make_request("GET", "/api/emails", params={"mailbox": email, "limit": 20})
219220
if not isinstance(mails, list):
220-
time.sleep(poll_interval)
221+
self._sleep_with_cancel(poll_interval)
221222
continue
222223

223224
ordered_mails = self._sort_items_by_message_time(
@@ -299,7 +300,7 @@ def get_verification_code(
299300
raise
300301
logger.debug(f"检查 Freemail 邮件时出错: {e}")
301302

302-
time.sleep(poll_interval)
303+
self._sleep_with_cancel(poll_interval)
303304

304305
logger.warning(f"等待 Freemail 验证码超时: {email}")
305306
return None

src/services/imap_mail.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
from email.header import decode_header
1313
from typing import Any, Dict, Optional
1414

15-
from .base import BaseEmailService, EmailServiceError, OTPNoOpenAISenderEmailServiceError, get_email_code_settings
15+
from .base import BaseEmailService, OTPNoOpenAISenderEmailServiceError, get_email_code_settings
1616
from ..config.constants import (
1717
EmailServiceType,
1818
OTP_CODE_SEMANTIC_PATTERN,
@@ -124,11 +124,12 @@ def get_verification_code(
124124
mail.select("INBOX")
125125

126126
while time.time() - start_time < timeout:
127+
self._raise_if_cancelled("等待 IMAP 验证码时任务已取消")
127128
try:
128129
# 搜索所有未读邮件
129130
status, data = mail.search(None, "UNSEEN")
130131
if status != "OK" or not data or not data[0]:
131-
time.sleep(poll_interval)
132+
self._sleep_with_cancel(poll_interval)
132133
continue
133134

134135
msg_ids = data[0].split()
@@ -181,13 +182,13 @@ def get_verification_code(
181182
except Exception:
182183
pass
183184

184-
time.sleep(poll_interval)
185+
self._sleep_with_cancel(poll_interval)
185186

186187
except Exception as e:
187188
if isinstance(e, OTPNoOpenAISenderEmailServiceError):
188189
raise
189190
logger.warning(f"IMAP 连接/轮询失败: {e}")
190-
self.update_status(False, str(e))
191+
self.update_status(False, e)
191192
finally:
192193
if mail:
193194
try:

src/services/moe_mail.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -308,13 +308,14 @@ def get_verification_code(
308308
seen_message_ids = set()
309309

310310
while time.time() - start_time < timeout:
311+
self._raise_if_cancelled("等待自定义域名邮箱验证码时任务已取消")
311312
try:
312313
# 获取邮件列表
313314
response = self._make_request("GET", f"/api/emails/{target_email_id}")
314315

315316
messages = response.get("messages", [])
316317
if not isinstance(messages, list):
317-
time.sleep(poll_interval)
318+
self._sleep_with_cancel(poll_interval)
318319
continue
319320

320321
ordered_messages = self._sort_items_by_message_time(
@@ -380,7 +381,7 @@ def get_verification_code(
380381
logger.debug(f"检查邮件时出错: {e}")
381382

382383
# 等待一段时间再检查
383-
time.sleep(poll_interval)
384+
self._sleep_with_cancel(poll_interval)
384385

385386
logger.warning(f"等待验证码超时: {email}")
386387
return None

src/services/outlook/service.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,7 @@ def get_verification_code(
337337
poll_count = 0
338338

339339
while time.time() - start_time < actual_timeout:
340+
self._raise_if_cancelled("等待 Outlook 验证码时任务已取消")
340341
poll_count += 1
341342

342343
# 渐进式邮件检查:前 3 次只检查未读
@@ -387,7 +388,7 @@ def get_verification_code(
387388
logger.warning(f"[{email}] 检查出错: {e}")
388389

389390
# 等待下次轮询
390-
time.sleep(poll_interval)
391+
self._sleep_with_cancel(poll_interval)
391392

392393
elapsed = int(time.time() - start_time)
393394
logger.warning(f"[{email}] 验证码超时 ({actual_timeout}s),共轮询 {poll_count} 次")

src/services/temp_mail.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,7 @@ def get_verification_code(
309309
# jwt = cached.get("jwt")
310310

311311
while time.time() - start_time < timeout:
312+
self._raise_if_cancelled("等待 TempMail 验证码时任务已取消")
312313
try:
313314
# if jwt:
314315
# response = self._make_request(
@@ -327,7 +328,7 @@ def get_verification_code(
327328
# /user_api/mails 和 /admin/mails 返回格式相同: {"results": [...], "total": N}
328329
mails = response.get("results", [])
329330
if not isinstance(mails, list):
330-
time.sleep(poll_interval)
331+
self._sleep_with_cancel(poll_interval)
331332
continue
332333

333334
ordered_mails = self._sort_items_by_message_time(
@@ -389,7 +390,7 @@ def get_verification_code(
389390
raise
390391
logger.debug(f"检查 TempMail 邮件时出错: {e}")
391392

392-
time.sleep(poll_interval)
393+
self._sleep_with_cancel(poll_interval)
393394

394395
logger.warning(f"等待 TempMail 验证码超时: {email}")
395396
return None

src/services/tempmail.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,7 @@ def get_verification_code(
217217
seen_ids = set()
218218

219219
while time.time() - start_time < timeout:
220+
self._raise_if_cancelled("等待 Tempmail 验证码时任务已取消")
220221
try:
221222
# 获取邮件列表
222223
response = self.http_client.get(
@@ -226,7 +227,7 @@ def get_verification_code(
226227
)
227228

228229
if response.status_code != 200:
229-
time.sleep(poll_interval)
230+
self._sleep_with_cancel(poll_interval)
230231
continue
231232

232233
data = response.json()
@@ -239,7 +240,7 @@ def get_verification_code(
239240
email_list = data.get("emails", []) if isinstance(data, dict) else []
240241

241242
if not isinstance(email_list, list):
242-
time.sleep(poll_interval)
243+
self._sleep_with_cancel(poll_interval)
243244
continue
244245

245246
ordered_emails = self._sort_items_by_message_time(
@@ -302,7 +303,7 @@ def get_verification_code(
302303
logger.debug(f"检查邮件时出错: {e}")
303304

304305
# 等待一段时间再检查
305-
time.sleep(poll_interval)
306+
self._sleep_with_cancel(poll_interval)
306307

307308
logger.warning(f"等待验证码超时: {email}")
308309
return None

0 commit comments

Comments
 (0)