Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 11 additions & 9 deletions mcp_server/core/workflow_graph_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,16 +116,18 @@ class WorkflowEdge(BaseModel):


def _short_hash(value: str, width: int = 10) -> str:
"""Stable, non-cryptographic short hash for ID stability across runs.

``usedforsecurity=False`` (Python 3.9+) documents intent — this hash mints
deterministic node IDs, it never protects sensitive data — and lets CodeQL's
weak-hashing query correctly skip it. SHA-1 is retained (not upgraded to
SHA-256) so existing node IDs stay stable across graphs and snapshots.
"""Stable, non-cryptographic short hash for deterministic node IDs.

Uses SHA-256 (a non-broken algorithm) rather than SHA-1 purely for
determinism: ``same input -> same id`` within a graph build. The only
consumer of these IDs is ``workflow_graph_layout``, a position cache
keyed by ``(node_id, topology_fingerprint, layout_version)`` — when the
ID scheme changes, the fingerprint changes and the layout recomputes, so
there is no cross-build stability requirement to preserve. SHA-256 keeps
CodeQL's weak-hashing query (CWE-327/328) clean without relying on the
``usedforsecurity`` flag.
"""
return hashlib.sha1(value.encode("utf-8"), usedforsecurity=False).hexdigest()[
:width
]
return hashlib.sha256(value.encode("utf-8")).hexdigest()[:width]


class NodeIdFactory:
Expand Down
5 changes: 4 additions & 1 deletion mcp_server/infrastructure/workflow_graph_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@


def _cmd_hash(cmd: str) -> str:
return hashlib.sha1(cmd.encode("utf-8")).hexdigest()[:12]
# SHA-256 (non-broken) for deterministic command node IDs — feeds the
# self-healing workflow_graph_layout cache, not a security boundary.
# Matches workflow_graph_schema._short_hash (CWE-327/328 clean).
return hashlib.sha256(cmd.encode("utf-8")).hexdigest()[:12]


def _first_line(text: str) -> str:
Expand Down
179 changes: 121 additions & 58 deletions mcp_server/server/http_file_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,38 +91,116 @@ def _allowed_probe_roots() -> "list[str]":
return roots


def _within(real_path: str, root: str) -> bool:
"""True iff ``real_path`` is ``root`` or nested beneath it.

``os.path.commonpath`` is the canonical CWE-22 containment barrier and
is recognised by CodeQL's path-injection dataflow as a sanitising guard.
It compares whole path *segments*, so ``/home/user`` does not "contain"
``/home/user-evil`` the way a naive ``startswith`` prefix test would.
Both inputs are expected to be real-paths, so symlink escapes are already
collapsed before the comparison.
"""
import os

try:
return os.path.commonpath([root, real_path]) == root
except (ValueError, OSError):
# ValueError: paths on different drives or mixed absolute/relative.
return False


def _contained_resolved(p: "str | Path") -> "Path | None": # noqa: F821
"""Resolve ``p`` and return it ONLY if it lands inside an allowed probe
"""Real-path ``p`` and return it ONLY if it lands inside an allowed probe
root; otherwise ``None``.

Sanitise-and-return: the returned Path is the *validated* value, and
callers must use it (never the raw input) for any subsequent filesystem
op. This places the ``is_relative_to`` barrier (the canonical CWE-22
path-traversal sanitiser) directly on the tainted→sink dataflow, which
CodeQL recognises — so ``?name=`` / ``?path=`` query data can never
reach a filesystem op that escapes ``$HOME`` / cwd / temp.
Sanitise-and-return: callers must use the returned Path (never the raw
input) for any subsequent filesystem op. ``os.path.realpath`` normalises
``..`` and symlink segments, and ``_within`` (``os.path.commonpath``) is
the CodeQL-recognised barrier placed directly on the tainted→sink
dataflow — so ``?name=`` / ``?path=`` query data can never reach a
filesystem op that escapes ``$HOME`` / cwd / temp.
"""
import os
from pathlib import Path

