Skip to content

Commit 2bff400

Browse files
committed
v2.3.11b
1 parent 3fe61b0 commit 2bff400

40 files changed

Lines changed: 2203 additions & 1479 deletions

backend/.gitignore

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,5 @@ trans.sh
3232
extract_messages.py
3333
babel.cfg
3434
legacy/
35-
jwt
35+
jwt
36+
response_data

backend/TASKIQ.md

Lines changed: 6 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
## 架构
66

7-
- **Broker**: Redis Stream 作为消息队列 (redis://localhost:6379/3)
7+
- **Broker**: RabbitMQ 作为消息队列
88
- **Result Backend**: Redis 存储任务执行结果
99
- **Scheduler**: 独立进程管理定时任务
1010
- **Worker**: 独立进程执行任务
@@ -18,25 +18,17 @@
1818

1919
## 运行方式
2020

21-
### 1. 安装依赖
22-
```bash
23-
cd backend
24-
pip install -r requirements.txt
25-
# 或者直接安装新增依赖
26-
pip install taskiq-scheduler>=0.4.0
27-
```
28-
29-
### 2. 启动 Worker 进程(执行任务)
21+
### 启动 Worker 进程(执行任务)
3022
```bash
3123
cd backend
3224
taskiq worker app.tasks.broker:broker --fs-discover
3325
```
3426

3527
参数说明:
3628
- `--fs-discover`: 自动发现任务文件
37-
- `--workers N`: 指定 Worker 进程数,默认 1
29+
- `--workers N`: 指定 Worker 进程数,默认 2
3830

39-
### 3. 启动 Scheduler 进程(调度定时任务)
31+
### 启动 Scheduler 进程(调度定时任务)
4032
```bash
4133
cd backend
4234
taskiq scheduler app.tasks.scheduler:scheduler --fs-discover
@@ -46,7 +38,7 @@ taskiq scheduler app.tasks.scheduler:scheduler --fs-discover
4638
- `--skip-first-run`: 启动时跳过立即执行所有任务
4739
- `--update-interval N`: 任务调度检查间隔,默认 5 秒
4840

49-
### 4. 启动 FastAPI 主服务(原有服务)
41+
### 启动 FastAPI 主服务(原有服务)
5042
```bash
5143
cd backend
5244
python dev.py
@@ -56,72 +48,13 @@ python dev.py
5648

5749
建议使用 systemd 或 supervisor 管理进程:
5850

59-
### Worker 服务配置示例 (`taskiq-worker.service`)
60-
```ini
61-
[Unit]
62-
Description=Taskiq Worker
63-
After=network.target redis.service
64-
65-
[Service]
66-
User=www-data
67-
WorkingDirectory=/path/to/backend
68-
ExecStart=/path/to/.venv/bin/taskiq worker app.tasks.broker:broker --fs-discover --workers 2
69-
Restart=always
70-
RestartSec=5
71-
72-
[Install]
73-
WantedBy=multi-user.target
74-
```
75-
76-
### Scheduler 服务配置示例 (`taskiq-scheduler.service`)
77-
```ini
78-
[Unit]
79-
Description=Taskiq Scheduler
80-
After=network.target redis.service
81-
82-
[Service]
83-
User=www-data
84-
WorkingDirectory=/path/to/backend
85-
ExecStart=/path/to/.venv/bin/taskiq scheduler app.tasks.scheduler:scheduler --fs-discover
86-
Restart=always
87-
RestartSec=5
88-
89-
[Install]
90-
WantedBy=multi-user.target
91-
```
92-
9351
## 监控
9452

9553
### 查看任务执行结果
9654
```python
9755
from app.tasks.broker import broker
9856

9957
# 获取任务结果
100-
result = await broker.get_result(task_id)
58+
result = await broker.with_result(task_id)
10159
print(result.return_value)
10260
```
103-
104-
### 任务重试配置
105-
`broker.py` 中添加重试中间件:
106-
```python
107-
from taskiq import TaskiqMiddleware
108-
from taskiq.retry import RetryMiddleware
109-
110-
broker = RedisStreamBroker(
111-
url="redis://localhost:6379/3",
112-
).with_result_backend(result_backend).with_middlewares(
113-
RetryMiddleware(
114-
max_retries=3,
115-
delay=1.0,
116-
)
117-
)
118-
```
119-
120-
## 从 APScheduler 迁移说明
121-
122-
1. **原有 APScheduler 代码已从 `main.py` 中移除**
123-
2. **原有任务逻辑保持不变**,仅做了最小修改以适配 taskiq
124-
3. **任务调度规则完全保持一致**
125-
- 数据迁移任务:每 10 分钟执行一次
126-
- RSS 刷新任务:每天上午 10 点执行一次
127-
4. **异常处理逻辑保持不变**,任务失败时会自动重试,Redis 数据不会丢失

backend/app/configs/config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ class Settings(BaseSettings):
3939
VITE_JS_API_TOKEN: str = ""
4040
AMAP_SECURITY_CODE: str = ""
4141
AMAP_WEB_KEY: str = ""
42+
JWT_PRIVATE_KEY: str = ""
4243

4344
model_config = SettingsConfigDict(
4445
env_file=get_env_file_path(),
@@ -65,3 +66,7 @@ def get_settings() -> Settings:
6566
print(f"SEND_BOOT_EMAIL: {get_settings().SEND_BOOT_EMAIL}")
6667
print(f"ADMIN_EMAIL: {get_settings().ADMIN_EMAIL}")
6768
print(f"FEISHU_WEBHOOK_URL: {get_settings().FEISHU_WEBHOOK_URL}")
69+
print(f"VITE_JS_API_TOKEN: {get_settings().VITE_JS_API_TOKEN}")
70+
print(f"AMAP_SECURITY_CODE: {get_settings().AMAP_SECURITY_CODE}")
71+
print(f"AMAP_WEB_KEY: {get_settings().AMAP_WEB_KEY}")
72+
print(f"JWT_PRIVATE_KEY: {get_settings().JWT_PRIVATE_KEY}")

backend/app/dependencies/csrf.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ async def csrf_middleware(request: Request, call_next):
125125
"/api/v1/auth/email/code",
126126
"/api/v1/admin/track",
127127
"/api/v1/admin/deploy",
128+
"/api/v1/qweather/tide",
128129
]
129130

130131
if any(request.url.path.startswith(path) for path in skip_paths):

backend/app/main.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,12 +33,12 @@
3333
messages,
3434
monitor,
3535
public,
36+
publish,
3637
rss,
3738
todos,
3839
users,
3940
weread,
4041
)
41-
from app.tasks import send_bootstrap_emails
4242
from app.tasks.broker import broker
4343
from app.tasks.task import send_feishu_message
4444
from app.utils.cache import close_cache_redis
@@ -84,7 +84,7 @@ async def lifespan(app: FastAPI):
8484
if lock_acquired:
8585
try:
8686
await send_feishu_message.kiq()
87-
await send_bootstrap_emails.kiq(admin_email=admin_email)
87+
# await send_bootstrap_emails.kiq(admin_email=admin_email)
8888
app_logger.info("✅启动通知任务已添加到队列")
8989
except Exception as e:
9090
logger.warning(f"Failed to queue bootstrap email: {e!s}")
@@ -128,7 +128,9 @@ async def lifespan(app: FastAPI):
128128
app.include_router(rss.router, prefix="/api/v1")
129129
app.include_router(monitor.router, prefix="/api/v1")
130130
app.include_router(aiagent.router, prefix="/api/v1")
131+
app.include_router(publish.router, prefix="/api/v1")
131132

