Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions src/sourceos_syncd/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,13 +175,15 @@ def add_katello_args(p: argparse.ArgumentParser) -> None:
add_katello_args(sync_apply)
sync_apply.add_argument("--execute", action="store_true", help="actually run nix copy + nixos-rebuild (default: dry-run)")
sync_apply.add_argument("--store-root", default=None, help="persist receipt to this store root")
sync_apply.add_argument("--agentplane-run-ref", default=None, help="agentplane RunArtifact URN that triggered this sync cycle (optional)")
add_compact(sync_apply)

sync_daemon = sync_sub.add_parser("daemon", help="run the sync daemon (polls Katello; applies on new version)")
add_katello_args(sync_daemon)
sync_daemon.add_argument("--poll-interval", type=int, default=300, help="Katello poll interval in seconds (default: 300)")
sync_daemon.add_argument("--store-root", default=None, help="state + receipt store root (default: /var/lib/sourceos-syncd)")
sync_daemon.add_argument("--from-env", action="store_true", help="read all config from environment variables (ignores CLI flags)")
sync_daemon.add_argument("--agentplane-run-ref", default=None, help="agentplane RunArtifact URN to embed in SyncCycleReceipts (optional)")
add_compact(sync_daemon)

sync_check_health = sync_sub.add_parser("check-health", help="run health checks and exit 0 if healthy, 2 if not")
Expand Down Expand Up @@ -341,6 +343,7 @@ def main(argv: list[str] | None = None) -> int:
locus=args.locus,
current_version=args.current_version,
signing_public_key=getattr(args, "signing_public_key", None),
agentplane_run_ref=getattr(args, "agentplane_run_ref", None),
)
plan = syncer.plan(manifest)
if args.command == "plan":
Expand Down Expand Up @@ -386,6 +389,7 @@ def main(argv: list[str] | None = None) -> int:
store_root=getattr(args, "store_root", None),
verify_ssl=not args.no_verify_ssl,
signing_public_key=getattr(args, "signing_public_key", None),
agentplane_run_ref=getattr(args, "agentplane_run_ref", None),
)
return daemon.run()

Expand Down
3 changes: 3 additions & 0 deletions src/sourceos_syncd/content_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,13 @@ def __init__(
locus: str = "local",
current_version: str | None = None,
signing_public_key: str | None = None,
agentplane_run_ref: str | None = None,
) -> None:
self._flake_ref = flake_ref
self._locus = locus
self._current_version = current_version
self._signing_public_key = signing_public_key
self._agentplane_run_ref = agentplane_run_ref

