Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
396 changes: 356 additions & 40 deletions src/skillspector/cli.py

Large diffs are not rendered by default.

56 changes: 47 additions & 9 deletions src/skillspector/input_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
import tempfile
import zipfile
from pathlib import Path
from urllib.parse import urlparse
from urllib.parse import urljoin, urlparse

import httpx

Expand All @@ -54,13 +54,34 @@
ALLOWED_DOWNLOAD_HOSTS = frozenset(
{
"github.com",
"codeload.github.com",
"raw.githubusercontent.com",
"gitlab.com",
"bitbucket.org",
"huggingface.co",
}
)

_DIRECT_FILE_URL_SUFFIXES = (
".md",
".py",
".sh",
".bash",
".zsh",
".js",
".ts",
".rb",
".go",
".rs",
".pl",
".json",
".yaml",
".yml",
".toml",
".txt",
".zip",
)


def _is_private_ip(host: str) -> bool:
"""Return True if host resolves to a private/reserved IP address."""
Expand Down Expand Up @@ -147,7 +168,11 @@ def _is_git_url(self, path: str) -> bool:
parsed = urlparse(path)
host = parsed.hostname or ""
if any(allowed in host for allowed in ALLOWED_GIT_HOSTS):
if "/raw/" in path or "/blob/" in path or path.endswith((".md", ".py", ".sh")):
if (
"/raw/" in path
or "/blob/" in path
or path.lower().endswith(_DIRECT_FILE_URL_SUFFIXES)
):
return False
return True
if path.endswith(".git"):
Expand Down Expand Up @@ -208,15 +233,12 @@ def _clone_git(self, url: str) -> Path:

def _download_file(self, url: str) -> Path:
"""Download a file from URL to a temporary directory."""
self._validate_url_host(url, ALLOWED_DOWNLOAD_HOSTS)
temp_dir = self._get_temp_dir()
parsed = urlparse(url)
filename = Path(parsed.path).name or "SKILL.md"
try:
with httpx.Client(follow_redirects=False, timeout=30) as client:
response = client.get(url)
response.raise_for_status()
content = response.content
response, final_url = self._download_with_redirect_validation(url)
parsed = urlparse(final_url)
filename = Path(parsed.path).name or "SKILL.md"
content = response.content
except httpx.HTTPError as e:
logger.warning("Download failed for %s: %s", url, e)
raise ValueError(f"Failed to download file: {e}") from e
Expand All @@ -230,6 +252,22 @@ def _download_file(self, url: str) -> Path:
file_path.write_bytes(content)
return temp_dir

def _download_with_redirect_validation(self, url: str) -> tuple[httpx.Response, str]:
current_url = url
for _ in range(5):
self._validate_url_host(current_url, ALLOWED_DOWNLOAD_HOSTS)
with httpx.Client(follow_redirects=False, timeout=30) as client:
response = client.get(current_url)
if response.status_code in {301, 302, 303, 307, 308}:
location = response.headers.get("location")
if not location:
raise ValueError(f"Redirect response missing location: {current_url}")
current_url = urljoin(current_url, location)
continue
response.raise_for_status()
return response, current_url
raise ValueError(f"Too many redirects while downloading: {url}")

def _extract_zip(self, zip_path: Path) -> Path:
"""Extract a zip file to a temporary directory with path traversal protection."""
if not zip_path.exists():
Expand Down
4 changes: 4 additions & 0 deletions src/skillspector/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ class Finding:
tags: list[str] = field(default_factory=list)
context: str | None = None
matched_text: str | None = None
transitive_depth: int = 0
source_url: str | None = None

def to_dict(self) -> dict[str, object]:
"""Return a JSON-serializable dict representation (full finding shape)."""
Expand All @@ -104,6 +106,8 @@ def to_dict(self) -> dict[str, object]:
# Tags surface markers like "llm-unconfirmed" (a high-severity static
# finding the LLM filter did not confirm but which is preserved anyway).
"tags": list(self.tags),
"transitive_depth": self.transitive_depth,
"source_url": self.source_url,
}

def __str__(self) -> str:
Expand Down
7 changes: 7 additions & 0 deletions src/skillspector/nodes/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ def _build_sarif(
results: list[SarifResult] = []
seen_rule_ids: dict[str, str] = {}

def _finding_properties(finding: Finding) -> dict[str, object]:
return {"transitiveDepth": finding.transitive_depth, "sourceUrl": finding.source_url}

for finding in findings:
if not finding.rule_id or not finding.message:
continue
Expand All @@ -225,6 +228,7 @@ def _build_sarif(
)
)
],
properties=_finding_properties(finding),
)
)
if finding.rule_id not in seen_rule_ids:
Expand All @@ -251,6 +255,7 @@ def _build_sarif(
)
)
],
properties=_finding_properties(finding),
suppressions=[SarifSuppression(kind="external", justification=sf.reason)],
)
)
Expand Down Expand Up @@ -552,6 +557,8 @@ def _format_markdown(
lines.append(f"### {emoji} {sev}: {f.rule_id}\n")
end = f"–{f.end_line}" if f.end_line and f.end_line != f.start_line else ""
lines.append(f"**Location:** `{f.file}:{f.start_line}{end}` ")
if f.transitive_depth > 0 and f.source_url:
lines.append(f"**Transitive:** depth={f.transitive_depth}, source={f.source_url} ")
lines.append(f"**Confidence:** {f.confidence:.0%} ")
lines.append("")
lines.append(f"**Message:** {f.message}")
Expand Down
1 change: 1 addition & 0 deletions src/skillspector/sarif_models.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ class SarifResult(BaseModel):
# When present, the result is suppressed; SARIF consumers (e.g. GitHub code
# scanning) exclude suppressed results from counts but keep them for audit.
suppressions: list[SarifSuppression] | None = None
properties: dict[str, object] | None = None


class SarifReportingDescriptor(BaseModel):
Expand Down
2 changes: 2 additions & 0 deletions src/skillspector/suppression.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ def finding_fingerprint(finding: Finding) -> str:
str(finding.start_line or ""),
str(finding.end_line or ""),
(finding.message or "").strip(),
finding.source_url or "",
str(finding.transitive_depth or 0),
]
)
digest = hashlib.sha256(raw.encode("utf-8")).hexdigest()[:16]
Expand Down
Loading