133+
# 统一注册全局异常处理器
132134
register_exception_handlers(app)
133135

134136
app.add_middleware(
@@ -144,6 +146,7 @@ async def lifespan(app: FastAPI):
144146
"https://kanocifer.chat",
145147
]
146148

149+
# 配置 CORS 中间件,允许前端访问 API,并暴露 Set-Cookie 头以支持跨域认证
147150
app.add_middleware(
148151
CORSMiddleware,
149152
allow_origins=origins,
@@ -160,6 +163,12 @@ async def add_process_time_header(request: Request, call_next):
160163
start_time: float = time.perf_counter()
161164
response = await call_next(request)
162165
process_time: float = round(time.perf_counter() - start_time, 6)
166+
167+
# 记录长时间异常请求
168+
if process_time > 1.0: # 1秒以上的请求视为慢请求
169+
app_logger.warning(
170+
f"Request to {request.url.path} took {process_time}s and returned status code {response.status_code}"
171+
)
163172
response.headers["X-Process-Time"] = f"{process_time}s"
164173
return response
165174

@@ -173,6 +182,7 @@ async def add_process_time_header(request: Request, call_next):
173182
name="media",
174183
)
175184

185+
# 注册慢速API的速率限制异常处理器
176186
app.add_exception_handler(
177187
exc_class_or_status_code=RateLimitExceeded,
178188
handler=_rate_limit_exceeded_handler, # type: ignore

backend/app/routers/public.py

Lines changed: 112 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
from xml.etree.ElementTree import Element, SubElement, tostring
1414

1515
import httpx
16+
import orjson
1617
from fastapi import APIRouter, Body, Depends, Request
1718
from fastapi.responses import JSONResponse, PlainTextResponse
1819
from redis.asyncio import Redis as AsyncRedis
@@ -22,6 +23,7 @@
2223
from app.dependencies.mongo import get_mongo_db
2324
from app.dependencies.redis import get_redis
2425
from app.schemas.response import APIResponse
26+
from app.utils.qweather_jwt import encoded_jwt
2527

2628
router = APIRouter(tags=["public"])
2729

@@ -325,6 +327,7 @@ async def get_weather(
325327
request: Request,
326328
city: str = Body(..., description="City adcode"),
327329
extensions: str = Body("base", description="Weather type: base/all"),
330+
redis: AsyncRedis = Depends(get_redis),
328331
) -> JSONResponse:
329332
"""Get weather information from Amap API.
330333
@@ -335,6 +338,15 @@ async def get_weather(
335338
Returns:
336339
JSONResponse: Weather data from Amap API
337340
"""
341+
342+
# 尝试命中缓存
343+
cache_key = f"weather:{city}:{extensions}"
344+
cached_data = await redis.get(cache_key)
345+
if cached_data:
346+
return APIResponse.ok(
347+
data=orjson.loads(cached_data),
348+
message="Weather information retrieved from cache",
349+
)
338350
url = "https://restapi.amap.com/v3/weather/weatherInfo"
339351
params = {
340352
"key": get_settings().AMAP_WEB_KEY,
@@ -345,6 +357,10 @@ async def get_weather(
345357
response = await client.get(url, params=params)
346358
data = response.json()
347359

360+
# Redis缓存天气数据,过期时间60分钟
361+
await redis.set(
362+
f"weather:{city}:{extensions}", orjson.dumps(data), ex=60 * 60
363+
)
348364
return APIResponse.ok(
349365
data=data,
350366
message="Weather information retrieved successfully",
@@ -383,27 +399,99 @@ async def reverse_geocode(
383399
)
384400

385401

386-
# @router.post("/geocode/regeo")
387-
# @limiter.limit("100/hour")
388-
# async def reverse_geocode(
389-
# request: Request,
390-
# params: dict = Body(..., description="Reverse geocode parameters"),
391-
# ) -> JSONResponse:
392-
# """Reverse geocode coordinates to address using Amap API.
393-
394-
# Args:
395-
# params: Reverse geocode parameters including location coordinates
396-
397-
# Returns:
398-
# JSONResponse: Address information including city adcode
399-
# """
400-
# url = "https://restapi.amap.com/v3/geocode/regeo"
401-
# params["key"] = get_settings().AMAP_WEB_KEY
402-
# async with httpx.AsyncClient() as client:
403-
# response = await client.get(url, params=params)
404-
# data = response.json()
405-
406-
# return APIResponse.ok(
407-
# data=data,
408-
# message="Reverse geocode completed successfully",
409-
# )
402+
@router.get("/qweather/tide")
403+
@limiter.limit("100/hour")
404+
async def get_qweather(
405+
request: Request,
406+
redis: AsyncRedis = Depends(get_redis),
407+
) -> JSONResponse:
408+
"""Get weather information from QWeather API."""
409+
410+
# 尝试命中缓存
411+
now = datetime.now().strftime("%Y%m%d")
412+
cache_key = f"qweather:tide:P2352:{now}"
413+
cached_data = await redis.get(cache_key)
414+
if cached_data:
415+
return APIResponse.ok(
416+
data=orjson.loads(cached_data),
417+
message="QWeather information retrieved from cache",
418+
)
419+
try:
420+
url = "https://qk2tupqwuj.re.qweatherapi.com/v7/ocean/tide"
421+
headers = {
422+
"Authorization": f"Bearer {encoded_jwt}",
423+
}
424+
now = datetime.now().strftime("%Y%m%d")
425+
payload = {
426+
"location": "P2352", # 黄埔港
427+
"date": now,
428+
}
429+
async with httpx.AsyncClient() as client:
430+
response = await client.get(url, headers=headers, params=payload)
431+
try:
432+
response.raise_for_status()
433+
except httpx.HTTPStatusError:
434+
return APIResponse.error(
435+
message=f"QWeather API error: {response.status_code} - {response.text}",
436+
code=response.status_code,
437+
)
438+
data = response.json()
439+
440+
# 缓存数据8小时,过期后自动删除
441+
cache_key = f"qweather:tide:{payload['location']}:{payload['date']}"
442+
await redis.set(cache_key, orjson.dumps(data), ex=8 * 3600)
443+
return APIResponse.ok(
444+
data=data,
445+
message="QWeather information retrieved successfully",
446+
)
447+
except httpx.HTTPError as e:
448+
return APIResponse.error(
449+
message=f"Failed to fetch QWeather data: {e!s}",
450+
code=503,
451+
)
452+
except Exception as e:
453+
return APIResponse.error(
454+
message=f"Internal server error: {e!s}",
455+
code=500,
456+
)
457+
458+
459+
@router.get("/qweather/location")
460+
@limiter.limit("100/hour")
461+
async def get_qweather_location(
462+
request: Request,
463+
location: str,
464+
type: str = "scenic",
465+
) -> JSONResponse:
466+
"""Get location information from QWeather API."""
467+
try:
468+
url = "https://qk2tupqwuj.re.qweatherapi.com/geo/v2/poi/lookup"
469+
headers = {
470+
"Authorization": f"Bearer {encoded_jwt}",
471+
}
472+
params = {"location": location, "type": type}
473+
async with httpx.AsyncClient() as client:
474+
response = await client.get(url, headers=headers, params=params)
475+
try:
476+
response.raise_for_status()
477+
except httpx.HTTPStatusError:
478+
return APIResponse.error(
479+
message=f"QWeather API error: {response.status_code} - {response.text}",
480+
code=response.status_code,
481+
)
482+
data = response.json()
483+
484+
return APIResponse.ok(
485+
data=data,
486+
message="QWeather location information retrieved successfully",
487+
)
488+
except httpx.HTTPError as e:
489+
return APIResponse.error(
490+
message=f"Failed to fetch QWeather location: {e!s}",
491+
code=503,
492+
)
493+
except Exception as e:
494+
return APIResponse.error(
495+
message=f"Internal server error: {e!s}",
496+
code=500,
497+
)

backend/app/routers/publish.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
from __future__ import annotations
2+
3+
from fastapi import APIRouter
4+
5+
router = APIRouter(prefix="/publish", tags=["publish"])

0 commit comments

Comments
 (0)