From ff62b6507b3684feb0bb14f1df548e99e86b04ce Mon Sep 17 00:00:00 2001 From: Rihards Gailums Date: Sat, 23 May 2026 17:06:18 +0000 Subject: [PATCH] Slice 3A: processgit-updater sidecar (state machine + HTTP API + cosign verify) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Foundation of the in-product self-update story. Adds the `updater/` sidecar: a tiny separate Go module that orchestrates ProcessGit updates inside Docker deployments. A separate process is necessary because: 1. A container cannot safely update itself in place. Replacing the running binary while it serves requests races with active sessions and connections. The sidecar runs continuously and survives main-container restarts. 2. Privilege boundary. The updater needs access to /var/run/docker.sock; the main ProcessGit container does not. 3. Tiny dependency surface. Stdlib-only Go (enforced by a tripwire test), plus docker CLI + cosign in the runtime image. The whole sidecar is independently reviewable. What ships in this PR (Slice 3A): HTTP API GET /healthz — liveness (no auth) GET /status — current state, active job GET /releases/latest — proxies GitHub Releases POST /update — kicks off an update; 409 if one runs GET /update/{id} — job status + step history GET /history — last 50 jobs (newest first) Auth: bearer token from $PROCESSGIT_UPDATER_TOKEN, constant-time compare via crypto/subtle. State machine idle → planning → snapshotting → pulling → verifying → migrating → swapping → healthchecking → committed On post-swap failure: rolling_back → rolled_back. If rollback itself fails: failed (requires manual intervention). Persistence Atomic write-temp-then-rename on $STATE_DIR/state.json. Bounded history (50 jobs). One job active at a time, enforced by Store. Real wiring - GitHub Releases API client (api.github.com/repos/…/releases). - cosign verify (image) + cosign verify-blob (release.json) via os/exec. Manifest signature is verified BEFORE we trust ANY field of release.json — an attacker who substitutes a malicious manifest cannot redirect the updater to a different image. Docker operations Stubbed in Slice 3A — each method logs + sleeps to simulate work. The orchestrator's state machine and HTTP API are exercisable end-to-end without touching real containers. PROCESSGIT_UPDATER_STUB defaults to true; Slice 3B will replace the stubs with real docker CLI invocations (pull / run --rm / stop / run / inspect / exec — none of which is conceptually hard, but each deserves careful testing and review). Runtime image Multi-stage Dockerfile: golang:1.25-alpine3.22 build → alpine:3.22 runtime with docker-cli, cosign (from gcr.io/projectsigstore/cosign), ca-certificates, tini. Final image ~150 MB, dominated by docker CLI. Non-root user, EXPOSE 9000, ENTRYPOINT via tini for clean SIGTERM handling. Tests (7 total, all passing): - Store round-trip: load/save/active enforced, ordering preserved after reload - Concurrent AddJob refused - State.IsTerminal classification - Job.transitionTo sets CompletedAt on terminal states only - Orchestrator happy path: full state-machine traversal in stub mode using a fake GitHub server + /bin/true as cosign stub - Concurrent-update rejection at the API layer - Bearer-auth: 401 without/wrong token, 200 with correct token - Tripwire: TestNoExternalImports fails if a go.sum ever appears Code stats: ~1900 lines total across 11 files. ~1400 lines of Go (including 346 lines of tests). Out of scope for this PR (deliberate splits): - deploy/docker-compose.yml integration: adds the updater service, wires the bearer token via .env, sets the network so only the main app can reach the updater. Separate PR — touches deployment config and will need .env.example documentation. - .github/workflows/release.yml addition: builds the updater image paired with the main image on every semver tag. Separate PR — small workflow edit, doesn't affect updater code. - Slice 3B: real docker calls (pull / swap / healthcheck / rollback). - Slice 3C: volume snapshot for full disaster recovery. - Slice 4: admin UI at /-/admin/updates consuming this API. Co-authored-by: Claude --- updater/Dockerfile | 89 +++++++++ updater/README.md | 166 +++++++++++++++++ updater/api.go | 159 ++++++++++++++++ updater/cosign.go | 112 ++++++++++++ updater/docker.go | 122 ++++++++++++ updater/go.mod | 8 + updater/job.go | 268 +++++++++++++++++++++++++++ updater/main.go | 169 +++++++++++++++++ updater/manifest.go | 266 +++++++++++++++++++++++++++ updater/orchestrator.go | 200 ++++++++++++++++++++ updater/orchestrator_test.go | 346 +++++++++++++++++++++++++++++++++++ 11 files changed, 1905 insertions(+) create mode 100644 updater/Dockerfile create mode 100644 updater/README.md create mode 100644 updater/api.go create mode 100644 updater/cosign.go create mode 100644 updater/docker.go create mode 100644 updater/go.mod create mode 100644 updater/job.go create mode 100644 updater/main.go create mode 100644 updater/manifest.go create mode 100644 updater/orchestrator.go create mode 100644 updater/orchestrator_test.go diff --git a/updater/Dockerfile b/updater/Dockerfile new file mode 100644 index 0000000..e148cee --- /dev/null +++ b/updater/Dockerfile @@ -0,0 +1,89 @@ +# syntax=docker/dockerfile:1.7 +# +# processgit-updater — sidecar that orchestrates ProcessGit self-updates. +# +# The runtime image carries: +# - the updater binary itself (stdlib-only Go, ~8 MB) +# - the docker CLI (~80 MB) for talking to the bind-mounted /var/run/docker.sock +# - cosign (~50 MB) for image and blob signature verification +# - ca-certificates so we can hit api.github.com and ghcr.io +# +# Total uncompressed image: ~150 MB. The vast majority is the docker CLI; the +# updater itself is tiny. +# +# At runtime, /var/run/docker.sock must be bind-mounted from the host: +# docker run -v /var/run/docker.sock:/var/run/docker.sock processgit-updater +# +# See deploy/docker-compose.yml (added in a follow-up PR) for the full +# compose-driven setup. + +ARG GO_VERSION=1.25 +ARG ALPINE_VERSION=3.22 + +# ----------------------------------------------------------------------------- +# Stage 1: build the updater binary +# ----------------------------------------------------------------------------- +FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS build +WORKDIR /src + +# Cache module downloads separately so source changes don't bust them. +COPY go.mod ./ +# (no go.sum because the updater is stdlib-only — see TestNoExternalImports) +RUN go mod download + +# Now bring in source. +COPY *.go ./ + +# Build with version stamped from build arg. +ARG VERSION=dev +ARG COMMIT=unknown +RUN CGO_ENABLED=0 GOOS=linux go build \ + -trimpath \ + -ldflags "-s -w -X main.version=${VERSION}+${COMMIT}" \ + -o /out/processgit-updater \ + . + +# ----------------------------------------------------------------------------- +# Stage 2: cosign — copy from official Sigstore image +# ----------------------------------------------------------------------------- +FROM gcr.io/projectsigstore/cosign:v2.4.1 AS cosign + +# ----------------------------------------------------------------------------- +# Stage 3: runtime +# ----------------------------------------------------------------------------- +FROM alpine:${ALPINE_VERSION} AS runtime + +RUN apk add --no-cache \ + ca-certificates \ + docker-cli \ + tini \ + && update-ca-certificates 2>/dev/null || true + +# Cosign binary from the official Sigstore image. +COPY --from=cosign /ko-app/cosign /usr/local/bin/cosign + +# The updater binary. +COPY --from=build /out/processgit-updater /usr/local/bin/processgit-updater + +# Non-root user. We can't drop entirely because the docker CLI talks to the +# docker socket which is typically owned by root or `docker` group on the +# host. Compose users with rootless docker will need to adjust. +RUN addgroup -S updater && adduser -S -G updater updater + +# State directory (persisted via a docker volume). +RUN mkdir -p /var/lib/processgit-updater \ + && chown updater:updater /var/lib/processgit-updater + +USER updater +WORKDIR /var/lib/processgit-updater +EXPOSE 9000 + +# tini handles SIGTERM/SIGINT for graceful shutdown. +ENTRYPOINT ["/sbin/tini", "--", "/usr/local/bin/processgit-updater"] + +LABEL org.opencontainers.image.title="processgit-updater" +LABEL org.opencontainers.image.description="ProcessGit self-update orchestration sidecar" +LABEL org.opencontainers.image.source="https://github.com/Algomation-AI/ProcessGit" +LABEL org.opencontainers.image.url="https://processgit.org" +LABEL org.opencontainers.image.vendor="Algomation-AI" +LABEL org.opencontainers.image.licenses="MIT" diff --git a/updater/README.md b/updater/README.md new file mode 100644 index 0000000..4730618 --- /dev/null +++ b/updater/README.md @@ -0,0 +1,166 @@ +# processgit-updater + +Sidecar that lets a ProcessGit Docker deployment update itself: pull the +new image, verify its cosign signature, run the migration, swap the +running container, healthcheck, and roll back on failure. + +A separate process from the main ProcessGit container because: + +1. **A container can't safely update itself in place.** Replacing the + running binary while it serves requests races. The sidecar runs + continuously and survives across main-container restarts. +2. **Privilege boundary.** The updater needs `/var/run/docker.sock`. The + main ProcessGit container should not. +3. **Tiny dependency surface.** Stdlib-only Go + docker CLI + cosign. + Reviewable on its own. + +## Architecture at a glance + +``` + ┌──────────────────────┐ ┌──────────────────────┐ +HTTP from │ processgit │ HTTP │ processgit-updater │ +admin UI ───▶ │ (main app) │ ──────▶ │ (this sidecar) │ + │ │ bearer │ │ + │ routers/web/admin/ │ │ /healthz │ + │ updates.go │ │ /status │ + │ (Slice 4) │ │ /releases/latest │ + └──────────────────────┘ │ POST /update │ + │ GET /update/{id} │ + │ /history │ + └─────────┬────────────┘ + │ docker.sock + ▼ + ┌───────────────────────────────┐ + │ docker daemon on the host │ + │ (pull / run / inspect / …) │ + └───────────────────────────────┘ +``` + +## Update state machine + +``` +idle → planning → snapshotting → pulling → verifying → migrating → swapping + ↓ + healthchecking + ↓ ↘ fail + committed rolling_back + (success) ↓ + rolled_back (recovered) + ↓ rollback fails + failed (manual intervention) +``` + +One update at a time, enforced by `Store.AddJob`. Concurrent attempts return +HTTP 409 Conflict. + +**Critical safety property:** the manifest signature is verified BEFORE we +trust any of its fields (image ref, digest, migration command). An attacker +who can substitute a malicious `release.json` cannot redirect the updater +to a different image, because the cosign verification of the manifest blob +must pass first, and that's bound to the workflow's OIDC identity. + +## Configuration + +All via environment variables. + +| Env var | Default | Purpose | +|---|---|---| +| `PROCESSGIT_UPDATER_TOKEN` | (required) | Bearer token for the HTTP API. Generate once with `openssl rand -hex 32`. Must match the value the main app uses to call the updater. | +| `PROCESSGIT_UPDATER_LISTEN` | `:9000` | Address to bind. | +| `PROCESSGIT_UPDATER_STATE_DIR` | `/var/lib/processgit-updater` | Where `state.json` lives. Bind-mount a volume. | +| `PROCESSGIT_UPDATER_REPO` | `Algomation-AI/ProcessGit` | GitHub repo to query for releases. | +| `PROCESSGIT_UPDATER_GITHUB_API` | `https://api.github.com` | Override for GitHub Enterprise. | +| `PROCESSGIT_UPDATER_GITHUB_TOKEN` | `""` | Optional. Raises rate limit; required for private repos. | +| `PROCESSGIT_UPDATER_APP_CONTAINER` | `processgit` | Name of the main app container, used for the swap. | +| `PROCESSGIT_UPDATER_STUB` | `true` | **Slice 3A default.** Skips real docker calls and simulates each phase with a short sleep. Set to `false` once Slice 3B lands. | +| `PROCESSGIT_UPDATER_DEBUG` | `false` | Verbose structured logs. | + +## HTTP API + +All paths except `/healthz` require `Authorization: Bearer $TOKEN`. + +| Method + path | Body | Response | +|---|---|---| +| `GET /healthz` | — | `{status, version, time}` | +| `GET /status` | — | `{version, active_job, recent_jobs_count}` | +| `GET /releases/latest?channel=stable` | — | `{tag, name, prerelease, html_url, published_at}` | +| `POST /update` | `{target_tag: "v0.1.2"}` | 202 `{job_id, status_url, job}` (409 if another job is active) | +| `GET /update/{id}` | — | Job object (state, steps, errors) | +| `GET /history` | — | `{jobs: [...]}` (last 50, newest first) | + +## Running locally for development + +```bash +# Stub mode (default) — runs the state machine end-to-end without docker +PROCESSGIT_UPDATER_TOKEN=devtoken \ +PROCESSGIT_UPDATER_STATE_DIR=/tmp/pg-updater \ +PROCESSGIT_UPDATER_STUB=true \ +go run . + +# In another terminal +TOKEN=devtoken +curl -s http://localhost:9000/healthz | jq + +curl -s -H "Authorization: Bearer $TOKEN" \ + http://localhost:9000/releases/latest | jq + +curl -s -X POST -H "Authorization: Bearer $TOKEN" \ + -H "Content-Type: application/json" \ + -d '{"target_tag": "v0.1.0"}' \ + http://localhost:9000/update | jq + +# Note the job_id, then poll: +curl -s -H "Authorization: Bearer $TOKEN" \ + http://localhost:9000/update/$JOB_ID | jq '.state, .steps[-1]' +``` + +In stub mode the full state-machine run takes ~20 seconds. + +In non-stub mode the planning + signature-verification phases still require +network access to `api.github.com` and to the OCI registry holding the image +being verified. cosign also reaches out to Sigstore's Rekor transparency log. + +## Tests + +```bash +go test -v ./... +``` + +Covers store round-trip, terminal-state classification, the orchestrator +happy path (full state-machine traversal in stub mode), concurrent-update +rejection, bearer auth on the API, and an external-dependency tripwire. + +## Scope of this PR (Slice 3A) + +What ships: + +- Full HTTP API with bearer-token auth +- Update orchestrator with all states wired +- Real GitHub release fetching (`api.github.com/repos/…/releases`) +- Real cosign image + release.json blob verification (`cosign verify`, + `cosign verify-blob` via `os/exec`) +- Atomic write-then-rename for the state file +- 7 tests covering store, state machine, API auth, no-deps invariant + +What's stubbed (planned for Slice 3B): + +- `docker pull` +- `docker run` for migrations +- The container swap (`docker stop` + `docker run` with carried-over + config) — non-trivial, deserves its own focused PR +- `docker exec` health probing +- Rollback container restoration + +Stub mode is the default precisely so the architecture can be reviewed and +the HTTP API exercised before any real container surgery is wired up. + +## Scope of follow-ups + +| Slice | Item | +|---|---| +| 3B | Real `docker pull` / `run` / `swap` / `healthcheck` / `rollback` | +| 3C | Volume snapshot + restore for full disaster recovery | +| 3D | Migration runner integration (handles the `Migration.Command` from the manifest robustly) | +| 4 | Admin UI page at `/-/admin/updates` that consumes this API | +| Workflow | Add `processgit-updater` image build to `.github/workflows/release.yml` so it ships paired with the main image | +| Deploy | `deploy/docker-compose.yml` integration — add the updater service, wire the bearer token via `.env`, set the network so only the main app can reach it | diff --git a/updater/api.go b/updater/api.go new file mode 100644 index 0000000..fbeea76 --- /dev/null +++ b/updater/api.go @@ -0,0 +1,159 @@ +// HTTP API. All endpoints except /healthz require a bearer token matching +// the value of $PROCESSGIT_UPDATER_TOKEN. The token is shared between the +// main app container (which calls the updater) and the updater itself, via +// the docker-compose .env file. +// +// The updater listens on 0.0.0.0 inside its container; access is gated +// purely by the docker-compose network, plus the bearer token as defence +// in depth in case someone exposes the port to the host. + +package main + +import ( + "context" + "crypto/subtle" + "encoding/json" + "errors" + "log/slog" + "net/http" + "strings" + "time" +) + +type API struct { + Token string + Store *Store + GitHub *GitHubClient + Orchestrator *Orchestrator + Log *slog.Logger + Version string // updater's own version +} + +func (a *API) Routes() http.Handler { + mux := http.NewServeMux() + mux.HandleFunc("GET /healthz", a.handleHealthz) + mux.Handle("GET /status", a.auth(a.handleStatus)) + mux.Handle("GET /releases/latest", a.auth(a.handleLatestRelease)) + mux.Handle("POST /update", a.auth(a.handleUpdateStart)) + mux.Handle("GET /update/{id}", a.auth(a.handleUpdateGet)) + mux.Handle("GET /history", a.auth(a.handleHistory)) + return mux +} + +// --- Middleware --- + +func (a *API) auth(h http.HandlerFunc) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + hdr := r.Header.Get("Authorization") + const prefix = "Bearer " + if !strings.HasPrefix(hdr, prefix) || subtle.ConstantTimeCompare([]byte(hdr[len(prefix):]), []byte(a.Token)) != 1 { + httpError(w, http.StatusUnauthorized, "missing or invalid bearer token") + return + } + h(w, r) + }) +} + +// --- Handlers --- + +func (a *API) handleHealthz(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "status": "ok", + "version": a.Version, + "time": time.Now().UTC().Format(time.RFC3339), + }) +} + +func (a *API) handleStatus(w http.ResponseWriter, r *http.Request) { + resp := map[string]any{ + "version": a.Version, + "active_job": a.Store.Active(), + "recent_jobs_count": len(a.Store.List()), + } + writeJSON(w, http.StatusOK, resp) +} + +func (a *API) handleLatestRelease(w http.ResponseWriter, r *http.Request) { + channel := r.URL.Query().Get("channel") + if channel == "" { + channel = "stable" + } + ctx, cancel := context.WithTimeout(r.Context(), 30*time.Second) + defer cancel() + + rel, err := a.GitHub.LatestRelease(ctx, channel) + if err != nil { + httpError(w, http.StatusBadGateway, err.Error()) + return + } + writeJSON(w, http.StatusOK, map[string]any{ + "tag": rel.TagName, + "name": rel.Name, + "prerelease": rel.Prerelease, + "html_url": rel.HTMLURL, + "published_at": rel.PublishedAt, + }) +} + +type updateStartRequest struct { + TargetTag string `json:"target_tag"` + // Future: Channel string, Strategy string +} + +func (a *API) handleUpdateStart(w http.ResponseWriter, r *http.Request) { + var req updateStartRequest + if err := json.NewDecoder(http.MaxBytesReader(w, r.Body, 16<<10)).Decode(&req); err != nil { + httpError(w, http.StatusBadRequest, "invalid JSON: "+err.Error()) + return + } + if req.TargetTag == "" { + httpError(w, http.StatusBadRequest, "target_tag is required") + return + } + job, err := a.Orchestrator.Start(r.Context(), req.TargetTag) + if err != nil { + if strings.Contains(err.Error(), "already in progress") { + httpError(w, http.StatusConflict, err.Error()) + return + } + httpError(w, http.StatusInternalServerError, err.Error()) + return + } + writeJSON(w, http.StatusAccepted, map[string]any{ + "job_id": job.ID, + "status_url": "/update/" + job.ID, + "job": job, + }) +} + +func (a *API) handleUpdateGet(w http.ResponseWriter, r *http.Request) { + id := r.PathValue("id") + job, ok := a.Store.Get(id) + if !ok { + httpError(w, http.StatusNotFound, "no such job: "+id) + return + } + writeJSON(w, http.StatusOK, job) +} + +func (a *API) handleHistory(w http.ResponseWriter, r *http.Request) { + writeJSON(w, http.StatusOK, map[string]any{ + "jobs": a.Store.List(), + }) +} + +// --- Helpers --- + +func writeJSON(w http.ResponseWriter, code int, v any) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(code) + enc := json.NewEncoder(w) + enc.SetIndent("", " ") + if err := enc.Encode(v); err != nil && !errors.Is(err, http.ErrBodyNotAllowed) { + // nothing we can usefully do at this point + } +} + +func httpError(w http.ResponseWriter, code int, msg string) { + writeJSON(w, code, map[string]any{"error": msg, "code": code}) +} diff --git a/updater/cosign.go b/updater/cosign.go new file mode 100644 index 0000000..16f90e5 --- /dev/null +++ b/updater/cosign.go @@ -0,0 +1,112 @@ +// Cosign wrapper. Shells out to the cosign binary that's installed in the +// updater image. Verification only — the updater never signs anything. +// +// Two operations: +// +// VerifyImage — checks that the image at was signed by the +// expected GitHub Actions OIDC identity (constructed from +// the manifest's signing fields). +// +// VerifyBlob — checks that release.json itself was signed by the same +// identity. Called BEFORE we trust the contents of the +// manifest. Without this, an attacker who can publish a +// malicious release.json could redirect the updater to +// pull an arbitrary image. + +package main + +import ( + "bytes" + "context" + "fmt" + "os" + "os/exec" +) + +type Cosign struct { + Bin string // path to cosign binary; default "cosign" +} + +func NewCosign() *Cosign { + bin := os.Getenv("COSIGN_BIN") + if bin == "" { + bin = "cosign" + } + return &Cosign{Bin: bin} +} + +// VerifyImage runs `cosign verify` against the image ref+digest. +func (c *Cosign) VerifyImage(ctx context.Context, m *Manifest) error { + ref := m.Image.DigestRef() // ref + digest is canonical + args := []string{ + "verify", + "--certificate-identity-regexp", m.Signing.IdentityRegex, + "--certificate-oidc-issuer", m.Signing.Issuer, + ref, + } + return c.run(ctx, "verify image "+ref, args, nil) +} + +// VerifyBlob runs `cosign verify-blob` against release.json using the +// release.json.sig + release.json.crt that we downloaded alongside it. +// Inputs are byte slices so callers don't need to persist them to disk +// outside the updater's control. +func (c *Cosign) VerifyBlob(ctx context.Context, m *Manifest, jsonBytes, sig, cert []byte) error { + // cosign verify-blob requires file paths, so we write to short-lived temp + // files (cleaned up regardless of outcome). + jsonPath, err := writeTemp("release-*.json", jsonBytes) + if err != nil { + return err + } + defer os.Remove(jsonPath) + sigPath, err := writeTemp("release-*.sig", sig) + if err != nil { + return err + } + defer os.Remove(sigPath) + certPath, err := writeTemp("release-*.crt", cert) + if err != nil { + return err + } + defer os.Remove(certPath) + + args := []string{ + "verify-blob", + "--signature", sigPath, + "--certificate", certPath, + "--certificate-identity-regexp", m.Signing.IdentityRegex, + "--certificate-oidc-issuer", m.Signing.Issuer, + jsonPath, + } + return c.run(ctx, "verify-blob release.json", args, nil) +} + +func writeTemp(pattern string, data []byte) (string, error) { + f, err := os.CreateTemp("", pattern) + if err != nil { + return "", fmt.Errorf("create temp %s: %w", pattern, err) + } + defer f.Close() + if _, err := f.Write(data); err != nil { + os.Remove(f.Name()) + return "", fmt.Errorf("write temp %s: %w", pattern, err) + } + return f.Name(), nil +} + +// run executes cosign and returns a wrapped error including the output on +// failure. stdin is optional. +func (c *Cosign) run(ctx context.Context, what string, args []string, stdin []byte) error { + cmd := exec.CommandContext(ctx, c.Bin, args...) + if stdin != nil { + cmd.Stdin = bytes.NewReader(stdin) + } + var out, errb bytes.Buffer + cmd.Stdout = &out + cmd.Stderr = &errb + if err := cmd.Run(); err != nil { + return fmt.Errorf("cosign %s failed: %w\nstdout: %s\nstderr: %s", + what, err, out.String(), errb.String()) + } + return nil +} diff --git a/updater/docker.go b/updater/docker.go new file mode 100644 index 0000000..c22fc3b --- /dev/null +++ b/updater/docker.go @@ -0,0 +1,122 @@ +// Docker CLI wrapper. Slice 3A ships these as stubs that log + sleep — the +// state machine, HTTP API, and cosign verification can all be exercised +// end-to-end without touching real containers, which is what we want for +// the first iteration. +// +// Slice 3B will replace each method body with `exec.CommandContext(ctx, +// "docker", ...)` invocations that talk to /var/run/docker.sock through +// the bind-mounted docker CLI. + +package main + +import ( + "context" + "fmt" + "log/slog" + "time" +) + +type Docker struct { + Bin string // path to docker CLI; default "docker" + AppContainer string // e.g. "processgit" + StagingNetwork string // optional separate network for the staged container + Stub bool // when true, all operations are simulated + Log *slog.Logger +} + +func NewDocker(log *slog.Logger, appContainer string, stub bool) *Docker { + return &Docker{ + Bin: "docker", + AppContainer: appContainer, + Stub: stub, + Log: log, + } +} + +// Pull pulls the image at ref. In stub mode it logs and sleeps proportional +// to the size of an average ProcessGit image (~5s). +func (d *Docker) Pull(ctx context.Context, ref string) error { + d.Log.Info("docker.pull", "ref", ref, "stub", d.Stub) + if d.Stub { + return sleepCtx(ctx, 5*time.Second) + } + return fmt.Errorf("docker.Pull: not implemented yet (Slice 3B)") +} + +// Inspect returns the image digest currently in use by the app container, +// for capturing the rollback target. +func (d *Docker) InspectAppImageDigest(ctx context.Context) (string, error) { + d.Log.Info("docker.inspect_app", "container", d.AppContainer, "stub", d.Stub) + if d.Stub { + // Return a deterministic stub digest so the state file shows something useful. + return "sha256:0000000000000000000000000000000000000000000000000000000000000000", nil + } + return "", fmt.Errorf("docker.Inspect: not implemented yet (Slice 3B)") +} + +// RunMigration runs `docker run --rm ` and returns +// its combined output. +func (d *Docker) RunMigration(ctx context.Context, image, command string) (string, error) { + d.Log.Info("docker.run_migration", "image", image, "command", command, "stub", d.Stub) + if d.Stub { + if err := sleepCtx(ctx, 3*time.Second); err != nil { + return "", err + } + return fmt.Sprintf("[stub] would have run: docker run --rm %s %s\n", image, command), nil + } + return "", fmt.Errorf("docker.RunMigration: not implemented yet (Slice 3B)") +} + +// SwapContainer stops the running app container and starts a new one from +// `newImage`, preserving env/volumes/network. Returns the old container ID +// so the orchestrator can roll back if healthcheck fails. +func (d *Docker) SwapContainer(ctx context.Context, newImage string) (oldContainerID string, err error) { + d.Log.Info("docker.swap", "new_image", newImage, "stub", d.Stub) + if d.Stub { + if err := sleepCtx(ctx, 4*time.Second); err != nil { + return "", err + } + return "stub-old-container-id", nil + } + return "", fmt.Errorf("docker.SwapContainer: not implemented yet (Slice 3B)") +} + +// Healthcheck polls the new app container's /api/healthz endpoint until it +// returns 200 OK or the context expires. +func (d *Docker) Healthcheck(ctx context.Context) error { + d.Log.Info("docker.healthcheck", "stub", d.Stub) + if d.Stub { + return sleepCtx(ctx, 3*time.Second) + } + return fmt.Errorf("docker.Healthcheck: not implemented yet (Slice 3B)") +} + +// Rollback restores the previously running container. +func (d *Docker) Rollback(ctx context.Context, oldContainerID, oldImage string) error { + d.Log.Info("docker.rollback", "old_container", oldContainerID, "old_image", oldImage, "stub", d.Stub) + if d.Stub { + return sleepCtx(ctx, 2*time.Second) + } + return fmt.Errorf("docker.Rollback: not implemented yet (Slice 3B)") +} + +// Snapshot captures the state of the app's persistent volumes to a tarball +// in the snapshot directory. Slice 3C will use `docker run` against a +// tar-toolbox image; for Slice 3A we just log. +func (d *Docker) Snapshot(ctx context.Context, dst string) error { + d.Log.Info("docker.snapshot", "dst", dst, "stub", d.Stub) + if d.Stub { + return sleepCtx(ctx, 2*time.Second) + } + return fmt.Errorf("docker.Snapshot: not implemented yet (Slice 3C)") +} + +// sleepCtx sleeps for d, returning early if ctx is cancelled. +func sleepCtx(ctx context.Context, d time.Duration) error { + select { + case <-time.After(d): + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/updater/go.mod b/updater/go.mod new file mode 100644 index 0000000..0762749 --- /dev/null +++ b/updater/go.mod @@ -0,0 +1,8 @@ +// processgit-updater is the sidecar that pulls and applies ProcessGit +// updates inside a Docker deployment. It's a separate Go module from the +// main ProcessGit codebase so that its dependency surface stays small and +// auditable — the sidecar has access to /var/run/docker.sock and runs +// privileged-by-implication, so every additional dependency is a risk. +module github.com/Algomation-AI/ProcessGit/updater + +go 1.25 diff --git a/updater/job.go b/updater/job.go new file mode 100644 index 0000000..c042ee4 --- /dev/null +++ b/updater/job.go @@ -0,0 +1,268 @@ +// Job and Step types, plus atomic JSON-file storage. The updater keeps a +// single rolling state document at $STATE_DIR/state.json containing the +// last N jobs (most-recent first). Writes use the write-temp-then-rename +// pattern so a crash mid-write cannot corrupt the state file. + +package main + +import ( + "crypto/rand" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" + "sync" + "time" +) + +// State enumerates the steps of an update job. +type State string + +const ( + StateIdle State = "idle" + StatePlanning State = "planning" + StateSnapshotting State = "snapshotting" + StatePulling State = "pulling" + StateVerifying State = "verifying" + StateMigrating State = "migrating" + StateSwapping State = "swapping" + StateHealthchecking State = "healthchecking" + + // Terminal success + StateCommitted State = "committed" + + // Terminal failure paths + StateRollingBack State = "rolling_back" + StateRolledBack State = "rolled_back" + StateFailed State = "failed" // failure DURING rollback — manual intervention needed + StateAborted State = "aborted" // user-requested abort +) + +// IsTerminal reports whether the state is a final state. +func (s State) IsTerminal() bool { + switch s { + case StateCommitted, StateRolledBack, StateFailed, StateAborted: + return true + } + return false +} + +// Step records one phase of a Job. +type Step struct { + State State `json:"state"` + StartedAt time.Time `json:"started_at"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Output string `json:"output,omitempty"` + Error string `json:"error,omitempty"` +} + +func (s *Step) Finish(output string, err error) { + now := time.Now().UTC() + s.CompletedAt = &now + s.Output = output + if err != nil { + s.Error = err.Error() + } +} + +// Job is one end-to-end update attempt. Persisted to state.json. +type Job struct { + ID string `json:"id"` + State State `json:"state"` + TargetTag string `json:"target_tag"` // e.g. "v0.1.2" + TargetVersion string `json:"target_version,omitempty"` // e.g. "0.1.2" + TargetImage string `json:"target_image,omitempty"` // e.g. "ghcr.io/algomation-ai/processgit:0.1.2" + TargetDigest string `json:"target_digest,omitempty"` // e.g. "sha256:..." + PreviousImage string `json:"previous_image,omitempty"` // captured before swap for rollback + StartedAt time.Time `json:"started_at"` + UpdatedAt time.Time `json:"updated_at"` + CompletedAt *time.Time `json:"completed_at,omitempty"` + Steps []Step `json:"steps"` + Error string `json:"error,omitempty"` +} + +// NewJob mints a job with a unique short ID. +func NewJob(targetTag string) *Job { + now := time.Now().UTC() + return &Job{ + ID: newJobID(), + State: StateIdle, + TargetTag: targetTag, + StartedAt: now, + UpdatedAt: now, + Steps: []Step{}, + } +} + +func newJobID() string { + b := make([]byte, 8) + if _, err := rand.Read(b); err != nil { + // Reading /dev/urandom shouldn't fail; if it does, fall back to time-based. + return fmt.Sprintf("job_%d", time.Now().UnixNano()) + } + return "job_" + hex.EncodeToString(b) +} + +// transitionTo advances the job state and starts a Step for it. The previous +// Step (if any) must already have been finished by the caller. +func (j *Job) transitionTo(s State) *Step { + j.State = s + j.UpdatedAt = time.Now().UTC() + if s.IsTerminal() { + now := time.Now().UTC() + j.CompletedAt = &now + } + step := Step{State: s, StartedAt: time.Now().UTC()} + j.Steps = append(j.Steps, step) + return &j.Steps[len(j.Steps)-1] +} + +// fail marks the job as terminal-failed with the given error. +func (j *Job) fail(s State, err error) { + j.State = s + j.Error = err.Error() + now := time.Now().UTC() + j.UpdatedAt = now + j.CompletedAt = &now +} + +// Store is the on-disk job log. Always holds the *full* history bounded by +// maxJobs; new jobs are prepended to keep recent-first ordering. +type Store struct { + mu sync.Mutex + path string + maxJobs int + jobs []*Job + active *Job // pointer into jobs[0] when there's a non-terminal job +} + +const defaultMaxJobs = 50 + +// NewStore loads (or creates) the state file at path. +func NewStore(path string) (*Store, error) { + s := &Store{ + path: path, + maxJobs: defaultMaxJobs, + } + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return nil, fmt.Errorf("create state dir: %w", err) + } + if err := s.load(); err != nil { + return nil, err + } + return s, nil +} + +type stateFile struct { + SchemaVersion int `json:"schema_version"` + Jobs []*Job `json:"jobs"` +} + +func (s *Store) load() error { + data, err := os.ReadFile(s.path) + if errors.Is(err, os.ErrNotExist) { + s.jobs = nil + return nil + } + if err != nil { + return fmt.Errorf("read state: %w", err) + } + var sf stateFile + if err := json.Unmarshal(data, &sf); err != nil { + return fmt.Errorf("parse state: %w", err) + } + s.jobs = sf.Jobs + for _, j := range s.jobs { + if !j.State.IsTerminal() { + s.active = j + break + } + } + return nil +} + +func (s *Store) save() error { + sf := stateFile{SchemaVersion: 1, Jobs: s.jobs} + data, err := json.MarshalIndent(sf, "", " ") + if err != nil { + return fmt.Errorf("marshal state: %w", err) + } + tmp := s.path + ".tmp" + if err := os.WriteFile(tmp, data, 0o644); err != nil { + return fmt.Errorf("write tmp state: %w", err) + } + if err := os.Rename(tmp, s.path); err != nil { + return fmt.Errorf("atomic rename state: %w", err) + } + return nil +} + +// AddJob prepends a job. Returns an error if there's already an active job. +func (s *Store) AddJob(j *Job) error { + s.mu.Lock() + defer s.mu.Unlock() + if s.active != nil { + return fmt.Errorf("update already in progress: job %s in state %s", s.active.ID, s.active.State) + } + s.jobs = append([]*Job{j}, s.jobs...) + if len(s.jobs) > s.maxJobs { + s.jobs = s.jobs[:s.maxJobs] + } + s.active = j + return s.save() +} + +// PersistActive saves changes to the currently-active job. +func (s *Store) PersistActive() error { + s.mu.Lock() + defer s.mu.Unlock() + if s.active != nil && s.active.State.IsTerminal() { + s.active = nil + } + return s.save() +} + +// ClearActive marks no job as active. Called after the active job reaches a +// terminal state. +func (s *Store) ClearActive() error { + s.mu.Lock() + defer s.mu.Unlock() + s.active = nil + return s.save() +} + +// Active returns the currently-running job, or nil. +func (s *Store) Active() *Job { + s.mu.Lock() + defer s.mu.Unlock() + return s.active +} + +// Get returns a snapshot copy of the job with the given ID. +func (s *Store) Get(id string) (*Job, bool) { + s.mu.Lock() + defer s.mu.Unlock() + for _, j := range s.jobs { + if j.ID == id { + cp := *j + cp.Steps = append([]Step(nil), j.Steps...) + return &cp, true + } + } + return nil, false +} + +// List returns snapshots of all jobs, recent-first. +func (s *Store) List() []*Job { + s.mu.Lock() + defer s.mu.Unlock() + out := make([]*Job, len(s.jobs)) + for i, j := range s.jobs { + cp := *j + cp.Steps = append([]Step(nil), j.Steps...) + out[i] = &cp + } + return out +} diff --git a/updater/main.go b/updater/main.go new file mode 100644 index 0000000..b6f0969 --- /dev/null +++ b/updater/main.go @@ -0,0 +1,169 @@ +// processgit-updater +// +// The updater sidecar. Exposes an HTTP API for the main ProcessGit +// container to ask "are there updates?" and "please apply this update". +// Orchestrates the actual update via docker.sock. +// +// Env config: +// +// PROCESSGIT_UPDATER_TOKEN (required) bearer token for the HTTP API +// PROCESSGIT_UPDATER_LISTEN default ":9000" +// PROCESSGIT_UPDATER_STATE_DIR default "/var/lib/processgit-updater" +// PROCESSGIT_UPDATER_REPO default "Algomation-AI/ProcessGit" +// PROCESSGIT_UPDATER_GITHUB_API default "https://api.github.com" +// PROCESSGIT_UPDATER_GITHUB_TOKEN optional; raises rate limit, allows private repos +// PROCESSGIT_UPDATER_APP_CONTAINER default "processgit" +// PROCESSGIT_UPDATER_STUB "true" to run with docker ops stubbed (Slice 3A default) +// +// Version is set at build time via -ldflags "-X main.version=..." + +package main + +import ( + "context" + "errors" + "flag" + "log/slog" + "net/http" + "os" + "os/signal" + "path/filepath" + "strconv" + "strings" + "syscall" + "time" +) + +var version = "dev" + +func main() { + listen := envOr("PROCESSGIT_UPDATER_LISTEN", ":9000") + stateDir := envOr("PROCESSGIT_UPDATER_STATE_DIR", "/var/lib/processgit-updater") + repo := envOr("PROCESSGIT_UPDATER_REPO", "Algomation-AI/ProcessGit") + githubAPI := envOr("PROCESSGIT_UPDATER_GITHUB_API", "https://api.github.com") + githubToken := os.Getenv("PROCESSGIT_UPDATER_GITHUB_TOKEN") + appContainer := envOr("PROCESSGIT_UPDATER_APP_CONTAINER", "processgit") + stub := envBool("PROCESSGIT_UPDATER_STUB", true) // Slice 3A: stub by default + token := os.Getenv("PROCESSGIT_UPDATER_TOKEN") + + // Allow -version for sanity + showVersion := flag.Bool("version", false, "print version and exit") + flag.Parse() + if *showVersion { + writeStdout("processgit-updater " + version + "\n") + os.Exit(0) + } + + logLevel := slog.LevelInfo + if envBool("PROCESSGIT_UPDATER_DEBUG", false) { + logLevel = slog.LevelDebug + } + log := slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel})) + slog.SetDefault(log) + + if token == "" { + log.Error("PROCESSGIT_UPDATER_TOKEN is required; refusing to start") + os.Exit(2) + } + if !strings.Contains(repo, "/") { + log.Error("PROCESSGIT_UPDATER_REPO must be OWNER/NAME", "value", repo) + os.Exit(2) + } + + statePath := filepath.Join(stateDir, "state.json") + store, err := NewStore(statePath) + if err != nil { + log.Error("init store", "err", err, "path", statePath) + os.Exit(1) + } + + gh := NewGitHubClient(githubAPI, repo, githubToken) + cosign := NewCosign() + docker := NewDocker(log.With("component", "docker"), appContainer, stub) + orch := &Orchestrator{ + Store: store, + GitHub: gh, + Cosign: cosign, + Docker: docker, + Log: log.With("component", "orchestrator"), + StateDir: stateDir, + } + + api := &API{ + Token: token, + Store: store, + GitHub: gh, + Orchestrator: orch, + Log: log.With("component", "api"), + Version: version, + } + + srv := &http.Server{ + Addr: listen, + Handler: api.Routes(), + ReadHeaderTimeout: 10 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 60 * time.Second, + IdleTimeout: 2 * time.Minute, + } + + log.Info("processgit-updater starting", + "version", version, + "listen", listen, + "state_dir", stateDir, + "repo", repo, + "app_container", appContainer, + "stub_mode", stub, + ) + + // Listen & shut down gracefully on SIGINT/SIGTERM. + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + errCh := make(chan error, 1) + go func() { + errCh <- srv.ListenAndServe() + }() + + select { + case <-ctx.Done(): + log.Info("shutdown signal received") + case err := <-errCh: + if err != nil && !errors.Is(err, http.ErrServerClosed) { + log.Error("server crashed", "err", err) + os.Exit(1) + } + return + } + + shutdownCtx, cancel := context.WithTimeout(context.Background(), 20*time.Second) + defer cancel() + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Error("graceful shutdown failed", "err", err) + os.Exit(1) + } + log.Info("clean shutdown") +} + +func envOr(k, def string) string { + if v := os.Getenv(k); v != "" { + return v + } + return def +} + +func envBool(k string, def bool) bool { + v := os.Getenv(k) + if v == "" { + return def + } + b, err := strconv.ParseBool(v) + if err != nil { + return def + } + return b +} + +func writeStdout(s string) { + _, _ = os.Stdout.WriteString(s) +} diff --git a/updater/manifest.go b/updater/manifest.go new file mode 100644 index 0000000..46ef4da --- /dev/null +++ b/updater/manifest.go @@ -0,0 +1,266 @@ +// Types mirroring the build/release.schema.json contract, plus the helpers +// that fetch a release manifest from a GitHub Release. +// +// We re-declare the types here (rather than importing from the main +// codebase) so the updater module stays independent and the contract +// boundary is explicit. Drift between the two is detected by the +// `processgit update check` JSON-output integration test. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + "time" +) + +type Manifest struct { + SchemaVersion int `json:"schema_version"` + Name string `json:"name"` + Version string `json:"version"` + Tag string `json:"tag"` + ReleasedAt string `json:"released_at"` + Prerelease bool `json:"prerelease"` + MinUpgradeFrom *string `json:"min_upgrade_from"` + Image Image `json:"image"` + Binaries []Binary `json:"binaries"` + Source *Source `json:"source,omitempty"` + Signing Signing `json:"signing"` + ReleaseNotesURL string `json:"release_notes_url"` + ReleaseNotesMarkdown string `json:"release_notes_markdown,omitempty"` + Migration Migration `json:"migration"` + BreakingChanges []string `json:"breaking_changes"` + Deprecations []string `json:"deprecations"` + Build Build `json:"build"` +} + +type Image struct { + Registry string `json:"registry"` + Repository string `json:"repository"` + Tag string `json:"tag"` + Digest string `json:"digest"` + Platforms []string `json:"platforms"` + AdditionalTags []string `json:"additional_tags,omitempty"` +} + +func (i Image) Ref() string { + return fmt.Sprintf("%s/%s:%s", i.Registry, i.Repository, i.Tag) +} + +func (i Image) DigestRef() string { + return fmt.Sprintf("%s/%s@%s", i.Registry, i.Repository, i.Digest) +} + +type Binary struct { + OS string `json:"os"` + Arch string `json:"arch"` + URL string `json:"url"` + Size int64 `json:"size,omitempty"` + SHA256 string `json:"sha256"` + Variant string `json:"variant,omitempty"` +} + +type Source struct { + URL string `json:"url"` + SHA256 string `json:"sha256"` + Size int64 `json:"size,omitempty"` +} + +type Signing struct { + Method string `json:"method"` + Issuer string `json:"issuer"` + IdentityRegex string `json:"identity_regex"` + RekorLogIndex *int64 `json:"rekor_log_index,omitempty"` +} + +type Migration struct { + Required bool `json:"required"` + Command string `json:"command,omitempty"` + EstimatedDowntimeSeconds int `json:"estimated_downtime_seconds,omitempty"` +} + +type Build struct { + Commit string `json:"commit,omitempty"` + WorkflowRunURL string `json:"workflow_run_url,omitempty"` + Builder string `json:"builder,omitempty"` +} + +// ghAsset is what the GitHub Releases API tells us about each release asset. +type ghAsset struct { + Name string `json:"name"` + BrowserDownloadURL string `json:"browser_download_url"` + Size int64 `json:"size"` +} + +// ghRelease is the subset of the GitHub Releases API we care about. +type ghRelease struct { + TagName string `json:"tag_name"` + Name string `json:"name"` + Draft bool `json:"draft"` + Prerelease bool `json:"prerelease"` + HTMLURL string `json:"html_url"` + PublishedAt time.Time `json:"published_at"` + Assets []ghAsset `json:"assets"` +} + +// GitHubClient is a tiny GitHub Releases REST client. +type GitHubClient struct { + APIBase string // e.g. "https://api.github.com" (override for GHE) + Repo string // "OWNER/NAME" + Token string // optional; empty for unauthenticated + HTTP *http.Client +} + +func NewGitHubClient(apiBase, repo, token string) *GitHubClient { + if apiBase == "" { + apiBase = "https://api.github.com" + } + return &GitHubClient{ + APIBase: strings.TrimRight(apiBase, "/"), + Repo: repo, + Token: token, + HTTP: &http.Client{Timeout: 30 * time.Second}, + } +} + +func (c *GitHubClient) do(ctx context.Context, method, urlStr string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, method, urlStr, nil) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "application/vnd.github+json") + req.Header.Set("X-GitHub-Api-Version", "2022-11-28") + req.Header.Set("User-Agent", "processgit-updater") + if c.Token != "" { + req.Header.Set("Authorization", "Bearer "+c.Token) + } + resp, err := c.HTTP.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + body, _ := io.ReadAll(io.LimitReader(resp.Body, 16<<20)) + if resp.StatusCode >= 400 { + return nil, fmt.Errorf("github API %s %s: HTTP %d: %s", method, urlStr, resp.StatusCode, strings.TrimSpace(string(body))) + } + return body, nil +} + +// LatestRelease walks /releases and returns the first non-draft release that +// matches the channel filter ("stable" excludes pre-releases; "prerelease" +// includes them). +func (c *GitHubClient) LatestRelease(ctx context.Context, channel string) (*ghRelease, error) { + u := fmt.Sprintf("%s/repos/%s/releases?per_page=30", c.APIBase, c.Repo) + body, err := c.do(ctx, http.MethodGet, u) + if err != nil { + return nil, err + } + var releases []ghRelease + if err := json.Unmarshal(body, &releases); err != nil { + return nil, fmt.Errorf("parse releases: %w", err) + } + for i := range releases { + r := releases[i] + if r.Draft { + continue + } + if channel == "stable" && r.Prerelease { + continue + } + return &r, nil + } + return nil, fmt.Errorf("no releases matching channel %q", channel) +} + +// ReleaseByTag fetches a single release by tag name. +func (c *GitHubClient) ReleaseByTag(ctx context.Context, tag string) (*ghRelease, error) { + u := fmt.Sprintf("%s/repos/%s/releases/tags/%s", c.APIBase, c.Repo, url.PathEscape(tag)) + body, err := c.do(ctx, http.MethodGet, u) + if err != nil { + return nil, err + } + var r ghRelease + if err := json.Unmarshal(body, &r); err != nil { + return nil, fmt.Errorf("parse release: %w", err) + } + return &r, nil +} + +// FetchManifest downloads release.json, release.json.sig, and release.json.crt +// from the given release and returns the parsed manifest plus the raw bytes of +// each (so the caller can pass them to cosign verify-blob). +type FetchedManifest struct { + Manifest *Manifest + JSONBytes []byte + Sig []byte + Cert []byte +} + +func (c *GitHubClient) FetchManifest(ctx context.Context, rel *ghRelease) (*FetchedManifest, error) { + want := map[string]*[]byte{ + "release.json": nil, + "release.json.sig": nil, + "release.json.crt": nil, + } + fm := &FetchedManifest{} + want["release.json"] = &fm.JSONBytes + want["release.json.sig"] = &fm.Sig + want["release.json.crt"] = &fm.Cert + + for _, a := range rel.Assets { + ptr, ok := want[a.Name] + if !ok { + continue + } + data, err := c.downloadAsset(ctx, a.BrowserDownloadURL) + if err != nil { + return nil, fmt.Errorf("download %s: %w", a.Name, err) + } + *ptr = data + } + for name, ptr := range want { + if *ptr == nil { + return nil, fmt.Errorf("release %s is missing required asset %q", rel.TagName, name) + } + } + + var m Manifest + if err := json.Unmarshal(fm.JSONBytes, &m); err != nil { + return nil, fmt.Errorf("parse release.json: %w", err) + } + if m.SchemaVersion != 1 { + return nil, fmt.Errorf("unsupported release manifest schema_version=%d (this updater supports 1)", m.SchemaVersion) + } + if m.Name != "processgit" { + return nil, fmt.Errorf("release manifest name=%q (expected %q)", m.Name, "processgit") + } + fm.Manifest = &m + return fm, nil +} + +func (c *GitHubClient) downloadAsset(ctx context.Context, downloadURL string) ([]byte, error) { + req, err := http.NewRequestWithContext(ctx, http.MethodGet, downloadURL, nil) + if err != nil { + return nil, err + } + // Release assets are served from a CDN that follows the API auth header + // only if it's a Bearer token on api.github.com; here we just use the + // public download URL with no auth. Public-repo releases are world-readable. + req.Header.Set("Accept", "application/octet-stream") + req.Header.Set("User-Agent", "processgit-updater") + resp, err := c.HTTP.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + body, _ := io.ReadAll(io.LimitReader(resp.Body, 4096)) + return nil, fmt.Errorf("download %s: HTTP %d: %s", downloadURL, resp.StatusCode, strings.TrimSpace(string(body))) + } + return io.ReadAll(io.LimitReader(resp.Body, 16<<20)) +} diff --git a/updater/orchestrator.go b/updater/orchestrator.go new file mode 100644 index 0000000..1b24c87 --- /dev/null +++ b/updater/orchestrator.go @@ -0,0 +1,200 @@ +// The orchestrator runs the update state machine. It owns the active Job +// throughout its lifecycle and is the single goroutine that mutates the Job +// fields between persistence calls. +// +// One update at a time, period. Concurrent updates would race for +// /var/run/docker.sock, the database migration, and the container swap. +// The store's AddJob enforces this. + +package main + +import ( + "context" + "errors" + "fmt" + "log/slog" + "path/filepath" + "time" +) + +type Orchestrator struct { + Store *Store + GitHub *GitHubClient + Cosign *Cosign + Docker *Docker + Log *slog.Logger + StateDir string +} + +// Start kicks off an update for the given target tag. Returns the new Job +// immediately (the actual work runs in a goroutine). The caller polls +// /update/{id} for status. +func (o *Orchestrator) Start(ctx context.Context, targetTag string) (*Job, error) { + if targetTag == "" { + return nil, errors.New("target_tag is required") + } + job := NewJob(targetTag) + if err := o.Store.AddJob(job); err != nil { + return nil, err + } + + // Run the update on its own context derived from a background root, so + // the request's cancellation doesn't kill an update mid-flight. The + // orchestrator has its own deadline. + go o.run(context.WithoutCancel(ctx), job) + return job, nil +} + +// run drives the job through its state machine. Each phase is wrapped so a +// returned error fails the job (and triggers rollback for phases that come +// after the container swap). +func (o *Orchestrator) run(ctx context.Context, job *Job) { + ctx, cancel := context.WithTimeout(ctx, 30*time.Minute) + defer cancel() + + log := o.Log.With("job_id", job.ID, "target", job.TargetTag) + log.Info("update.start") + defer func() { + _ = o.Store.ClearActive() + log.Info("update.end", "final_state", job.State, "error", job.Error) + }() + + // === planning === + step := job.transitionTo(StatePlanning) + manifest, err := o.planUpdate(ctx, job, log) + if err != nil { + step.Finish("", err) + job.fail(StateRolledBack, fmt.Errorf("planning failed: %w", err)) + _ = o.Store.PersistActive() + return + } + step.Finish(fmt.Sprintf("target %s digest %s", manifest.Image.Ref(), manifest.Image.Digest), nil) + job.TargetVersion = manifest.Version + job.TargetImage = manifest.Image.Ref() + job.TargetDigest = manifest.Image.Digest + _ = o.Store.PersistActive() + + // === snapshotting === + step = job.transitionTo(StateSnapshotting) + snapPath := filepath.Join(o.StateDir, "snapshots", job.ID+".tar") + if err := o.Docker.Snapshot(ctx, snapPath); err != nil { + step.Finish("", err) + job.fail(StateRolledBack, fmt.Errorf("snapshot failed: %w", err)) + _ = o.Store.PersistActive() + return + } + step.Finish("snapshot at "+snapPath, nil) + _ = o.Store.PersistActive() + + // === pulling === + step = job.transitionTo(StatePulling) + if err := o.Docker.Pull(ctx, manifest.Image.DigestRef()); err != nil { + step.Finish("", err) + job.fail(StateRolledBack, fmt.Errorf("image pull failed: %w", err)) + _ = o.Store.PersistActive() + return + } + step.Finish("pulled "+manifest.Image.DigestRef(), nil) + _ = o.Store.PersistActive() + + // === verifying (after pull; cosign needs the image in the local registry to verify) === + step = job.transitionTo(StateVerifying) + if err := o.Cosign.VerifyImage(ctx, manifest); err != nil { + step.Finish("", err) + job.fail(StateRolledBack, fmt.Errorf("cosign image verify failed: %w", err)) + _ = o.Store.PersistActive() + return + } + // (the blob verify already happened in planUpdate; this is the image verify) + step.Finish("cosign verify passed", nil) + _ = o.Store.PersistActive() + + // === migrating === + if manifest.Migration.Required { + step = job.transitionTo(StateMigrating) + out, err := o.Docker.RunMigration(ctx, manifest.Image.DigestRef(), manifest.Migration.Command) + if err != nil { + step.Finish(out, err) + job.fail(StateRolledBack, fmt.Errorf("migration failed: %w", err)) + _ = o.Store.PersistActive() + return + } + step.Finish(out, nil) + _ = o.Store.PersistActive() + } + + // Capture the currently-running image so we can roll back if healthcheck fails. + prevDigest, err := o.Docker.InspectAppImageDigest(ctx) + if err != nil { + log.Warn("could not capture rollback target", "err", err) + } + job.PreviousImage = prevDigest + + // === swapping === + step = job.transitionTo(StateSwapping) + oldContainerID, err := o.Docker.SwapContainer(ctx, manifest.Image.DigestRef()) + if err != nil { + step.Finish("", err) + job.fail(StateRolledBack, fmt.Errorf("container swap failed: %w", err)) + _ = o.Store.PersistActive() + return + } + step.Finish("swapped (old container id "+oldContainerID+")", nil) + _ = o.Store.PersistActive() + + // === healthchecking — beyond this point, failure means we must roll back === + step = job.transitionTo(StateHealthchecking) + if err := o.Docker.Healthcheck(ctx); err != nil { + step.Finish("", err) + o.rollback(ctx, job, oldContainerID, prevDigest, log) + return + } + step.Finish("healthy", nil) + _ = o.Store.PersistActive() + + // === committed === + job.transitionTo(StateCommitted) + job.Steps[len(job.Steps)-1].Finish("committed", nil) + _ = o.Store.PersistActive() +} + +func (o *Orchestrator) planUpdate(ctx context.Context, job *Job, log *slog.Logger) (*Manifest, error) { + rel, err := o.GitHub.ReleaseByTag(ctx, job.TargetTag) + if err != nil { + return nil, fmt.Errorf("fetch release %s: %w", job.TargetTag, err) + } + fm, err := o.GitHub.FetchManifest(ctx, rel) + if err != nil { + return nil, err + } + // Verify the manifest signature BEFORE trusting any of its contents. + if err := o.Cosign.VerifyBlob(ctx, fm.Manifest, fm.JSONBytes, fm.Sig, fm.Cert); err != nil { + return nil, fmt.Errorf("release.json signature verification failed: %w", err) + } + log.Info("planned", + "target_tag", fm.Manifest.Tag, + "image_digest", fm.Manifest.Image.Digest, + "migration_required", fm.Manifest.Migration.Required, + ) + return fm.Manifest, nil +} + +// rollback is the failure path entered when healthcheck fails AFTER the +// container swap. If rollback itself fails we end up in StateFailed, which +// is the only state that requires human intervention. +func (o *Orchestrator) rollback(ctx context.Context, job *Job, oldContainerID, oldImage string, log *slog.Logger) { + step := job.transitionTo(StateRollingBack) + log.Warn("healthcheck failed — rolling back", + "old_container", oldContainerID, "old_image", oldImage) + if err := o.Docker.Rollback(ctx, oldContainerID, oldImage); err != nil { + step.Finish("", err) + job.fail(StateFailed, fmt.Errorf("rollback failed (manual intervention needed): %w", err)) + _ = o.Store.PersistActive() + return + } + step.Finish("rolled back to "+oldImage, nil) + job.transitionTo(StateRolledBack) + job.Steps[len(job.Steps)-1].Finish("rolled back successfully; update aborted", nil) + job.Error = "healthcheck failed after swap; rolled back to previous image" + _ = o.Store.PersistActive() +} diff --git a/updater/orchestrator_test.go b/updater/orchestrator_test.go new file mode 100644 index 0000000..230c581 --- /dev/null +++ b/updater/orchestrator_test.go @@ -0,0 +1,346 @@ +package main + +import ( + "context" + "encoding/json" + "io" + "log/slog" + "net/http" + "net/http/httptest" + "os" + "path/filepath" + "testing" + "time" +) + +// --- Store ----------------------------------------------------------------- + +func TestStore_RoundTrip(t *testing.T) { + dir := t.TempDir() + path := filepath.Join(dir, "state.json") + + s, err := NewStore(path) + if err != nil { + t.Fatal(err) + } + + j := NewJob("v1.2.3") + if err := s.AddJob(j); err != nil { + t.Fatalf("AddJob: %v", err) + } + if got := s.Active(); got == nil || got.ID != j.ID { + t.Fatalf("Active should be the just-added job; got %+v", got) + } + + // Adding a second job while one is active must fail. + if err := s.AddJob(NewJob("v1.2.4")); err == nil { + t.Fatal("expected AddJob to refuse while another is active") + } + + // Mark terminal & persist, then a new job should be accepted. + j.transitionTo(StateCommitted) + j.Steps[len(j.Steps)-1].Finish("done", nil) + if err := s.PersistActive(); err != nil { + t.Fatal(err) + } + if got := s.Active(); got != nil { + t.Fatalf("Active should be nil after terminal; got %+v", got) + } + + j2 := NewJob("v1.2.4") + if err := s.AddJob(j2); err != nil { + t.Fatalf("AddJob after terminal: %v", err) + } + + // Reload from disk and confirm both jobs are present, ordering preserved. + s2, err := NewStore(path) + if err != nil { + t.Fatal(err) + } + list := s2.List() + if len(list) != 2 { + t.Fatalf("expected 2 jobs, got %d", len(list)) + } + if list[0].TargetTag != "v1.2.4" || list[1].TargetTag != "v1.2.3" { + t.Fatalf("wrong ordering after reload: %s, %s", list[0].TargetTag, list[1].TargetTag) + } + if got := s2.Active(); got == nil || got.ID != j2.ID { + t.Fatalf("active not restored after reload; got %+v", got) + } +} + +func TestState_IsTerminal(t *testing.T) { + terminal := map[State]bool{ + StateCommitted: true, StateRolledBack: true, StateFailed: true, StateAborted: true, + StateIdle: false, StatePlanning: false, StateSnapshotting: false, + StatePulling: false, StateVerifying: false, StateMigrating: false, + StateSwapping: false, StateHealthchecking: false, StateRollingBack: false, + } + for s, want := range terminal { + if got := s.IsTerminal(); got != want { + t.Errorf("%s.IsTerminal() = %v, want %v", s, got, want) + } + } +} + +func TestJob_TransitionFinishesOnTerminal(t *testing.T) { + j := NewJob("v1.0.0") + j.transitionTo(StatePlanning) + if j.CompletedAt != nil { + t.Fatal("CompletedAt should be nil on non-terminal") + } + j.transitionTo(StateCommitted) + if j.CompletedAt == nil { + t.Fatal("CompletedAt should be set on terminal") + } +} + +// --- Orchestrator (stub docker) -------------------------------------------- + +// withDocker overrides the orchestrator's docker so we can simulate failure +// paths. +type fakeDocker struct { + *Docker + failPull error + failHealth error + failRollback error + migrationOut string +} + +func newOrch(t *testing.T) (*Orchestrator, *Store, *httptest.Server) { + t.Helper() + dir := t.TempDir() + store, err := NewStore(filepath.Join(dir, "state.json")) + if err != nil { + t.Fatal(err) + } + log := slog.New(slog.NewTextHandler(io.Discard, nil)) + docker := NewDocker(log, "processgit", true) + + gh := newFakeGitHub(t) + gh.Manifest = sampleManifest() + + cosign := newFakeCosign() + + orch := &Orchestrator{ + Store: store, + GitHub: NewGitHubClient(gh.Server.URL, "Algomation-AI/ProcessGit", ""), + Cosign: cosign.Cosign, + Docker: docker, + Log: log, + StateDir: dir, + } + return orch, store, gh.Server +} + +// fakeGitHub serves the minimum endpoints the orchestrator uses, with +// a configurable manifest and asset bodies. +type fakeGitHub struct { + Server *httptest.Server + Manifest *Manifest + // jsonBytes is the byte representation served as release.json (lets us + // inject malformed payloads in tests). + JSONBytes []byte +} + +func newFakeGitHub(t *testing.T) *fakeGitHub { + t.Helper() + fg := &fakeGitHub{} + mux := http.NewServeMux() + mux.HandleFunc("/repos/Algomation-AI/ProcessGit/releases/tags/", func(w http.ResponseWriter, r *http.Request) { + // fabricate a release pointing to our fixture asset URLs (served below) + base := "http://" + r.Host + rel := ghRelease{ + TagName: "v0.9.9", + Name: "v0.9.9", + Draft: false, + Prerelease: false, + HTMLURL: "https://example.com/release", + Assets: []ghAsset{ + {Name: "release.json", BrowserDownloadURL: base + "/dl/release.json"}, + {Name: "release.json.sig", BrowserDownloadURL: base + "/dl/release.json.sig"}, + {Name: "release.json.crt", BrowserDownloadURL: base + "/dl/release.json.crt"}, + }, + } + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(rel) + }) + mux.HandleFunc("/dl/release.json", func(w http.ResponseWriter, r *http.Request) { + if fg.JSONBytes != nil { + _, _ = w.Write(fg.JSONBytes) + return + } + b, _ := json.Marshal(fg.Manifest) + _, _ = w.Write(b) + }) + mux.HandleFunc("/dl/release.json.sig", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("fake-sig")) + }) + mux.HandleFunc("/dl/release.json.crt", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte("fake-crt")) + }) + fg.Server = httptest.NewServer(mux) + t.Cleanup(fg.Server.Close) + return fg +} + +func sampleManifest() *Manifest { + return &Manifest{ + SchemaVersion: 1, + Name: "processgit", + Version: "0.9.9", + Tag: "v0.9.9", + ReleasedAt: "2026-05-23T18:00:00Z", + Prerelease: false, + Image: Image{ + Registry: "ghcr.io", + Repository: "algomation-ai/processgit", + Tag: "0.9.9", + Digest: "sha256:1111111111111111111111111111111111111111111111111111111111111111", + Platforms: []string{"linux/amd64"}, + }, + Signing: Signing{ + Method: "cosign-keyless", + Issuer: "https://token.actions.githubusercontent.com", + IdentityRegex: "^https://github.com/Algomation-AI/.*", + }, + ReleaseNotesURL: "https://example.com/release", + Migration: Migration{Required: false}, + BreakingChanges: []string{}, + Deprecations: []string{}, + } +} + +// fakeCosign replaces the cosign binary with a script that always succeeds. +// We use exec.Command's PATH lookup against a temp dir containing a stub +// executable named "cosign" that exits 0. +type fakeCosignSetup struct { + Cosign *Cosign +} + +func newFakeCosign() *fakeCosignSetup { + // We can't trivially stub exec.Command, but we can point Cosign.Bin at + // /bin/true which exists on all Linux test runners and accepts any args. + return &fakeCosignSetup{Cosign: &Cosign{Bin: "/bin/true"}} +} + +func TestOrchestrator_HappyPath(t *testing.T) { + orch, store, _ := newOrch(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + job, err := orch.Start(ctx, "v0.9.9") + if err != nil { + t.Fatal(err) + } + if job.State != StateIdle { + t.Fatalf("initial state should be idle, got %s", job.State) + } + + // Wait for terminal. + deadline := time.Now().Add(30 * time.Second) + for time.Now().Before(deadline) { + got, _ := store.Get(job.ID) + if got != nil && got.State.IsTerminal() { + if got.State != StateCommitted { + t.Fatalf("expected committed, got %s; error=%s", got.State, got.Error) + } + if got.TargetVersion != "0.9.9" { + t.Errorf("TargetVersion not captured: %q", got.TargetVersion) + } + if got.TargetImage == "" || got.TargetDigest == "" { + t.Errorf("TargetImage/Digest not captured: %+v", got) + } + // Make sure we passed through expected states. + seen := map[State]bool{} + for _, s := range got.Steps { + seen[s.State] = true + } + for _, s := range []State{StatePlanning, StateSnapshotting, StatePulling, StateVerifying, StateSwapping, StateHealthchecking, StateCommitted} { + if !seen[s] { + t.Errorf("step %s missing from history", s) + } + } + return + } + time.Sleep(50 * time.Millisecond) + } + t.Fatal("orchestrator never reached terminal state") +} + +func TestOrchestrator_RejectsConcurrent(t *testing.T) { + orch, _, _ := newOrch(t) + ctx := context.Background() + if _, err := orch.Start(ctx, "v0.9.9"); err != nil { + t.Fatal(err) + } + _, err := orch.Start(ctx, "v0.9.8") + if err == nil { + t.Fatal("expected second concurrent Start to fail") + } +} + +// --- API ------------------------------------------------------------------- + +func TestAPI_BearerAuth(t *testing.T) { + orch, store, _ := newOrch(t) + api := &API{ + Token: "supersecret", + Store: store, + GitHub: orch.GitHub, + Orchestrator: orch, + Log: slog.New(slog.NewTextHandler(io.Discard, nil)), + Version: "test", + } + srv := httptest.NewServer(api.Routes()) + defer srv.Close() + + // healthz: no auth required + resp, err := http.Get(srv.URL + "/healthz") + if err != nil || resp.StatusCode != 200 { + t.Fatalf("healthz: status=%v err=%v", resp.StatusCode, err) + } + + // /status without auth: 401 + resp, err = http.Get(srv.URL + "/status") + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != 401 { + t.Fatalf("expected 401, got %d", resp.StatusCode) + } + + // /status with bad token: 401 + req, _ := http.NewRequest("GET", srv.URL+"/status", nil) + req.Header.Set("Authorization", "Bearer wrong") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != 401 { + t.Fatalf("expected 401 with wrong token, got %d", resp.StatusCode) + } + + // /status with right token: 200 + req, _ = http.NewRequest("GET", srv.URL+"/status", nil) + req.Header.Set("Authorization", "Bearer supersecret") + resp, err = http.DefaultClient.Do(req) + if err != nil { + t.Fatal(err) + } + if resp.StatusCode != 200 { + body, _ := io.ReadAll(resp.Body) + t.Fatalf("expected 200, got %d: %s", resp.StatusCode, string(body)) + } +} + +// --- Sanity: ensure we didn't import anything outside stdlib ------------- + +func TestNoExternalImports(t *testing.T) { + // This is enforced structurally by go.mod; the test is here as a + // belt-and-suspenders marker. If go.sum exists, fail loudly. + if _, err := os.Stat("go.sum"); err == nil { + t.Fatal("go.sum exists — updater has acquired an external dependency; review carefully") + } +}