Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions src/sourceos_boot/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,9 @@
from typing import Any

from .adapter import DeviceClaim, SourceOSBootAdapter
from .asahi_boot_chain import AsahiBootChain, AsahiBootChainInfo, BOOT_CHAIN_TYPE
from .control_plane import build_control_plane_boot_plan
from .rollback_executor import RollbackExecutor


def load_json(path: Path) -> dict[str, Any]:
Expand Down Expand Up @@ -70,6 +72,34 @@ def plan_control_plane(args: argparse.Namespace) -> int:
return 0


def rollback_plan(args: argparse.Namespace) -> int:
chain_info = AsahiBootChainInfo(
chain_type=BOOT_CHAIN_TYPE,
m1n1_version=None,
uboot_version=None,
efi_vars_mutable=False,
)
chain = AsahiBootChain(chain_info=chain_info)
plan = chain.plan_rollback()
print(json.dumps(plan.to_dict(), indent=2, sort_keys=True))
return 0 if plan.allowed else 2


def rollback_execute(args: argparse.Namespace) -> int:
chain_info = AsahiBootChainInfo(
chain_type=BOOT_CHAIN_TYPE,
m1n1_version=None,
uboot_version=None,
efi_vars_mutable=False,
)
chain = AsahiBootChain(chain_info=chain_info)
plan = chain.plan_rollback()
executor = RollbackExecutor(timeout_s=args.timeout)
result = executor.execute(plan, dry_run=not args.execute)
print(json.dumps(result.to_dict(), indent=2, sort_keys=True))
return 0 if result.ok else 2


def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(description="SourceOS Boot helpers")
subparsers = parser.add_subparsers(dest="command", required=True)
Expand All @@ -91,6 +121,18 @@ def build_parser() -> argparse.ArgumentParser:
)
plan.add_argument("--boot-release-set", type=Path, required=True)
plan.set_defaults(func=plan_control_plane)

rollback = subparsers.add_parser("rollback", help="NixOS generation rollback planning and execution")
rollback_sub = rollback.add_subparsers(dest="rollback_command", required=True)

rp = rollback_sub.add_parser("plan", help="emit a non-mutating AsahiRollbackPlan (no changes)")
rp.set_defaults(func=rollback_plan)

rx = rollback_sub.add_parser("execute", help="execute the rollback plan (dry-run unless --execute)")
rx.add_argument("--execute", action="store_true", help="actually run nixos-rebuild --rollback (default: dry-run)")
rx.add_argument("--timeout", type=int, default=300, help="subprocess timeout in seconds (default: 300)")
rx.set_defaults(func=rollback_execute)

return parser


Expand Down
161 changes: 161 additions & 0 deletions src/sourceos_boot/rollback_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
"""Rollback executor for sourceos-boot.

Converts a non-mutating AsahiRollbackPlan into real subprocess calls.
This is intentionally a separate module from asahi_boot_chain.py to preserve
the purity boundary: the chain module is side-effect-free; this module owns
the execute boundary.

Safety invariants:
- Will not execute a denied plan.
- Will not execute if efiVarsMutable is true on the plan's chain.
- Emits a RollbackExecutionResult with full step-level detail.
- timeout_s defaults to 300 (nixos-rebuild switch --rollback can be slow
if nix store paths need to be fetched, but on a local Katello it should
be fast).
"""

from __future__ import annotations

import subprocess
import time
import uuid
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any

from .asahi_boot_chain import AsahiRollbackPlan

ROLLBACK_ENGINE_ID = "sourceos.boot.asahi-rollback"
ROLLBACK_SPEC_VERSION = "0.1.0"


@dataclass(frozen=True)
class RollbackStepResult:
step: str
status: str # ok | failed | skipped | timeout | dry_run
returncode: int | None = None
stdout: str = ""
stderr: str = ""
reason: str = ""

def to_dict(self) -> dict[str, Any]:
d: dict[str, Any] = {"step": self.step, "status": self.status}
if self.returncode is not None:
d["returncode"] = self.returncode
if self.stdout:
d["stdout"] = self.stdout
if self.stderr:
d["stderr"] = self.stderr
if self.reason:
d["reason"] = self.reason
return d


@dataclass(frozen=True)
class RollbackExecutionResult:
execution_id: str
plan: AsahiRollbackPlan
outcome: str # applied | denied | dry_run | failed
steps: list[RollbackStepResult]
duration_ms: int
issued_at: str

def to_dict(self) -> dict[str, Any]:
return {
"executionId": self.execution_id,
"engineId": ROLLBACK_ENGINE_ID,
"specVersion": ROLLBACK_SPEC_VERSION,
"outcome": self.outcome,
"plan": self.plan.to_dict(),
"steps": [s.to_dict() for s in self.steps],
"durationMs": self.duration_ms,
"issuedAt": self.issued_at,
}

@property
def ok(self) -> bool:
return self.outcome in ("applied", "dry_run")


class RollbackExecutor:
"""Executes an AsahiRollbackPlan by shelling out to nixos-rebuild --rollback."""

def __init__(self, timeout_s: int = 300) -> None:
self._timeout_s = timeout_s

