diff --git a/src/sourceos_syncd/cli.py b/src/sourceos_syncd/cli.py index cbb8ebd..a9f65c5 100644 --- a/src/sourceos_syncd/cli.py +++ b/src/sourceos_syncd/cli.py @@ -5,6 +5,7 @@ import argparse import json import sys +from typing import Any from .evidence import load_json_file, make_evidence, validate_evidence, write_evidence_file from .orchestration_events import ( @@ -18,7 +19,9 @@ from .scorecard import evaluate_scorecard, validate_scorecard from .store_reports import append_store_event, init_store, snapshot_from_store from .content_sync import ContentViewSyncer +from .daemon import SyncDaemon, daemon_from_env from .katello_client import KatelloContentClient +from .receipt_store import ReceiptStore from .trust import TrustRequest, evaluate_trust, validate_trust_decision @@ -170,8 +173,32 @@ def add_katello_args(p: argparse.ArgumentParser) -> None: sync_apply = sync_sub.add_parser("apply", help="apply a ContentSyncPlan (dry-run unless --execute)") add_katello_args(sync_apply) sync_apply.add_argument("--execute", action="store_true", help="actually run nix copy + nixos-rebuild (default: dry-run)") + sync_apply.add_argument("--store-root", default=None, help="persist receipt to this store root") add_compact(sync_apply) + sync_daemon = sync_sub.add_parser("daemon", help="run the sync daemon (polls Katello; applies on new version)") + add_katello_args(sync_daemon) + sync_daemon.add_argument("--poll-interval", type=int, default=300, help="Katello poll interval in seconds (default: 300)") + sync_daemon.add_argument("--store-root", default=None, help="state + receipt store root (default: /var/lib/sourceos-syncd)") + sync_daemon.add_argument("--from-env", action="store_true", help="read all config from environment variables (ignores CLI flags)") + add_compact(sync_daemon) + + sync_check_health = sync_sub.add_parser("check-health", help="run health checks and exit 0 if healthy, 2 if not") + sync_check_health.add_argument("--store-root", default=None, help="store root to inspect") + sync_check_health.add_argument("--katello-url", default="https://127.0.0.1:8443", help="Foreman+Katello base URL to probe") + sync_check_health.add_argument("--no-verify-ssl", action="store_true", help="skip TLS verification") + add_compact(sync_check_health) + + receipts = subcommands.add_parser("receipts", help="inspect persisted SyncCycleReceipts") + receipts_sub = receipts.add_subparsers(dest="command", required=True) + receipts_list = receipts_sub.add_parser("list", help="list recent receipts") + receipts_list.add_argument("--store-root", default=None, help="store root") + receipts_list.add_argument("--limit", type=int, default=10, help="number of receipts to show") + add_compact(receipts_list) + receipts_last = receipts_sub.add_parser("last", help="show the most recent receipt") + receipts_last.add_argument("--store-root", default=None, help="store root") + add_compact(receipts_last) + return parser @@ -312,8 +339,105 @@ def main(argv: list[str] | None = None) -> int: sys.stdout.write(pretty_json(plan.to_dict(), pretty=pretty)) return 0 if plan.policy_gate in ("allowed", "no-op") else 2 result = syncer.execute(plan, dry_run=not args.execute) + store_root = getattr(args, "store_root", None) + if store_root and "receipt" in result: + store = ReceiptStore(root=store_root) + store.write_receipt(result["receipt"]) + if result.get("status") == "applied": + store.write_current_version(manifest.version) sys.stdout.write(pretty_json(result, pretty=pretty)) - return 0 if result["status"] in ("dry_run", "executed") else 2 + return 0 if result["status"] in ("dry_run", "applied") else 2 + + if args.area == "sync" and args.command == "daemon": + import logging as _logging + import os + _logging.basicConfig( + level=_logging.INFO, + format="%(asctime)s %(levelname)s %(name)s %(message)s", + stream=sys.stderr, + ) + if getattr(args, "from_env", False): + daemon = daemon_from_env() + else: + password = args.katello_password or os.environ.get("KATELLO_PASSWORD", "") + if not password: + sys.stderr.write(pretty_json({"error": "missing password", "message": "pass --katello-password or set KATELLO_PASSWORD"}, pretty=pretty)) + return 1 + daemon = SyncDaemon( + katello_url=args.katello_url, + katello_user=args.katello_user, + katello_password=password, + org=args.org, + content_view=args.content_view, + lifecycle_env=args.lifecycle_env, + locus=args.locus, + flake_ref=args.flake_ref, + poll_interval_s=args.poll_interval, + store_root=getattr(args, "store_root", None), + verify_ssl=not args.no_verify_ssl, + ) + return daemon.run() + + if args.area == "sync" and args.command == "check-health": + import os + import shutil + import urllib.request + store_root = getattr(args, "store_root", None) or "/var/lib/sourceos-syncd" + store = ReceiptStore(root=store_root) + checks: dict[str, Any] = {} + + # last receipt outcome + last = store.last_receipt() + if last: + checks["last_receipt_outcome"] = last.get("outcome", "unknown") + checks["last_receipt_ok"] = last.get("outcome") in ("applied", "dry_run", "planned") + else: + checks["last_receipt_outcome"] = "none" + checks["last_receipt_ok"] = True # no sync yet is not a failure + + # nix + nixos-rebuild available + checks["nix_available"] = shutil.which("nix") is not None + checks["nixos_rebuild_available"] = shutil.which("nixos-rebuild") is not None + + # Katello reachable (non-fatal: daemon tolerates network blips) + katello_url = getattr(args, "katello_url", "https://127.0.0.1:8443") + no_verify = getattr(args, "no_verify_ssl", False) + try: + import ssl + ctx = ssl.create_default_context() + if no_verify: + ctx.check_hostname = False + ctx.verify_mode = ssl.CERT_NONE + req = urllib.request.urlopen( + f"{katello_url}/api/v2/status", context=ctx, timeout=5 + ) + checks["katello_reachable"] = req.status < 500 + except Exception as exc: + checks["katello_reachable"] = False + checks["katello_error"] = str(exc) + + healthy = ( + checks.get("last_receipt_ok", True) + and checks.get("katello_reachable", False) + ) + output = {"healthy": healthy, "checks": checks} + sys.stdout.write(pretty_json(output, pretty=pretty)) + return 0 if healthy else 2 + + if args.area == "receipts" and args.command == "list": + store = ReceiptStore(root=getattr(args, "store_root", None) or "/var/lib/sourceos-syncd") + receipts = store.list_receipts(limit=args.limit) + sys.stdout.write(pretty_json(receipts, pretty=pretty)) + return 0 + + if args.area == "receipts" and args.command == "last": + store = ReceiptStore(root=getattr(args, "store_root", None) or "/var/lib/sourceos-syncd") + receipt = store.last_receipt() + if receipt is None: + sys.stderr.write(pretty_json({"error": "no receipts found"}, pretty=pretty)) + return 1 + sys.stdout.write(pretty_json(receipt, pretty=pretty)) + return 0 except Exception as exc: # noqa: BLE001 - CLI boundary should present clean error JSON. sys.stderr.write(pretty_json({"error": type(exc).__name__, "message": str(exc)}, pretty=pretty)) diff --git a/src/sourceos_syncd/daemon.py b/src/sourceos_syncd/daemon.py new file mode 100644 index 0000000..d88fbf1 --- /dev/null +++ b/src/sourceos_syncd/daemon.py @@ -0,0 +1,180 @@ +"""Daemon mode for sourceos-syncd. + +Polls Katello on a configurable interval. When a new content view version +is available, plans and applies the sync, then persists the SyncCycleReceipt +and updates the current-version state file. + +Failure handling: + - A failed sync (outcome: failed) is logged to stderr and the version file + is NOT updated, so the next poll will retry. + - A denied or skipped sync is logged but not retried (policy won't change + without operator intervention). + - Network/API errors are caught; the daemon backs off exponentially up to + MAX_BACKOFF_S and retries indefinitely. + +The daemon is intentionally simple: no threading, no async, one poll loop. +systemd handles restart-on-crash via Restart=on-failure. +""" + +from __future__ import annotations + +import json +import logging +import os +import signal +import sys +import time +from typing import Any + +from .content_sync import ContentViewSyncer +from .katello_client import KatelloContentClient +from .receipt_store import ReceiptStore + +log = logging.getLogger("sourceos-syncd.daemon") + +DEFAULT_POLL_INTERVAL_S = 300 +MIN_BACKOFF_S = 30 +MAX_BACKOFF_S = 1800 + + +class SyncDaemon: + def __init__( + self, + katello_url: str, + katello_user: str, + katello_password: str, + org: str, + content_view: str, + lifecycle_env: str, + locus: str, + flake_ref: str, + poll_interval_s: int = DEFAULT_POLL_INTERVAL_S, + store_root: str | None = None, + verify_ssl: bool = True, + ) -> None: + self._client = KatelloContentClient( + base_url=katello_url, + username=katello_user, + password=katello_password, + org=org, + verify_ssl=verify_ssl, + ) + self._content_view = content_view + self._lifecycle_env = lifecycle_env + self._locus = locus + self._flake_ref = flake_ref + self._poll_interval_s = poll_interval_s + self._store = ReceiptStore(root=store_root or "/var/lib/sourceos-syncd") + self._running = True + self._backoff_s = 0 + + signal.signal(signal.SIGTERM, self._handle_signal) + signal.signal(signal.SIGINT, self._handle_signal) + + def _handle_signal(self, signum: int, _frame: Any) -> None: + log.info("received signal %d, shutting down", signum) + self._running = False + + def run(self) -> int: + self._store.ensure_dirs() + log.info( + "daemon starting — org=%s cv=%s env=%s locus=%s poll=%ds", + self._client._org, + self._content_view, + self._lifecycle_env, + self._locus, + self._poll_interval_s, + ) + + while self._running: + try: + self._poll_once() + self._backoff_s = 0 + except Exception as exc: # noqa: BLE001 + self._backoff_s = min( + MAX_BACKOFF_S, + self._backoff_s * 2 if self._backoff_s else MIN_BACKOFF_S, + ) + log.error("poll error (%s: %s); backing off %ds", type(exc).__name__, exc, self._backoff_s) + + if not self._running: + break + + sleep_s = self._backoff_s or self._poll_interval_s + log.debug("sleeping %ds", sleep_s) + self._interruptible_sleep(sleep_s) + + log.info("daemon stopped") + return 0 + + def _poll_once(self) -> None: + current_version = self._store.read_current_version() + manifest = self._client.get_latest_version(self._content_view, self._lifecycle_env) + + syncer = ContentViewSyncer( + flake_ref=self._flake_ref, + locus=self._locus, + current_version=current_version, + ) + plan = syncer.plan(manifest) + + if plan.policy_gate == "no-op": + log.debug("already at %s — no sync needed", manifest.version) + return + + log.info( + "syncing %s → %s (locus=%s gate=%s)", + current_version or "none", + manifest.version, + self._locus, + plan.policy_gate, + ) + + result = syncer.execute(plan, dry_run=False) + receipt = result.get("receipt", {}) + + receipt_path = self._store.write_receipt(receipt) + log.info("receipt written: %s (outcome=%s)", receipt_path, receipt.get("outcome")) + + if result.get("status") == "applied": + self._store.write_current_version(manifest.version) + log.info("applied version %s", manifest.version) + elif result.get("status") in ("denied", "skipped"): + log.warning( + "sync %s: %s — will not retry until policy changes", + result["status"], + receipt.get("policyReason", ""), + ) + elif result.get("status") == "failed": + log.error("sync failed — will retry next poll; receipt: %s", receipt_path) + raise RuntimeError(f"sync failed for version {manifest.version}") + + def _interruptible_sleep(self, seconds: int) -> None: + deadline = time.monotonic() + seconds + while self._running and time.monotonic() < deadline: + time.sleep(min(5, deadline - time.monotonic())) + + +def daemon_from_env() -> SyncDaemon: + """Construct a SyncDaemon entirely from environment variables.""" + def require(name: str) -> str: + val = os.environ.get(name, "") + if not val: + raise RuntimeError(f"required env var {name} is not set") + return val + + return SyncDaemon( + katello_url=os.environ.get("KATELLO_URL", "https://127.0.0.1:8443"), + katello_user=os.environ.get("KATELLO_USER", "admin"), + katello_password=require("KATELLO_PASSWORD"), + org=os.environ.get("KATELLO_ORG", "SocioProphet"), + content_view=os.environ.get("KATELLO_CONTENT_VIEW", "sourceos-builder-aarch64"), + lifecycle_env=os.environ.get("KATELLO_LIFECYCLE_ENV", "stable"), + locus=os.environ.get("SOURCEOS_LOCUS", "local"), + flake_ref=os.environ.get( + "SOURCEOS_FLAKE_REF", "github:SociOS-Linux/source-os#builder-aarch64" + ), + poll_interval_s=int(os.environ.get("SOURCEOS_POLL_INTERVAL", str(DEFAULT_POLL_INTERVAL_S))), + store_root=os.environ.get("SOURCEOS_STORE_ROOT", "/var/lib/sourceos-syncd"), + verify_ssl=os.environ.get("SOURCEOS_NO_VERIFY_SSL", "").lower() not in ("1", "true", "yes"), + ) diff --git a/src/sourceos_syncd/receipt_store.py b/src/sourceos_syncd/receipt_store.py new file mode 100644 index 0000000..da5ee96 --- /dev/null +++ b/src/sourceos_syncd/receipt_store.py @@ -0,0 +1,75 @@ +"""Persistent local store for SyncCycleReceipts. + +Receipts are written as individual JSON files under a configurable directory. +The store also tracks the last successfully applied content view version so +the daemon can detect when a new version is available without re-querying +the state from an external source. + +Layout: + {store_root}/ + current-version — plain-text file: current applied CV version + receipts/ + {issuedAt}-{id}.json — one file per SyncCycleReceipt +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + +DEFAULT_STORE_ROOT = "/var/lib/sourceos-syncd" + + +class ReceiptStore: + def __init__(self, root: str = DEFAULT_STORE_ROOT) -> None: + self._root = Path(root) + self._receipts_dir = self._root / "receipts" + + def ensure_dirs(self) -> None: + self._root.mkdir(parents=True, exist_ok=True) + self._receipts_dir.mkdir(parents=True, exist_ok=True) + + # ── version tracking ──────────────────────────────────────────────────── + + @property + def _version_file(self) -> Path: + return self._root / "current-version" + + def read_current_version(self) -> str | None: + if not self._version_file.exists(): + return None + text = self._version_file.read_text(encoding="utf-8").strip() + return text or None + + def write_current_version(self, version: str) -> None: + self.ensure_dirs() + self._version_file.write_text(version + "\n", encoding="utf-8") + + # ── receipt persistence ────────────────────────────────────────────────── + + def write_receipt(self, receipt: dict[str, Any]) -> Path: + self.ensure_dirs() + issued = receipt.get("issuedAt", "unknown").replace(":", "-").replace("T", "_") + rid = receipt.get("id", "").split(":")[-1][:16] + filename = f"{issued}-{rid}.json" + path = self._receipts_dir / filename + path.write_text(json.dumps(receipt, indent=2, sort_keys=True) + "\n", encoding="utf-8") + return path + + def list_receipts(self, limit: int = 20) -> list[dict[str, Any]]: + if not self._receipts_dir.exists(): + return [] + files = sorted(self._receipts_dir.glob("*.json"), reverse=True)[:limit] + receipts = [] + for f in files: + try: + receipts.append(json.loads(f.read_text(encoding="utf-8"))) + except (json.JSONDecodeError, OSError): + continue + return receipts + + def last_receipt(self) -> dict[str, Any] | None: + results = self.list_receipts(limit=1) + return results[0] if results else None diff --git a/tests/test_daemon.py b/tests/test_daemon.py new file mode 100644 index 0000000..d685fc1 --- /dev/null +++ b/tests/test_daemon.py @@ -0,0 +1,160 @@ +"""Tests for SyncDaemon and ReceiptStore.""" + +from __future__ import annotations + +import json +import tempfile +from pathlib import Path +from unittest.mock import MagicMock, patch + +import pytest + +from sourceos_syncd.receipt_store import ReceiptStore +from sourceos_syncd.daemon import SyncDaemon +from sourceos_syncd.katello_client import ContentViewManifest + + +# ── ReceiptStore ────────────────────────────────────────────────────────────── + + +def _make_receipt(outcome: str = "applied", version: str = "1.0") -> dict: + return { + "id": f"urn:srcos:sync-receipt:test-{version}", + "type": "SyncCycleReceipt", + "specVersion": "0.1.0", + "cycleId": "test-cycle", + "engineId": "sourceos.sync.katello-content", + "org": "SocioProphet", + "contentView": "sourceos-builder-aarch64", + "fromVersion": None, + "toVersion": version, + "lifecycleEnv": "dev", + "locus": "local", + "outcome": outcome, + "steps": [], + "issuedAt": "2026-06-16T00:00:00Z", + "auditId": "urn:srcos:audit:test", + } + + +def test_receipt_store_write_and_read(): + with tempfile.TemporaryDirectory() as tmpdir: + store = ReceiptStore(root=tmpdir) + receipt = _make_receipt(outcome="applied", version="1.0") + path = store.write_receipt(receipt) + assert path.exists() + last = store.last_receipt() + assert last is not None + assert last["outcome"] == "applied" + + +def test_receipt_store_list_receipts(): + with tempfile.TemporaryDirectory() as tmpdir: + store = ReceiptStore(root=tmpdir) + for v in ["1.0", "1.1", "1.2"]: + receipt = _make_receipt(version=v) + receipt["issuedAt"] = f"2026-06-16T0{v.replace('.', ':')}:00Z" + store.write_receipt(receipt) + receipts = store.list_receipts(limit=10) + assert len(receipts) == 3 + + +def test_receipt_store_empty(): + with tempfile.TemporaryDirectory() as tmpdir: + store = ReceiptStore(root=tmpdir) + assert store.last_receipt() is None + assert store.list_receipts() == [] + + +def test_current_version_roundtrip(): + with tempfile.TemporaryDirectory() as tmpdir: + store = ReceiptStore(root=tmpdir) + assert store.read_current_version() is None + store.write_current_version("2.3") + assert store.read_current_version() == "2.3" + + +# ── SyncDaemon._poll_once ───────────────────────────────────────────────────── + + +def _make_daemon(tmpdir: str, current_version: str | None = None) -> SyncDaemon: + daemon = SyncDaemon( + katello_url="https://127.0.0.1:8443", + katello_user="admin", + katello_password="test", + org="SocioProphet", + content_view="sourceos-builder-aarch64", + lifecycle_env="stable", + locus="local", + flake_ref="github:SociOS-Linux/source-os#builder-aarch64", + poll_interval_s=1, + store_root=tmpdir, + verify_ssl=False, + ) + if current_version: + daemon._store.write_current_version(current_version) + return daemon + + +def _manifest(version: str = "1.0") -> ContentViewManifest: + return ContentViewManifest( + org="SocioProphet", + content_view="sourceos-builder-aarch64", + version=version, + lifecycle_env="stable", + katello_url="https://127.0.0.1:8443", + pulp_content_url="http://127.0.0.1:8101", + nix_cache_url="http://127.0.0.1:8101", + ) + + +def test_poll_once_noop_when_current(tmp_path): + daemon = _make_daemon(str(tmp_path), current_version="1.0") + with patch.object(daemon._client, "get_latest_version", return_value=_manifest("1.0")): + daemon._poll_once() + # no receipt written when no-op + assert daemon._store.last_receipt() is None + + +def test_poll_once_writes_receipt_on_apply(tmp_path): + daemon = _make_daemon(str(tmp_path)) + with patch.object(daemon._client, "get_latest_version", return_value=_manifest("1.0")): + with patch("sourceos_syncd.content_sync.subprocess.run") as mock_run: + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = "" + mock_proc.stderr = "" + mock_run.return_value = mock_proc + import shutil as _shutil + with patch("sourceos_syncd.content_sync.shutil.which", return_value="/usr/bin/nix"): + daemon._poll_once() + + receipt = daemon._store.last_receipt() + assert receipt is not None + assert receipt["toVersion"] == "1.0" + assert daemon._store.read_current_version() == "1.0" + + +def test_poll_once_denied_does_not_update_version(tmp_path): + daemon = _make_daemon(str(tmp_path)) + # change locus to denied + daemon._locus = "burst_cloud" + with patch.object(daemon._client, "get_latest_version", return_value=_manifest("1.0")): + daemon._poll_once() + # receipt written, but version not updated + receipt = daemon._store.last_receipt() + assert receipt is not None + assert receipt["outcome"] == "denied" + assert daemon._store.read_current_version() is None + + +def test_poll_once_raises_on_failed(tmp_path): + daemon = _make_daemon(str(tmp_path)) + fail_result = { + "status": "failed", + "receipt": _make_receipt(outcome="failed"), + } + with patch.object(daemon._client, "get_latest_version", return_value=_manifest("1.0")): + with patch("sourceos_syncd.content_sync.ContentViewSyncer.execute", return_value=fail_result): + with pytest.raises(RuntimeError, match="sync failed"): + daemon._poll_once()