Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 125 additions & 1 deletion src/sourceos_syncd/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import argparse
import json
import sys
from typing import Any

from .evidence import load_json_file, make_evidence, validate_evidence, write_evidence_file
from .orchestration_events import (
Expand All @@ -18,7 +19,9 @@
from .scorecard import evaluate_scorecard, validate_scorecard
from .store_reports import append_store_event, init_store, snapshot_from_store
from .content_sync import ContentViewSyncer
from .daemon import SyncDaemon, daemon_from_env
from .katello_client import KatelloContentClient
from .receipt_store import ReceiptStore
from .trust import TrustRequest, evaluate_trust, validate_trust_decision


Expand Down Expand Up @@ -170,8 +173,32 @@ def add_katello_args(p: argparse.ArgumentParser) -> None:
sync_apply = sync_sub.add_parser("apply", help="apply a ContentSyncPlan (dry-run unless --execute)")
add_katello_args(sync_apply)
sync_apply.add_argument("--execute", action="store_true", help="actually run nix copy + nixos-rebuild (default: dry-run)")
sync_apply.add_argument("--store-root", default=None, help="persist receipt to this store root")
add_compact(sync_apply)

sync_daemon = sync_sub.add_parser("daemon", help="run the sync daemon (polls Katello; applies on new version)")
add_katello_args(sync_daemon)
sync_daemon.add_argument("--poll-interval", type=int, default=300, help="Katello poll interval in seconds (default: 300)")
sync_daemon.add_argument("--store-root", default=None, help="state + receipt store root (default: /var/lib/sourceos-syncd)")
sync_daemon.add_argument("--from-env", action="store_true", help="read all config from environment variables (ignores CLI flags)")
add_compact(sync_daemon)

sync_check_health = sync_sub.add_parser("check-health", help="run health checks and exit 0 if healthy, 2 if not")
sync_check_health.add_argument("--store-root", default=None, help="store root to inspect")
sync_check_health.add_argument("--katello-url", default="https://127.0.0.1:8443", help="Foreman+Katello base URL to probe")
sync_check_health.add_argument("--no-verify-ssl", action="store_true", help="skip TLS verification")
add_compact(sync_check_health)

receipts = subcommands.add_parser("receipts", help="inspect persisted SyncCycleReceipts")
receipts_sub = receipts.add_subparsers(dest="command", required=True)
receipts_list = receipts_sub.add_parser("list", help="list recent receipts")
receipts_list.add_argument("--store-root", default=None, help="store root")
receipts_list.add_argument("--limit", type=int, default=10, help="number of receipts to show")
add_compact(receipts_list)
receipts_last = receipts_sub.add_parser("last", help="show the most recent receipt")
receipts_last.add_argument("--store-root", default=None, help="store root")
add_compact(receipts_last)

return parser


Expand Down Expand Up @@ -312,8 +339,105 @@ def main(argv: list[str] | None = None) -> int:
sys.stdout.write(pretty_json(plan.to_dict(), pretty=pretty))
return 0 if plan.policy_gate in ("allowed", "no-op") else 2
result = syncer.execute(plan, dry_run=not args.execute)
store_root = getattr(args, "store_root", None)
if store_root and "receipt" in result:
store = ReceiptStore(root=store_root)
store.write_receipt(result["receipt"])
if result.get("status") == "applied":
store.write_current_version(manifest.version)
sys.stdout.write(pretty_json(result, pretty=pretty))
return 0 if result["status"] in ("dry_run", "executed") else 2
return 0 if result["status"] in ("dry_run", "applied") else 2

if args.area == "sync" and args.command == "daemon":
import logging as _logging
import os
_logging.basicConfig(
level=_logging.INFO,
format="%(asctime)s %(levelname)s %(name)s %(message)s",
stream=sys.stderr,
)
if getattr(args, "from_env", False):
daemon = daemon_from_env()
else:
password = args.katello_password or os.environ.get("KATELLO_PASSWORD", "")
if not password:
sys.stderr.write(pretty_json({"error": "missing password", "message": "pass --katello-password or set KATELLO_PASSWORD"}, pretty=pretty))
return 1
daemon = SyncDaemon(
katello_url=args.katello_url,
katello_user=args.katello_user,
katello_password=password,
org=args.org,
content_view=args.content_view,
lifecycle_env=args.lifecycle_env,
locus=args.locus,
flake_ref=args.flake_ref,
poll_interval_s=args.poll_interval,
store_root=getattr(args, "store_root", None),
verify_ssl=not args.no_verify_ssl,
)
return daemon.run()

if args.area == "sync" and args.command == "check-health":
import os
import shutil
import urllib.request
store_root = getattr(args, "store_root", None) or "/var/lib/sourceos-syncd"
store = ReceiptStore(root=store_root)
checks: dict[str, Any] = {}

# last receipt outcome
last = store.last_receipt()
if last:
checks["last_receipt_outcome"] = last.get("outcome", "unknown")
checks["last_receipt_ok"] = last.get("outcome") in ("applied", "dry_run", "planned")
else:
checks["last_receipt_outcome"] = "none"
checks["last_receipt_ok"] = True # no sync yet is not a failure

# nix + nixos-rebuild available
checks["nix_available"] = shutil.which("nix") is not None
checks["nixos_rebuild_available"] = shutil.which("nixos-rebuild") is not None

# Katello reachable (non-fatal: daemon tolerates network blips)
katello_url = getattr(args, "katello_url", "https://127.0.0.1:8443")
no_verify = getattr(args, "no_verify_ssl", False)
try:
import ssl
ctx = ssl.create_default_context()
if no_verify:
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
req = urllib.request.urlopen(
f"{katello_url}/api/v2/status", context=ctx, timeout=5
)
checks["katello_reachable"] = req.status < 500
except Exception as exc:
checks["katello_reachable"] = False
checks["katello_error"] = str(exc)

healthy = (
checks.get("last_receipt_ok", True)
and checks.get("katello_reachable", False)
)
output = {"healthy": healthy, "checks": checks}
sys.stdout.write(pretty_json(output, pretty=pretty))
return 0 if healthy else 2

if args.area == "receipts" and args.command == "list":
store = ReceiptStore(root=getattr(args, "store_root", None) or "/var/lib/sourceos-syncd")
receipts = store.list_receipts(limit=args.limit)
sys.stdout.write(pretty_json(receipts, pretty=pretty))
return 0

if args.area == "receipts" and args.command == "last":
store = ReceiptStore(root=getattr(args, "store_root", None) or "/var/lib/sourceos-syncd")
receipt = store.last_receipt()
if receipt is None:
sys.stderr.write(pretty_json({"error": "no receipts found"}, pretty=pretty))
return 1
sys.stdout.write(pretty_json(receipt, pretty=pretty))
return 0

except Exception as exc: # noqa: BLE001 - CLI boundary should present clean error JSON.
sys.stderr.write(pretty_json({"error": type(exc).__name__, "message": str(exc)}, pretty=pretty))
Expand Down
180 changes: 180 additions & 0 deletions src/sourceos_syncd/daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
"""Daemon mode for sourceos-syncd.

Polls Katello on a configurable interval. When a new content view version
is available, plans and applies the sync, then persists the SyncCycleReceipt
and updates the current-version state file.

Failure handling:
- A failed sync (outcome: failed) is logged to stderr and the version file
is NOT updated, so the next poll will retry.
- A denied or skipped sync is logged but not retried (policy won't change
without operator intervention).
- Network/API errors are caught; the daemon backs off exponentially up to
MAX_BACKOFF_S and retries indefinitely.

The daemon is intentionally simple: no threading, no async, one poll loop.
systemd handles restart-on-crash via Restart=on-failure.
"""

from __future__ import annotations

import json
import logging
import os
import signal
import sys
import time
from typing import Any

from .content_sync import ContentViewSyncer
from .katello_client import KatelloContentClient
from .receipt_store import ReceiptStore

log = logging.getLogger("sourceos-syncd.daemon")

DEFAULT_POLL_INTERVAL_S = 300
MIN_BACKOFF_S = 30
MAX_BACKOFF_S = 1800


class SyncDaemon:
def __init__(
self,
katello_url: str,
katello_user: str,
katello_password: str,
org: str,
content_view: str,
lifecycle_env: str,
locus: str,
flake_ref: str,
poll_interval_s: int = DEFAULT_POLL_INTERVAL_S,
store_root: str | None = None,
verify_ssl: bool = True,
) -> None:
self._client = KatelloContentClient(
base_url=katello_url,
username=katello_user,
password=katello_password,
org=org,
verify_ssl=verify_ssl,
)
self._content_view = content_view
self._lifecycle_env = lifecycle_env
self._locus = locus
self._flake_ref = flake_ref
self._poll_interval_s = poll_interval_s
self._store = ReceiptStore(root=store_root or "/var/lib/sourceos-syncd")
self._running = True
self._backoff_s = 0

signal.signal(signal.SIGTERM, self._handle_signal)
signal.signal(signal.SIGINT, self._handle_signal)

def _handle_signal(self, signum: int, _frame: Any) -> None:
log.info("received signal %d, shutting down", signum)
self._running = False

def run(self) -> int:
self._store.ensure_dirs()
log.info(
"daemon starting — org=%s cv=%s env=%s locus=%s poll=%ds",
self._client._org,
self._content_view,
self._lifecycle_env,
self._locus,
self._poll_interval_s,
)

while self._running:
try:
self._poll_once()
self._backoff_s = 0
except Exception as exc: # noqa: BLE001
self._backoff_s = min(
MAX_BACKOFF_S,
self._backoff_s * 2 if self._backoff_s else MIN_BACKOFF_S,
)
log.error("poll error (%s: %s); backing off %ds", type(exc).__name__, exc, self._backoff_s)

if not self._running:
break

sleep_s = self._backoff_s or self._poll_interval_s
log.debug("sleeping %ds", sleep_s)
self._interruptible_sleep(sleep_s)

log.info("daemon stopped")
return 0

def _poll_once(self) -> None:
current_version = self._store.read_current_version()
manifest = self._client.get_latest_version(self._content_view, self._lifecycle_env)

syncer = ContentViewSyncer(
flake_ref=self._flake_ref,
locus=self._locus,
current_version=current_version,
)
plan = syncer.plan(manifest)

if plan.policy_gate == "no-op":
log.debug("already at %s — no sync needed", manifest.version)
return

log.info(
"syncing %s → %s (locus=%s gate=%s)",
current_version or "none",
manifest.version,
self._locus,
plan.policy_gate,
)

result = syncer.execute(plan, dry_run=False)
receipt = result.get("receipt", {})

receipt_path = self._store.write_receipt(receipt)
log.info("receipt written: %s (outcome=%s)", receipt_path, receipt.get("outcome"))

if result.get("status") == "applied":
self._store.write_current_version(manifest.version)
log.info("applied version %s", manifest.version)
elif result.get("status") in ("denied", "skipped"):
log.warning(
"sync %s: %s — will not retry until policy changes",
result["status"],
receipt.get("policyReason", ""),
)
elif result.get("status") == "failed":
log.error("sync failed — will retry next poll; receipt: %s", receipt_path)
raise RuntimeError(f"sync failed for version {manifest.version}")

def _interruptible_sleep(self, seconds: int) -> None:
deadline = time.monotonic() + seconds
while self._running and time.monotonic() < deadline:
time.sleep(min(5, deadline - time.monotonic()))


def daemon_from_env() -> SyncDaemon:
"""Construct a SyncDaemon entirely from environment variables."""
def require(name: str) -> str:
val = os.environ.get(name, "")
if not val:
raise RuntimeError(f"required env var {name} is not set")
return val

return SyncDaemon(
katello_url=os.environ.get("KATELLO_URL", "https://127.0.0.1:8443"),
katello_user=os.environ.get("KATELLO_USER", "admin"),
katello_password=require("KATELLO_PASSWORD"),
org=os.environ.get("KATELLO_ORG", "SocioProphet"),
content_view=os.environ.get("KATELLO_CONTENT_VIEW", "sourceos-builder-aarch64"),
lifecycle_env=os.environ.get("KATELLO_LIFECYCLE_ENV", "stable"),
locus=os.environ.get("SOURCEOS_LOCUS", "local"),
flake_ref=os.environ.get(
"SOURCEOS_FLAKE_REF", "github:SociOS-Linux/source-os#builder-aarch64"
),
poll_interval_s=int(os.environ.get("SOURCEOS_POLL_INTERVAL", str(DEFAULT_POLL_INTERVAL_S))),
store_root=os.environ.get("SOURCEOS_STORE_ROOT", "/var/lib/sourceos-syncd"),
verify_ssl=os.environ.get("SOURCEOS_NO_VERIFY_SSL", "").lower() not in ("1", "true", "yes"),
)
Loading
Loading