try:
target = Path(p).resolve(strict=False)
real = os.path.realpath(str(p))
except (OSError, ValueError):
return None
for root in _allowed_probe_roots():
if _within(real, root):
return Path(real)
return None


def _descend_trusted(root: str, names: "list[str]") -> "Path | None": # noqa: F821
"""Descend from a TRUSTED ``root`` into child directories whose names
match the successive user-supplied ``names``, returning the deepest
existing directory reached.

CWE-22 taint break: this is the ``git_diff._match_in_whitelist`` pattern
applied to directory traversal. At every level the candidate paths come
from ``os.scandir(cur)`` — a trusted enumeration of what is actually on
disk — and a user component selects among them ONLY via ``entry.name ==
name`` equality. The path that reaches the ``is_dir`` / ``scandir`` sink
(``cur`` / ``entry.path``) is composed entirely from the constant
``root`` plus scandir output; the user ``names`` never construct a probed
path. Static analysers (CodeQL ``py/path-injection``) therefore see the
sink operand as not derived from user input. Capped at 64 levels.
"""
import os
from pathlib import Path

cur = os.path.realpath(root) # ``root`` is a constant probe root → trusted
if not os.path.isdir(cur):
return None
deepest = cur
for name in names[:64]:
nxt = None
try:
base = Path(root).resolve(strict=False)
with os.scandir(cur) as entries:
for entry in entries:
# Equality match only — ``name`` selects a trusted entry,
# it never builds the path that gets probed.
if entry.name == name and entry.is_dir():
nxt = entry.path
break
except (OSError, ValueError):
break
if nxt is None:
break
cur = nxt
deepest = cur
return Path(deepest)


def _first_existing_dir_within(target: "Path") -> "Path | None": # noqa: F821
"""Deepest existing directory on ``target``'s path chain, found by
DESCENDING from the allowed probe root that contains it — never by
probing a ``realpath(user_input)``-derived path.

CWE-22 taint break (redesign): the up-walk variant fed ``is_dir()`` a
value derived from the user-controlled ``target`` on every iteration,
which CodeQL's loop-carried dataflow re-taints and refuses to treat as
sanitised. Instead we locate the constant allowed root that prefixes
``target`` (a pure segment comparison — no filesystem op on user data),
then hand the remaining components to :func:`_descend_trusted`, where the
filesystem sinks only ever touch trusted enumerated paths. ``target`` is
used solely to *choose* a root and *compare* component names.
"""
import os

real = os.path.realpath(str(target))
target_parts = [p for p in real.split(os.sep) if p]
for root in _allowed_probe_roots():
root_parts = [p for p in root.split(os.sep) if p]
if target_parts[: len(root_parts)] != root_parts:
continue
if target == base or target.is_relative_to(base):
return target
return _descend_trusted(root, target_parts[len(root_parts) :])
return None


def _under_allowed_root(p: "Path") -> bool: # noqa: F821
"""Back-compat boolean wrapper around :func:`_contained_resolved`."""
return _contained_resolved(p) is not None


