From 02518ad416aeaeca46b7e063253fdb155969005a Mon Sep 17 00:00:00 2001
From: Michael Heller <21163552+mdheller@users.noreply.github.com>
Date: Tue, 16 Jun 2026 12:03:55 -0400
Subject: [PATCH] feat(sync): add Katello content-view client and
ContentViewSyncer
katello_client.py: stdlib-only HTTP client (urllib.request) for Katello API;
queries org, content view, lifecycle env, and version endpoints; returns a
ContentViewManifest with the Pulp content URL and nix cache URL derived from
the Foreman HTTPS port
content_sync.py: ContentViewSyncer produces a non-mutating ContentSyncPlan
(policy_gate: allowed/denied/no-op); execute() is the only side-effecting
method and defaults to dry_run=True; enforces locus gate (local/trusted_private
only); skips nix/nixos-rebuild if not in PATH
cli.py: adds `sync plan` and `sync apply` subcommands under the `sync` area;
--katello-url, --org, --content-view, --lifecycle-env, --locus, --flake-ref,
--current-version, --no-verify-ssl, --execute flags
13 tests, all passing
---
src/sourceos_syncd/cli.py | 53 ++++++++
src/sourceos_syncd/content_sync.py | 174 +++++++++++++++++++++++++
src/sourceos_syncd/katello_client.py | 182 +++++++++++++++++++++++++++
tests/test_katello_client.py | 163 ++++++++++++++++++++++++
4 files changed, 572 insertions(+)
create mode 100644 src/sourceos_syncd/content_sync.py
create mode 100644 src/sourceos_syncd/katello_client.py
create mode 100644 tests/test_katello_client.py
diff --git a/src/sourceos_syncd/cli.py b/src/sourceos_syncd/cli.py
index 5b89775..cbb8ebd 100644
--- a/src/sourceos_syncd/cli.py
+++ b/src/sourceos_syncd/cli.py
@@ -17,6 +17,8 @@
from .reports import load_report, pretty_json, repair_plan, snapshot, validate_report, verify, with_fresh_diagnosis
from .scorecard import evaluate_scorecard, validate_scorecard
from .store_reports import append_store_event, init_store, snapshot_from_store
+from .content_sync import ContentViewSyncer
+from .katello_client import KatelloContentClient
from .trust import TrustRequest, evaluate_trust, validate_trust_decision
@@ -146,6 +148,30 @@ def build_parser() -> argparse.ArgumentParser:
score_validate.add_argument("--file", "-f", required=True, help="scorecard JSON file")
add_compact(score_validate)
+ sync = subcommands.add_parser("sync", help="Katello content view sync planning and apply")
+ sync_sub = sync.add_subparsers(dest="command", required=True)
+
+ def add_katello_args(p: argparse.ArgumentParser) -> None:
+ p.add_argument("--katello-url", default="https://127.0.0.1:8443", help="Foreman+Katello base URL")
+ p.add_argument("--katello-user", default="admin", help="Katello admin username")
+ p.add_argument("--katello-password", default=None, help="Katello admin password (or set KATELLO_PASSWORD env)")
+ p.add_argument("--org", default="SocioProphet", help="Katello organization name")
+ p.add_argument("--content-view", default="sourceos-builder-aarch64", help="content view name")
+ p.add_argument("--lifecycle-env", default="dev", help="lifecycle environment (dev/candidate/stable)")
+ p.add_argument("--locus", default="local", help="execution locus (local/trusted_private)")
+ p.add_argument("--flake-ref", default="github:SociOS-Linux/source-os#builder-aarch64", help="NixOS flake ref")
+ p.add_argument("--current-version", default=None, help="current content view version (skip if up to date)")
+ p.add_argument("--no-verify-ssl", action="store_true", help="skip TLS verification (local dev only)")
+
+ sync_plan = sync_sub.add_parser("plan", help="query Katello and emit a ContentSyncPlan (no changes)")
+ add_katello_args(sync_plan)
+ add_compact(sync_plan)
+
+ sync_apply = sync_sub.add_parser("apply", help="apply a ContentSyncPlan (dry-run unless --execute)")
+ add_katello_args(sync_apply)
+ sync_apply.add_argument("--execute", action="store_true", help="actually run nix copy + nixos-rebuild (default: dry-run)")
+ add_compact(sync_apply)
+
return parser
@@ -262,6 +288,33 @@ def main(argv: list[str] | None = None) -> int:
sys.stdout.write(pretty_json({"valid": not errors, "errors": errors}, pretty=pretty))
return 0 if not errors else 2
+ if args.area == "sync" and args.command in ("plan", "apply"):
+ import os
+ password = args.katello_password or os.environ.get("KATELLO_PASSWORD", "")
+ if not password:
+ sys.stderr.write(pretty_json({"error": "missing password", "message": "pass --katello-password or set KATELLO_PASSWORD"}, pretty=pretty))
+ return 1
+ client = KatelloContentClient(
+ base_url=args.katello_url,
+ username=args.katello_user,
+ password=password,
+ org=args.org,
+ verify_ssl=not args.no_verify_ssl,
+ )
+ manifest = client.get_latest_version(args.content_view, args.lifecycle_env)
+ syncer = ContentViewSyncer(
+ flake_ref=args.flake_ref,
+ locus=args.locus,
+ current_version=args.current_version,
+ )
+ plan = syncer.plan(manifest)
+ if args.command == "plan":
+ sys.stdout.write(pretty_json(plan.to_dict(), pretty=pretty))
+ return 0 if plan.policy_gate in ("allowed", "no-op") else 2
+ result = syncer.execute(plan, dry_run=not args.execute)
+ sys.stdout.write(pretty_json(result, pretty=pretty))
+ return 0 if result["status"] in ("dry_run", "executed") else 2
+
except Exception as exc: # noqa: BLE001 - CLI boundary should present clean error JSON.
sys.stderr.write(pretty_json({"error": type(exc).__name__, "message": str(exc)}, pretty=pretty))
return 1
diff --git a/src/sourceos_syncd/content_sync.py b/src/sourceos_syncd/content_sync.py
new file mode 100644
index 0000000..68bbbbf
--- /dev/null
+++ b/src/sourceos_syncd/content_sync.py
@@ -0,0 +1,174 @@
+"""Content view sync planner for sourceos-syncd.
+
+Consumes a ContentViewManifest from KatelloContentClient and produces a
+ContentSyncPlan describing the nix copy and nixos-rebuild steps required to
+apply the new content view version.
+
+Boundary invariant: plan() is pure and side-effect-free. execute() is the
+only method that shells out — it requires an explicit caller opt-in and will
+refuse to run if the plan's policy_gate is not 'allowed'.
+"""
+
+from __future__ import annotations
+
+import hashlib
+import shutil
+import subprocess
+from dataclasses import dataclass, field
+from typing import Any
+
+from .katello_client import ContentViewManifest
+
+SYNC_SCHEMA = "sourceos.content-sync-plan/v0.1"
+
+
+@dataclass(frozen=True)
+class ContentSyncPlan:
+ """Non-mutating description of a pending content sync."""
+
+ schema: str
+ org: str
+ content_view: str
+ from_version: str | None
+ to_version: str
+ lifecycle_env: str
+ nix_cache_url: str
+ flake_ref: str
+ policy_gate: str
+ policy_reason: str
+ steps: list[str] = field(default_factory=list)
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "schema": self.schema,
+ "org": self.org,
+ "content_view": self.content_view,
+ "from_version": self.from_version,
+ "to_version": self.to_version,
+ "lifecycle_env": self.lifecycle_env,
+ "nix_cache_url": self.nix_cache_url,
+ "flake_ref": self.flake_ref,
+ "policy_gate": self.policy_gate,
+ "policy_reason": self.policy_reason,
+ "steps": self.steps,
+ }
+
+ @property
+ def allowed(self) -> bool:
+ return self.policy_gate == "allowed"
+
+
+class ContentViewSyncer:
+ """Plans and optionally executes a Katello content view sync.
+
+ The syncer enforces the locus gate: only local locus is permitted for
+ Phase 0. burst_cloud and attested_fog require explicit policy elevation
+ (not yet implemented).
+ """
+
+ ALLOWED_LOCI = {"local", "trusted_private"}
+
+ def __init__(
+ self,
+ flake_ref: str = "github:SociOS-Linux/source-os#builder-aarch64",
+ locus: str = "local",
+ current_version: str | None = None,
+ ) -> None:
+ self._flake_ref = flake_ref
+ self._locus = locus
+ self._current_version = current_version
+
+ def plan(self, manifest: ContentViewManifest) -> ContentSyncPlan:
+ """Return a non-mutating ContentSyncPlan. No I/O performed."""
+
+ if self._locus not in self.ALLOWED_LOCI:
+ return ContentSyncPlan(
+ schema=SYNC_SCHEMA,
+ org=manifest.org,
+ content_view=manifest.content_view,
+ from_version=self._current_version,
+ to_version=manifest.version,
+ lifecycle_env=manifest.lifecycle_env,
+ nix_cache_url=manifest.nix_cache_url,
+ flake_ref=self._flake_ref,
+ policy_gate="denied",
+ policy_reason=f"locus '{self._locus}' not in allowed loci {sorted(self.ALLOWED_LOCI)}",
+ steps=[],
+ )
+
+ if self._current_version and self._current_version == manifest.version:
+ return ContentSyncPlan(
+ schema=SYNC_SCHEMA,
+ org=manifest.org,
+ content_view=manifest.content_view,
+ from_version=self._current_version,
+ to_version=manifest.version,
+ lifecycle_env=manifest.lifecycle_env,
+ nix_cache_url=manifest.nix_cache_url,
+ flake_ref=self._flake_ref,
+ policy_gate="no-op",
+ policy_reason="already at latest version",
+ steps=[],
+ )
+
+ steps = [
+ f"nix copy --from '{manifest.nix_cache_url}' --no-check-sigs '{self._flake_ref}'",
+ f"nixos-rebuild switch --flake '{self._flake_ref}'",
+ ]
+
+ return ContentSyncPlan(
+ schema=SYNC_SCHEMA,
+ org=manifest.org,
+ content_view=manifest.content_view,
+ from_version=self._current_version,
+ to_version=manifest.version,
+ lifecycle_env=manifest.lifecycle_env,
+ nix_cache_url=manifest.nix_cache_url,
+ flake_ref=self._flake_ref,
+ policy_gate="allowed",
+ policy_reason=f"locus '{self._locus}' permitted; new version available",
+ steps=steps,
+ )
+
+ def execute(self, plan: ContentSyncPlan, dry_run: bool = True) -> dict[str, Any]:
+ """Execute the sync plan. dry_run=True (default) only prints steps."""
+
+ if not plan.allowed:
+ return {
+ "status": "skipped",
+ "reason": plan.policy_reason,
+ "policy_gate": plan.policy_gate,
+ }
+
+ results = []
+ for step in plan.steps:
+ if dry_run:
+ results.append({"step": step, "status": "dry_run"})
+ continue
+
+ if not shutil.which("nix") and step.startswith("nix "):
+ results.append({"step": step, "status": "skipped", "reason": "nix not found in PATH"})
+ continue
+ if not shutil.which("nixos-rebuild") and step.startswith("nixos-rebuild "):
+ results.append({"step": step, "status": "skipped", "reason": "nixos-rebuild not found in PATH"})
+ continue
+
+ try:
+ proc = subprocess.run(
+ step, shell=True, capture_output=True, text=True, timeout=600
+ )
+ results.append({
+ "step": step,
+ "status": "ok" if proc.returncode == 0 else "failed",
+ "returncode": proc.returncode,
+ "stdout": proc.stdout.strip()[:500],
+ "stderr": proc.stderr.strip()[:500],
+ })
+ except subprocess.TimeoutExpired:
+ results.append({"step": step, "status": "timeout"})
+
+ return {
+ "status": "dry_run" if dry_run else "executed",
+ "plan": plan.to_dict(),
+ "results": results,
+ }
diff --git a/src/sourceos_syncd/katello_client.py b/src/sourceos_syncd/katello_client.py
new file mode 100644
index 0000000..d57b2cb
--- /dev/null
+++ b/src/sourceos_syncd/katello_client.py
@@ -0,0 +1,182 @@
+"""Katello content-view client for sourceos-syncd.
+
+Provides a read-only, stdlib-only HTTP client that queries the Katello API to
+discover the current content view version for the device's lifecycle environment
+and returns the artifact URLs needed for a Nix-based system update.
+
+Boundary invariant: this module performs no disk writes, no nix invocations, no
+nixos-rebuild calls. It returns a ContentViewManifest describing what is available.
+Execution of the sync is the caller's responsibility.
+"""
+
+from __future__ import annotations
+
+import base64
+import json
+import ssl
+import urllib.request
+from dataclasses import dataclass, field
+from typing import Any
+
+
+@dataclass(frozen=True)
+class KatelloArtifact:
+ name: str
+ content_url: str
+ sha256: str | None = None
+ size_bytes: int | None = None
+
+
+@dataclass(frozen=True)
+class ContentViewManifest:
+ """Describes a single published content view version in a lifecycle environment."""
+
+ org: str
+ content_view: str
+ version: str
+ lifecycle_env: str
+ katello_url: str
+ pulp_content_url: str
+ nix_cache_url: str
+ artifacts: list[KatelloArtifact] = field(default_factory=list)
+ description: str = ""
+
+ def to_dict(self) -> dict[str, Any]:
+ return {
+ "org": self.org,
+ "content_view": self.content_view,
+ "version": self.version,
+ "lifecycle_env": self.lifecycle_env,
+ "katello_url": self.katello_url,
+ "pulp_content_url": self.pulp_content_url,
+ "nix_cache_url": self.nix_cache_url,
+ "artifacts": [
+ {"name": a.name, "content_url": a.content_url,
+ "sha256": a.sha256, "size_bytes": a.size_bytes}
+ for a in self.artifacts
+ ],
+ "description": self.description,
+ }
+
+
+class KatelloClientError(RuntimeError):
+ pass
+
+
+class KatelloContentClient:
+ """Read-only Katello API client.
+
+ Connects to a Foreman+Katello instance and discovers the latest content view
+ version for a given org, content view name, and lifecycle environment.
+
+ Uses basic auth over HTTPS. For local dev (self-signed cert), pass
+ verify_ssl=False.
+ """
+
+ API_BASE = "/katello/api/v2"
+
+ def __init__(
+ self,
+ base_url: str,
+ username: str,
+ password: str,
+ org: str = "SocioProphet",
+ verify_ssl: bool = True,
+ ) -> None:
+ self._base_url = base_url.rstrip("/")
+ self._org = org
+ self._auth = base64.b64encode(f"{username}:{password}".encode()).decode()
+ self._ctx = ssl.create_default_context() if verify_ssl else self._insecure_ctx()
+
+ @staticmethod
+ def _insecure_ctx() -> ssl.SSLContext:
+ ctx = ssl.create_default_context()
+ ctx.check_hostname = False
+ ctx.verify_mode = ssl.CERT_NONE
+ return ctx
+
+ def _get(self, path: str, params: dict[str, str] | None = None) -> Any:
+ url = f"{self._base_url}{self.API_BASE}{path}"
+ if params:
+ from urllib.parse import urlencode
+ url = f"{url}?{urlencode(params)}"
+ req = urllib.request.Request(url, headers={
+ "Authorization": f"Basic {self._auth}",
+ "Accept": "application/json",
+ "Content-Type": "application/json",
+ })
+ try:
+ with urllib.request.urlopen(req, context=self._ctx, timeout=30) as resp:
+ return json.loads(resp.read().decode("utf-8"))
+ except urllib.error.HTTPError as exc:
+ raise KatelloClientError(f"HTTP {exc.code} fetching {url}: {exc.reason}") from exc
+ except urllib.error.URLError as exc:
+ raise KatelloClientError(f"Connection error fetching {url}: {exc.reason}") from exc
+
+ def get_org_id(self) -> int:
+ data = self._get("/organizations", {"search": f"name={self._org}"})
+ results = data.get("results", [])
+ if not results:
+ raise KatelloClientError(f"Organization not found: {self._org}")
+ return int(results[0]["id"])
+
+ def get_content_view_id(self, name: str, org_id: int) -> int:
+ data = self._get("/content_views", {
+ "organization_id": str(org_id),
+ "search": f"name={name}",
+ })
+ results = data.get("results", [])
+ if not results:
+ raise KatelloClientError(f"Content view not found: {name}")
+ return int(results[0]["id"])
+
+ def get_lifecycle_env_id(self, name: str, org_id: int) -> int:
+ data = self._get("/environments", {
+ "organization_id": str(org_id),
+ "search": f"name={name}",
+ })
+ results = data.get("results", [])
+ if not results:
+ raise KatelloClientError(f"Lifecycle environment not found: {name}")
+ return int(results[0]["id"])
+
+ def get_latest_version(
+ self,
+ content_view_name: str,
+ lifecycle_env: str,
+ ) -> ContentViewManifest:
+ """Return the latest content view version promoted to the given lifecycle env."""
+
+ org_id = self.get_org_id()
+ cv_id = self.get_content_view_id(content_view_name, org_id)
+ env_id = self.get_lifecycle_env_id(lifecycle_env, org_id)
+
+ data = self._get("/content_view_versions", {
+ "content_view_id": str(cv_id),
+ "environment_id": str(env_id),
+ "order": "version DESC",
+ "per_page": "1",
+ })
+ versions = data.get("results", [])
+ if not versions:
+ raise KatelloClientError(
+ f"No version of '{content_view_name}' promoted to '{lifecycle_env}'"
+ )
+ latest = versions[0]
+ version_str = latest.get("version", "unknown")
+
+ # The Pulp content server URL: //content/...
+ # For our compose the content port is 8101 (24816 inside container)
+ pulp_url = self._base_url.replace(":8443", ":8101").replace(":8080", ":8101")
+ nix_cache_url = f"{pulp_url}/{self._org.lower()}/content/sourceos/nix-cache-aarch64-linux/"
+
+ return ContentViewManifest(
+ org=self._org,
+ content_view=content_view_name,
+ version=version_str,
+ lifecycle_env=lifecycle_env,
+ katello_url=self._base_url,
+ pulp_content_url=pulp_url,
+ nix_cache_url=nix_cache_url,
+ description=latest.get("description", ""),
+ )
diff --git a/tests/test_katello_client.py b/tests/test_katello_client.py
new file mode 100644
index 0000000..ba02449
--- /dev/null
+++ b/tests/test_katello_client.py
@@ -0,0 +1,163 @@
+"""Tests for KatelloContentClient and ContentViewSyncer."""
+
+from __future__ import annotations
+
+import json
+from unittest.mock import MagicMock, patch
+
+import pytest
+
+from sourceos_syncd.content_sync import ContentSyncPlan, ContentViewSyncer
+from sourceos_syncd.katello_client import (
+ ContentViewManifest,
+ KatelloClientError,
+ KatelloContentClient,
+)
+
+
+# ── ContentViewManifest ────────────────────────────────────────────────────
+
+def make_manifest(**kwargs) -> ContentViewManifest:
+ defaults = dict(
+ org="SocioProphet",
+ content_view="sourceos-builder-aarch64",
+ version="1.0",
+ lifecycle_env="dev",
+ katello_url="https://127.0.0.1:8443",
+ pulp_content_url="http://127.0.0.1:8101",
+ nix_cache_url="http://127.0.0.1:8101/socioprophet/content/sourceos/nix-cache-aarch64-linux/",
+ )
+ defaults.update(kwargs)
+ return ContentViewManifest(**defaults)
+
+
+def test_manifest_to_dict():
+ m = make_manifest()
+ d = m.to_dict()
+ assert d["org"] == "SocioProphet"
+ assert d["content_view"] == "sourceos-builder-aarch64"
+ assert d["version"] == "1.0"
+ assert d["lifecycle_env"] == "dev"
+ assert "nix_cache_url" in d
+
+
+# ── ContentViewSyncer.plan ─────────────────────────────────────────────────
+
+def test_plan_allowed_new_version():
+ syncer = ContentViewSyncer(locus="local", current_version="0.9")
+ plan = syncer.plan(make_manifest(version="1.0"))
+ assert plan.policy_gate == "allowed"
+ assert plan.allowed
+ assert any("nix copy" in s for s in plan.steps)
+ assert any("nixos-rebuild" in s for s in plan.steps)
+
+
+def test_plan_noop_same_version():
+ syncer = ContentViewSyncer(locus="local", current_version="1.0")
+ plan = syncer.plan(make_manifest(version="1.0"))
+ assert plan.policy_gate == "no-op"
+ assert plan.steps == []
+
+
+def test_plan_denied_invalid_locus():
+ syncer = ContentViewSyncer(locus="burst_cloud")
+ plan = syncer.plan(make_manifest())
+ assert plan.policy_gate == "denied"
+ assert not plan.allowed
+ assert "burst_cloud" in plan.policy_reason
+
+
+def test_plan_allowed_trusted_private():
+ syncer = ContentViewSyncer(locus="trusted_private", current_version=None)
+ plan = syncer.plan(make_manifest(version="2.0"))
+ assert plan.policy_gate == "allowed"
+
+
+def test_plan_no_current_version_always_syncs():
+ syncer = ContentViewSyncer(locus="local", current_version=None)
+ plan = syncer.plan(make_manifest(version="1.0"))
+ assert plan.policy_gate == "allowed"
+ assert len(plan.steps) == 2
+
+
+def test_plan_to_dict_roundtrip():
+ syncer = ContentViewSyncer(locus="local")
+ plan = syncer.plan(make_manifest(version="1.0"))
+ d = plan.to_dict()
+ assert d["schema"].startswith("sourceos.content-sync-plan")
+ assert d["policy_gate"] in ("allowed", "denied", "no-op")
+ assert isinstance(d["steps"], list)
+
+
+# ── ContentViewSyncer.execute dry-run ─────────────────────────────────────
+
+def test_execute_dry_run_allowed():
+ syncer = ContentViewSyncer(locus="local")
+ plan = syncer.plan(make_manifest(version="1.0"))
+ result = syncer.execute(plan, dry_run=True)
+ assert result["status"] == "dry_run"
+ for r in result["results"]:
+ assert r["status"] == "dry_run"
+
+
+def test_execute_dry_run_denied():
+ syncer = ContentViewSyncer(locus="burst_cloud")
+ plan = syncer.plan(make_manifest(version="1.0"))
+ result = syncer.execute(plan, dry_run=True)
+ assert result["status"] == "skipped"
+ assert "policy_gate" in result
+
+
+def test_execute_dry_run_noop():
+ syncer = ContentViewSyncer(locus="local", current_version="1.0")
+ plan = syncer.plan(make_manifest(version="1.0"))
+ result = syncer.execute(plan, dry_run=True)
+ assert result["status"] == "skipped"
+
+
+# ── KatelloContentClient URL construction ─────────────────────────────────
+
+def test_nix_cache_url_from_https_port():
+ """Pulp content URL derived from Foreman HTTPS URL by port replacement."""
+ with patch.object(KatelloContentClient, "_get") as mock_get:
+ mock_get.side_effect = [
+ {"results": [{"id": 1}]}, # get_org_id
+ {"results": [{"id": 2}]}, # get_content_view_id
+ {"results": [{"id": 3}]}, # get_lifecycle_env_id
+ {"results": [{"id": 10, "version": "1.0", "description": "test"}]}, # versions
+ ]
+ client = KatelloContentClient(
+ base_url="https://127.0.0.1:8443",
+ username="admin",
+ password="secret",
+ verify_ssl=False,
+ )
+ manifest = client.get_latest_version("sourceos-builder-aarch64", "dev")
+ assert "8101" in manifest.nix_cache_url
+ assert manifest.version == "1.0"
+ assert manifest.org == "SocioProphet"
+
+
+def test_get_latest_version_raises_on_empty_results():
+ with patch.object(KatelloContentClient, "_get") as mock_get:
+ mock_get.return_value = {"results": [{"id": 1}]}
+ client = KatelloContentClient(
+ "https://127.0.0.1:8443", "admin", "x", verify_ssl=False
+ )
+ mock_get.side_effect = [
+ {"results": [{"id": 1}]}, # org
+ {"results": [{"id": 2}]}, # cv
+ {"results": [{"id": 3}]}, # env
+ {"results": []}, # versions — empty
+ ]
+ with pytest.raises(KatelloClientError, match="No version"):
+ client.get_latest_version("sourceos-builder-aarch64", "dev")
+
+
+def test_get_org_raises_when_not_found():
+ with patch.object(KatelloContentClient, "_get", return_value={"results": []}):
+ client = KatelloContentClient(
+ "https://127.0.0.1:8443", "admin", "x", verify_ssl=False
+ )
+ with pytest.raises(KatelloClientError, match="Organization not found"):
+ client.get_org_id()