Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions src/agents_shipgate/cli/scan/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,12 @@ def _run_checks_and_decide(
derived from sanitized tools. Mutating ``reason`` here would leak
``path:line`` into ``Finding.evidence``, churning fingerprints.
"""
action_surface_warnings: list[str] = []
action_surface_facts = build_action_surface_facts(
manifest,
agent_id=tools_and_agent.agent.id,
tools=tools_and_agent.tools,
warnings=action_surface_warnings,
)
capability_facts, capability_policy_subjects = build_capability_policy_subjects(
manifest,
Expand Down Expand Up @@ -167,6 +169,7 @@ def _run_checks_and_decide(
return _ChecksDecision(
action_surface_facts=action_surface_facts,
action_surface_diff=action_surface_diff,
action_surface_warnings=action_surface_warnings,
findings=findings,
legacy_fingerprints=legacy_fingerprints,
override_resolution=override_resolution,
Expand Down
5 changes: 5 additions & 0 deletions src/agents_shipgate/cli/scan/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ class _ChecksDecision:

action_surface_facts: ActionSurfaceFacts
action_surface_diff: Any # ActionSurfaceDiff (internal/semantic)
# Fail-soft warnings raised while building the action surface (e.g.
# duplicate action_id collisions from OpenAPI specs whose paths
# normalize identically). Merged into ``report.source_warnings`` so a
# third-party spec degrades to review_required instead of crashing.
action_surface_warnings: list[str]
findings: list[Any] # list[Finding]
legacy_fingerprints: list[str]
override_resolution: Any # SeverityOverrideResolution
Expand Down
11 changes: 11 additions & 0 deletions src/agents_shipgate/cli/scan/sanitization.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,17 @@ def _sanitize_for_output(
stats=privacy_stats,
path="source_warnings[]",
)
# Fail-soft action-surface warnings (e.g. OpenAPI action_id collisions)
# are raised in Phase 5, after the Phase 3 source-warning list is
# assembled. Append them last so byte-stable output is preserved for
# the common (no-collision) case where this list is empty.
public_source_warnings.extend(
redact_data(
decision.action_surface_warnings,
stats=privacy_stats,
path="source_warnings[]",
)
)
public_api_surface = redact_data(
public_api_artifacts.surface_summary() if public_api_artifacts else None,
stats=privacy_stats,
Expand Down
114 changes: 111 additions & 3 deletions src/agents_shipgate/core/lenses/action_surface.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,23 @@ def build_action_surface_facts(
*,
agent_id: str,
tools: list[Tool],
warnings: list[str] | None = None,
) -> ActionSurfaceFacts:
"""Build the typed action surface from ``tools`` + manifest declarations.

Two distinct operations can derive the same ``action_id`` — most
commonly two OpenAPI operations whose paths normalize identically
(e.g. a trailing-slash variant of ``/sessions/{session_id}``), because
the OpenAPI ``operation`` token is ``METHOD path`` only and drops the
``operationId``. A third-party spec must never crash a scan, so when a
``warnings`` sink is provided the colliding ids are disambiguated
fail-soft and one ``source_warning`` is recorded per collision (same
fail-soft principle as the symlink-loop and MCP-as-tools fixes).

Callers that cannot tolerate a fail-soft snapshot (the redaction path,
which has its own public-only ordinal disambiguator) pass no sink and
keep the legacy hard :class:`ConfigError`.
"""
declarations = {entry.tool: entry for entry in manifest.action_surface.actions}
actions = [
_action_from_tool(
Expand All @@ -103,7 +119,26 @@ def build_action_surface_facts(
)
for tool in sorted(tools, key=lambda item: item.name)
]
_validate_unique_action_ids(actions)
if warnings is None:
_validate_unique_action_ids(actions)
else:
# Tools whose action_id carries a manifest-authored component. A
# collision touching one of these stays a hard ConfigError even on
# the fail-soft path — only inferred-vs-inferred collisions degrade.
# `id`, `provider`, and `operation` all override action_id
# components (see `build_action` / `_provider` / `_operation`), so a
# declaration setting any of them is an explicit identity the engine
# must never silently rewrite.
explicit_tool_names = {
entry.tool
for entry in manifest.action_surface.actions
if entry.id or entry.provider or entry.operation
}
warnings.extend(
_disambiguate_duplicate_action_ids(
actions, explicit_tool_names=explicit_tool_names
)
)
return ActionSurfaceFacts(actions=sorted(actions, key=lambda item: item.action_id))


Expand Down Expand Up @@ -547,8 +582,11 @@ def _validate_unique_action_ids(actions: list[ActionFact]) -> None:
for action_id, tool_names in by_id.items()
if len(tool_names) > 1
}
if not duplicates:
return
if duplicates:
_raise_duplicate_action_ids(duplicates)


def _raise_duplicate_action_ids(duplicates: dict[str, list[str]]) -> None:
details = "; ".join(
f"{action_id!r} used by {', '.join(tool_names)}"
for action_id, tool_names in sorted(duplicates.items())
Expand All @@ -560,6 +598,76 @@ def _validate_unique_action_ids(actions: list[ActionFact]) -> None:
)


def _disambiguate_duplicate_action_ids(
actions: list[ActionFact],
*,
explicit_tool_names: set[str],
) -> list[str]:
"""Fail-soft replacement for :func:`_validate_unique_action_ids`,
scoped to *inferred* collisions only.

A manifest-authored action identity — any ``action_surface.actions[]``
declaration setting ``id``, ``provider``, or ``operation`` — is a
contract: a collision that involves one (explicit-vs-explicit or
explicit-vs-inferred) is a config mistake to fix, so it stays a hard
:class:`ConfigError` (identical to the no-sink path).
``explicit_tool_names`` names the tools carrying such a declaration.

Only collisions between purely *inferred* ids degrade — most commonly
two OpenAPI operations whose paths normalize identically. Those are
mutated in place so each ``action_id`` is unique, returning one
human-readable warning per resolved collision. The first member of each
colliding group (in ``tool_name`` order) keeps the bare id; the rest
gain a ``#<operationId>`` suffix so distinct operations stay distinct in
the diff. An ordinal fallback covers the pathological case where two
colliding actions also share a ``tool_name``. Renamed actions get a
refreshed ``identity_hash`` so downstream identity-hash consumers stay
consistent.
"""
by_id: dict[str, list[ActionFact]] = {}
for action in actions:
by_id.setdefault(action.action_id, []).append(action)
colliding = {
action_id: group for action_id, group in by_id.items() if len(group) > 1
}

# Hard-fail any collision that touches an explicitly declared id. These
# are not third-party quirks the engine should silently rewrite.
explicit_collisions = {
action_id: sorted(item.tool_name for item in group)
for action_id, group in colliding.items()
if any(item.tool_name in explicit_tool_names for item in group)
}
if explicit_collisions:
_raise_duplicate_action_ids(explicit_collisions)

warnings: list[str] = []
for action_id, group in sorted(colliding.items()):
ordered = sorted(group, key=lambda item: item.tool_name)
warnings.append(
"Duplicate action_surface action_id "
f"{action_id!r} derived from operations "
f"{', '.join(repr(item.tool_name) for item in ordered)}; "
"disambiguated with per-operation suffixes so distinct "
"operations are not collapsed."
)
used_suffixes: dict[str, int] = {}
for index, action in enumerate(ordered):
if index == 0:
continue
suffix = action.tool_name or str(index)
seen = used_suffixes.get(suffix, 0) + 1
used_suffixes[suffix] = seen
if seen > 1:
suffix = f"{suffix}#{seen}"
new_id = f"{action_id}#{suffix}"
action.action_id = new_id
action.hashes = action.hashes.model_copy(
update={"identity_hash": _stable_hash(new_id)}
)
return warnings


def _provider(tool: Tool, declaration: ActionDeclarationConfig | None) -> str:
if declaration and declaration.provider:
return _normalize_token(declaration.provider)
Expand Down
109 changes: 109 additions & 0 deletions tests/test_action_surface_diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
from agents_shipgate.core.findings import assign_finding_ids
from agents_shipgate.core.lenses.action_surface import (
_dedupe_findings,
_stable_hash,
build_action_surface_facts,
compute_action_surface_diff,
enrich_action_surface_diff_with_source,
Expand Down Expand Up @@ -209,6 +210,11 @@ def test_action_surface_rejects_duplicate_tool_declarations():


def test_action_surface_rejects_action_id_collisions():
"""An explicit ``actions[].id`` colliding with another action's inferred
id is a manifest config error and must stay a hard ConfigError — even on
the fail-soft (warnings-sink) path the live scan uses. Only purely
inferred collisions degrade; a user-declared id is never silently
rewritten."""
manifest = _manifest(
{
"action_surface": {
Expand Down Expand Up @@ -244,6 +250,109 @@ def test_action_surface_rejects_action_id_collisions():
agent_id="agent:action-test/agent",
tools=tools,
)
# The live scan path always passes a warnings sink; an explicit-vs-inferred
# collision must NOT be silently disambiguated into review_required.
with pytest.raises(ConfigError, match="Duplicate action_surface action_id"):
build_action_surface_facts(
manifest,
agent_id="agent:action-test/agent",
tools=tools,
warnings=[],
)


def test_action_surface_rejects_collision_from_explicit_provider_operation():
"""``provider``/``operation`` declarations override action_id components
just like ``id`` does. A collision between two such manifest-authored
identities must stay a hard ConfigError even on the warnings-sink path —
not be silently rewritten to ``...:shared#beta``."""
manifest = _manifest(
{
"action_surface": {
"actions": [
{"tool": "alpha", "provider": "tools", "operation": "shared"},
{"tool": "beta", "provider": "tools", "operation": "shared"},
]
}
}
)
tools = [
Tool(
id="tool:alpha",
name="alpha",
source_type="mcp",
source_id="tools",
extraction_confidence="high",
),
Tool(
id="tool:beta",
name="beta",
source_type="mcp",
source_id="tools",
extraction_confidence="high",
),
]

for sink in (None, []):
with pytest.raises(ConfigError, match="Duplicate action_surface action_id"):
build_action_surface_facts(
manifest,
agent_id="agent:action-test/agent",
tools=tools,
warnings=sink,
)


def test_action_surface_disambiguates_openapi_action_id_collisions_fail_soft():
"""Two OpenAPI operations whose paths normalize identically (a
trailing-slash variant) collapse to one ``action_id``. With a warnings
sink the scan degrades instead of raising: ids stay distinct and one
warning is recorded. Real-world repro: block/goose's spec."""
manifest = _manifest()
tools = [
Tool(
id="tool:get_session",
name="get_session",
source_type="openapi",
source_id="goose-api",
annotations={"httpMethod": "get", "path": "/sessions/{session_id}"},
extraction_confidence="high",
),
Tool(
id="tool:get_session_detail",
name="get_session_detail",
source_type="openapi",
source_id="goose-api",
# Trailing slash normalizes to the same path as get_session.
annotations={"httpMethod": "get", "path": "/sessions/{session_id}/"},
extraction_confidence="high",
),
]

warnings: list[str] = []
facts = build_action_surface_facts(
manifest,
agent_id="agent:action-test/agent",
tools=tools,
warnings=warnings,
)

action_ids = [action.action_id for action in facts.actions]
base_id = "agent:action-test/agent:openapi:goose-api:GET /sessions/{session_id}"
# Both operations are retained as distinct actions; the first (in
# tool-name order) keeps the bare id, the second is suffixed.
assert action_ids == sorted([base_id, f"{base_id}#get_session_detail"])
assert len(set(action_ids)) == 2
assert {action.tool_name for action in facts.actions} == {
"get_session",
"get_session_detail",
}
# identity_hash tracks the renamed action_id.
suffixed = next(a for a in facts.actions if a.action_id.endswith("#get_session_detail"))
assert suffixed.hashes.identity_hash == _stable_hash(suffixed.action_id)
assert len(warnings) == 1
assert "Duplicate action_surface action_id" in warnings[0]
assert "get_session" in warnings[0] and "get_session_detail" in warnings[0]


def test_action_surface_policy_requires_typed_expected_values():
Expand Down
Loading