def _git_root_for_name(name: str, find_git_root) -> "Path | None": # noqa: F821
"""Resolve git root from the file's own path, then fall back to CWD.

Expand All @@ -136,14 +214,18 @@ def _git_root_for_name(name: str, find_git_root) -> "Path | None": # noqa: F821
query parameter). Defences:

* Strip surrounding quotes, reject empty/null-byte inputs.
* ``os.path.normpath`` collapses ``..`` and ``//`` segments.
* Require absolute paths — relative inputs go straight to CWD.
* ``_under_allowed_root`` constrains the probe surface to the
user's ``$HOME``, server CWD, and system temp directories —
attackers cannot probe ``/etc``, ``/root``, etc.
* Ancestor walk capped at 64 levels.
* Only ``is_dir()`` / ``git rev-parse`` run against the
ancestry — no file content is read in this function.
* ``..`` segments are rejected outright — input falls back to CWD.
* ``_contained_resolved`` bounds the input to ``$HOME`` / cwd / temp
(``os.path.commonpath``), so anything outside falls back to CWD.
* The directory actually probed is reached by DESCENDING from a
constant allowed root via ``_first_existing_dir_within`` /
``_descend_trusted`` (``os.scandir``): the value that reaches
``is_dir`` / ``git rev-parse --cwd`` is composed from trusted
enumeration, not from ``name`` — the CWE-22 taint flow is broken
the same way ``git_diff._match_in_whitelist`` breaks it.
* Descent capped at 64 levels.
* Only directory probes / ``git rev-parse`` run against the path —
no file content is read in this function.
"""
from pathlib import Path

Expand All @@ -158,49 +240,30 @@ def _git_root_for_name(name: str, find_git_root) -> "Path | None": # noqa: F821
except (ValueError, OSError):
return find_git_root()

# Absolute inputs are the COMMON case, not an attack: graph file
# nodes carry the absolute ``file_path`` captured from the original
# tool call, on this same machine. The server is loopback-only and
# ``name`` comes from the user's own stored data, so we resolve the
# repo from the file's own location — constrained to
# ``_under_allowed_root`` (HOME / cwd / temp) so a crafted ``?name=``
# still can't probe ``/etc`` / ``/root`` (CWE-22).
# Absolute inputs are the COMMON case, not an attack: graph file nodes
# carry the absolute ``file_path`` captured from the original tool call,
# on this same machine. ``_contained_resolved`` bounds the path to
# HOME / cwd / temp, then ``_first_existing_dir_within`` DESCENDS from the
# containing trusted root via ``os.scandir`` to the deepest existing dir —
# so the path reaching the git sink is trusted enumeration (CWE-22).
if clean.startswith(("/", "\\")):
# Sanitise-and-return: ``target`` is the validated resolved path
# (gated by is_relative_to), so the value reaching ``is_dir()`` /
# ``git rev-parse`` below carries the CWE-22 barrier inline.
target = _contained_resolved(clean)
if target is None:
return find_git_root()
# Walk up to the first directory that exists (handles a file, or
# an intermediate dir, deleted after capture). Capped at 64.
start = target
for _ in range(64):
if start.is_dir():
break
parent = start.parent
if parent == start:
break
start = parent
start = _first_existing_dir_within(target)
if start is None:
return find_git_root()
root = find_git_root(start)
return root if root is not None else find_git_root()

# Relative inputs: join under each allowed probe root and let git
# walk the ancestry. ``is_relative_to`` keeps the join inside base.
# Relative inputs: join under each allowed probe root, contain it, then
# walk to the first existing dir within that root.
for base_root in _allowed_probe_roots():
try:
base = Path(base_root).resolve(strict=False)
target = (base / Path(*parts)).resolve(strict=False)
except (OSError, ValueError):
continue
# Canonical CodeQL-recognised sanitiser.
if not (target == base or target.is_relative_to(base)):
continue
try:
start = target if target.is_dir() else target.parent
except OSError:
target = _contained_resolved(str(Path(base_root) / Path(*parts)))
if target is None:
continue
if not (start == base or start.is_relative_to(base)):
start = _first_existing_dir_within(target)
if start is None:
continue
root = find_git_root(start)
if root is not None:
Expand Down
132 changes: 132 additions & 0 deletions tests_py/server/test_http_file_diff_path_containment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
"""CWE-22 (path traversal) regression tests for
``mcp_server.server.http_file_diff``.

The ``/api/file-diff?name=`` endpoint takes a user-controlled path and
resolves a git root from it. These tests pin the containment barrier:
every probed path must real-path *inside* an allowed root (``$HOME`` /
cwd / system temp) — a crafted ``?name=`` can never make the server
probe ``/etc``, ``/root`` or climb out via ``..`` / symlink escapes.

The sanitiser is ``os.path.realpath`` + ``os.path.commonpath`` (the
canonical, CodeQL-recognised CWE-22 barrier). Each test would FAIL if a
regression reverted to a naive ``startswith`` prefix test, dropped the
``..`` rejection, or let the ancestor walk leave the allowed root.
"""

