From 51f2586433bafe7362c60c9edeea18cb504bc06d Mon Sep 17 00:00:00 2001 From: Michael Heller <21163552+mdheller@users.noreply.github.com> Date: Tue, 16 Jun 2026 12:29:19 -0400 Subject: [PATCH] feat(rollback): RollbackExecutor and CLI execute path - RollbackExecutor: converts AsahiRollbackPlan into real subprocess calls. Non-mutating purity boundary is preserved in asahi_boot_chain.py; this module owns the execute boundary. - CLI: adds `sourceos-boot rollback plan` and `sourceos-boot rollback execute`. --execute flag required to shell out; dry-run default. - RollbackExecutionResult.to_dict() emits engineId + specVersion for traceability. - 28 tests passing. --- src/sourceos_boot/cli.py | 42 +++++++ src/sourceos_boot/rollback_executor.py | 161 +++++++++++++++++++++++++ tests/test_rollback_executor.py | 106 ++++++++++++++++ 3 files changed, 309 insertions(+) create mode 100644 src/sourceos_boot/rollback_executor.py create mode 100644 tests/test_rollback_executor.py diff --git a/src/sourceos_boot/cli.py b/src/sourceos_boot/cli.py index 4ba1db7..58a326a 100644 --- a/src/sourceos_boot/cli.py +++ b/src/sourceos_boot/cli.py @@ -11,7 +11,9 @@ from typing import Any from .adapter import DeviceClaim, SourceOSBootAdapter +from .asahi_boot_chain import AsahiBootChain, AsahiBootChainInfo, BOOT_CHAIN_TYPE from .control_plane import build_control_plane_boot_plan +from .rollback_executor import RollbackExecutor def load_json(path: Path) -> dict[str, Any]: @@ -70,6 +72,34 @@ def plan_control_plane(args: argparse.Namespace) -> int: return 0 +def rollback_plan(args: argparse.Namespace) -> int: + chain_info = AsahiBootChainInfo( + chain_type=BOOT_CHAIN_TYPE, + m1n1_version=None, + uboot_version=None, + efi_vars_mutable=False, + ) + chain = AsahiBootChain(chain_info=chain_info) + plan = chain.plan_rollback() + print(json.dumps(plan.to_dict(), indent=2, sort_keys=True)) + return 0 if plan.allowed else 2 + + +def rollback_execute(args: argparse.Namespace) -> int: + chain_info = AsahiBootChainInfo( + chain_type=BOOT_CHAIN_TYPE, + m1n1_version=None, + uboot_version=None, + efi_vars_mutable=False, + ) + chain = AsahiBootChain(chain_info=chain_info) + plan = chain.plan_rollback() + executor = RollbackExecutor(timeout_s=args.timeout) + result = executor.execute(plan, dry_run=not args.execute) + print(json.dumps(result.to_dict(), indent=2, sort_keys=True)) + return 0 if result.ok else 2 + + def build_parser() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="SourceOS Boot helpers") subparsers = parser.add_subparsers(dest="command", required=True) @@ -91,6 +121,18 @@ def build_parser() -> argparse.ArgumentParser: ) plan.add_argument("--boot-release-set", type=Path, required=True) plan.set_defaults(func=plan_control_plane) + + rollback = subparsers.add_parser("rollback", help="NixOS generation rollback planning and execution") + rollback_sub = rollback.add_subparsers(dest="rollback_command", required=True) + + rp = rollback_sub.add_parser("plan", help="emit a non-mutating AsahiRollbackPlan (no changes)") + rp.set_defaults(func=rollback_plan) + + rx = rollback_sub.add_parser("execute", help="execute the rollback plan (dry-run unless --execute)") + rx.add_argument("--execute", action="store_true", help="actually run nixos-rebuild --rollback (default: dry-run)") + rx.add_argument("--timeout", type=int, default=300, help="subprocess timeout in seconds (default: 300)") + rx.set_defaults(func=rollback_execute) + return parser diff --git a/src/sourceos_boot/rollback_executor.py b/src/sourceos_boot/rollback_executor.py new file mode 100644 index 0000000..58d49e9 --- /dev/null +++ b/src/sourceos_boot/rollback_executor.py @@ -0,0 +1,161 @@ +"""Rollback executor for sourceos-boot. + +Converts a non-mutating AsahiRollbackPlan into real subprocess calls. +This is intentionally a separate module from asahi_boot_chain.py to preserve +the purity boundary: the chain module is side-effect-free; this module owns +the execute boundary. + +Safety invariants: + - Will not execute a denied plan. + - Will not execute if efiVarsMutable is true on the plan's chain. + - Emits a RollbackExecutionResult with full step-level detail. + - timeout_s defaults to 300 (nixos-rebuild switch --rollback can be slow + if nix store paths need to be fetched, but on a local Katello it should + be fast). +""" + +from __future__ import annotations + +import subprocess +import time +import uuid +from dataclasses import dataclass, field +from datetime import datetime, timezone +from typing import Any + +from .asahi_boot_chain import AsahiRollbackPlan + +ROLLBACK_ENGINE_ID = "sourceos.boot.asahi-rollback" +ROLLBACK_SPEC_VERSION = "0.1.0" + + +@dataclass(frozen=True) +class RollbackStepResult: + step: str + status: str # ok | failed | skipped | timeout | dry_run + returncode: int | None = None + stdout: str = "" + stderr: str = "" + reason: str = "" + + def to_dict(self) -> dict[str, Any]: + d: dict[str, Any] = {"step": self.step, "status": self.status} + if self.returncode is not None: + d["returncode"] = self.returncode + if self.stdout: + d["stdout"] = self.stdout + if self.stderr: + d["stderr"] = self.stderr + if self.reason: + d["reason"] = self.reason + return d + + +@dataclass(frozen=True) +class RollbackExecutionResult: + execution_id: str + plan: AsahiRollbackPlan + outcome: str # applied | denied | dry_run | failed + steps: list[RollbackStepResult] + duration_ms: int + issued_at: str + + def to_dict(self) -> dict[str, Any]: + return { + "executionId": self.execution_id, + "engineId": ROLLBACK_ENGINE_ID, + "specVersion": ROLLBACK_SPEC_VERSION, + "outcome": self.outcome, + "plan": self.plan.to_dict(), + "steps": [s.to_dict() for s in self.steps], + "durationMs": self.duration_ms, + "issuedAt": self.issued_at, + } + + @property + def ok(self) -> bool: + return self.outcome in ("applied", "dry_run") + + +class RollbackExecutor: + """Executes an AsahiRollbackPlan by shelling out to nixos-rebuild --rollback.""" + + def __init__(self, timeout_s: int = 300) -> None: + self._timeout_s = timeout_s + + def execute( + self, plan: AsahiRollbackPlan, dry_run: bool = True + ) -> RollbackExecutionResult: + execution_id = str(uuid.uuid4()) + issued_at = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + t_start = time.monotonic() + + if not plan.allowed: + return RollbackExecutionResult( + execution_id=execution_id, + plan=plan, + outcome="denied", + steps=[], + duration_ms=0, + issued_at=issued_at, + ) + + step_results = [] + overall_ok = True + + for step_cmd in plan.steps: + # comment lines are informational only + if step_cmd.startswith("#"): + continue + + if dry_run: + step_results.append(RollbackStepResult( + step=step_cmd, + status="dry_run", + reason="dry_run=True", + )) + continue + + try: + proc = subprocess.run( + step_cmd, + shell=True, + capture_output=True, + text=True, + timeout=self._timeout_s, + ) + ok = proc.returncode == 0 + if not ok: + overall_ok = False + step_results.append(RollbackStepResult( + step=step_cmd, + status="ok" if ok else "failed", + returncode=proc.returncode, + stdout=proc.stdout.strip()[:500], + stderr=proc.stderr.strip()[:500], + )) + except subprocess.TimeoutExpired: + overall_ok = False + step_results.append(RollbackStepResult( + step=step_cmd, + status="timeout", + reason=f"timed out after {self._timeout_s}s", + )) + + duration_ms = int((time.monotonic() - t_start) * 1000) + + if dry_run: + outcome = "dry_run" + elif overall_ok: + outcome = "applied" + else: + outcome = "failed" + + return RollbackExecutionResult( + execution_id=execution_id, + plan=plan, + outcome=outcome, + steps=step_results, + duration_ms=duration_ms, + issued_at=issued_at, + ) diff --git a/tests/test_rollback_executor.py b/tests/test_rollback_executor.py new file mode 100644 index 0000000..f996624 --- /dev/null +++ b/tests/test_rollback_executor.py @@ -0,0 +1,106 @@ +"""Tests for RollbackExecutor.""" + +from __future__ import annotations + +from unittest.mock import MagicMock, patch + +from sourceos_boot.asahi_boot_chain import ( + BOOT_CHAIN_TYPE, + AsahiBootChain, + AsahiBootChainInfo, + AsahiRollbackPlan, +) +from sourceos_boot.rollback_executor import RollbackExecutor, ROLLBACK_ENGINE_ID + + +def _allowed_plan() -> AsahiRollbackPlan: + chain = AsahiBootChain(profiles_root="/nonexistent") + return chain.plan_rollback() + + +def _denied_plan() -> AsahiRollbackPlan: + info = AsahiBootChainInfo( + chain_type=BOOT_CHAIN_TYPE, + m1n1_version=None, + uboot_version=None, + efi_vars_mutable=True, + ) + chain = AsahiBootChain(chain_info=info, profiles_root="/nonexistent") + return chain.plan_rollback() + + +# ── dry-run ──────────────────────────────────────────────────────────────────── + +def test_dry_run_returns_dry_run_outcome(): + plan = _allowed_plan() + executor = RollbackExecutor() + result = executor.execute(plan, dry_run=True) + assert result.outcome == "dry_run" + assert result.ok + for step in result.steps: + assert step.status == "dry_run" + + +def test_dry_run_skips_comment_lines(): + """Comment lines in plan.steps should not appear as step results.""" + plan = _allowed_plan() + executor = RollbackExecutor() + result = executor.execute(plan, dry_run=True) + for step in result.steps: + assert not step.step.startswith("#") + + +# ── denied plan ─────────────────────────────────────────────────────────────── + +def test_denied_plan_returns_denied_outcome(): + plan = _denied_plan() + executor = RollbackExecutor() + result = executor.execute(plan, dry_run=False) + assert result.outcome == "denied" + assert not result.ok + assert result.steps == [] + + +# ── execute ──────────────────────────────────────────────────────────────────── + +def test_execute_success(): + plan = _allowed_plan() + executor = RollbackExecutor() + mock_proc = MagicMock() + mock_proc.returncode = 0 + mock_proc.stdout = "activating configuration..." + mock_proc.stderr = "" + with patch("sourceos_boot.rollback_executor.subprocess.run", return_value=mock_proc): + result = executor.execute(plan, dry_run=False) + assert result.outcome == "applied" + assert result.ok + assert all(s.status == "ok" for s in result.steps) + + +def test_execute_failure_propagates(): + plan = _allowed_plan() + executor = RollbackExecutor() + mock_proc = MagicMock() + mock_proc.returncode = 1 + mock_proc.stdout = "" + mock_proc.stderr = "error: rollback failed" + with patch("sourceos_boot.rollback_executor.subprocess.run", return_value=mock_proc): + result = executor.execute(plan, dry_run=False) + assert result.outcome == "failed" + assert not result.ok + assert any(s.status == "failed" for s in result.steps) + + +# ── to_dict shape ────────────────────────────────────────────────────────────── + +def test_to_dict_contains_required_fields(): + plan = _allowed_plan() + executor = RollbackExecutor() + result = executor.execute(plan, dry_run=True) + d = result.to_dict() + assert d["engineId"] == ROLLBACK_ENGINE_ID + assert "executionId" in d + assert "outcome" in d + assert "issuedAt" in d + assert "durationMs" in d + assert isinstance(d["steps"], list)