def execute(
self, plan: AsahiRollbackPlan, dry_run: bool = True
) -> RollbackExecutionResult:
execution_id = str(uuid.uuid4())
issued_at = datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
t_start = time.monotonic()

if not plan.allowed:
return RollbackExecutionResult(
execution_id=execution_id,
plan=plan,
outcome="denied",
steps=[],
duration_ms=0,
issued_at=issued_at,
)

step_results = []
overall_ok = True

for step_cmd in plan.steps:
# comment lines are informational only
if step_cmd.startswith("#"):
continue

if dry_run:
step_results.append(RollbackStepResult(
step=step_cmd,
status="dry_run",
reason="dry_run=True",
))
continue

try:
proc = subprocess.run(
step_cmd,
shell=True,
capture_output=True,
text=True,
timeout=self._timeout_s,
)
ok = proc.returncode == 0
if not ok:
overall_ok = False
step_results.append(RollbackStepResult(
step=step_cmd,
status="ok" if ok else "failed",
returncode=proc.returncode,
stdout=proc.stdout.strip()[:500],
stderr=proc.stderr.strip()[:500],
))
except subprocess.TimeoutExpired:
overall_ok = False
step_results.append(RollbackStepResult(
step=step_cmd,
status="timeout",
reason=f"timed out after {self._timeout_s}s",
))

duration_ms = int((time.monotonic() - t_start) * 1000)

if dry_run:
outcome = "dry_run"
elif overall_ok:
outcome = "applied"
else:
outcome = "failed"

return RollbackExecutionResult(
execution_id=execution_id,
plan=plan,
outcome=outcome,
steps=step_results,
duration_ms=duration_ms,
issued_at=issued_at,
)
106 changes: 106 additions & 0 deletions tests/test_rollback_executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
"""Tests for RollbackExecutor."""

from __future__ import annotations

from unittest.mock import MagicMock, patch

from sourceos_boot.asahi_boot_chain import (
BOOT_CHAIN_TYPE,
AsahiBootChain,
AsahiBootChainInfo,
AsahiRollbackPlan,
)
from sourceos_boot.rollback_executor import RollbackExecutor, ROLLBACK_ENGINE_ID


def _allowed_plan() -> AsahiRollbackPlan:
chain = AsahiBootChain(profiles_root="/nonexistent")
return chain.plan_rollback()


def _denied_plan() -> AsahiRollbackPlan:
info = AsahiBootChainInfo(
chain_type=BOOT_CHAIN_TYPE,
m1n1_version=None,
uboot_version=None,
efi_vars_mutable=True,
)
chain = AsahiBootChain(chain_info=info, profiles_root="/nonexistent")
return chain.plan_rollback()


# ── dry-run ────────────────────────────────────────────────────────────────────

def test_dry_run_returns_dry_run_outcome():
plan = _allowed_plan()
executor = RollbackExecutor()
result = executor.execute(plan, dry_run=True)
assert result.outcome == "dry_run"
assert result.ok
for step in result.steps:
assert step.status == "dry_run"


def test_dry_run_skips_comment_lines():
"""Comment lines in plan.steps should not appear as step results."""
plan = _allowed_plan()
executor = RollbackExecutor()
result = executor.execute(plan, dry_run=True)
for step in result.steps:
assert not step.step.startswith("#")


# ── denied plan ───────────────────────────────────────────────────────────────

def test_denied_plan_returns_denied_outcome():
plan = _denied_plan()
executor = RollbackExecutor()
result = executor.execute(plan, dry_run=False)
assert result.outcome == "denied"
assert not result.ok
assert result.steps == []


# ── execute ────────────────────────────────────────────────────────────────────

def test_execute_success():
plan = _allowed_plan()
executor = RollbackExecutor()
mock_proc = MagicMock()
mock_proc.returncode = 0
mock_proc.stdout = "activating configuration..."
mock_proc.stderr = ""
with patch("sourceos_boot.rollback_executor.subprocess.run", return_value=mock_proc):
result = executor.execute(plan, dry_run=False)
assert result.outcome == "applied"
assert result.ok
assert all(s.status == "ok" for s in result.steps)


def test_execute_failure_propagates():
plan = _allowed_plan()
executor = RollbackExecutor()
mock_proc = MagicMock()
mock_proc.returncode = 1
mock_proc.stdout = ""
mock_proc.stderr = "error: rollback failed"
with patch("sourceos_boot.rollback_executor.subprocess.run", return_value=mock_proc):
result = executor.execute(plan, dry_run=False)
assert result.outcome == "failed"
assert not result.ok
assert any(s.status == "failed" for s in result.steps)


# ── to_dict shape ──────────────────────────────────────────────────────────────

def test_to_dict_contains_required_fields():
plan = _allowed_plan()
executor = RollbackExecutor()
result = executor.execute(plan, dry_run=True)
d = result.to_dict()
assert d["engineId"] == ROLLBACK_ENGINE_ID
assert "executionId" in d
assert "outcome" in d
assert "issuedAt" in d
assert "durationMs" in d
assert isinstance(d["steps"], list)
Loading