from __future__ import annotations

import os
import tempfile
from pathlib import Path

from mcp_server.server.http_file_diff import (
_allowed_probe_roots,
_contained_resolved,
_first_existing_dir_within,
_git_root_for_name,
_within,
)


def _sentinel_root():
"""Return ``find_git_root`` and a marker it yields, so we can assert the
code fell back to the CWD repo instead of probing the tainted path."""
sentinel = Path("/__cwd_repo_fallback__")

def fake_find_git_root(start=None):
# No argument => the CWD-repo fallback branch.
return sentinel if start is None else None

return fake_find_git_root, sentinel


# ── _within: commonpath is segment-aware, not a prefix test ──────────────


def test_within_true_for_nested_path():
assert _within("/home/user/project", "/home/user") is True


def test_within_true_for_exact_root():
assert _within("/home/user", "/home/user") is True


def test_within_false_for_sibling_prefix():
# The naive ``startswith`` bug: "/home/user-evil".startswith("/home/user").
assert _within("/home/user-evil", "/home/user") is False


def test_within_false_for_outside_path():
assert _within("/etc/passwd", "/home/user") is False


# ── _contained_resolved: only returns paths inside an allowed root ───────


def test_contained_resolved_blocks_etc():
assert _contained_resolved("/etc/passwd") is None


def test_contained_resolved_allows_home():
out = _contained_resolved(str(Path.home()))
assert out is not None
assert _within(os.path.realpath(str(out)), os.path.realpath(str(Path.home())))


def test_contained_resolved_allows_temp():
with tempfile.TemporaryDirectory() as td:
# /tmp and /var/folders are allowed roots; a real temp dir resolves
# under one of them on every supported platform.
real = os.path.realpath(td)
assert any(_within(real, r) for r in _allowed_probe_roots())
assert _contained_resolved(td) is not None


# ── _first_existing_dir_within: walk never leaves the root ───────────────


def test_first_existing_dir_returns_existing_ancestor():
with tempfile.TemporaryDirectory() as td:
deep = Path(td) / "a" / "b" / "c.py" # never created
out = _first_existing_dir_within(deep)
assert out is not None
assert os.path.realpath(str(out)) == os.path.realpath(td)


def test_first_existing_dir_rejects_outside_root():
# A path outside any allowed root yields None rather than probing it.
assert _first_existing_dir_within(Path("/etc/cron.d")) is None


# ── _git_root_for_name: end-to-end traversal blocking ────────────────────


def test_git_root_dotdot_falls_back_to_cwd():
find, sentinel = _sentinel_root()
assert _git_root_for_name("/etc/../etc/passwd", find) is sentinel


def test_git_root_etc_falls_back_to_cwd():
find, sentinel = _sentinel_root()
assert _git_root_for_name("/etc/passwd", find) is sentinel


def test_git_root_nullbyte_falls_back_to_cwd():
find, sentinel = _sentinel_root()
assert _git_root_for_name("/home/\x00/x", find) is sentinel


def test_git_root_empty_falls_back_to_cwd():
find, sentinel = _sentinel_root()
assert _git_root_for_name(" ", find) is sentinel


def test_git_root_resolves_real_in_repo_path():
# A legitimate absolute path inside this repo (cwd) must resolve to a
# real git root, proving the barrier does not break the happy path.
from mcp_server.infrastructure.git_diff import find_git_root

here = os.path.realpath(__file__)
root = _git_root_for_name(here, find_git_root)
assert root is not None
assert Path(root).is_dir()
Loading