def plan(self, manifest: ContentViewManifest) -> ContentSyncPlan:
"""Return a non-mutating ContentSyncPlan. No I/O performed."""
Expand Down Expand Up @@ -260,4 +262,5 @@ def _build_receipt(
"durationMs": duration_ms,
"issuedAt": now,
"auditId": audit_id,
"agentplaneRunRef": self._agentplane_run_ref,
}
3 changes: 3 additions & 0 deletions src/sourceos_syncd/daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def __init__(
store_root: str | None = None,
verify_ssl: bool = True,
signing_public_key: str | None = None,
agentplane_run_ref: str | None = None,
) -> None:
self._client = KatelloContentClient(
base_url=katello_url,
Expand All @@ -65,6 +66,7 @@ def __init__(
self._locus = locus
self._flake_ref = flake_ref
self._signing_public_key = signing_public_key
self._agentplane_run_ref = agentplane_run_ref
self._poll_interval_s = poll_interval_s
self._store = ReceiptStore(root=store_root or "/var/lib/sourceos-syncd")
self._running = True
Expand Down Expand Up @@ -118,6 +120,7 @@ def _poll_once(self) -> None:
locus=self._locus,
current_version=current_version,
signing_public_key=self._signing_public_key,
agentplane_run_ref=self._agentplane_run_ref,
)
plan = syncer.plan(manifest)

Expand Down
69 changes: 67 additions & 2 deletions src/sourceos_syncd/policy.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from typing import Any

POLICY_DECISION_SCHEMA = "sourceos.policy-decision/v1alpha1"
POLICY_ENGINE = "policy-fabric-local-stub"
POLICY_ENGINE = "policy-fabric-local"
DECISION_SCOPE = "policy-only"

ACTIONS = {
Expand Down Expand Up @@ -125,6 +125,57 @@ def validate_decision_boundary(decision: dict[str, Any]) -> None:
raise ValueError(f"policy decision must not perform {key}")


class RemotePolicyFabricClient:
"""Delegates policy evaluation to a remote PolicyFabric service.

Falls back gracefully (returns None) on any network error or timeout.
Set SOURCEOS_POLICY_FABRIC_URL to enable; evaluate_policy() checks this env var.

Remote endpoint: POST {base_url}/v1/evaluate
Request body: JSON matching PolicyRequest fields.
Response: JSON matching PolicyDecision fields.
"""

def __init__(self, base_url: str, timeout_s: float = 2.0) -> None:
self._base_url = base_url.rstrip("/")
self._timeout_s = timeout_s

def evaluate(self, request: PolicyRequest) -> PolicyDecision | None:
import json as _json
import urllib.request as _urllib

body = _json.dumps({
"action": request.action,
"lane": request.lane,
"subject": request.subject,
"object_id": request.object_id,
"data_class": request.data_class,
"context": request.context,
}).encode("utf-8")
req = _urllib.Request(
f"{self._base_url}/v1/evaluate",
data=body,
headers={"Content-Type": "application/json", "Accept": "application/json"},
method="POST",
)
try:
with _urllib.urlopen(req, timeout=self._timeout_s) as resp:
data = _json.loads(resp.read().decode("utf-8"))
return PolicyDecision(
decision_id=str(data["decision_id"]),
action=str(data["action"]),
lane=str(data["lane"]),
status=str(data["status"]),
reason=str(data.get("reason", "remote")),
subject=str(data.get("subject", request.subject)),
object_id=data.get("object_id"),
data_class=str(data.get("data_class", request.data_class)),
engine=self._base_url,
)
except Exception: # noqa: BLE001
return None


def _decision_id(request: PolicyRequest, status: str, reason: str) -> str:
payload = {
"action": request.action,
Expand All @@ -140,7 +191,21 @@ def _decision_id(request: PolicyRequest, status: str, reason: str) -> str:


def evaluate_policy(request: PolicyRequest) -> PolicyDecision:
"""Evaluate a local policy request with conservative SourceOS defaults."""
"""Evaluate a policy request. Delegates to remote PolicyFabric when
SOURCEOS_POLICY_FABRIC_URL is set; falls back to local eval on failure."""
import os as _os
import sys as _sys

remote_url = _os.environ.get("SOURCEOS_POLICY_FABRIC_URL")
if remote_url:
result = RemotePolicyFabricClient(remote_url).evaluate(request)
if result is not None:
return result
print(
f"[policy] remote PolicyFabric at {remote_url} unreachable; using local eval",
file=_sys.stderr,
)

if request.action not in ACTIONS:
status = "deferred"
reason = "unknown_action"
Expand Down
6 changes: 3 additions & 3 deletions src/sourceos_syncd/store_reports.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from typing import Any

from .local_store import LocalStateStore
from .policy import decision_counts, evaluate_report_policy, policy_summary
from .policy import POLICY_ENGINE, decision_counts, evaluate_report_policy, policy_summary
from .reports import controls_for, diagnose, snapshot, verify


Expand All @@ -27,8 +27,8 @@ def snapshot_from_store(root: str | Path) -> dict[str, Any]:
report["identity"]["store_root"] = str(Path(root).expanduser().resolve())

decisions = evaluate_report_policy(report["lanes"], subject="sourceos-syncd")
report["policy"]["policy_engine"] = "policy-fabric-local-stub"
report["policy"]["policy_version"] = "v0.1.0-local-stub"
report["policy"]["policy_engine"] = POLICY_ENGINE
report["policy"]["policy_version"] = "v0.1.0-local"
report["policy"]["policy_decisions"] = decision_counts(decisions)

report["diagnosis"] = diagnose(report)
Expand Down
75 changes: 75 additions & 0 deletions tests/test_agentplane_run_ref.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from __future__ import annotations

from sourceos_syncd.cli import build_parser
from sourceos_syncd.content_sync import ContentViewSyncer, ContentSyncPlan, SYNC_SCHEMA


def _make_plan(**kwargs) -> ContentSyncPlan:
defaults = dict(
schema=SYNC_SCHEMA,
org="SocioProphet",
content_view="sourceos-builder-aarch64",
from_version="1.0",
to_version="1.1",
lifecycle_env="stable",
nix_cache_url="http://127.0.0.1:8101",
flake_ref="github:SociOS-Linux/source-os#builder-aarch64",
policy_gate="allowed",
policy_reason="test",
steps=["nix copy", "nixos-rebuild"],
)
defaults.update(kwargs)
return ContentSyncPlan(**defaults)


def test_receipt_includes_agentplane_run_ref_when_set() -> None:
syncer = ContentViewSyncer(agentplane_run_ref="urn:agentplane:run:abc123")
plan = _make_plan()
receipt = syncer._build_receipt(
cycle_id="cycle-1",
plan=plan,
outcome="dry_run",
steps=[],
duration_ms=0,
)
assert receipt["agentplaneRunRef"] == "urn:agentplane:run:abc123"


def test_receipt_agentplane_run_ref_is_none_by_default() -> None:
syncer = ContentViewSyncer()
plan = _make_plan()
receipt = syncer._build_receipt(
cycle_id="cycle-2",
plan=plan,
outcome="dry_run",
steps=[],
duration_ms=0,
)
assert receipt["agentplaneRunRef"] is None


def test_cli_sync_apply_accepts_agentplane_run_ref_flag() -> None:
parser = build_parser()
args = parser.parse_args([
"sync", "apply",
"--katello-password", "pw",
"--agentplane-run-ref", "urn:agentplane:run:xyz",
"--compact",
])
assert args.agentplane_run_ref == "urn:agentplane:run:xyz"


def test_cli_sync_apply_agentplane_run_ref_defaults_none() -> None:
parser = build_parser()
args = parser.parse_args(["sync", "apply", "--katello-password", "pw"])
assert args.agentplane_run_ref is None


def test_cli_sync_daemon_accepts_agentplane_run_ref_flag() -> None:
parser = build_parser()
args = parser.parse_args([
"sync", "daemon",
"--katello-password", "pw",
"--agentplane-run-ref", "urn:agentplane:run:daemon-ref",
])
assert args.agentplane_run_ref == "urn:agentplane:run:daemon-ref"
6 changes: 3 additions & 3 deletions tests/test_policy_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,9 @@ def test_report_policy_counts_include_all_statuses():
def test_store_backed_snapshot_includes_policy_summary(tmp_path):
init_store(tmp_path)
report = snapshot_from_store(tmp_path)
assert report["policy"]["policy_engine"] == "policy-fabric-local-stub"
assert report["policy"]["policy_version"] == "v0.1.0-local-stub"
assert report["diagnosis"]["policy"]["engine"] == "policy-fabric-local-stub"
assert report["policy"]["policy_engine"] == "policy-fabric-local"
assert report["policy"]["policy_version"] == "v0.1.0-local"
assert report["diagnosis"]["policy"]["engine"] == "policy-fabric-local"
assert report["diagnosis"]["policy"]["counts"]["allowed"] > 0
sample = report["diagnosis"]["policy"]["sample"]
assert sample
Expand Down
68 changes: 68 additions & 0 deletions tests/test_policy_remote.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
from __future__ import annotations

import json
from unittest.mock import MagicMock, patch

from sourceos_syncd.policy import POLICY_ENGINE, PolicyRequest, evaluate_policy


def _req(**kwargs) -> PolicyRequest:
return PolicyRequest(action="read", lane="normal", **kwargs)


def test_local_eval_used_when_no_env_var() -> None:
with patch("urllib.request.urlopen") as mock_url:
result = evaluate_policy(_req())
mock_url.assert_not_called()
assert result.status in ("allowed", "denied", "deferred", "redacted")
assert result.engine == POLICY_ENGINE


def test_local_eval_fallback_on_unreachable_remote(monkeypatch) -> None:
monkeypatch.setenv("SOURCEOS_POLICY_FABRIC_URL", "http://localhost:19999")
with patch("urllib.request.urlopen", side_effect=ConnectionRefusedError("refused")):
result = evaluate_policy(_req())
assert result.status in ("allowed", "denied", "deferred", "redacted")
assert result.engine == POLICY_ENGINE # fell back to local


def test_remote_decision_used_when_available(monkeypatch) -> None:
monkeypatch.setenv("SOURCEOS_POLICY_FABRIC_URL", "http://policy.local")
remote_payload = {
"decision_id": "policy-remote-abc",
"action": "read",
"lane": "normal",
"status": "allowed",
"reason": "remote-allows",
"subject": "sourceos-syncd",
"object_id": None,
"data_class": "internal",
}
mock_resp = MagicMock()
mock_resp.read.return_value = json.dumps(remote_payload).encode()
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp):
result = evaluate_policy(_req())
assert result.decision_id == "policy-remote-abc"
assert result.reason == "remote-allows"
assert result.engine == "http://policy.local"


def test_remote_timeout_falls_back_to_local(monkeypatch) -> None:
monkeypatch.setenv("SOURCEOS_POLICY_FABRIC_URL", "http://policy.local")
import socket
with patch("urllib.request.urlopen", side_effect=TimeoutError("timeout")):
result = evaluate_policy(_req())
assert result.engine == POLICY_ENGINE # local fallback


def test_policy_engine_constant_has_no_stub_suffix() -> None:
assert "stub" not in POLICY_ENGINE


def test_local_eval_still_correct_for_secure_lane(monkeypatch) -> None:
monkeypatch.delenv("SOURCEOS_POLICY_FABRIC_URL", raising=False)
result = evaluate_policy(PolicyRequest(action="agent_access", lane="secure"))
assert result.status == "denied"
assert result.engine == POLICY_ENGINE
Loading