Skip to content

Commit 43270de

Browse files
authored
feat(datagen): rule2code (#11)
* feat: add script for fetching bandit rules * feat(datagen): guru2code * feat: add script for data post-processing * fix: gemini comments
1 parent 9c2fa31 commit 43270de

4 files changed

Lines changed: 715 additions & 4 deletions

File tree

datagen/rule2code/cwe2code.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,7 +275,7 @@ def datagen_for_one_cwe(cwe_id, markdown, depth, remote_api=False):
275275

276276
def main(
277277
parallel=256,
278-
output_path="outputs/rule2code/cwe2code-raw.jsonl",
278+
output_path="outputs/rule2code/cwe2code.jsonl",
279279
depth=1,
280280
remote_api=False,
281281
):

datagen/rule2code/get_bandit_rules.py

Lines changed: 186 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,189 @@
22
#
33
# SPDX-License-Identifier: Apache-2.0
44

5-
# TODO(@zhewang2001): Please refactor the corresponding code snippets and then upload it.
5+
"""
6+
Scrape all flake8-bandit (Sxxx) rules from the Ruff docs.
7+
8+
Output: bandit_rules.json ― list[{code,name,short_msg,url,full_text}]
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import json
14+
import re
15+
import time
16+
from pathlib import Path
17+
from typing import Any, Dict, Iterable, Optional
18+
from urllib.parse import urljoin
19+
20+
import fire
21+
import requests
22+
from bs4 import BeautifulSoup, Tag
23+
24+
SITE = "https://docs.astral.sh"
25+
RULES_DIR = f"{SITE}/ruff/rules/" # <-- NEW
26+
LISTING = f"{RULES_DIR}#flake8-bandit-s"
27+
HEADERS = {"User-Agent": "bandit-scraper/0.2 (+https://github.com/you)"}
28+
29+
SECTION_HEADINGS = {
30+
"what it does": "what_it_does",
31+
"why is this bad?": "why_bad",
32+
"example": "example_bad",
33+
"use instead:": "example_good",
34+
}
35+
36+
TITLE_RE = re.compile(r"^(?P<title>.+?)\s+\((?P<code>S\d{3})\)$", re.I)
37+
38+
BANDIT_RE = re.compile(r"\b[bB](\d{3})\b") # matches B605, b401, …
39+
40+
41+
def load_ruff_rules(path: str | Path = "bandit_rules.json") -> Dict[str, dict]:
42+
"""code → full rule dict (O(1) lookup)."""
43+
rules = json.loads(Path(path).read_text())
44+
return {r["code"]: r for r in rules} # e.g. "S605": {...}
45+
46+
47+
def bandit_id(text: str) -> Optional[str]:
48+
"""Return 'B605' (str) or None."""
49+
m = BANDIT_RE.search(text)
50+
return f"B{m.group(1)}" if m else None
51+
52+
53+
def ruff_code(bid: str) -> str:
54+
"""'B605' → 'S605' (flake8-bandit / Ruff code)."""
55+
return "S" + bid[1:]
56+
57+
58+
def enrich(recs: Iterable[dict], rules: Dict[str, Any]) -> Iterable[dict]:
59+
"""Yield each rec + attached Ruff rule (or None)."""
60+
for rec in recs:
61+
bid = bandit_id(rec["recommendation_text"])
62+
rc = ruff_code(bid) if bid else None
63+
rec["bandit_id"] = bid
64+
rec["ruff_code"] = rc
65+
rec["ruff_rule"] = rules.get(rc)
66+
yield rec
67+
68+
69+
def categorize_bandit_text(full_text: str) -> Dict[str, Optional[str]]:
70+
raw_lines = full_text.splitlines()
71+
lines = []
72+
73+
for line in raw_lines:
74+
if line.strip():
75+
lines.append(line.rstrip())
76+
elif lines and lines[-1].strip():
77+
lines.append("")
78+
79+
if not lines:
80+
raise ValueError("empty text")
81+
82+
m = TITLE_RE.match(lines[0].strip())
83+
if not m:
84+
raise ValueError(f"unexpected title line {lines[0]!r}")
85+
86+
out = {
87+
"code": m.group("code"),
88+
"title": m.group("title"),
89+
"what_it_does": None,
90+
"why_bad": None,
91+
"example_bad": None,
92+
"example_good": None,
93+
"remainder": None,
94+
}
95+
96+
current_key = "remainder"
97+
buf = []
98+
99+
def flush():
100+
if buf:
101+
text = "\n".join(buf).rstrip()
102+
if current_key in ["example_bad", "example_good"]:
103+
text = text.split("\nReferences")[0].rstrip()
104+
text = text.split("\nNote")[0].rstrip()
105+
text = text.split("\nOptions")[0].rstrip()
106+
elif current_key in ["what_it_does", "why_bad"]:
107+
text = " ".join(text.split())
108+
if out[current_key]:
109+
out[current_key] += "\n" + text
110+
else:
111+
out[current_key] = text
112+
buf.clear()
113+
114+
for ln in lines[1:]:
115+
key = SECTION_HEADINGS.get(ln.strip().lower())
116+
if key:
117+
flush()
118+
current_key = key
119+
continue
120+
buf.append(ln)
121+
flush()
122+
return out
123+
124+
125+
def get_soup(url: str) -> BeautifulSoup:
126+
r = requests.get(url, headers=HEADERS, timeout=30)
127+
r.raise_for_status()
128+
return BeautifulSoup(r.text, "html.parser")
129+
130+
131+
def bandit_table(doc: BeautifulSoup) -> Tag:
132+
h2 = doc.find(id="flake8-bandit-s")
133+
if not h2:
134+
raise RuntimeError("unable to find flake8-bandit section")
135+
return h2.find_next("table")
136+
137+
138+
def row_to_meta(tr: Tag) -> dict[str, str]:
139+
tds = tr.find_all("td")
140+
code = tds[0].text.strip()
141+
a = tds[1].find("a")
142+
rel = a["href"]
143+
url = urljoin(RULES_DIR, rel.lstrip("/")) # <-- FIX
144+
return {
145+
"code": code,
146+
"name": a.text.strip(),
147+
"short_msg": tds[2].get_text(" ", strip=True),
148+
"url": url,
149+
}
150+
151+
152+
def page_markdown(url: str) -> str:
153+
soup = get_soup(url)
154+
body = soup.find("article") or soup
155+
for n in body.select("nav, aside, footer"):
156+
n.decompose()
157+
158+
placeholders = []
159+
for pre in body.find_all("pre"):
160+
placeholders.append(pre.get_text(separator="", strip=False))
161+
pre.replace_with(f"__PRE_PLACEHOLDER_{len(placeholders)-1}__")
162+
163+
text = body.get_text("\n", strip=False)
164+
text = re.sub(r"\n{3,}", "\n\n", text)
165+
166+
for i, content in enumerate(placeholders):
167+
text = text.replace(f"__PRE_PLACEHOLDER_{i}__", content)
168+
169+
return text
170+
171+
172+
def main(output_file: str = "bandit_rules.json") -> None:
173+
soup = get_soup(LISTING)
174+
rows = bandit_table(soup).tbody.find_all("tr")
175+
result = []
176+
for tr in rows:
177+
meta = row_to_meta(tr)
178+
try:
179+
meta["full_text"] = categorize_bandit_text(page_markdown(meta["url"]))
180+
except requests.HTTPError as e:
181+
print(f"[WARN] {meta['code']}: {e}")
182+
continue
183+
result.append(meta)
184+
time.sleep(0.3)
185+
Path(output_file).write_text(json.dumps(result, indent=2, ensure_ascii=False))
186+
print(f"✓ scraped {len(result)} rules → {output_file}")
187+
188+
189+
if __name__ == "__main__":
190+
fire.Fire(main)

0 commit comments

Comments
 (0)