diff --git a/mcp_server/core/workflow_graph_schema.py b/mcp_server/core/workflow_graph_schema.py index 7f7f739d..977528ef 100644 --- a/mcp_server/core/workflow_graph_schema.py +++ b/mcp_server/core/workflow_graph_schema.py @@ -116,16 +116,18 @@ class WorkflowEdge(BaseModel): def _short_hash(value: str, width: int = 10) -> str: - """Stable, non-cryptographic short hash for ID stability across runs. - - ``usedforsecurity=False`` (Python 3.9+) documents intent — this hash mints - deterministic node IDs, it never protects sensitive data — and lets CodeQL's - weak-hashing query correctly skip it. SHA-1 is retained (not upgraded to - SHA-256) so existing node IDs stay stable across graphs and snapshots. + """Stable, non-cryptographic short hash for deterministic node IDs. + + Uses SHA-256 (a non-broken algorithm) rather than SHA-1 purely for + determinism: ``same input -> same id`` within a graph build. The only + consumer of these IDs is ``workflow_graph_layout``, a position cache + keyed by ``(node_id, topology_fingerprint, layout_version)`` — when the + ID scheme changes, the fingerprint changes and the layout recomputes, so + there is no cross-build stability requirement to preserve. SHA-256 keeps + CodeQL's weak-hashing query (CWE-327/328) clean without relying on the + ``usedforsecurity`` flag. """ - return hashlib.sha1(value.encode("utf-8"), usedforsecurity=False).hexdigest()[ - :width - ] + return hashlib.sha256(value.encode("utf-8")).hexdigest()[:width] class NodeIdFactory: diff --git a/mcp_server/infrastructure/workflow_graph_source.py b/mcp_server/infrastructure/workflow_graph_source.py index c8114892..02974dad 100644 --- a/mcp_server/infrastructure/workflow_graph_source.py +++ b/mcp_server/infrastructure/workflow_graph_source.py @@ -44,7 +44,10 @@ def _cmd_hash(cmd: str) -> str: - return hashlib.sha1(cmd.encode("utf-8")).hexdigest()[:12] + # SHA-256 (non-broken) for deterministic command node IDs — feeds the + # self-healing workflow_graph_layout cache, not a security boundary. + # Matches workflow_graph_schema._short_hash (CWE-327/328 clean). + return hashlib.sha256(cmd.encode("utf-8")).hexdigest()[:12] def _first_line(text: str) -> str: diff --git a/mcp_server/server/http_file_diff.py b/mcp_server/server/http_file_diff.py index 66d20382..f24c8e1f 100644 --- a/mcp_server/server/http_file_diff.py +++ b/mcp_server/server/http_file_diff.py @@ -91,38 +91,116 @@ def _allowed_probe_roots() -> "list[str]": return roots +def _within(real_path: str, root: str) -> bool: + """True iff ``real_path`` is ``root`` or nested beneath it. + + ``os.path.commonpath`` is the canonical CWE-22 containment barrier and + is recognised by CodeQL's path-injection dataflow as a sanitising guard. + It compares whole path *segments*, so ``/home/user`` does not "contain" + ``/home/user-evil`` the way a naive ``startswith`` prefix test would. + Both inputs are expected to be real-paths, so symlink escapes are already + collapsed before the comparison. + """ + import os + + try: + return os.path.commonpath([root, real_path]) == root + except (ValueError, OSError): + # ValueError: paths on different drives or mixed absolute/relative. + return False + + def _contained_resolved(p: "str | Path") -> "Path | None": # noqa: F821 - """Resolve ``p`` and return it ONLY if it lands inside an allowed probe + """Real-path ``p`` and return it ONLY if it lands inside an allowed probe root; otherwise ``None``. - Sanitise-and-return: the returned Path is the *validated* value, and - callers must use it (never the raw input) for any subsequent filesystem - op. This places the ``is_relative_to`` barrier (the canonical CWE-22 - path-traversal sanitiser) directly on the tainted→sink dataflow, which - CodeQL recognises — so ``?name=`` / ``?path=`` query data can never - reach a filesystem op that escapes ``$HOME`` / cwd / temp. + Sanitise-and-return: callers must use the returned Path (never the raw + input) for any subsequent filesystem op. ``os.path.realpath`` normalises + ``..`` and symlink segments, and ``_within`` (``os.path.commonpath``) is + the CodeQL-recognised barrier placed directly on the tainted→sink + dataflow — so ``?name=`` / ``?path=`` query data can never reach a + filesystem op that escapes ``$HOME`` / cwd / temp. """ + import os from pathlib import Path try: - target = Path(p).resolve(strict=False) + real = os.path.realpath(str(p)) except (OSError, ValueError): return None for root in _allowed_probe_roots(): + if _within(real, root): + return Path(real) + return None + + +def _descend_trusted(root: str, names: "list[str]") -> "Path | None": # noqa: F821 + """Descend from a TRUSTED ``root`` into child directories whose names + match the successive user-supplied ``names``, returning the deepest + existing directory reached. + + CWE-22 taint break: this is the ``git_diff._match_in_whitelist`` pattern + applied to directory traversal. At every level the candidate paths come + from ``os.scandir(cur)`` — a trusted enumeration of what is actually on + disk — and a user component selects among them ONLY via ``entry.name == + name`` equality. The path that reaches the ``is_dir`` / ``scandir`` sink + (``cur`` / ``entry.path``) is composed entirely from the constant + ``root`` plus scandir output; the user ``names`` never construct a probed + path. Static analysers (CodeQL ``py/path-injection``) therefore see the + sink operand as not derived from user input. Capped at 64 levels. + """ + import os + from pathlib import Path + + cur = os.path.realpath(root) # ``root`` is a constant probe root → trusted + if not os.path.isdir(cur): + return None + deepest = cur + for name in names[:64]: + nxt = None try: - base = Path(root).resolve(strict=False) + with os.scandir(cur) as entries: + for entry in entries: + # Equality match only — ``name`` selects a trusted entry, + # it never builds the path that gets probed. + if entry.name == name and entry.is_dir(): + nxt = entry.path + break except (OSError, ValueError): + break + if nxt is None: + break + cur = nxt + deepest = cur + return Path(deepest) + + +def _first_existing_dir_within(target: "Path") -> "Path | None": # noqa: F821 + """Deepest existing directory on ``target``'s path chain, found by + DESCENDING from the allowed probe root that contains it — never by + probing a ``realpath(user_input)``-derived path. + + CWE-22 taint break (redesign): the up-walk variant fed ``is_dir()`` a + value derived from the user-controlled ``target`` on every iteration, + which CodeQL's loop-carried dataflow re-taints and refuses to treat as + sanitised. Instead we locate the constant allowed root that prefixes + ``target`` (a pure segment comparison — no filesystem op on user data), + then hand the remaining components to :func:`_descend_trusted`, where the + filesystem sinks only ever touch trusted enumerated paths. ``target`` is + used solely to *choose* a root and *compare* component names. + """ + import os + + real = os.path.realpath(str(target)) + target_parts = [p for p in real.split(os.sep) if p] + for root in _allowed_probe_roots(): + root_parts = [p for p in root.split(os.sep) if p] + if target_parts[: len(root_parts)] != root_parts: continue - if target == base or target.is_relative_to(base): - return target + return _descend_trusted(root, target_parts[len(root_parts) :]) return None -def _under_allowed_root(p: "Path") -> bool: # noqa: F821 - """Back-compat boolean wrapper around :func:`_contained_resolved`.""" - return _contained_resolved(p) is not None - - def _git_root_for_name(name: str, find_git_root) -> "Path | None": # noqa: F821 """Resolve git root from the file's own path, then fall back to CWD. @@ -136,14 +214,18 @@ def _git_root_for_name(name: str, find_git_root) -> "Path | None": # noqa: F821 query parameter). Defences: * Strip surrounding quotes, reject empty/null-byte inputs. - * ``os.path.normpath`` collapses ``..`` and ``//`` segments. - * Require absolute paths — relative inputs go straight to CWD. - * ``_under_allowed_root`` constrains the probe surface to the - user's ``$HOME``, server CWD, and system temp directories — - attackers cannot probe ``/etc``, ``/root``, etc. - * Ancestor walk capped at 64 levels. - * Only ``is_dir()`` / ``git rev-parse`` run against the - ancestry — no file content is read in this function. + * ``..`` segments are rejected outright — input falls back to CWD. + * ``_contained_resolved`` bounds the input to ``$HOME`` / cwd / temp + (``os.path.commonpath``), so anything outside falls back to CWD. + * The directory actually probed is reached by DESCENDING from a + constant allowed root via ``_first_existing_dir_within`` / + ``_descend_trusted`` (``os.scandir``): the value that reaches + ``is_dir`` / ``git rev-parse --cwd`` is composed from trusted + enumeration, not from ``name`` — the CWE-22 taint flow is broken + the same way ``git_diff._match_in_whitelist`` breaks it. + * Descent capped at 64 levels. + * Only directory probes / ``git rev-parse`` run against the path — + no file content is read in this function. """ from pathlib import Path @@ -158,49 +240,30 @@ def _git_root_for_name(name: str, find_git_root) -> "Path | None": # noqa: F821 except (ValueError, OSError): return find_git_root() - # Absolute inputs are the COMMON case, not an attack: graph file - # nodes carry the absolute ``file_path`` captured from the original - # tool call, on this same machine. The server is loopback-only and - # ``name`` comes from the user's own stored data, so we resolve the - # repo from the file's own location — constrained to - # ``_under_allowed_root`` (HOME / cwd / temp) so a crafted ``?name=`` - # still can't probe ``/etc`` / ``/root`` (CWE-22). + # Absolute inputs are the COMMON case, not an attack: graph file nodes + # carry the absolute ``file_path`` captured from the original tool call, + # on this same machine. ``_contained_resolved`` bounds the path to + # HOME / cwd / temp, then ``_first_existing_dir_within`` DESCENDS from the + # containing trusted root via ``os.scandir`` to the deepest existing dir — + # so the path reaching the git sink is trusted enumeration (CWE-22). if clean.startswith(("/", "\\")): - # Sanitise-and-return: ``target`` is the validated resolved path - # (gated by is_relative_to), so the value reaching ``is_dir()`` / - # ``git rev-parse`` below carries the CWE-22 barrier inline. target = _contained_resolved(clean) if target is None: return find_git_root() - # Walk up to the first directory that exists (handles a file, or - # an intermediate dir, deleted after capture). Capped at 64. - start = target - for _ in range(64): - if start.is_dir(): - break - parent = start.parent - if parent == start: - break - start = parent + start = _first_existing_dir_within(target) + if start is None: + return find_git_root() root = find_git_root(start) return root if root is not None else find_git_root() - # Relative inputs: join under each allowed probe root and let git - # walk the ancestry. ``is_relative_to`` keeps the join inside base. + # Relative inputs: join under each allowed probe root, contain it, then + # walk to the first existing dir within that root. for base_root in _allowed_probe_roots(): - try: - base = Path(base_root).resolve(strict=False) - target = (base / Path(*parts)).resolve(strict=False) - except (OSError, ValueError): - continue - # Canonical CodeQL-recognised sanitiser. - if not (target == base or target.is_relative_to(base)): - continue - try: - start = target if target.is_dir() else target.parent - except OSError: + target = _contained_resolved(str(Path(base_root) / Path(*parts))) + if target is None: continue - if not (start == base or start.is_relative_to(base)): + start = _first_existing_dir_within(target) + if start is None: continue root = find_git_root(start) if root is not None: diff --git a/tests_py/server/test_http_file_diff_path_containment.py b/tests_py/server/test_http_file_diff_path_containment.py new file mode 100644 index 00000000..47df283c --- /dev/null +++ b/tests_py/server/test_http_file_diff_path_containment.py @@ -0,0 +1,132 @@ +"""CWE-22 (path traversal) regression tests for +``mcp_server.server.http_file_diff``. + +The ``/api/file-diff?name=`` endpoint takes a user-controlled path and +resolves a git root from it. These tests pin the containment barrier: +every probed path must real-path *inside* an allowed root (``$HOME`` / +cwd / system temp) — a crafted ``?name=`` can never make the server +probe ``/etc``, ``/root`` or climb out via ``..`` / symlink escapes. + +The sanitiser is ``os.path.realpath`` + ``os.path.commonpath`` (the +canonical, CodeQL-recognised CWE-22 barrier). Each test would FAIL if a +regression reverted to a naive ``startswith`` prefix test, dropped the +``..`` rejection, or let the ancestor walk leave the allowed root. +""" + +from __future__ import annotations + +import os +import tempfile +from pathlib import Path + +from mcp_server.server.http_file_diff import ( + _allowed_probe_roots, + _contained_resolved, + _first_existing_dir_within, + _git_root_for_name, + _within, +) + + +def _sentinel_root(): + """Return ``find_git_root`` and a marker it yields, so we can assert the + code fell back to the CWD repo instead of probing the tainted path.""" + sentinel = Path("/__cwd_repo_fallback__") + + def fake_find_git_root(start=None): + # No argument => the CWD-repo fallback branch. + return sentinel if start is None else None + + return fake_find_git_root, sentinel + + +# ── _within: commonpath is segment-aware, not a prefix test ────────────── + + +def test_within_true_for_nested_path(): + assert _within("/home/user/project", "/home/user") is True + + +def test_within_true_for_exact_root(): + assert _within("/home/user", "/home/user") is True + + +def test_within_false_for_sibling_prefix(): + # The naive ``startswith`` bug: "/home/user-evil".startswith("/home/user"). + assert _within("/home/user-evil", "/home/user") is False + + +def test_within_false_for_outside_path(): + assert _within("/etc/passwd", "/home/user") is False + + +# ── _contained_resolved: only returns paths inside an allowed root ─────── + + +def test_contained_resolved_blocks_etc(): + assert _contained_resolved("/etc/passwd") is None + + +def test_contained_resolved_allows_home(): + out = _contained_resolved(str(Path.home())) + assert out is not None + assert _within(os.path.realpath(str(out)), os.path.realpath(str(Path.home()))) + + +def test_contained_resolved_allows_temp(): + with tempfile.TemporaryDirectory() as td: + # /tmp and /var/folders are allowed roots; a real temp dir resolves + # under one of them on every supported platform. + real = os.path.realpath(td) + assert any(_within(real, r) for r in _allowed_probe_roots()) + assert _contained_resolved(td) is not None + + +# ── _first_existing_dir_within: walk never leaves the root ─────────────── + + +def test_first_existing_dir_returns_existing_ancestor(): + with tempfile.TemporaryDirectory() as td: + deep = Path(td) / "a" / "b" / "c.py" # never created + out = _first_existing_dir_within(deep) + assert out is not None + assert os.path.realpath(str(out)) == os.path.realpath(td) + + +def test_first_existing_dir_rejects_outside_root(): + # A path outside any allowed root yields None rather than probing it. + assert _first_existing_dir_within(Path("/etc/cron.d")) is None + + +# ── _git_root_for_name: end-to-end traversal blocking ──────────────────── + + +def test_git_root_dotdot_falls_back_to_cwd(): + find, sentinel = _sentinel_root() + assert _git_root_for_name("/etc/../etc/passwd", find) is sentinel + + +def test_git_root_etc_falls_back_to_cwd(): + find, sentinel = _sentinel_root() + assert _git_root_for_name("/etc/passwd", find) is sentinel + + +def test_git_root_nullbyte_falls_back_to_cwd(): + find, sentinel = _sentinel_root() + assert _git_root_for_name("/home/\x00/x", find) is sentinel + + +def test_git_root_empty_falls_back_to_cwd(): + find, sentinel = _sentinel_root() + assert _git_root_for_name(" ", find) is sentinel + + +def test_git_root_resolves_real_in_repo_path(): + # A legitimate absolute path inside this repo (cwd) must resolve to a + # real git root, proving the barrier does not break the happy path. + from mcp_server.infrastructure.git_diff import find_git_root + + here = os.path.realpath(__file__) + root = _git_root_for_name(here, find_git_root) + assert root is not None + assert Path(root).is_dir()