Skip to content

Commit 1b708df

Browse files
committed
同步日志入库
1 parent 4f3eef5 commit 1b708df

1 file changed

Lines changed: 90 additions & 55 deletions

File tree

app/tasks/sqlmap_worker.py

Lines changed: 90 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
SqlmapScanPayload,
1111
ScanStatus,
1212
SqlmapScanResult,
13+
SqlmapScanLog,
1314
)
1415
from app.core.sqlmap_core import celery_task_add
1516

@@ -59,83 +60,118 @@ def normalize_sqlmap_result(raw: dict) -> dict:
5960
return result
6061

6162

62-
# 轮询获取运行状态信息
63+
def fetch_sqlmap_logs(session, task: SqlmapScanPayload):
64+
resp = requests.get(
65+
f"{SQLMAP_API}/scan/{task.task_id}/log",
66+
auth=AUTH,
67+
)
68+
if not resp.ok:
69+
return
70+
71+
logs = resp.json().get("log", [])
72+
73+
# 已存在日志(避免重复写)
74+
existing = {
75+
(l.log_time, l.message)
76+
for l in session.query(SqlmapScanLog)
77+
.filter(SqlmapScanLog.task_id == task.task_id)
78+
.all()
79+
}
80+
81+
for log in logs:
82+
key = (log.get("time"), log.get("message"))
83+
if key in existing:
84+
continue
85+
86+
session.add(
87+
SqlmapScanLog(
88+
task_id=task.task_id,
89+
level=log.get("level", "INFO"),
90+
message=log.get("message"),
91+
log_time=log.get("time"),
92+
celery_task_id=task.celery_task_id,
93+
)
94+
)
95+
96+
97+
def fetch_sqlmap_result(session, task_id: str):
98+
resp = requests.get(
99+
f"{SQLMAP_API}/scan/{task_id}/data",
100+
auth=AUTH,
101+
)
102+
if not resp.ok:
103+
return
104+
105+
data = resp.json().get("data", [])
106+
107+
result = SqlmapScanResult(
108+
target_url="",
109+
vulnerable=bool(data),
110+
raw_output=data,
111+
started_at=datetime.utcnow(),
112+
finished_at=datetime.utcnow(),
113+
command="sqlmap api scan",
114+
)
115+
116+
session.add(result)
117+
118+
119+
# 轮询运行状态任务
63120
@shared_task(
64121
bind=True,
65-
autoretry_for=(Exception,),
122+
autoretry_for=(requests.RequestException,),
66123
retry_backoff=5,
67124
retry_kwargs={"max_retries": 3},
68125
)
69-
def poll_single_sqlmap_task(self, task_id: str):
126+
def poll_single_sqlmap_task(self, sqlmap_task_id: str):
70127
session = SessionLocal()
128+
71129
try:
72130
task = (
73131
session.query(SqlmapScanPayload)
74-
.filter(SqlmapScanPayload.task_id == task_id)
132+
.filter(SqlmapScanPayload.task_id == sqlmap_task_id)
75133
.first()
76134
)
77-
78135
if not task:
79136
return
80137

81-
# 查询 sqlmap task 状态
82-
resp = requests.get(
83-
f"{SQLMAP_API}/scan/{task_id}/status",
84-
timeout=10,
138+
# 查询扫描状态
139+
status_resp = requests.get(
140+
f"{SQLMAP_API}/scan/{sqlmap_task_id}/status",
85141
auth=AUTH,
86142
)
87-
resp.raise_for_status()
88-
status_data = resp.json()
89143

90-
status = status_data.get("status")
91-
92-
if status == "running":
93-
task.status = ScanStatus.running
144+
if status_resp.status_code != 200:
145+
task.status = ScanStatus.failed
94146
session.commit()
95147
return
96148

97-
if status != "terminated":
149+
status_json = status_resp.json()
150+
if not status_json.get("success"):
151+
task.status = ScanStatus.failed
152+
session.commit()
98153
return
99154

100-
# 获取扫描结果
101-
result_resp = requests.get(
102-
f"{SQLMAP_API}/scan/{task_id}/data",
103-
timeout=30,
104-
auth=AUTH,
105-
)
106-
result_resp.raise_for_status()
107-
data = result_resp.json()
108-
109-
print(data)
110-
111-
# # 展平sqlmap返回日志
112-
# normalized = normalize_sqlmap_result(data)
113-
#
114-
# print(normalized)
115-
#
116-
# # 解析 sqlmap 返回
117-
# scan_result = SqlmapScanResult(
118-
# target_url=normalized["data"]["target"]["url"],
119-
# dbms=normalized["data"]["dbms"].get("name"),
120-
# vulnerable=bool(normalized["data"]["injections"]),
121-
# injection_points=normalized["data"]["injections"],
122-
# dump_data=None, # 后续支持 sqlmap dump 再填
123-
# raw_output=normalized,
124-
# command="",
125-
# started_at=datetime.utcnow(),
126-
# finished_at=datetime.utcnow(),
127-
# )
128-
#
129-
# session.add(scan_result)
130-
# task.status = ScanStatus.success
131-
#
132-
# session.commit()
133-
134-
except Exception:
135-
session.rollback()
136-
task.status = ScanStatus.failed
155+
sqlmap_status = status_json["status"]
156+
157+
# 状态同步
158+
if sqlmap_status == "running":
159+
task.status = ScanStatus.running
160+
161+
elif sqlmap_status in ("terminated", "not running"):
162+
task.status = ScanStatus.success
163+
task.finished_at = datetime.utcnow()
164+
fetch_sqlmap_result(session, sqlmap_task_id)
165+
166+
elif sqlmap_status == "error":
167+
task.status = ScanStatus.failed
168+
task.finished_at = datetime.utcnow()
169+
170+
# 同步写入日志
171+
fetch_sqlmap_logs(session, task)
172+
137173
session.commit()
138-
raise
174+
139175
finally:
140176
session.close()
141177

@@ -177,7 +213,6 @@ def sqlmap_scan_task(self, payload: dict):
177213

178214
return {
179215
"celery_task_id": self.request.id,
180-
"sqlmap_task_id": sqlmap_task_id,
181216
}
182217

183218
except Exception as e:

0 commit comments

Comments
 (0)