|
1 | | -import requests |
| 1 | +import os |
2 | 2 | from datetime import datetime |
| 3 | + |
| 4 | +import requests |
3 | 5 | from celery import shared_task |
| 6 | +from fastapi import HTTPException |
| 7 | + |
4 | 8 | from app.database.celery_sync_database import SessionLocal |
5 | 9 | from app.models.sqlmap_result import ( |
6 | 10 | SqlmapScanPayload, |
7 | 11 | ScanStatus, |
8 | 12 | SqlmapScanResult, |
9 | 13 | ) |
10 | | -import os |
11 | 14 |
|
12 | 15 | SQLMAP_API = os.getenv("SQLMAP_API") |
13 | 16 | AUTH = (os.getenv("SQLMAP_USERNAME"), os.getenv("SQLMAP_PASSWORD")) # Basic Auth |
14 | 17 |
|
15 | 18 |
|
| 19 | +# 展平sqlmap日志 |
16 | 20 | def normalize_sqlmap_result(raw: dict) -> dict: |
17 | 21 | result = { |
18 | 22 | "success": raw.get("success", False), |
@@ -130,3 +134,31 @@ def poll_single_sqlmap_task(self, task_id: str): |
130 | 134 | raise |
131 | 135 | finally: |
132 | 136 | session.close() |
| 137 | + |
| 138 | + |
| 139 | +# 用户手动创建扫描任务 |
| 140 | +@shared_task( |
| 141 | + bind=True, |
| 142 | + autoretry_for=(Exception,), |
| 143 | + retry_backoff=5, |
| 144 | + retry_kwargs={"max_retries": 3}, |
| 145 | +) |
| 146 | +def sqlmap_scan_task(self, payload: dict): |
| 147 | + session = SessionLocal() |
| 148 | + r = requests.get(f"{SQLMAP_API}/task/new", auth=AUTH) |
| 149 | + if not r.ok: |
| 150 | + raise HTTPException(500, "sqlmap task 创建失败") |
| 151 | + |
| 152 | + taskid = r.json()["taskid"] |
| 153 | + |
| 154 | + # 2. 启动扫描 |
| 155 | + start = requests.post( |
| 156 | + f"{SQLMAP_API}/scan/{taskid}/start", |
| 157 | + json=payload, # json转换问题 |
| 158 | + auth=AUTH, |
| 159 | + ) |
| 160 | + |
| 161 | + if not start.ok: |
| 162 | + raise HTTPException(500, start.text) |
| 163 | + |
| 164 | + return {"taskid": taskid} |
0 commit comments