Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/specify_cli/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def ensure_executable_scripts(project_path: Path, tracker: StepTracker | None =
with script.open("rb") as f:
if f.read(2) != b"#!":
continue
except Exception:
except OSError:
continue
st = script.stat()
mode = st.st_mode
Expand Down
4 changes: 2 additions & 2 deletions src/specify_cli/_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def get_speckit_version() -> str:
"""Get current spec-kit version."""
try:
return importlib.metadata.version("specify-cli")
except Exception:
except importlib.metadata.PackageNotFoundError:
# Fallback: try reading from pyproject.toml
try:
import tomllib
Expand All @@ -114,7 +114,7 @@ def get_speckit_version() -> str:
with open(pyproject_path, "rb") as f:
data = tomllib.load(f)
return data.get("project", {}).get("version", "unknown")
except Exception:
except (OSError, tomllib.TOMLDecodeError, TypeError):
# Intentionally ignore any errors while reading/parsing pyproject.toml.
# If this lookup fails for any reason, we fall back to returning "unknown" below.
pass
Expand Down
2 changes: 1 addition & 1 deletion src/specify_cli/_console.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def _maybe_refresh(self):
if self._refresh_cb:
try:
self._refresh_cb()
except Exception:
except Exception: # nosec B110
pass

def render(self):
Expand Down
8 changes: 5 additions & 3 deletions src/specify_cli/_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import os
import shutil
import stat
import subprocess
import subprocess # nosec B404
import tempfile
import yaml
from pathlib import Path, PurePosixPath, PureWindowsPath
Expand Down Expand Up @@ -86,10 +86,12 @@ def run_command(

try:
if capture:
result = subprocess.run(cmd, check=check_return, capture_output=True, text=True)
result = subprocess.run( # nosec B603
cmd, check=check_return, capture_output=True, text=True
)
return result.stdout.strip()
else:
subprocess.run(cmd, check=check_return)
subprocess.run(cmd, check=check_return) # nosec B603
return None
except subprocess.CalledProcessError as e:
if check_return:
Expand Down
15 changes: 8 additions & 7 deletions src/specify_cli/_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import re
import shlex
import shutil
import subprocess
import subprocess # nosec B404
import sys
import urllib.error
import urllib.parse
Expand Down Expand Up @@ -407,7 +407,7 @@ def _uv_tool_list_contains_specify_cli(stdout: str) -> bool:
if not line:
continue
first_token = line.split(None, 1)[0]
if first_token == "specify-cli":
if first_token == "specify-cli": # nosec B105
return True
return False

Expand Down Expand Up @@ -527,7 +527,7 @@ def _detect_install_method(
if uv_bin is not None:
consulted.append("uv tool list")
try:
result = subprocess.run(
result = subprocess.run( # nosec B603
[uv_bin, "tool", "list"],
capture_output=True,
text=True,
Expand All @@ -547,7 +547,7 @@ def _detect_install_method(
if pipx_bin is not None:
consulted.append("pipx list --json")
try:
result = subprocess.run(
result = subprocess.run( # nosec B603
[pipx_bin, "list", "--json"],
capture_output=True,
text=True,
Expand Down Expand Up @@ -819,7 +819,7 @@ def _run_installer(plan: _UpgradePlan) -> _InstallerResult:
timeout = None

try:
completed = subprocess.run(
completed = subprocess.run( # nosec B603
plan.installer_argv,
shell=False,
check=False,
Expand Down Expand Up @@ -882,7 +882,7 @@ def _verify_upgrade(plan: _UpgradePlan) -> str | None:
if specify_bin is None:
return None
try:
result = subprocess.run(
result = subprocess.run( # nosec B603
[specify_bin, "--version"],
shell=False,
check=False,
Expand Down Expand Up @@ -1156,7 +1156,8 @@ def self_check() -> None:
# Graceful-failure path (FR-008). `failure_reason` is one of the
# enumerated strings produced by _fetch_latest_release_tag() — it
# never contains a URL, headers, response body, or traceback.
assert failure_reason is not None
if failure_reason is None:
failure_reason = "unknown"
console.print(f"Installed: {installed}")
console.print(f"[yellow]Could not check latest release:[/yellow] {failure_reason}")
return
Expand Down
27 changes: 20 additions & 7 deletions src/specify_cli/authentication/azure_devops.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@
import base64
import json as _json
import os
import subprocess
import subprocess # nosec B404
from typing import TYPE_CHECKING
from urllib.parse import quote

from .base import AuthProvider

Expand All @@ -15,6 +16,17 @@

# Azure DevOps resource ID for OAuth / Azure AD token acquisition.
_ADO_RESOURCE_ID = "499b84ac-1321-427f-aa17-267ca6975798"
_MICROSOFT_LOGIN_HOST = "login.microsoftonline.com"


def _is_safe_tenant_segment(tenant_id: str) -> bool:
"""Return True when *tenant_id* cannot alter the Azure token URL path."""
tenant_id = tenant_id.strip()
if not tenant_id or tenant_id in (".", ".."):
return False
if any(ord(ch) < 32 or ord(ch) == 127 for ch in tenant_id):
return False
return quote(tenant_id, safe="") == tenant_id


class AzureDevOpsAuth(AuthProvider):
Expand Down Expand Up @@ -56,7 +68,7 @@ def resolve_token(self, entry: AuthConfigEntry) -> str | None:
def _acquire_via_az_cli() -> str | None:
"""Run ``az account get-access-token`` and return the access token."""
try:
result = subprocess.run( # noqa: S603, S607
result = subprocess.run( # noqa: S603, S607 # nosec B603 B607
[
"az",
"account",
Expand Down Expand Up @@ -91,10 +103,11 @@ def _acquire_via_client_credentials(entry: AuthConfigEntry) -> str | None:
if not client_secret:
return None

url = (
f"https://login.microsoftonline.com/{entry.tenant_id}"
"/oauth2/v2.0/token"
)
tenant_id = entry.tenant_id.strip()
if not _is_safe_tenant_segment(tenant_id):
return None

url = f"https://{_MICROSOFT_LOGIN_HOST}/{tenant_id}/oauth2/v2.0/token"
from urllib.parse import urlencode
body = urlencode({
"grant_type": "client_credentials",
Expand All @@ -109,7 +122,7 @@ def _acquire_via_client_credentials(entry: AuthConfigEntry) -> str | None:
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
try:
with urllib.request.urlopen(req, timeout=30) as resp: # noqa: S310
with urllib.request.urlopen(req, timeout=30) as resp: # nosec B310
payload = _json.loads(resp.read().decode("utf-8"))
token = payload.get("access_token", "").strip()
return token or None
Expand Down
24 changes: 23 additions & 1 deletion src/specify_cli/authentication/http.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@
_config_cache: list[AuthConfigEntry] | None = None # None = not yet loaded


def _validate_http_url(url: str) -> str:
"""Return *url* when it is an absolute HTTP(S) URL with a host.

Authenticated download helpers must never hand ``file:``, custom schemes,
relative paths, or malformed hostless URLs to ``urllib``. Keeping this as a
single validator makes both authenticated and unauthenticated fallbacks use
the same network-only boundary.
"""
if not isinstance(url, str):
raise ValueError("URL must be a string.")
if url != url.strip() or any(ord(ch) < 32 or ord(ch) == 127 for ch in url):
raise ValueError("URL must be an absolute http(s) URL with a hostname.")
parsed = urlparse(url)
if parsed.scheme not in ("http", "https") or not parsed.hostname:
raise ValueError("URL must be an absolute http(s) URL with a hostname.")
if parsed.username or parsed.password:
raise ValueError("URL must not include embedded credentials.")
return url


def _load_config() -> list[AuthConfigEntry]:
"""Load auth config, using override if set (for testing).

Expand Down Expand Up @@ -101,6 +121,7 @@ def build_request(url: str, extra_headers: dict[str, str] | None = None) -> urll
Uses the first matching entry from ``auth.json`` whose token resolves.
Returns a plain request when no entry matches or the file doesn't exist.
"""
url = _validate_http_url(url)
headers: dict[str, str] = {}
if extra_headers:
# Strip Authorization from extra_headers to prevent bypass
Expand Down Expand Up @@ -150,6 +171,7 @@ def open_url(
*redirect_validator*, when provided, is called with ``(old_url, new_url)``
before following each redirect and may raise to reject the redirect.
"""
url = _validate_http_url(url)
entries = find_entries_for_url(url, _load_config())

def _make_req(auth_headers: dict[str, str]) -> urllib.request.Request:
Expand Down Expand Up @@ -185,4 +207,4 @@ def _make_req(auth_headers: dict[str, str]) -> urllib.request.Request:
if redirect_validator is not None:
opener = urllib.request.build_opener(_StripAuthOnRedirect((), redirect_validator))
return opener.open(req, timeout=timeout)
return urllib.request.urlopen(req, timeout=timeout) # noqa: S310
return urllib.request.urlopen(req, timeout=timeout) # nosec B310
2 changes: 1 addition & 1 deletion src/specify_cli/bundler/services/installer.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,5 +206,5 @@ def _rollback(
for component in reversed(done):
try:
installer.remove(project_root, component)
except Exception: # noqa: BLE001 - best-effort rollback
except Exception: # noqa: BLE001 - best-effort rollback # nosec B112
continue
3 changes: 2 additions & 1 deletion src/specify_cli/integration_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,8 @@ def build_integration_status_report(project_root: Path) -> dict[str, Any]:
)
return _build_report(None, [], findings, {}, None)

assert raw_state is not None
if raw_state is None:
return _build_report(None, [], findings, {}, None)
raw_default_key = default_integration_key(raw_state)
raw_installed_value = raw_state.get("installed_integrations")
raw_installed_is_list = isinstance(raw_installed_value, list)
Expand Down
6 changes: 3 additions & 3 deletions src/specify_cli/integrations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ def dispatch_command(
Raises ``NotImplementedError`` if the integration does not
support CLI dispatch.
"""
import subprocess
import subprocess # nosec B404

prompt = self.build_command_invocation(command_name, args)
# When streaming to the terminal, request text output so the
Expand Down Expand Up @@ -299,7 +299,7 @@ def dispatch_command(
# can Ctrl+C at any time. The timeout parameter is only
# applied in the captured (non-streaming) branch below.
try:
result = subprocess.run(
result = subprocess.run( # nosec B603
exec_args,
text=True,
cwd=cwd,
Expand All @@ -316,7 +316,7 @@ def dispatch_command(
"stderr": "",
}

result = subprocess.run(
result = subprocess.run( # nosec B603
exec_args,
capture_output=True,
text=True,
Expand Down
6 changes: 3 additions & 3 deletions src/specify_cli/integrations/copilot/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ def dispatch_command(
In skills mode, the prompt includes the skill invocation
(``/speckit-<stem>``).
"""
import subprocess
import subprocess # nosec B404

stem = command_name
if stem.startswith("speckit."):
Expand Down Expand Up @@ -256,7 +256,7 @@ def dispatch_command(

if stream:
try:
result = subprocess.run(
result = subprocess.run( # nosec B603
cli_args,
text=True,
cwd=cwd,
Expand All @@ -273,7 +273,7 @@ def dispatch_command(
"stderr": "",
}

result = subprocess.run(
result = subprocess.run( # nosec B603
cli_args,
capture_output=True,
text=True,
Expand Down
2 changes: 1 addition & 1 deletion src/specify_cli/integrations/generic/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def _resolve_commands_dir(
import shlex
tokens = shlex.split(raw)
for i, token in enumerate(tokens):
if token == "--commands-dir" and i + 1 < len(tokens):
if token == "--commands-dir" and i + 1 < len(tokens): # nosec B105
return tokens[i + 1]
if token.startswith("--commands-dir="):
return token.split("=", 1)[1]
Expand Down
6 changes: 3 additions & 3 deletions src/specify_cli/presets/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -780,7 +780,7 @@ def _reconcile_composed_commands(self, command_names: List[str]) -> None:
context_note=f"\n<!-- Extension: {ext_id} -->\n<!-- Config: .specify/extensions/{ext_id}/ -->\n",
)
registered = True
except Exception:
except Exception: # nosec B110
# Extension registration failed; fall back to
# generic path-based registration below.
pass
Expand Down Expand Up @@ -905,7 +905,7 @@ def _register_command_from_path(
cmd_tmpl["aliases"] = aliases
break
break
except Exception:
except Exception: # nosec B110
pass # best-effort alias loading
self._register_for_non_skill_agents(
registrar, [cmd_tmpl], source_id, cmd_path.parent
Expand Down Expand Up @@ -1080,7 +1080,7 @@ def _reconcile_skills(self, command_names: List[str]) -> None:
if integration is not None and hasattr(integration, "post_process_skill_content"):
skill_content = integration.post_process_skill_content(skill_content)
skill_file.write_text(skill_content, encoding="utf-8")
except Exception:
except Exception: # nosec B110
pass # best-effort override skill restoration

# Register skills only for the specific commands being reconciled,
Expand Down
2 changes: 1 addition & 1 deletion src/specify_cli/workflows/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def load_custom_steps(project_root: Path) -> list[str]:
k for k in _sys.modules if k.startswith(submodule_prefix)
]:
_sys.modules.pop(_mod_key, None)
except Exception: # noqa: BLE001
except Exception: # noqa: BLE001 # nosec B112
# Silently skip broken step packages at load time
continue

Expand Down
20 changes: 11 additions & 9 deletions src/specify_cli/workflows/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,9 @@ def __init__(self, data: dict[str, Any], source_path: Path | None = None) -> Non
# Advisory pre-conditions (spec-kit version / integrations a workflow
# expects). Validated by ``validate_workflow`` (recognized keys only;
# see ``_RECOGNIZED_REQUIRES_KEYS``) but NOT enforced at run time — they
# are not a security boundary. In particular there is no
# ``requires.permissions`` capability gate: shell steps always run with
# the user's privileges.
# are not a security boundary. In particular there is no granular
# ``requires.permissions`` capability sandbox; shell steps are guarded
# by their own explicit workflow-trust prompt/env gate.
#
# Holds the raw parsed value, so before ``validate_workflow`` runs it may
# be a non-mapping (``None`` for a bare ``requires:``, a list for
Expand Down Expand Up @@ -204,9 +204,10 @@ def validate_workflow(definition: WorkflowDefinition) -> list[str]:
# integrations a workflow expects). Only a fixed set of keys is recognized;
# reject anything else so authoring typos surface here instead of being
# silently ignored at runtime. In particular ``requires.permissions`` is
# rejected explicitly: it reads like a runtime capability gate, but no such
# gate exists — a ``shell`` step always runs with the user's privileges, so
# declaring it would give a false sense of sandboxing.
# rejected explicitly: it reads like a granular runtime capability sandbox,
# but no such sandbox exists. A ``shell`` step has its own explicit trust
# gate, and declaring ``requires.permissions`` would still give a false
# sense of per-workflow permission enforcement.
#
# Mirror ``inputs`` validation: an omitted block defaults to ``{}`` and is
# valid, but any present-but-non-mapping value — ``requires:`` (YAML null),
Expand All @@ -219,9 +220,10 @@ def validate_workflow(definition: WorkflowDefinition) -> list[str]:
if key == "permissions":
errors.append(
"'requires.permissions' is not a recognized or "
"enforced capability gate — shell steps always run "
"with the user's privileges. Remove it and gate "
"sensitive steps with a 'gate' step instead."
"enforced capability sandbox. Shell steps require "
"explicit workflow trust before execution, but "
"'requires.permissions' does not grant or constrain "
"runtime permissions."
)
elif key not in _RECOGNIZED_REQUIRES_KEYS:
errors.append(
Expand Down
Loading