diff --git a/src/sourceos_syncd/cli.py b/src/sourceos_syncd/cli.py index d13f8a3..9983e79 100644 --- a/src/sourceos_syncd/cli.py +++ b/src/sourceos_syncd/cli.py @@ -175,6 +175,7 @@ def add_katello_args(p: argparse.ArgumentParser) -> None: add_katello_args(sync_apply) sync_apply.add_argument("--execute", action="store_true", help="actually run nix copy + nixos-rebuild (default: dry-run)") sync_apply.add_argument("--store-root", default=None, help="persist receipt to this store root") + sync_apply.add_argument("--agentplane-run-ref", default=None, help="agentplane RunArtifact URN that triggered this sync cycle (optional)") add_compact(sync_apply) sync_daemon = sync_sub.add_parser("daemon", help="run the sync daemon (polls Katello; applies on new version)") @@ -182,6 +183,7 @@ def add_katello_args(p: argparse.ArgumentParser) -> None: sync_daemon.add_argument("--poll-interval", type=int, default=300, help="Katello poll interval in seconds (default: 300)") sync_daemon.add_argument("--store-root", default=None, help="state + receipt store root (default: /var/lib/sourceos-syncd)") sync_daemon.add_argument("--from-env", action="store_true", help="read all config from environment variables (ignores CLI flags)") + sync_daemon.add_argument("--agentplane-run-ref", default=None, help="agentplane RunArtifact URN to embed in SyncCycleReceipts (optional)") add_compact(sync_daemon) sync_check_health = sync_sub.add_parser("check-health", help="run health checks and exit 0 if healthy, 2 if not") @@ -341,6 +343,7 @@ def main(argv: list[str] | None = None) -> int: locus=args.locus, current_version=args.current_version, signing_public_key=getattr(args, "signing_public_key", None), + agentplane_run_ref=getattr(args, "agentplane_run_ref", None), ) plan = syncer.plan(manifest) if args.command == "plan": @@ -386,6 +389,7 @@ def main(argv: list[str] | None = None) -> int: store_root=getattr(args, "store_root", None), verify_ssl=not args.no_verify_ssl, signing_public_key=getattr(args, "signing_public_key", None), + agentplane_run_ref=getattr(args, "agentplane_run_ref", None), ) return daemon.run() diff --git a/src/sourceos_syncd/content_sync.py b/src/sourceos_syncd/content_sync.py index 2b89bde..2402e0f 100644 --- a/src/sourceos_syncd/content_sync.py +++ b/src/sourceos_syncd/content_sync.py @@ -84,11 +84,13 @@ def __init__( locus: str = "local", current_version: str | None = None, signing_public_key: str | None = None, + agentplane_run_ref: str | None = None, ) -> None: self._flake_ref = flake_ref self._locus = locus self._current_version = current_version self._signing_public_key = signing_public_key + self._agentplane_run_ref = agentplane_run_ref def plan(self, manifest: ContentViewManifest) -> ContentSyncPlan: """Return a non-mutating ContentSyncPlan. No I/O performed.""" @@ -260,4 +262,5 @@ def _build_receipt( "durationMs": duration_ms, "issuedAt": now, "auditId": audit_id, + "agentplaneRunRef": self._agentplane_run_ref, } diff --git a/src/sourceos_syncd/daemon.py b/src/sourceos_syncd/daemon.py index 278ca2c..4536d2f 100644 --- a/src/sourceos_syncd/daemon.py +++ b/src/sourceos_syncd/daemon.py @@ -52,6 +52,7 @@ def __init__( store_root: str | None = None, verify_ssl: bool = True, signing_public_key: str | None = None, + agentplane_run_ref: str | None = None, ) -> None: self._client = KatelloContentClient( base_url=katello_url, @@ -65,6 +66,7 @@ def __init__( self._locus = locus self._flake_ref = flake_ref self._signing_public_key = signing_public_key + self._agentplane_run_ref = agentplane_run_ref self._poll_interval_s = poll_interval_s self._store = ReceiptStore(root=store_root or "/var/lib/sourceos-syncd") self._running = True @@ -118,6 +120,7 @@ def _poll_once(self) -> None: locus=self._locus, current_version=current_version, signing_public_key=self._signing_public_key, + agentplane_run_ref=self._agentplane_run_ref, ) plan = syncer.plan(manifest) diff --git a/src/sourceos_syncd/policy.py b/src/sourceos_syncd/policy.py index 01bb649..b6a7b18 100644 --- a/src/sourceos_syncd/policy.py +++ b/src/sourceos_syncd/policy.py @@ -19,7 +19,7 @@ from typing import Any POLICY_DECISION_SCHEMA = "sourceos.policy-decision/v1alpha1" -POLICY_ENGINE = "policy-fabric-local-stub" +POLICY_ENGINE = "policy-fabric-local" DECISION_SCOPE = "policy-only" ACTIONS = { @@ -125,6 +125,57 @@ def validate_decision_boundary(decision: dict[str, Any]) -> None: raise ValueError(f"policy decision must not perform {key}") +class RemotePolicyFabricClient: + """Delegates policy evaluation to a remote PolicyFabric service. + + Falls back gracefully (returns None) on any network error or timeout. + Set SOURCEOS_POLICY_FABRIC_URL to enable; evaluate_policy() checks this env var. + + Remote endpoint: POST {base_url}/v1/evaluate + Request body: JSON matching PolicyRequest fields. + Response: JSON matching PolicyDecision fields. + """ + + def __init__(self, base_url: str, timeout_s: float = 2.0) -> None: + self._base_url = base_url.rstrip("/") + self._timeout_s = timeout_s + + def evaluate(self, request: PolicyRequest) -> PolicyDecision | None: + import json as _json + import urllib.request as _urllib + + body = _json.dumps({ + "action": request.action, + "lane": request.lane, + "subject": request.subject, + "object_id": request.object_id, + "data_class": request.data_class, + "context": request.context, + }).encode("utf-8") + req = _urllib.Request( + f"{self._base_url}/v1/evaluate", + data=body, + headers={"Content-Type": "application/json", "Accept": "application/json"}, + method="POST", + ) + try: + with _urllib.urlopen(req, timeout=self._timeout_s) as resp: + data = _json.loads(resp.read().decode("utf-8")) + return PolicyDecision( + decision_id=str(data["decision_id"]), + action=str(data["action"]), + lane=str(data["lane"]), + status=str(data["status"]), + reason=str(data.get("reason", "remote")), + subject=str(data.get("subject", request.subject)), + object_id=data.get("object_id"), + data_class=str(data.get("data_class", request.data_class)), + engine=self._base_url, + ) + except Exception: # noqa: BLE001 + return None + + def _decision_id(request: PolicyRequest, status: str, reason: str) -> str: payload = { "action": request.action, @@ -140,7 +191,21 @@ def _decision_id(request: PolicyRequest, status: str, reason: str) -> str: def evaluate_policy(request: PolicyRequest) -> PolicyDecision: - """Evaluate a local policy request with conservative SourceOS defaults.""" + """Evaluate a policy request. Delegates to remote PolicyFabric when + SOURCEOS_POLICY_FABRIC_URL is set; falls back to local eval on failure.""" + import os as _os + import sys as _sys + + remote_url = _os.environ.get("SOURCEOS_POLICY_FABRIC_URL") + if remote_url: + result = RemotePolicyFabricClient(remote_url).evaluate(request) + if result is not None: + return result + print( + f"[policy] remote PolicyFabric at {remote_url} unreachable; using local eval", + file=_sys.stderr, + ) + if request.action not in ACTIONS: status = "deferred" reason = "unknown_action" diff --git a/src/sourceos_syncd/store_reports.py b/src/sourceos_syncd/store_reports.py index 79c5b29..7c124ed 100644 --- a/src/sourceos_syncd/store_reports.py +++ b/src/sourceos_syncd/store_reports.py @@ -7,7 +7,7 @@ from typing import Any from .local_store import LocalStateStore -from .policy import decision_counts, evaluate_report_policy, policy_summary +from .policy import POLICY_ENGINE, decision_counts, evaluate_report_policy, policy_summary from .reports import controls_for, diagnose, snapshot, verify @@ -27,8 +27,8 @@ def snapshot_from_store(root: str | Path) -> dict[str, Any]: report["identity"]["store_root"] = str(Path(root).expanduser().resolve()) decisions = evaluate_report_policy(report["lanes"], subject="sourceos-syncd") - report["policy"]["policy_engine"] = "policy-fabric-local-stub" - report["policy"]["policy_version"] = "v0.1.0-local-stub" + report["policy"]["policy_engine"] = POLICY_ENGINE + report["policy"]["policy_version"] = "v0.1.0-local" report["policy"]["policy_decisions"] = decision_counts(decisions) report["diagnosis"] = diagnose(report) diff --git a/tests/test_agentplane_run_ref.py b/tests/test_agentplane_run_ref.py new file mode 100644 index 0000000..532e366 --- /dev/null +++ b/tests/test_agentplane_run_ref.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from sourceos_syncd.cli import build_parser +from sourceos_syncd.content_sync import ContentViewSyncer, ContentSyncPlan, SYNC_SCHEMA + + +def _make_plan(**kwargs) -> ContentSyncPlan: + defaults = dict( + schema=SYNC_SCHEMA, + org="SocioProphet", + content_view="sourceos-builder-aarch64", + from_version="1.0", + to_version="1.1", + lifecycle_env="stable", + nix_cache_url="http://127.0.0.1:8101", + flake_ref="github:SociOS-Linux/source-os#builder-aarch64", + policy_gate="allowed", + policy_reason="test", + steps=["nix copy", "nixos-rebuild"], + ) + defaults.update(kwargs) + return ContentSyncPlan(**defaults) + + +def test_receipt_includes_agentplane_run_ref_when_set() -> None: + syncer = ContentViewSyncer(agentplane_run_ref="urn:agentplane:run:abc123") + plan = _make_plan() + receipt = syncer._build_receipt( + cycle_id="cycle-1", + plan=plan, + outcome="dry_run", + steps=[], + duration_ms=0, + ) + assert receipt["agentplaneRunRef"] == "urn:agentplane:run:abc123" + + +def test_receipt_agentplane_run_ref_is_none_by_default() -> None: + syncer = ContentViewSyncer() + plan = _make_plan() + receipt = syncer._build_receipt( + cycle_id="cycle-2", + plan=plan, + outcome="dry_run", + steps=[], + duration_ms=0, + ) + assert receipt["agentplaneRunRef"] is None + + +def test_cli_sync_apply_accepts_agentplane_run_ref_flag() -> None: + parser = build_parser() + args = parser.parse_args([ + "sync", "apply", + "--katello-password", "pw", + "--agentplane-run-ref", "urn:agentplane:run:xyz", + "--compact", + ]) + assert args.agentplane_run_ref == "urn:agentplane:run:xyz" + + +def test_cli_sync_apply_agentplane_run_ref_defaults_none() -> None: + parser = build_parser() + args = parser.parse_args(["sync", "apply", "--katello-password", "pw"]) + assert args.agentplane_run_ref is None + + +def test_cli_sync_daemon_accepts_agentplane_run_ref_flag() -> None: + parser = build_parser() + args = parser.parse_args([ + "sync", "daemon", + "--katello-password", "pw", + "--agentplane-run-ref", "urn:agentplane:run:daemon-ref", + ]) + assert args.agentplane_run_ref == "urn:agentplane:run:daemon-ref" diff --git a/tests/test_policy_hook.py b/tests/test_policy_hook.py index fc0fb79..b9b5860 100644 --- a/tests/test_policy_hook.py +++ b/tests/test_policy_hook.py @@ -65,9 +65,9 @@ def test_report_policy_counts_include_all_statuses(): def test_store_backed_snapshot_includes_policy_summary(tmp_path): init_store(tmp_path) report = snapshot_from_store(tmp_path) - assert report["policy"]["policy_engine"] == "policy-fabric-local-stub" - assert report["policy"]["policy_version"] == "v0.1.0-local-stub" - assert report["diagnosis"]["policy"]["engine"] == "policy-fabric-local-stub" + assert report["policy"]["policy_engine"] == "policy-fabric-local" + assert report["policy"]["policy_version"] == "v0.1.0-local" + assert report["diagnosis"]["policy"]["engine"] == "policy-fabric-local" assert report["diagnosis"]["policy"]["counts"]["allowed"] > 0 sample = report["diagnosis"]["policy"]["sample"] assert sample diff --git a/tests/test_policy_remote.py b/tests/test_policy_remote.py new file mode 100644 index 0000000..e077923 --- /dev/null +++ b/tests/test_policy_remote.py @@ -0,0 +1,68 @@ +from __future__ import annotations + +import json +from unittest.mock import MagicMock, patch + +from sourceos_syncd.policy import POLICY_ENGINE, PolicyRequest, evaluate_policy + + +def _req(**kwargs) -> PolicyRequest: + return PolicyRequest(action="read", lane="normal", **kwargs) + + +def test_local_eval_used_when_no_env_var() -> None: + with patch("urllib.request.urlopen") as mock_url: + result = evaluate_policy(_req()) + mock_url.assert_not_called() + assert result.status in ("allowed", "denied", "deferred", "redacted") + assert result.engine == POLICY_ENGINE + + +def test_local_eval_fallback_on_unreachable_remote(monkeypatch) -> None: + monkeypatch.setenv("SOURCEOS_POLICY_FABRIC_URL", "http://localhost:19999") + with patch("urllib.request.urlopen", side_effect=ConnectionRefusedError("refused")): + result = evaluate_policy(_req()) + assert result.status in ("allowed", "denied", "deferred", "redacted") + assert result.engine == POLICY_ENGINE # fell back to local + + +def test_remote_decision_used_when_available(monkeypatch) -> None: + monkeypatch.setenv("SOURCEOS_POLICY_FABRIC_URL", "http://policy.local") + remote_payload = { + "decision_id": "policy-remote-abc", + "action": "read", + "lane": "normal", + "status": "allowed", + "reason": "remote-allows", + "subject": "sourceos-syncd", + "object_id": None, + "data_class": "internal", + } + mock_resp = MagicMock() + mock_resp.read.return_value = json.dumps(remote_payload).encode() + mock_resp.__enter__ = lambda s: s + mock_resp.__exit__ = MagicMock(return_value=False) + with patch("urllib.request.urlopen", return_value=mock_resp): + result = evaluate_policy(_req()) + assert result.decision_id == "policy-remote-abc" + assert result.reason == "remote-allows" + assert result.engine == "http://policy.local" + + +def test_remote_timeout_falls_back_to_local(monkeypatch) -> None: + monkeypatch.setenv("SOURCEOS_POLICY_FABRIC_URL", "http://policy.local") + import socket + with patch("urllib.request.urlopen", side_effect=TimeoutError("timeout")): + result = evaluate_policy(_req()) + assert result.engine == POLICY_ENGINE # local fallback + + +def test_policy_engine_constant_has_no_stub_suffix() -> None: + assert "stub" not in POLICY_ENGINE + + +def test_local_eval_still_correct_for_secure_lane(monkeypatch) -> None: + monkeypatch.delenv("SOURCEOS_POLICY_FABRIC_URL", raising=False) + result = evaluate_policy(PolicyRequest(action="agent_access", lane="secure")) + assert result.status == "denied" + assert result.engine == POLICY_ENGINE