From 739a6cc4d33a603003f7842ec45b6d13d79494ef Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 10:16:46 -0700 Subject: [PATCH 1/7] Add `mass environment preview` for per-PR preview environments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Converges a preview environment from a YAML config in four idempotent steps: forkEnvironment, setEnvironmentDefault, copyInstance per instance (params/version/secrets), and deployEnvironment. Config terminology follows V2: - `project` (no "slug") - `baseEnvironment` - `environmentDefaults` use `resourceType` + `resourceId` - `instances` (V2 term) Supports: - ABAC `attributes` (config and `-a` flag; flag overrides config). - `${VAR}` / `$VAR` expansion across the YAML for CI metadata. - Fork-level macros (copyEnvironmentDefaults / copySecrets / copyRemoteReferences). - Per-instance overrides for version, params (via copyInstance with overrides for deep-merge), and secrets. Wired through the SDK's environments / instances services. Per-instance `remoteReferences:` overrides aren't supported yet โ€” the SDK's instances service doesn't expose setRemoteReference; the fork-level copyRemoteReferences macro handles the bulk case until that lands. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/environment.go | 65 ++++ docs/generated/mass_environment.md | 1 + docs/generated/mass_environment_preview.md | 141 ++++++++ .../helpdocs/environment/preview.example.yaml | 56 ++++ docs/helpdocs/environment/preview.md | 110 +++++++ internal/commands/environment/preview.go | 302 ++++++++++++++++++ internal/commands/environment/preview_test.go | 297 +++++++++++++++++ 7 files changed, 972 insertions(+) create mode 100644 docs/generated/mass_environment_preview.md create mode 100644 docs/helpdocs/environment/preview.example.yaml create mode 100644 docs/helpdocs/environment/preview.md create mode 100644 internal/commands/environment/preview.go create mode 100644 internal/commands/environment/preview_test.go diff --git a/cmd/environment.go b/cmd/environment.go index 76a9bd13..c36dd6d0 100644 --- a/cmd/environment.go +++ b/cmd/environment.go @@ -86,12 +86,25 @@ func NewCmdEnvironment() *cobra.Command { RunE: runEnvironmentDefault, } + environmentPreviewCmd := &cobra.Command{ + Use: "preview [ID]", + Short: "Converge a preview environment from a YAML config", + Long: helpdocs.MustRender("environment/preview"), + Args: cobra.ExactArgs(1), + RunE: runEnvironmentPreview, + } + environmentPreviewCmd.Flags().StringP("file", "f", "preview.yaml", "Path to the preview config YAML") + environmentPreviewCmd.Flags().StringP("name", "n", "", "Environment name (defaults to ID if not provided)") + environmentPreviewCmd.Flags().StringP("description", "d", "", "Optional environment description") + environmentPreviewCmd.Flags().StringToStringP("attributes", "a", nil, "Custom attributes for ABAC (e.g. -a environment=preview,region=uswest). Overrides `attributes:` in the config file.") + environmentCmd.AddCommand(environmentExportCmd) environmentCmd.AddCommand(environmentGetCmd) environmentCmd.AddCommand(environmentListCmd) environmentCmd.AddCommand(environmentCreateCmd) environmentCmd.AddCommand(environmentUpdateCmd) environmentCmd.AddCommand(environmentDefaultCmd) + environmentCmd.AddCommand(environmentPreviewCmd) return environmentCmd } @@ -368,3 +381,55 @@ func runEnvironmentDefault(cmd *cobra.Command, args []string) error { return nil } + +func runEnvironmentPreview(cmd *cobra.Command, args []string) error { + ctx := context.Background() + + id := args[0] + configPath, err := cmd.Flags().GetString("file") + if err != nil { + return err + } + name, err := cmd.Flags().GetString("name") + if err != nil { + return err + } + description, err := cmd.Flags().GetString("description") + if err != nil { + return err + } + attrs, err := cmd.Flags().GetStringToString("attributes") + if err != nil { + return err + } + + cmd.SilenceUsage = true + + config, configErr := environment.LoadPreviewConfig(configPath) + if configErr != nil { + return configErr + } + + mdClient, mdClientErr := massdriver.NewClient() + if mdClientErr != nil { + return fmt.Errorf("error initializing massdriver client: %w", mdClientErr) + } + + previewOpts := environment.PreviewOptions{ + ID: id, + Name: name, + Description: description, + } + if cmd.Flags().Changed("attributes") { + previewOpts.Attributes = attrs + } + + env, runErr := environment.RunPreview(ctx, environment.NewPreviewAPI(mdClient), config, previewOpts) + if runErr != nil { + return runErr + } + + fmt.Printf("โœ… Preview environment `%s` converged\n", env.ID) + fmt.Printf("๐Ÿ”— %s\n", mdClient.URLs.Helper(ctx).EnvironmentURL(env.ID)) + return nil +} diff --git a/docs/generated/mass_environment.md b/docs/generated/mass_environment.md index 87b8d81d..f64efb1d 100644 --- a/docs/generated/mass_environment.md +++ b/docs/generated/mass_environment.md @@ -31,4 +31,5 @@ Environments can be modeled by application stage (production, staging, developme * [mass environment export](/cli/commands/mass_environment_export) - Export an environment from Massdriver * [mass environment get](/cli/commands/mass_environment_get) - Get an environment from Massdriver * [mass environment list](/cli/commands/mass_environment_list) - List environments +* [mass environment preview](/cli/commands/mass_environment_preview) - Converge a preview environment from a YAML config * [mass environment update](/cli/commands/mass_environment_update) - Update an environment's name, description, or attributes diff --git a/docs/generated/mass_environment_preview.md b/docs/generated/mass_environment_preview.md new file mode 100644 index 00000000..6f12d0de --- /dev/null +++ b/docs/generated/mass_environment_preview.md @@ -0,0 +1,141 @@ +--- +id: mass_environment_preview.md +slug: /cli/commands/mass_environment_preview +title: Mass Environment Preview +sidebar_label: Mass Environment Preview +--- +## mass environment preview + +Converge a preview environment from a YAML config + +### Synopsis + +# Preview Environment + +Converges a preview environment from a YAML config: forks a base environment, +pins environment defaults, applies per-instance overrides, and triggers a deploy. + +Re-running the command against the same config is safe โ€” every step is +idempotent. Use it to ramp up a per-PR environment on every git push, and again +to reset the env back to the declared state. + +## Usage + +```bash +mass environment preview [flags] +``` + +## Arguments + +- `ID`: the local segment of the preview environment's identifier (e.g. + `pr123`). Must match `^[a-z0-9]{1,20}$` โ€” lowercase alphanumeric only, no + dashes. The full stored identifier becomes `-`, where + `project` comes from the config file. + +## Flags + +- `--file, -f`: path to the preview YAML config (default `preview.yaml`). +- `--name, -n`: human-readable environment name (defaults to `ID`). +- `--description, -d`: optional environment description. +- `--attributes, -a`: custom attributes for ABAC, e.g. `-a environment=preview,region=uswest`. Overrides `attributes:` in the config file. + +## Environment-variable expansion + +`${VAR}` / `$VAR` references in the config file are expanded from the +process environment before parsing. Use this for CI-injected values like PR +numbers: + +```yaml +instances: + chatsvc: + params: + host: chatty-pr-${GITHUB_PR}.example.com +attributes: + pr: "${GITHUB_PR}" +``` + +Undefined variables expand to empty strings. + +## Config schema + +```yaml +# Required: the project the preview env lives in. +project: demo + +# Required: the local segment of the env to fork from. The full parent +# identifier is `-`. +baseEnvironment: production + +# Optional fork-level macros. Defaults to false. +copyEnvironmentDefaults: true # carry the parent's default resources over +copySecrets: false # fan copyInstance(copySecrets: true) to every instance +copyRemoteReferences: false # fan copyInstance(copyRemoteReferences: true) to every instance + +# Optional. Required when the organization declares attributes at the +# environment scope. Both keys and values must be strings. +attributes: + region: us-east-1 + pr: "${GITHUB_PR}" + +# Optional: pin specific resources as defaults for this env. `resourceType` is +# documentation for readers; the CLI only needs `resourceId`. +environmentDefaults: + - resourceType: aws-iam-role + resourceId: 161aeb95-e1c5-4f8d-803e-ef82087d7ad4 + +# Optional: per-instance overrides. Listed instances with no fields just +# inherit from the fork's seed. +instances: + chatdb: + version: "~2.0" # stable channel + + chatsvc: + version: "latest+dev" # `+dev` pulls from the development channel + params: + ingress: + enabled: true + host: chatty-pr-${GITHUB_PR}.mdawssbx.com + path: / + secrets: + - name: STRIPE_KEY + value: FOO + + # listed without overrides โ€” inherit from the fork + imported: + sessions: + sessionsapi: + sessionsfn: + sharedvpc: +``` + +## Examples + +```bash +# Converge a preview env for PR 123 from the default `preview.yaml` +mass environment preview pr123 + +# Same, with a friendly name +mass environment preview pr123 -n "Chat PR #123" + +# Point at a config in another path +mass environment preview pr123 -f .github/preview.yml +``` + + +``` +mass environment preview [ID] [flags] +``` + +### Options + +``` + -a, --attributes attributes: Custom attributes for ABAC (e.g. -a environment=preview,region=uswest). Overrides attributes: in the config file. (default []) + -d, --description string Optional environment description + -f, --file string Path to the preview config YAML (default "preview.yaml") + -h, --help help for preview + -n, --name string Environment name (defaults to ID if not provided) +``` + +### SEE ALSO + +* [mass environment](/cli/commands/mass_environment) - Environment management diff --git a/docs/helpdocs/environment/preview.example.yaml b/docs/helpdocs/environment/preview.example.yaml new file mode 100644 index 00000000..ae4b750f --- /dev/null +++ b/docs/helpdocs/environment/preview.example.yaml @@ -0,0 +1,56 @@ +# Example preview environment config. Drive a converge with: +# +# mass environment preview pr123 -f preview.yaml +# +# Every step is idempotent โ€” re-running the command resets the preview env +# back to the declared state. + +# The project the preview env lives in. +project: demo + +# The base env to fork from. The full parent identifier is +# `-` โ€” `demo-production` here. +baseEnvironment: production + +# Carry the parent env's default resource connections into the fork. +copyEnvironmentDefaults: true + +# Optional. Set environment-scope attributes for ABAC. Required when the org +# declares attributes at the environment scope. `${VAR}` references are +# expanded from the process environment before parsing โ€” handy for piping +# CI metadata in without rewriting the file. +attributes: + region: us-east-1 + pr: "${GITHUB_PR}" + +# Override env-level defaults for specific resource types. `resourceType` is +# documentation for the human reader; the CLI only needs `resourceId`. +environmentDefaults: + - resourceType: aws-iam-role + resourceId: 161aeb95-e1c5-4f8d-803e-ef82087d7ad4 + - resourceType: aws-vpc + resourceId: 1e9fc8a3-f011-433f-b937-b5e525fd753c + +# Per-instance overrides. Listed instances with no fields just inherit from +# the fork's seed. +instances: + chatsvc: + version: "latest+dev" # `+dev` pulls from the development channel + params: + ingress: + enabled: true + host: chatty-pr-${GITHUB_PR}.mdawssbx.com + path: / + secrets: + - name: STRIPE_KEY + value: FOO + + chatdb: + version: "~2.0" # stable channel + + # listed without overrides โ€” inherit from the fork + imported: + sessions: + sessionsapi: + sessionsfn: + sharedvpc: diff --git a/docs/helpdocs/environment/preview.md b/docs/helpdocs/environment/preview.md new file mode 100644 index 00000000..c90b6c55 --- /dev/null +++ b/docs/helpdocs/environment/preview.md @@ -0,0 +1,110 @@ +# Preview Environment + +Converges a preview environment from a YAML config: forks a base environment, +pins environment defaults, applies per-instance overrides, and triggers a deploy. + +Re-running the command against the same config is safe โ€” every step is +idempotent. Use it to ramp up a per-PR environment on every git push, and again +to reset the env back to the declared state. + +## Usage + +```bash +mass environment preview [flags] +``` + +## Arguments + +- `ID`: the local segment of the preview environment's identifier (e.g. + `pr123`). Must match `^[a-z0-9]{1,20}$` โ€” lowercase alphanumeric only, no + dashes. The full stored identifier becomes `-`, where + `project` comes from the config file. + +## Flags + +- `--file, -f`: path to the preview YAML config (default `preview.yaml`). +- `--name, -n`: human-readable environment name (defaults to `ID`). +- `--description, -d`: optional environment description. +- `--attributes, -a`: custom attributes for ABAC, e.g. `-a environment=preview,region=uswest`. Overrides `attributes:` in the config file. + +## Environment-variable expansion + +`${VAR}` / `$VAR` references in the config file are expanded from the +process environment before parsing. Use this for CI-injected values like PR +numbers: + +```yaml +instances: + chatsvc: + params: + host: chatty-pr-${GITHUB_PR}.example.com +attributes: + pr: "${GITHUB_PR}" +``` + +Undefined variables expand to empty strings. + +## Config schema + +```yaml +# Required: the project the preview env lives in. +project: demo + +# Required: the local segment of the env to fork from. The full parent +# identifier is `-`. +baseEnvironment: production + +# Optional fork-level macros. Defaults to false. +copyEnvironmentDefaults: true # carry the parent's default resources over +copySecrets: false # fan copyInstance(copySecrets: true) to every instance +copyRemoteReferences: false # fan copyInstance(copyRemoteReferences: true) to every instance + +# Optional. Required when the organization declares attributes at the +# environment scope. Both keys and values must be strings. +attributes: + region: us-east-1 + pr: "${GITHUB_PR}" + +# Optional: pin specific resources as defaults for this env. `resourceType` is +# documentation for readers; the CLI only needs `resourceId`. +environmentDefaults: + - resourceType: aws-iam-role + resourceId: 161aeb95-e1c5-4f8d-803e-ef82087d7ad4 + +# Optional: per-instance overrides. Listed instances with no fields just +# inherit from the fork's seed. +instances: + chatdb: + version: "~2.0" # stable channel + + chatsvc: + version: "latest+dev" # `+dev` pulls from the development channel + params: + ingress: + enabled: true + host: chatty-pr-${GITHUB_PR}.mdawssbx.com + path: / + secrets: + - name: STRIPE_KEY + value: FOO + + # listed without overrides โ€” inherit from the fork + imported: + sessions: + sessionsapi: + sessionsfn: + sharedvpc: +``` + +## Examples + +```bash +# Converge a preview env for PR 123 from the default `preview.yaml` +mass environment preview pr123 + +# Same, with a friendly name +mass environment preview pr123 -n "Chat PR #123" + +# Point at a config in another path +mass environment preview pr123 -f .github/preview.yml +``` diff --git a/internal/commands/environment/preview.go b/internal/commands/environment/preview.go new file mode 100644 index 00000000..532cb8a9 --- /dev/null +++ b/internal/commands/environment/preview.go @@ -0,0 +1,302 @@ +package environment + +import ( + "context" + "errors" + "fmt" + "os" + + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/environments" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" + "sigs.k8s.io/yaml" +) + +// PreviewConfig is the YAML config that drives `mass environment preview`. +// +// Stable terminology: +// - `project` (no "slug") +// - `baseEnvironment` (the env we're forking from) +// - `environmentDefaults` use `resourceType` (catalog name) + `resourceId` โ€” no +// "artifact" terminology, no "massdriver/" scoping prefix. +// - `instances` (V2 term; not "packages") +type PreviewConfig struct { + // Project is the project identifier the preview env lives in. Required. + Project string `json:"project"` + + // BaseEnvironment is the identifier (local segment) of the env we fork from. Required. + BaseEnvironment string `json:"baseEnvironment"` + + // CopyEnvironmentDefaults inherits the parent's default resource connections + // into the fork on top of any explicit `environmentDefaults` overrides. + CopyEnvironmentDefaults bool `json:"copyEnvironmentDefaults,omitempty"` + + // CopySecrets fans copyInstance's `copySecrets: true` across every package + // during the fork. Per-instance secret overrides in `instances` still apply + // after this. + CopySecrets bool `json:"copySecrets,omitempty"` + + // CopyRemoteReferences fans copyInstance's `copyRemoteReferences: true` + // across every package during the fork. The SDK does not yet expose a + // per-instance setRemoteReference, so override granularity stops at the + // fork-level macro for now. + CopyRemoteReferences bool `json:"copyRemoteReferences,omitempty"` + + // Attributes are key/value labels set on the forked environment. Required + // when the organization declares attributes at the environment scope (ABAC + // gates `environment:create` on attribute-shaped policies). Both keys and + // values must be strings. CLI flag `-a/--attributes` overrides this. + Attributes map[string]string `json:"attributes,omitempty"` + + // EnvironmentDefaults pins specific resources as the env's defaults of their + // type. Each entry must point at an existing resource. + EnvironmentDefaults []EnvironmentDefaultEntry `json:"environmentDefaults,omitempty"` + + // Instances lists per-instance overrides. Listed instances without explicit + // fields just inherit from the fork's seed. + Instances map[string]InstanceOverride `json:"instances,omitempty"` +} + +// EnvironmentDefaultEntry pins one resource as a default for the preview env. +// `resourceType` is documentation for the human reader; the CLI only needs +// `resourceId` for the API call. +type EnvironmentDefaultEntry struct { + ResourceType string `json:"resourceType,omitempty"` + ResourceID string `json:"resourceId"` +} + +// InstanceOverride captures the per-instance overrides for a preview env. +// Every field is optional; missing fields fall back to the value the fork +// seeded from the base environment. +// +// `version` accepts a semver constraint (e.g. `~2.0`, `1.2.3`, `latest`). +// Append `+dev` to pull from the development channel โ€” e.g. `latest+dev` or +// `~2.0+dev`. +type InstanceOverride struct { + Version string `json:"version,omitempty"` + Params map[string]any `json:"params,omitempty"` + Secrets []PreviewSecret `json:"secrets,omitempty"` +} + +// PreviewSecret is a single secret override on an instance. +type PreviewSecret struct { + Name string `json:"name"` + Value string `json:"value"` +} + +// PreviewOptions controls a single invocation of RunPreview. +type PreviewOptions struct { + // ID is the local segment of the preview env identifier (e.g. "pr123"). + // Must match `^[a-z0-9]{1,20}$` โ€” lowercase alphanumeric only, no dashes. + ID string + // Name is the human-readable name; defaults to ID. + Name string + // Description is the optional environment description. + Description string + // Attributes overrides `config.Attributes` when non-nil โ€” useful for + // piping CI metadata in via the CLI flag without rewriting the config. + Attributes map[string]string +} + +// PreviewAPI is the narrow SDK surface RunPreview needs. Tests supply a stub +// directly; production callers use [NewPreviewAPI] to bind a *massdriver.Client. +type PreviewAPI interface { + Fork(ctx context.Context, parentID string, input environments.ForkInput) (*types.Environment, error) + SetEnvironmentDefault(ctx context.Context, environmentID, resourceID string) error + CopyInstance(ctx context.Context, sourceID, destinationID string, input instances.CopyInput) (*types.Instance, error) + UpdateInstance(ctx context.Context, id string, input instances.UpdateInput) (*types.Instance, error) + SetInstanceSecret(ctx context.Context, instanceID, name, value string) error + DeployEnvironment(ctx context.Context, id string) (*types.Environment, error) +} + +// NewPreviewAPI returns the production [PreviewAPI] backed by the SDK client. +func NewPreviewAPI(c *massdriver.Client) PreviewAPI { return sdkPreviewAPI{c: c} } + +type sdkPreviewAPI struct{ c *massdriver.Client } + +func (s sdkPreviewAPI) Fork(ctx context.Context, parentID string, input environments.ForkInput) (*types.Environment, error) { + return s.c.Environments.Fork(ctx, parentID, input) +} + +func (s sdkPreviewAPI) SetEnvironmentDefault(ctx context.Context, environmentID, resourceID string) error { + _, err := s.c.Environments.SetDefault(ctx, environmentID, resourceID) + return err +} + +func (s sdkPreviewAPI) CopyInstance(ctx context.Context, sourceID, destinationID string, input instances.CopyInput) (*types.Instance, error) { + return s.c.Instances.Copy(ctx, sourceID, destinationID, input) +} + +func (s sdkPreviewAPI) UpdateInstance(ctx context.Context, id string, input instances.UpdateInput) (*types.Instance, error) { + return s.c.Instances.Update(ctx, id, input) +} + +func (s sdkPreviewAPI) SetInstanceSecret(ctx context.Context, instanceID, name, value string) error { + _, err := s.c.Instances.SetSecret(ctx, instanceID, name, value) + return err +} + +func (s sdkPreviewAPI) DeployEnvironment(ctx context.Context, id string) (*types.Environment, error) { + return s.c.Environments.Deploy(ctx, id) +} + +// RunPreview converges a preview environment from `config`: +// +// 1. Fork the base environment. +// 2. Pin any environment defaults declared in the config. +// 3. Apply per-instance overrides (version, params, secrets). +// 4. Trigger a deploy of every instance in dependency order. +// +// Every step but (4) is idempotent โ€” re-running the command against the same +// config converges the environment back to the declared state. +func RunPreview(ctx context.Context, api PreviewAPI, config *PreviewConfig, opts PreviewOptions) (*types.Environment, error) { + if validateErr := validatePreviewConfig(config); validateErr != nil { + return nil, validateErr + } + if opts.ID == "" { + return nil, errors.New("preview environment ID is required") + } + + parentID := fmt.Sprintf("%s-%s", config.Project, config.BaseEnvironment) + previewID := fmt.Sprintf("%s-%s", config.Project, opts.ID) + name := opts.Name + if name == "" { + name = opts.ID + } + + attrs := config.Attributes + if opts.Attributes != nil { + attrs = opts.Attributes + } + + fmt.Printf("โคด Forking `%s` โ†’ `%s`\n", parentID, previewID) + forkInput := environments.ForkInput{ + ID: opts.ID, + Name: name, + Description: opts.Description, + Attributes: stringMapToAnyMap(attrs), + CopyEnvironmentDefaults: config.CopyEnvironmentDefaults, + CopySecrets: config.CopySecrets, + CopyRemoteReferences: config.CopyRemoteReferences, + } + env, forkErr := api.Fork(ctx, parentID, forkInput) + if forkErr != nil { + return nil, fmt.Errorf("fork failed: %w", forkErr) + } + + for _, ed := range config.EnvironmentDefaults { + fmt.Printf("๐Ÿ“Œ Pinning environment default `%s`\n", ed.ResourceID) + if edErr := api.SetEnvironmentDefault(ctx, previewID, ed.ResourceID); edErr != nil { + return nil, fmt.Errorf("set environment default %s: %w", ed.ResourceID, edErr) + } + } + + for localID, override := range config.Instances { + instanceID := fmt.Sprintf("%s-%s", previewID, localID) + if applyErr := applyInstanceOverride(ctx, api, config, instanceID, localID, override); applyErr != nil { + return nil, fmt.Errorf("instance %s: %w", instanceID, applyErr) + } + } + + fmt.Printf("๐Ÿš€ Deploying `%s`\n", previewID) + if _, deployErr := api.DeployEnvironment(ctx, previewID); deployErr != nil { + return nil, fmt.Errorf("deploy failed: %w", deployErr) + } + + return env, nil +} + +// applyInstanceOverride applies the per-instance configuration in `override` +// to the preview env's instance. Order matters: params first (via copyInstance +// from the base env's matching instance, so it deep-merges over the parent's +// values), then version, then secrets. +func applyInstanceOverride(ctx context.Context, api PreviewAPI, config *PreviewConfig, instanceID, localID string, override InstanceOverride) error { + if len(override.Params) > 0 { + sourceID := fmt.Sprintf("%s-%s-%s", config.Project, config.BaseEnvironment, localID) + fmt.Printf("๐Ÿ“ฆ Configuring instance `%s`\n", instanceID) + if _, copyErr := api.CopyInstance(ctx, sourceID, instanceID, instances.CopyInput{Overrides: override.Params}); copyErr != nil { + return fmt.Errorf("copy params: %w", copyErr) + } + } + + if override.Version != "" { + fmt.Printf("๐Ÿท Pinning version on `%s`\n", instanceID) + if _, updateErr := api.UpdateInstance(ctx, instanceID, instances.UpdateInput{Version: override.Version}); updateErr != nil { + return fmt.Errorf("update version: %w", updateErr) + } + } + + for _, secret := range override.Secrets { + fmt.Printf("๐Ÿ” Setting secret `%s` on `%s`\n", secret.Name, instanceID) + if secretErr := api.SetInstanceSecret(ctx, instanceID, secret.Name, secret.Value); secretErr != nil { + return fmt.Errorf("set secret %s: %w", secret.Name, secretErr) + } + } + + return nil +} + +// LoadPreviewConfig reads and parses a preview config from `path`. +// +// `${VAR}` / `$VAR` references in the raw YAML are expanded from the +// process environment before parsing โ€” so a config can read: +// +// instances: +// chatsvc: +// params: +// host: chatty-pr-${GITHUB_PR}.example.com +// +// and pick up `GITHUB_PR` from the CI runner. Undefined variables expand to +// empty strings, matching `os.ExpandEnv`'s standard behavior. +func LoadPreviewConfig(path string) (*PreviewConfig, error) { + data, err := os.ReadFile(path) + if err != nil { + return nil, fmt.Errorf("read preview config: %w", err) + } + expanded := os.ExpandEnv(string(data)) + cfg := &PreviewConfig{} + if unmarshalErr := yaml.Unmarshal([]byte(expanded), cfg); unmarshalErr != nil { + return nil, fmt.Errorf("parse preview config: %w", unmarshalErr) + } + return cfg, nil +} + +// stringMapToAnyMap widens a string-valued map for SDK callers that expect +// `map[string]any` (matches the `:map` GraphQL type). Returns nil so the +// generated input struct keeps `attributes` absent when no overrides exist. +func stringMapToAnyMap(in map[string]string) map[string]any { + if len(in) == 0 { + return nil + } + out := make(map[string]any, len(in)) + for k, v := range in { + out[k] = v + } + return out +} + +func validatePreviewConfig(config *PreviewConfig) error { + if config == nil { + return errors.New("preview config is required") + } + if config.Project == "" { + return errors.New("preview config: `project` is required") + } + if config.BaseEnvironment == "" { + return errors.New("preview config: `baseEnvironment` is required") + } + for i, ed := range config.EnvironmentDefaults { + if ed.ResourceID == "" { + return fmt.Errorf("preview config: environmentDefaults[%d]: `resourceId` is required", i) + } + } + for localID, override := range config.Instances { + for i, secret := range override.Secrets { + if secret.Name == "" { + return fmt.Errorf("preview config: instances.%s.secrets[%d]: `name` is required", localID, i) + } + } + } + return nil +} diff --git a/internal/commands/environment/preview_test.go b/internal/commands/environment/preview_test.go new file mode 100644 index 00000000..74d18915 --- /dev/null +++ b/internal/commands/environment/preview_test.go @@ -0,0 +1,297 @@ +package environment_test + +import ( + "context" + "errors" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/massdriver-cloud/mass/internal/commands/environment" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/environments" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" +) + +const sampleConfig = ` +project: demo +baseEnvironment: production +copyEnvironmentDefaults: true + +environmentDefaults: + - resourceType: aws-iam-role + resourceId: res-iam + +instances: + chatdb: + version: "~2.0+dev" + params: + ingress: + enabled: true + secrets: + - name: STRIPE_KEY + value: FOO + + noOverrides: +` + +// stubPreviewAPI is an in-test stub for environment.PreviewAPI. Each test +// sets the fields it cares about and inspects the captured input on the +// other side. +type stubPreviewAPI struct { + forkInput environments.ForkInput + forkParent string + forkErr error + + setDefaultCalls []setDefaultCall + + copyInputs []copyInstanceCall + + updateInputs []updateInstanceCall + + setSecretCalls []setSecretCall + + deployed string + deployErr error + deployCallsLen int +} + +type setDefaultCall struct { + envID string + resource string +} + +type copyInstanceCall struct { + source, destination string + input instances.CopyInput +} + +type updateInstanceCall struct { + instanceID string + input instances.UpdateInput +} + +type setSecretCall struct { + instanceID, name, value string +} + +func (f *stubPreviewAPI) Fork(_ context.Context, parentID string, input environments.ForkInput) (*types.Environment, error) { + f.forkParent = parentID + f.forkInput = input + if f.forkErr != nil { + return nil, f.forkErr + } + return &types.Environment{ID: "demo-" + input.ID, Name: input.Name}, nil +} + +func (f *stubPreviewAPI) SetEnvironmentDefault(_ context.Context, environmentID, resourceID string) error { + f.setDefaultCalls = append(f.setDefaultCalls, setDefaultCall{envID: environmentID, resource: resourceID}) + return nil +} + +func (f *stubPreviewAPI) CopyInstance(_ context.Context, sourceID, destinationID string, input instances.CopyInput) (*types.Instance, error) { + f.copyInputs = append(f.copyInputs, copyInstanceCall{source: sourceID, destination: destinationID, input: input}) + return &types.Instance{ID: destinationID}, nil +} + +func (f *stubPreviewAPI) UpdateInstance(_ context.Context, id string, input instances.UpdateInput) (*types.Instance, error) { + f.updateInputs = append(f.updateInputs, updateInstanceCall{instanceID: id, input: input}) + return &types.Instance{ID: id}, nil +} + +func (f *stubPreviewAPI) SetInstanceSecret(_ context.Context, instanceID, name, value string) error { + f.setSecretCalls = append(f.setSecretCalls, setSecretCall{instanceID: instanceID, name: name, value: value}) + return nil +} + +func (f *stubPreviewAPI) DeployEnvironment(_ context.Context, id string) (*types.Environment, error) { + f.deployed = id + f.deployCallsLen++ + if f.deployErr != nil { + return nil, f.deployErr + } + return &types.Environment{ID: id}, nil +} + +func TestLoadPreviewConfig_ParsesAllFields(t *testing.T) { + path := writeConfig(t, sampleConfig) + + cfg, err := environment.LoadPreviewConfig(path) + if err != nil { + t.Fatalf("LoadPreviewConfig: %v", err) + } + + if cfg.Project != "demo" { + t.Errorf("project = %q, want demo", cfg.Project) + } + if cfg.BaseEnvironment != "production" { + t.Errorf("baseEnvironment = %q, want production", cfg.BaseEnvironment) + } + if !cfg.CopyEnvironmentDefaults { + t.Error("copyEnvironmentDefaults = false, want true") + } + if len(cfg.EnvironmentDefaults) != 1 || cfg.EnvironmentDefaults[0].ResourceID != "res-iam" { + t.Errorf("environmentDefaults parsed wrong: %+v", cfg.EnvironmentDefaults) + } + chat, ok := cfg.Instances["chatdb"] + if !ok { + t.Fatal("missing chatdb instance") + } + if chat.Version != "~2.0+dev" { + t.Errorf("chatdb version = %q, want ~2.0+dev", chat.Version) + } + if len(chat.Secrets) != 1 || chat.Secrets[0].Name != "STRIPE_KEY" { + t.Errorf("chatdb secrets wrong: %+v", chat.Secrets) + } +} + +func TestLoadPreviewConfig_RejectsMissingProject(t *testing.T) { + path := writeConfig(t, "baseEnvironment: production\n") + + cfg, err := environment.LoadPreviewConfig(path) + if err != nil { + t.Fatalf("LoadPreviewConfig: %v", err) + } + + _, runErr := environment.RunPreview(t.Context(), &stubPreviewAPI{}, cfg, environment.PreviewOptions{ID: "pr1"}) + if runErr == nil || !strings.Contains(runErr.Error(), "project") { + t.Errorf("expected project required error, got %v", runErr) + } +} + +func TestRunPreview_HappyPath(t *testing.T) { + path := writeConfig(t, sampleConfig) + cfg, err := environment.LoadPreviewConfig(path) + if err != nil { + t.Fatalf("LoadPreviewConfig: %v", err) + } + + api := &stubPreviewAPI{} + env, runErr := environment.RunPreview(t.Context(), api, cfg, environment.PreviewOptions{ID: "pr123"}) + if runErr != nil { + t.Fatalf("RunPreview: %v", runErr) + } + if env.ID != "demo-pr123" { + t.Errorf("env.ID = %q, want demo-pr123", env.ID) + } + + if api.forkParent != "demo-production" { + t.Errorf("forkParent = %q, want demo-production", api.forkParent) + } + if !api.forkInput.CopyEnvironmentDefaults { + t.Error("forkInput.CopyEnvironmentDefaults = false, want true") + } + if len(api.setDefaultCalls) != 1 || api.setDefaultCalls[0].resource != "res-iam" { + t.Errorf("setDefault calls wrong: %+v", api.setDefaultCalls) + } + if len(api.copyInputs) != 1 { + t.Errorf("expected 1 copyInstance call (chatdb has params); got %d", len(api.copyInputs)) + } + if len(api.updateInputs) != 1 || api.updateInputs[0].input.Version != "~2.0+dev" { + t.Errorf("update calls wrong: %+v", api.updateInputs) + } + if len(api.setSecretCalls) != 1 || api.setSecretCalls[0].name != "STRIPE_KEY" { + t.Errorf("setSecret calls wrong: %+v", api.setSecretCalls) + } + if api.deployed != "demo-pr123" { + t.Errorf("deployed = %q, want demo-pr123", api.deployed) + } +} + +func TestRunPreview_PropagatesForkFailure(t *testing.T) { + path := writeConfig(t, sampleConfig) + cfg, _ := environment.LoadPreviewConfig(path) + + api := &stubPreviewAPI{forkErr: errors.New("parent immutable")} + _, runErr := environment.RunPreview(t.Context(), api, cfg, environment.PreviewOptions{ID: "pr1"}) + if runErr == nil || !strings.Contains(runErr.Error(), "parent immutable") { + t.Errorf("expected fork error, got %v", runErr) + } + if api.deployCallsLen != 0 { + t.Errorf("deploy should not have been called after fork failure") + } +} + +func TestLoadPreviewConfig_ExpandsEnvVars(t *testing.T) { + t.Setenv("GITHUB_PR", "42") + body := ` +project: demo +baseEnvironment: production +attributes: + pr: "${GITHUB_PR}" +instances: + chatsvc: + params: + host: "chatty-pr-${GITHUB_PR}.example.com" +` + cfg, err := environment.LoadPreviewConfig(writeConfig(t, body)) + if err != nil { + t.Fatalf("LoadPreviewConfig: %v", err) + } + + if cfg.Attributes["pr"] != "42" { + t.Errorf("attributes.pr = %q, want 42", cfg.Attributes["pr"]) + } + if cfg.Instances["chatsvc"].Params["host"] != "chatty-pr-42.example.com" { + t.Errorf("host = %q, want chatty-pr-42.example.com", cfg.Instances["chatsvc"].Params["host"]) + } +} + +func TestRunPreview_AttributesFlowIntoFork(t *testing.T) { + t.Setenv("GITHUB_PR", "42") + path := writeConfig(t, ` +project: demo +baseEnvironment: production +attributes: + data_classification: pii + pr: "${GITHUB_PR}" +`) + cfg, _ := environment.LoadPreviewConfig(path) + + api := &stubPreviewAPI{} + if _, runErr := environment.RunPreview(t.Context(), api, cfg, environment.PreviewOptions{ID: "pr42"}); runErr != nil { + t.Fatalf("RunPreview: %v", runErr) + } + if api.forkInput.Attributes["data_classification"] != "pii" { + t.Errorf("attributes.data_classification = %v, want pii", api.forkInput.Attributes["data_classification"]) + } + if api.forkInput.Attributes["pr"] != "42" { + t.Errorf("attributes.pr = %v, want 42 (env-expanded)", api.forkInput.Attributes["pr"]) + } +} + +func TestRunPreview_CLIAttributesOverrideConfigAttributes(t *testing.T) { + path := writeConfig(t, ` +project: demo +baseEnvironment: production +attributes: + region: us-east-1 +`) + cfg, _ := environment.LoadPreviewConfig(path) + + api := &stubPreviewAPI{} + if _, runErr := environment.RunPreview(t.Context(), api, cfg, environment.PreviewOptions{ + ID: "pr1", + Attributes: map[string]string{"region": "us-west-2"}, + }); runErr != nil { + t.Fatalf("RunPreview: %v", runErr) + } + if api.forkInput.Attributes["region"] != "us-west-2" { + t.Errorf("attributes.region = %v, want us-west-2 (CLI override)", api.forkInput.Attributes["region"]) + } +} + +// --------------------------------------------------------------------------- +// Helpers +// --------------------------------------------------------------------------- + +func writeConfig(t *testing.T, body string) string { + t.Helper() + dir := t.TempDir() + path := filepath.Join(dir, "preview.yaml") + if err := os.WriteFile(path, []byte(body), 0600); err != nil { + t.Fatalf("write tmp config: %v", err) + } + return path +} From e3edb7cbadc94212e8e38ec8ce22308149a15306 Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 10:36:03 -0700 Subject: [PATCH 2/7] Rename EnvironmentDefaultEntry to DefaultEntry Revive's var-naming rule rejects `environment.EnvironmentDefaultEntry` for stuttering with the package name. `environment.DefaultEntry` reads cleaner at the call site too. Co-Authored-By: Claude Opus 4.7 (1M context) --- internal/commands/environment/preview.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/internal/commands/environment/preview.go b/internal/commands/environment/preview.go index 532cb8a9..bf36a43a 100644 --- a/internal/commands/environment/preview.go +++ b/internal/commands/environment/preview.go @@ -51,17 +51,17 @@ type PreviewConfig struct { // EnvironmentDefaults pins specific resources as the env's defaults of their // type. Each entry must point at an existing resource. - EnvironmentDefaults []EnvironmentDefaultEntry `json:"environmentDefaults,omitempty"` + EnvironmentDefaults []DefaultEntry `json:"environmentDefaults,omitempty"` // Instances lists per-instance overrides. Listed instances without explicit // fields just inherit from the fork's seed. Instances map[string]InstanceOverride `json:"instances,omitempty"` } -// EnvironmentDefaultEntry pins one resource as a default for the preview env. +// DefaultEntry pins one resource as a default for the preview env. // `resourceType` is documentation for the human reader; the CLI only needs // `resourceId` for the API call. -type EnvironmentDefaultEntry struct { +type DefaultEntry struct { ResourceType string `json:"resourceType,omitempty"` ResourceID string `json:"resourceId"` } From 2255e269903ee1f80acbc236699cc5cf5cb3aef3 Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 11:27:48 -0700 Subject: [PATCH 3/7] Add --follow to `environment preview` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Tails every deployment that fires during a preview rollout, prefixing each line with the instance ID so the interleaved output from parallel deployments stays grep-friendly: [ecomm-pr42-db] applying db schema [ecomm-pr42-app] starting container [ecomm-pr42-db] migrations done `FollowEnvironment` polls per-instance deployment lists every 2s, spawns a Deployments.TailLogs goroutine for each new deployment it sees, and exits after a quiet window with all observed deployments terminal. The platform deploys in dependency-ordered waves, so the quiet window has to outlast the gap between waves โ€” set to 30s. Polling instead of subscribing: the SDK's Absinthe/Phoenix-channels machinery is in an internal package, so writing real `environmentEvents` / `instanceEvents` subscriptions from the CLI would mean re-porting that layer. Polling has a few seconds of discovery latency per deployment, acceptable next to per-instance deploy times measured in minutes. The subscription path is cleaner and should land once the SDK exposes event subscriptions. Co-Authored-By: Claude Opus 4.7 (1M context) --- cmd/environment.go | 10 + docs/generated/mass_environment_preview.md | 2 + docs/helpdocs/environment/preview.md | 1 + internal/commands/environment/follow.go | 233 +++++++++++++++++++ internal/commands/environment/follow_test.go | 124 ++++++++++ 5 files changed, 370 insertions(+) create mode 100644 internal/commands/environment/follow.go create mode 100644 internal/commands/environment/follow_test.go diff --git a/cmd/environment.go b/cmd/environment.go index c36dd6d0..60a3b66a 100644 --- a/cmd/environment.go +++ b/cmd/environment.go @@ -6,6 +6,7 @@ import ( "embed" "encoding/json" "fmt" + "os" "strings" "text/template" @@ -97,6 +98,7 @@ func NewCmdEnvironment() *cobra.Command { environmentPreviewCmd.Flags().StringP("name", "n", "", "Environment name (defaults to ID if not provided)") environmentPreviewCmd.Flags().StringP("description", "d", "", "Optional environment description") environmentPreviewCmd.Flags().StringToStringP("attributes", "a", nil, "Custom attributes for ABAC (e.g. -a environment=preview,region=uswest). Overrides `attributes:` in the config file.") + environmentPreviewCmd.Flags().Bool("follow", false, "Stream every deployment's logs to stdout until the rollout completes. Each line is prefixed with the instance id.") environmentCmd.AddCommand(environmentExportCmd) environmentCmd.AddCommand(environmentGetCmd) @@ -402,6 +404,10 @@ func runEnvironmentPreview(cmd *cobra.Command, args []string) error { if err != nil { return err } + follow, err := cmd.Flags().GetBool("follow") + if err != nil { + return err + } cmd.SilenceUsage = true @@ -431,5 +437,9 @@ func runEnvironmentPreview(cmd *cobra.Command, args []string) error { fmt.Printf("โœ… Preview environment `%s` converged\n", env.ID) fmt.Printf("๐Ÿ”— %s\n", mdClient.URLs.Helper(ctx).EnvironmentURL(env.ID)) + + if follow { + return environment.FollowEnvironment(ctx, environment.NewFollowAPI(mdClient), env.ID, os.Stdout) + } return nil } diff --git a/docs/generated/mass_environment_preview.md b/docs/generated/mass_environment_preview.md index 6f12d0de..2046842b 100644 --- a/docs/generated/mass_environment_preview.md +++ b/docs/generated/mass_environment_preview.md @@ -38,6 +38,7 @@ mass environment preview [flags] - `--name, -n`: human-readable environment name (defaults to `ID`). - `--description, -d`: optional environment description. - `--attributes, -a`: custom attributes for ABAC, e.g. `-a environment=preview,region=uswest`. Overrides `attributes:` in the config file. +- `--follow`: stream every deployment's logs to stdout until the rollout completes. Each line is prefixed with the instance id so the interleaved output stays grep-friendly when multiple deployments run in parallel. ## Environment-variable expansion @@ -132,6 +133,7 @@ mass environment preview [ID] [flags] -a, --attributes attributes: Custom attributes for ABAC (e.g. -a environment=preview,region=uswest). Overrides attributes: in the config file. (default []) -d, --description string Optional environment description -f, --file string Path to the preview config YAML (default "preview.yaml") + --follow Stream every deployment's logs to stdout until the rollout completes. Each line is prefixed with the instance id. -h, --help help for preview -n, --name string Environment name (defaults to ID if not provided) ``` diff --git a/docs/helpdocs/environment/preview.md b/docs/helpdocs/environment/preview.md index c90b6c55..c1cea58f 100644 --- a/docs/helpdocs/environment/preview.md +++ b/docs/helpdocs/environment/preview.md @@ -26,6 +26,7 @@ mass environment preview [flags] - `--name, -n`: human-readable environment name (defaults to `ID`). - `--description, -d`: optional environment description. - `--attributes, -a`: custom attributes for ABAC, e.g. `-a environment=preview,region=uswest`. Overrides `attributes:` in the config file. +- `--follow`: stream every deployment's logs to stdout until the rollout completes. Each line is prefixed with the instance id so the interleaved output stays grep-friendly when multiple deployments run in parallel. ## Environment-variable expansion diff --git a/internal/commands/environment/follow.go b/internal/commands/environment/follow.go new file mode 100644 index 00000000..11fd3dad --- /dev/null +++ b/internal/commands/environment/follow.go @@ -0,0 +1,233 @@ +package environment + +import ( + "bytes" + "context" + "errors" + "fmt" + "io" + "sync" + "time" + + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/deployments" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" +) + +// FollowPollInterval controls how often each instance watcher polls for new +// deployments. Exposed as a var so tests can drop it to zero. +var FollowPollInterval = 2 * time.Second + +// FollowQuietWindow is how long the watcher loop waits with no new +// deployments observed before declaring the rollout done. The environment +// deploys in dependency-ordered waves, so we can't bail the moment one +// wave's deployments are terminal โ€” the next wave's haven't been created +// yet. The window has to be long enough to cover the gap between waves. +var FollowQuietWindow = 30 * time.Second + +// FollowAPI is the narrow SDK surface FollowEnvironment needs. Tests supply a +// stub directly; production callers use [NewFollowAPI] to bind a +// *massdriver.Client. +type FollowAPI interface { + ListInstances(ctx context.Context, input instances.ListInput) ([]types.Instance, error) + ListDeployments(ctx context.Context, input deployments.ListInput) ([]types.Deployment, error) + TailLogs(ctx context.Context, deploymentID string, w io.Writer) error +} + +// NewFollowAPI returns the production [FollowAPI] backed by the SDK client. +func NewFollowAPI(c *massdriver.Client) FollowAPI { return sdkFollowAPI{c: c} } + +type sdkFollowAPI struct{ c *massdriver.Client } + +func (s sdkFollowAPI) ListInstances(ctx context.Context, input instances.ListInput) ([]types.Instance, error) { + return s.c.Instances.List(ctx, input) +} + +func (s sdkFollowAPI) ListDeployments(ctx context.Context, input deployments.ListInput) ([]types.Deployment, error) { + return s.c.Deployments.List(ctx, input) +} + +func (s sdkFollowAPI) TailLogs(ctx context.Context, deploymentID string, w io.Writer) error { + return s.c.Deployments.TailLogs(ctx, deploymentID, w) +} + +// FollowEnvironment tails logs for every deployment that fires in an +// environment-level rollout, prefixing each line with the instance's id so +// the interleaved output is grep-friendly. +// +// The platform deploys in dependency-ordered waves: wave 1 instances start, +// finish, then wave 2 instances start. This watcher polls per-instance +// deployment lists every [FollowPollInterval], spawns a [Service.TailLogs] +// goroutine for each new deployment it sees, and exits when no new +// deployments have appeared for [FollowQuietWindow] and every observed +// deployment is in a terminal state. +// +// The user's hint on the design ticket was to use Absinthe subscriptions +// (`environmentEvents` / `instanceEvents`) instead of polling. The SDK's +// websocket / Absinthe machinery is in an internal package, so subscribing +// from the CLI would mean either re-porting that layer or waiting for the +// SDK to expose event subscriptions. The polling approach has a few +// seconds of discovery latency per deployment, which is acceptable +// relative to per-instance deploy times measured in minutes. +func FollowEnvironment(ctx context.Context, api FollowAPI, envID string, w io.Writer) error { + insts, err := api.ListInstances(ctx, instances.ListInput{EnvironmentID: envID}) + if err != nil { + return fmt.Errorf("list instances in environment %s: %w", envID, err) + } + if len(insts) == 0 { + return nil + } + + out := newPrefixedSink(w) + + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + var wg sync.WaitGroup + errCh := make(chan error, len(insts)) + + for _, inst := range insts { + wg.Add(1) + go func(inst types.Instance) { + defer wg.Done() + if watchErr := watchInstanceDeployments(ctx, api, inst, out); watchErr != nil && !errors.Is(watchErr, context.Canceled) { + errCh <- watchErr + } + }(inst) + } + + wg.Wait() + close(errCh) + + var firstErr error + for werr := range errCh { + if firstErr == nil { + firstErr = werr + } + } + return firstErr +} + +// watchInstanceDeployments polls for new deployments on a single instance +// and tails each one as it's created. Returns when the quiet window elapses +// with every observed deployment in a terminal state. +func watchInstanceDeployments(ctx context.Context, api FollowAPI, inst types.Instance, sink *prefixedSink) error { + prefixWriter := sink.For(inst.ID) + seen := map[string]struct{}{} + var inFlight sync.WaitGroup + var lastChange time.Time + allTerminal := true + + for { + deps, listErr := api.ListDeployments(ctx, deployments.ListInput{ + InstanceID: inst.ID, + PageSize: 25, + }) + if listErr != nil { + return fmt.Errorf("list deployments for %s: %w", inst.ID, listErr) + } + + anyNew := false + anyActive := false + for _, dep := range deps { + if _, taken := seen[dep.ID]; !taken { + seen[dep.ID] = struct{}{} + anyNew = true + lastChange = time.Now() + + inFlight.Add(1) + go func(depID string) { + defer inFlight.Done() + // TailLogs blocks until the deployment hits a terminal + // status. Errors are silenced per-deployment so one + // bad tail doesn't take the whole follow down โ€” the + // final deployment list will still show the failure. + _ = api.TailLogs(ctx, depID, prefixWriter) + }(dep.ID) + } + if !deployments.IsTerminal(string(dep.Status)) { + anyActive = true + } + } + + if anyNew { + allTerminal = false + } + if !anyActive && !anyNew && !lastChange.IsZero() && time.Since(lastChange) >= FollowQuietWindow { + allTerminal = true + break + } + + select { + case <-time.After(FollowPollInterval): + case <-ctx.Done(): + inFlight.Wait() + return ctx.Err() + } + } + + inFlight.Wait() + _ = allTerminal + return nil +} + +// prefixedSink serializes interleaved writes from per-instance tail +// goroutines and tags each line with the instance id. Writers handed out by +// [prefixedSink.For] line-buffer until they see a `\n`, then emit a single +// `[id] ` write under the sink's mutex so two goroutines never +// scribble on top of each other. +type prefixedSink struct { + mu sync.Mutex + w io.Writer +} + +func newPrefixedSink(w io.Writer) *prefixedSink { + return &prefixedSink{w: w} +} + +// For returns an io.Writer that prefixes every line it writes with +// "[] " and forwards to the shared writer. +func (s *prefixedSink) For(id string) io.Writer { + return &prefixedWriter{sink: s, prefix: []byte("[" + id + "] ")} +} + +type prefixedWriter struct { + sink *prefixedSink + prefix []byte + buf bytes.Buffer +} + +func (p *prefixedWriter) Write(b []byte) (int, error) { + n := len(b) + p.buf.Write(b) + if writeErr := p.flushCompleteLines(); writeErr != nil { + return n, writeErr + } + return n, nil +} + +// flushCompleteLines emits every fully-terminated line in the buffer under +// the sink mutex. A trailing partial line stays in the buffer until the +// next Write provides the terminating newline. +func (p *prefixedWriter) flushCompleteLines() error { + for { + raw := p.buf.Bytes() + idx := bytes.IndexByte(raw, '\n') + if idx < 0 { + return nil + } + line := raw[:idx+1] + p.sink.mu.Lock() + if _, err := p.sink.w.Write(p.prefix); err != nil { + p.sink.mu.Unlock() + return err + } + if _, err := p.sink.w.Write(line); err != nil { + p.sink.mu.Unlock() + return err + } + p.sink.mu.Unlock() + p.buf.Next(idx + 1) + } +} diff --git a/internal/commands/environment/follow_test.go b/internal/commands/environment/follow_test.go new file mode 100644 index 00000000..fef8c5df --- /dev/null +++ b/internal/commands/environment/follow_test.go @@ -0,0 +1,124 @@ +package environment_test + +import ( + "bytes" + "context" + "io" + "strings" + "sync" + "testing" + "time" + + "github.com/massdriver-cloud/mass/internal/commands/environment" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/deployments" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" +) + +// stubFollowAPI is an in-test stub for environment.FollowAPI. Each test +// supplies instances + per-deployment log scripts; the stub serves them +// back when polled. +type stubFollowAPI struct { + mu sync.Mutex + + instances []types.Instance + // deployments per instance, returned in order across successive + // ListDeployments calls. Empty means no deployments for that instance. + depQueue map[string][][]types.Deployment + depCalls map[string]int + + logs map[string]string + logErr map[string]error +} + +func (s *stubFollowAPI) ListInstances(_ context.Context, _ instances.ListInput) ([]types.Instance, error) { + return s.instances, nil +} + +func (s *stubFollowAPI) ListDeployments(_ context.Context, input deployments.ListInput) ([]types.Deployment, error) { + s.mu.Lock() + defer s.mu.Unlock() + queue := s.depQueue[input.InstanceID] + call := s.depCalls[input.InstanceID] + if call >= len(queue) { + // Stable past the script โ€” repeat the final entry to keep the + // poller in steady state. + if len(queue) == 0 { + return nil, nil + } + return queue[len(queue)-1], nil + } + s.depCalls[input.InstanceID] = call + 1 + return queue[call], nil +} + +func (s *stubFollowAPI) TailLogs(_ context.Context, deploymentID string, w io.Writer) error { + if err := s.logErr[deploymentID]; err != nil { + return err + } + _, err := io.WriteString(w, s.logs[deploymentID]) + return err +} + +func TestFollowEnvironment_PrefixesLinesWithInstanceID(t *testing.T) { + // Speed the loop up so the test isn't slow. + environment.FollowPollInterval = 0 + environment.FollowQuietWindow = 10 * time.Millisecond + t.Cleanup(func() { + environment.FollowPollInterval = 2 * time.Second + environment.FollowQuietWindow = 30 * time.Second + }) + + api := &stubFollowAPI{ + instances: []types.Instance{ + {ID: "ecomm-prod-db", Name: "db"}, + {ID: "ecomm-prod-app", Name: "app"}, + }, + depQueue: map[string][][]types.Deployment{ + "ecomm-prod-db": {{{ID: "dep-db-1", Status: "COMPLETED"}}}, + "ecomm-prod-app": {{{ID: "dep-app-1", Status: "COMPLETED"}}}, + }, + depCalls: map[string]int{}, + logs: map[string]string{ + "dep-db-1": "applying db schema\nmigrations done\n", + "dep-app-1": "starting app\nready\n", + }, + logErr: map[string]error{}, + } + + var buf bytes.Buffer + ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) + defer cancel() + + if err := environment.FollowEnvironment(ctx, api, "ecomm-prod", &buf); err != nil { + t.Fatalf("FollowEnvironment: %v", err) + } + + out := buf.String() + if !strings.Contains(out, "[ecomm-prod-db] applying db schema\n") { + t.Errorf("missing prefixed db line in:\n%s", out) + } + if !strings.Contains(out, "[ecomm-prod-db] migrations done\n") { + t.Errorf("missing prefixed db line 2 in:\n%s", out) + } + if !strings.Contains(out, "[ecomm-prod-app] starting app\n") { + t.Errorf("missing prefixed app line in:\n%s", out) + } + if !strings.Contains(out, "[ecomm-prod-app] ready\n") { + t.Errorf("missing prefixed app line 2 in:\n%s", out) + } +} + +func TestFollowEnvironment_NoOpForEmptyEnv(t *testing.T) { + api := &stubFollowAPI{ + instances: nil, + } + + var buf bytes.Buffer + if err := environment.FollowEnvironment(t.Context(), api, "ecomm-prod", &buf); err != nil { + t.Fatalf("FollowEnvironment: %v", err) + } + if buf.Len() != 0 { + t.Errorf("expected no output for empty env; got %q", buf.String()) + } +} From 651550d244a3b37d623e217fdb004974c76e6658 Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 15:05:49 -0700 Subject: [PATCH 4/7] Switch --follow from polling to SDK event streams MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The SDK now exposes `Environments.StreamEvents` and `Instances.StreamEvents` (massdriver-cloud/massdriver-sdk-go#22), so FollowEnvironment no longer needs to spin per-instance Deployments.List polling loops to discover new deployments. The new flow: - Subscribe to `environmentEvents(envID)` over WebSocket. - On each `DeploymentEvent` for a deployment we haven't seen, fetch the deployment once (the event payload trims `instance.id` for bandwidth) and kick off a `Deployments.TailLogs` goroutine into a prefix writer keyed on the instance ID. - Track which deployments are still active by transitioning them off the active set on terminal status events. - Exit when no active deployments remain and the quiet window has elapsed. Net effect from a user's POV: deployment discovery is now sub-second instead of polling-interval bound, and the CLI makes one round-trip per new deployment instead of one per (instance ร— poll interval). Pinned to the events-streaming branch HEAD until v0.2.2 is tagged. Co-Authored-By: Claude Opus 4.7 (1M context) --- go.mod | 2 +- go.sum | 6 +- internal/commands/environment/follow.go | 232 +++++++++---------- internal/commands/environment/follow_test.go | 150 ++++++------ 4 files changed, 198 insertions(+), 192 deletions(-) diff --git a/go.mod b/go.mod index 55e54152..a6aeb399 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/itchyny/gojq v0.12.16 github.com/manifoldco/promptui v0.9.0 github.com/massdriver-cloud/airlock v0.0.9 - github.com/massdriver-cloud/massdriver-sdk-go v0.2.1 + github.com/massdriver-cloud/massdriver-sdk-go v0.2.3 github.com/mattn/go-runewidth v0.0.16 github.com/opencontainers/image-spec v1.1.1 github.com/osteele/liquid v1.7.0 diff --git a/go.sum b/go.sum index e18a80e5..780e02a5 100644 --- a/go.sum +++ b/go.sum @@ -125,10 +125,8 @@ github.com/manifoldco/promptui v0.9.0 h1:3V4HzJk1TtXW1MTZMP7mdlwbBpIinw3HztaIlYt github.com/manifoldco/promptui v0.9.0/go.mod h1:ka04sppxSGFAtxX0qhlYQjISsg9mR4GWtQEhdbn6Pgg= github.com/massdriver-cloud/airlock v0.0.9 h1:t+jTY6nZEiPZNTKx0wEgQTPztIxL4u0RFvVWXn2/RMc= github.com/massdriver-cloud/airlock v0.0.9/go.mod h1:igJm33JvINiUtbyEspUeKUWyWewG+jYyxO1UDHqLp9Q= -github.com/massdriver-cloud/massdriver-sdk-go v0.2.1-0.20260515043345-6ce3d1195ebf h1:s3IugUvi+bbcvDsSy2TDDGkEz7+8k3le7MS4v+ZFKic= -github.com/massdriver-cloud/massdriver-sdk-go v0.2.1-0.20260515043345-6ce3d1195ebf/go.mod h1:6NrSP+wfGQvUOAggsz10/Wkln8CKmk3VBnD+OJzZgFY= -github.com/massdriver-cloud/massdriver-sdk-go v0.2.1 h1:KjvNc2P7Wa+P3lam65tVzUI7SAhW0A9Osm/9mOxRIqQ= -github.com/massdriver-cloud/massdriver-sdk-go v0.2.1/go.mod h1:6NrSP+wfGQvUOAggsz10/Wkln8CKmk3VBnD+OJzZgFY= +github.com/massdriver-cloud/massdriver-sdk-go v0.2.3 h1:gKRiSbJPI1uWVBRmoexPNbi9IJEYZ2upciMTjxRHb5I= +github.com/massdriver-cloud/massdriver-sdk-go v0.2.3/go.mod h1:6NrSP+wfGQvUOAggsz10/Wkln8CKmk3VBnD+OJzZgFY= github.com/massdriver-cloud/terraform-config-inspect v0.0.1 h1:eLtKFRaklHIxcPvUtZmNacl28n4QIHr29pJzw/u/FKU= github.com/massdriver-cloud/terraform-config-inspect v0.0.1/go.mod h1:3AbDpWxIRMdMAg7FDmTJuVBhCGNwdm49cBIOmUHjqRg= github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= diff --git a/internal/commands/environment/follow.go b/internal/commands/environment/follow.go index 11fd3dad..6295bbb5 100644 --- a/internal/commands/environment/follow.go +++ b/internal/commands/environment/follow.go @@ -11,27 +11,23 @@ import ( "github.com/massdriver-cloud/massdriver-sdk-go/massdriver" "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/deployments" - "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" ) -// FollowPollInterval controls how often each instance watcher polls for new -// deployments. Exposed as a var so tests can drop it to zero. -var FollowPollInterval = 2 * time.Second - -// FollowQuietWindow is how long the watcher loop waits with no new -// deployments observed before declaring the rollout done. The environment -// deploys in dependency-ordered waves, so we can't bail the moment one -// wave's deployments are terminal โ€” the next wave's haven't been created -// yet. The window has to be long enough to cover the gap between waves. +// FollowQuietWindow is how long the watcher waits with no active +// deployments and no fresh DeploymentEvents before declaring the +// rollout done. The environment deploys in dependency-ordered waves, +// so we can't bail the moment one wave's deployments are terminal โ€” +// the next wave's haven't been created yet. The window has to outlast +// the gap between waves. var FollowQuietWindow = 30 * time.Second -// FollowAPI is the narrow SDK surface FollowEnvironment needs. Tests supply a -// stub directly; production callers use [NewFollowAPI] to bind a -// *massdriver.Client. +// FollowAPI is the narrow SDK surface FollowEnvironment needs. Tests +// supply a stub directly; production callers use [NewFollowAPI] to +// bind a *massdriver.Client. type FollowAPI interface { - ListInstances(ctx context.Context, input instances.ListInput) ([]types.Instance, error) - ListDeployments(ctx context.Context, input deployments.ListInput) ([]types.Deployment, error) + StreamEnvironmentEvents(ctx context.Context, environmentID string) (<-chan types.Event, error) + GetDeployment(ctx context.Context, id string) (*types.Deployment, error) TailLogs(ctx context.Context, deploymentID string, w io.Writer) error } @@ -40,12 +36,12 @@ func NewFollowAPI(c *massdriver.Client) FollowAPI { return sdkFollowAPI{c: c} } type sdkFollowAPI struct{ c *massdriver.Client } -func (s sdkFollowAPI) ListInstances(ctx context.Context, input instances.ListInput) ([]types.Instance, error) { - return s.c.Instances.List(ctx, input) +func (s sdkFollowAPI) StreamEnvironmentEvents(ctx context.Context, environmentID string) (<-chan types.Event, error) { + return s.c.Environments.StreamEvents(ctx, environmentID) } -func (s sdkFollowAPI) ListDeployments(ctx context.Context, input deployments.ListInput) ([]types.Deployment, error) { - return s.c.Deployments.List(ctx, input) +func (s sdkFollowAPI) GetDeployment(ctx context.Context, id string) (*types.Deployment, error) { + return s.c.Deployments.Get(ctx, id) } func (s sdkFollowAPI) TailLogs(ctx context.Context, deploymentID string, w io.Writer) error { @@ -53,130 +49,126 @@ func (s sdkFollowAPI) TailLogs(ctx context.Context, deploymentID string, w io.Wr } // FollowEnvironment tails logs for every deployment that fires in an -// environment-level rollout, prefixing each line with the instance's id so -// the interleaved output is grep-friendly. +// environment-level rollout, prefixing each line with the instance's +// id so the interleaved output stays grep-friendly. // -// The platform deploys in dependency-ordered waves: wave 1 instances start, -// finish, then wave 2 instances start. This watcher polls per-instance -// deployment lists every [FollowPollInterval], spawns a [Service.TailLogs] -// goroutine for each new deployment it sees, and exits when no new -// deployments have appeared for [FollowQuietWindow] and every observed -// deployment is in a terminal state. +// Subscribes to `environmentEvents` over WebSocket; every +// `DeploymentEvent` either kicks off a [Service.TailLogs] goroutine +// for a newly-seen deployment or updates the active set so the +// watcher knows when the rollout has gone quiet. // -// The user's hint on the design ticket was to use Absinthe subscriptions -// (`environmentEvents` / `instanceEvents`) instead of polling. The SDK's -// websocket / Absinthe machinery is in an internal package, so subscribing -// from the CLI would mean either re-porting that layer or waiting for the -// SDK to expose event subscriptions. The polling approach has a few -// seconds of discovery latency per deployment, which is acceptable -// relative to per-instance deploy times measured in minutes. +// Termination: when no deployments are active and no fresh +// DeploymentEvents have arrived for [FollowQuietWindow], the watcher +// exits. The environment deploys in dependency-ordered waves, so the +// quiet window has to outlast the gap between waves. func FollowEnvironment(ctx context.Context, api FollowAPI, envID string, w io.Writer) error { - insts, err := api.ListInstances(ctx, instances.ListInput{EnvironmentID: envID}) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + eventCh, err := api.StreamEnvironmentEvents(ctx, envID) if err != nil { - return fmt.Errorf("list instances in environment %s: %w", envID, err) - } - if len(insts) == 0 { - return nil + return fmt.Errorf("subscribe to environment events for %s: %w", envID, err) } - out := newPrefixedSink(w) + sink := newPrefixedSink(w) + seen := map[string]struct{}{} + active := map[string]struct{}{} + lastActivity := time.Now() - ctx, cancel := context.WithCancel(ctx) - defer cancel() + var tails sync.WaitGroup + defer tails.Wait() - var wg sync.WaitGroup - errCh := make(chan error, len(insts)) + check := time.NewTicker(quietWindowTick(FollowQuietWindow)) + defer check.Stop() - for _, inst := range insts { - wg.Add(1) - go func(inst types.Instance) { - defer wg.Done() - if watchErr := watchInstanceDeployments(ctx, api, inst, out); watchErr != nil && !errors.Is(watchErr, context.Canceled) { - errCh <- watchErr + for { + select { + case ev, ok := <-eventCh: + if !ok { + return nil } - }(inst) - } + depEv, isDeploy := ev.(*types.DeploymentEvent) + if !isDeploy { + continue + } + lastActivity = time.Now() + handleDeploymentEvent(ctx, api, depEv, seen, active, sink, &tails) - wg.Wait() - close(errCh) + case <-check.C: + if len(active) == 0 && time.Since(lastActivity) >= FollowQuietWindow { + return nil + } - var firstErr error - for werr := range errCh { - if firstErr == nil { - firstErr = werr + case <-ctx.Done(): + if errors.Is(ctx.Err(), context.Canceled) { + return nil + } + return ctx.Err() } } - return firstErr } -// watchInstanceDeployments polls for new deployments on a single instance -// and tails each one as it's created. Returns when the quiet window elapses -// with every observed deployment in a terminal state. -func watchInstanceDeployments(ctx context.Context, api FollowAPI, inst types.Instance, sink *prefixedSink) error { - prefixWriter := sink.For(inst.ID) - seen := map[string]struct{}{} - var inFlight sync.WaitGroup - var lastChange time.Time - allTerminal := true +// handleDeploymentEvent kicks off a tail goroutine the first time we +// see a deployment and tracks whether it's still active. A deployment +// that arrives already-terminal still gets its logs streamed โ€” TailLogs +// surfaces the historical batch and returns cleanly when the deployment +// is past terminal. +func handleDeploymentEvent( + ctx context.Context, + api FollowAPI, + depEv *types.DeploymentEvent, + seen, active map[string]struct{}, + sink *prefixedSink, + tails *sync.WaitGroup, +) { + depID := depEv.Deployment.ID + if depID == "" { + return + } - for { - deps, listErr := api.ListDeployments(ctx, deployments.ListInput{ - InstanceID: inst.ID, - PageSize: 25, - }) - if listErr != nil { - return fmt.Errorf("list deployments for %s: %w", inst.ID, listErr) - } + if _, taken := seen[depID]; !taken { + seen[depID] = struct{}{} - anyNew := false - anyActive := false - for _, dep := range deps { - if _, taken := seen[dep.ID]; !taken { - seen[dep.ID] = struct{}{} - anyNew = true - lastChange = time.Now() - - inFlight.Add(1) - go func(depID string) { - defer inFlight.Done() - // TailLogs blocks until the deployment hits a terminal - // status. Errors are silenced per-deployment so one - // bad tail doesn't take the whole follow down โ€” the - // final deployment list will still show the failure. - _ = api.TailLogs(ctx, depID, prefixWriter) - }(dep.ID) - } - if !deployments.IsTerminal(string(dep.Status)) { - anyActive = true - } + // Need the instance id to label this deployment's log lines. + // The event payload trims it for bandwidth; one round-trip per + // new deployment is cheap. + dep, getErr := api.GetDeployment(ctx, depID) + if getErr != nil || dep.Instance == nil { + return } + prefixWriter := sink.For(dep.Instance.ID) - if anyNew { - allTerminal = false - } - if !anyActive && !anyNew && !lastChange.IsZero() && time.Since(lastChange) >= FollowQuietWindow { - allTerminal = true - break - } + tails.Add(1) + go func() { + defer tails.Done() + _ = api.TailLogs(ctx, depID, prefixWriter) + }() + } - select { - case <-time.After(FollowPollInterval): - case <-ctx.Done(): - inFlight.Wait() - return ctx.Err() - } + if deployments.IsTerminal(string(depEv.Deployment.Status)) { + delete(active, depID) + } else { + active[depID] = struct{}{} } +} - inFlight.Wait() - _ = allTerminal - return nil +// quietWindowTick chooses how often to check the termination condition. +// Polling at the quiet window itself feels laggy; polling at 1/4 of it +// gives us a tighter "is the rollout actually idle" answer without +// burning cycles. +func quietWindowTick(window time.Duration) time.Duration { + tick := window / 4 + if tick < 250*time.Millisecond { + tick = 250 * time.Millisecond + } + return tick } // prefixedSink serializes interleaved writes from per-instance tail -// goroutines and tags each line with the instance id. Writers handed out by -// [prefixedSink.For] line-buffer until they see a `\n`, then emit a single -// `[id] ` write under the sink's mutex so two goroutines never -// scribble on top of each other. +// goroutines and tags each line with the instance id. Writers handed +// out by [prefixedSink.For] line-buffer until they see a `\n`, then +// emit a single `[id] ` write under the sink's mutex so two +// goroutines never scribble on top of each other. type prefixedSink struct { mu sync.Mutex w io.Writer @@ -207,9 +199,9 @@ func (p *prefixedWriter) Write(b []byte) (int, error) { return n, nil } -// flushCompleteLines emits every fully-terminated line in the buffer under -// the sink mutex. A trailing partial line stays in the buffer until the -// next Write provides the terminating newline. +// flushCompleteLines emits every fully-terminated line in the buffer +// under the sink mutex. A trailing partial line stays in the buffer +// until the next Write provides the terminating newline. func (p *prefixedWriter) flushCompleteLines() error { for { raw := p.buf.Bytes() diff --git a/internal/commands/environment/follow_test.go b/internal/commands/environment/follow_test.go index fef8c5df..cd5f53a2 100644 --- a/internal/commands/environment/follow_test.go +++ b/internal/commands/environment/follow_test.go @@ -3,6 +3,7 @@ package environment_test import ( "bytes" "context" + "errors" "io" "strings" "sync" @@ -10,81 +11,79 @@ import ( "time" "github.com/massdriver-cloud/mass/internal/commands/environment" - "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/deployments" - "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" ) // stubFollowAPI is an in-test stub for environment.FollowAPI. Each test -// supplies instances + per-deployment log scripts; the stub serves them -// back when polled. +// pushes events on `events`; the channel is closed (or ctx cancelled) to +// signal end-of-stream. TailLogs serves canned log text per deployment. type stubFollowAPI struct { mu sync.Mutex - instances []types.Instance - // deployments per instance, returned in order across successive - // ListDeployments calls. Empty means no deployments for that instance. - depQueue map[string][][]types.Deployment - depCalls map[string]int + events chan types.Event + + // instance ID returned by GetDeployment, keyed by deployment ID. + depToInstance map[string]string + depGetErr error logs map[string]string logErr map[string]error } -func (s *stubFollowAPI) ListInstances(_ context.Context, _ instances.ListInput) ([]types.Instance, error) { - return s.instances, nil +func newStubFollowAPI() *stubFollowAPI { + return &stubFollowAPI{ + events: make(chan types.Event, 32), + depToInstance: map[string]string{}, + logs: map[string]string{}, + logErr: map[string]error{}, + } +} + +func (s *stubFollowAPI) StreamEnvironmentEvents(_ context.Context, _ string) (<-chan types.Event, error) { + return s.events, nil } -func (s *stubFollowAPI) ListDeployments(_ context.Context, input deployments.ListInput) ([]types.Deployment, error) { +func (s *stubFollowAPI) GetDeployment(_ context.Context, id string) (*types.Deployment, error) { s.mu.Lock() defer s.mu.Unlock() - queue := s.depQueue[input.InstanceID] - call := s.depCalls[input.InstanceID] - if call >= len(queue) { - // Stable past the script โ€” repeat the final entry to keep the - // poller in steady state. - if len(queue) == 0 { - return nil, nil - } - return queue[len(queue)-1], nil + if s.depGetErr != nil { + return nil, s.depGetErr } - s.depCalls[input.InstanceID] = call + 1 - return queue[call], nil + return &types.Deployment{ + ID: id, + Instance: &types.Instance{ID: s.depToInstance[id]}, + }, nil } func (s *stubFollowAPI) TailLogs(_ context.Context, deploymentID string, w io.Writer) error { - if err := s.logErr[deploymentID]; err != nil { - return err + s.mu.Lock() + logErr := s.logErr[deploymentID] + logText := s.logs[deploymentID] + s.mu.Unlock() + if logErr != nil { + return logErr } - _, err := io.WriteString(w, s.logs[deploymentID]) + _, err := io.WriteString(w, logText) return err } func TestFollowEnvironment_PrefixesLinesWithInstanceID(t *testing.T) { - // Speed the loop up so the test isn't slow. - environment.FollowPollInterval = 0 - environment.FollowQuietWindow = 10 * time.Millisecond - t.Cleanup(func() { - environment.FollowPollInterval = 2 * time.Second - environment.FollowQuietWindow = 30 * time.Second - }) - - api := &stubFollowAPI{ - instances: []types.Instance{ - {ID: "ecomm-prod-db", Name: "db"}, - {ID: "ecomm-prod-app", Name: "app"}, - }, - depQueue: map[string][][]types.Deployment{ - "ecomm-prod-db": {{{ID: "dep-db-1", Status: "COMPLETED"}}}, - "ecomm-prod-app": {{{ID: "dep-app-1", Status: "COMPLETED"}}}, - }, - depCalls: map[string]int{}, - logs: map[string]string{ - "dep-db-1": "applying db schema\nmigrations done\n", - "dep-app-1": "starting app\nready\n", - }, - logErr: map[string]error{}, - } + environment.FollowQuietWindow = 100 * time.Millisecond + t.Cleanup(func() { environment.FollowQuietWindow = 30 * time.Second }) + + api := newStubFollowAPI() + api.depToInstance["dep-db-1"] = "ecomm-prod-db" + api.depToInstance["dep-app-1"] = "ecomm-prod-app" + api.logs["dep-db-1"] = "applying db schema\nmigrations done\n" + api.logs["dep-app-1"] = "starting app\nready\n" + + // Fire a RUNNING then COMPLETED event for each deployment. + api.events <- &types.DeploymentEvent{Deployment: types.Deployment{ID: "dep-db-1", Status: "RUNNING"}} + api.events <- &types.DeploymentEvent{Deployment: types.Deployment{ID: "dep-app-1", Status: "RUNNING"}} + api.events <- &types.DeploymentEvent{Deployment: types.Deployment{ID: "dep-db-1", Status: "COMPLETED"}} + api.events <- &types.DeploymentEvent{Deployment: types.Deployment{ID: "dep-app-1", Status: "COMPLETED"}} + // Don't close `events` โ€” the watcher exits via the quiet window after + // both deployments have transitioned to terminal status. var buf bytes.Buffer ctx, cancel := context.WithTimeout(t.Context(), 5*time.Second) @@ -95,30 +94,47 @@ func TestFollowEnvironment_PrefixesLinesWithInstanceID(t *testing.T) { } out := buf.String() - if !strings.Contains(out, "[ecomm-prod-db] applying db schema\n") { - t.Errorf("missing prefixed db line in:\n%s", out) - } - if !strings.Contains(out, "[ecomm-prod-db] migrations done\n") { - t.Errorf("missing prefixed db line 2 in:\n%s", out) - } - if !strings.Contains(out, "[ecomm-prod-app] starting app\n") { - t.Errorf("missing prefixed app line in:\n%s", out) - } - if !strings.Contains(out, "[ecomm-prod-app] ready\n") { - t.Errorf("missing prefixed app line 2 in:\n%s", out) + for _, want := range []string{ + "[ecomm-prod-db] applying db schema\n", + "[ecomm-prod-db] migrations done\n", + "[ecomm-prod-app] starting app\n", + "[ecomm-prod-app] ready\n", + } { + if !strings.Contains(out, want) { + t.Errorf("missing expected line %q in output:\n%s", want, out) + } } } -func TestFollowEnvironment_NoOpForEmptyEnv(t *testing.T) { - api := &stubFollowAPI{ - instances: nil, - } +func TestFollowEnvironment_ExitsWhenStreamCloses(t *testing.T) { + api := newStubFollowAPI() + close(api.events) var buf bytes.Buffer - if err := environment.FollowEnvironment(t.Context(), api, "ecomm-prod", &buf); err != nil { + ctx, cancel := context.WithTimeout(t.Context(), 2*time.Second) + defer cancel() + + if err := environment.FollowEnvironment(ctx, api, "ecomm-prod", &buf); err != nil { t.Fatalf("FollowEnvironment: %v", err) } if buf.Len() != 0 { - t.Errorf("expected no output for empty env; got %q", buf.String()) + t.Errorf("expected no output when no deployments fire; got %q", buf.String()) + } +} + +func TestFollowEnvironment_PropagatesSubscribeError(t *testing.T) { + api := errorAPI{err: errors.New("ws handshake failed")} + + err := environment.FollowEnvironment(t.Context(), api, "ecomm-prod", io.Discard) + if err == nil || !strings.Contains(err.Error(), "ws handshake failed") { + t.Errorf("expected subscribe error, got %v", err) } } + +type errorAPI struct{ err error } + +func (e errorAPI) StreamEnvironmentEvents(_ context.Context, _ string) (<-chan types.Event, error) { + return nil, e.err +} +func (errorAPI) GetDeployment(_ context.Context, _ string) (*types.Deployment, error) { return nil, nil } +func (errorAPI) TailLogs(_ context.Context, _ string, _ io.Writer) error { return nil } From bab43e3e708e31745856a4bcf5e1ae02a2910971 Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 16:54:08 -0700 Subject: [PATCH 5/7] Fix golangci-lint findings in follow - unconvert: drop string() cast around Deployment.Status (it's already string) - reassign: tag the FollowQuietWindow overrides with the same nolint pattern deploy_test.go uses for DeploymentStatusSleep - goimports: reformat the errorAPI stub block --- internal/commands/environment/follow.go | 2 +- internal/commands/environment/follow_test.go | 10 ++++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/internal/commands/environment/follow.go b/internal/commands/environment/follow.go index 6295bbb5..a75d9450 100644 --- a/internal/commands/environment/follow.go +++ b/internal/commands/environment/follow.go @@ -145,7 +145,7 @@ func handleDeploymentEvent( }() } - if deployments.IsTerminal(string(depEv.Deployment.Status)) { + if deployments.IsTerminal(depEv.Deployment.Status) { delete(active, depID) } else { active[depID] = struct{}{} diff --git a/internal/commands/environment/follow_test.go b/internal/commands/environment/follow_test.go index cd5f53a2..0c214457 100644 --- a/internal/commands/environment/follow_test.go +++ b/internal/commands/environment/follow_test.go @@ -68,8 +68,8 @@ func (s *stubFollowAPI) TailLogs(_ context.Context, deploymentID string, w io.Wr } func TestFollowEnvironment_PrefixesLinesWithInstanceID(t *testing.T) { - environment.FollowQuietWindow = 100 * time.Millisecond - t.Cleanup(func() { environment.FollowQuietWindow = 30 * time.Second }) + environment.FollowQuietWindow = 100 * time.Millisecond //nolint:reassign // intentionally shortened in tests + t.Cleanup(func() { environment.FollowQuietWindow = 30 * time.Second }) //nolint:reassign // restore default api := newStubFollowAPI() api.depToInstance["dep-db-1"] = "ecomm-prod-db" @@ -136,5 +136,7 @@ type errorAPI struct{ err error } func (e errorAPI) StreamEnvironmentEvents(_ context.Context, _ string) (<-chan types.Event, error) { return nil, e.err } -func (errorAPI) GetDeployment(_ context.Context, _ string) (*types.Deployment, error) { return nil, nil } -func (errorAPI) TailLogs(_ context.Context, _ string, _ io.Writer) error { return nil } +func (errorAPI) GetDeployment(_ context.Context, _ string) (*types.Deployment, error) { + return nil, nil +} +func (errorAPI) TailLogs(_ context.Context, _ string, _ io.Writer) error { return nil } From c0e0c82d641b6833f43f6b39a4eb480a823ac978 Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 16:57:34 -0700 Subject: [PATCH 6/7] Pad instance-id prefixes in --follow output MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Multi-instance tails were jittery because each line's prefix was sized to its own instance id โ€” `[fancy-claude-pg]` and `[fancy-claude-mysql]` gave you two different column offsets for the actual log text. List instances up front, take the longest id as the column width, and right-pad every prefix to that width. New env-deploys create every instance during fork before any deployment fires, so the up-front list is the full set. [fancy-claude-pg] applying schema [fancy-claude-mysql] starting mysqld [fancy-claude-pg] migrations done --- internal/commands/environment/follow.go | 65 +++++++++++++++++--- internal/commands/environment/follow_test.go | 26 ++++++-- 2 files changed, 78 insertions(+), 13 deletions(-) diff --git a/internal/commands/environment/follow.go b/internal/commands/environment/follow.go index a75d9450..1d996e8b 100644 --- a/internal/commands/environment/follow.go +++ b/internal/commands/environment/follow.go @@ -11,6 +11,7 @@ import ( "github.com/massdriver-cloud/massdriver-sdk-go/massdriver" "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/deployments" + "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/instances" "github.com/massdriver-cloud/massdriver-sdk-go/massdriver/platform/types" ) @@ -26,6 +27,10 @@ var FollowQuietWindow = 30 * time.Second // supply a stub directly; production callers use [NewFollowAPI] to // bind a *massdriver.Client. type FollowAPI interface { + // ListInstances pulls every instance in the environment up front so the + // log-prefix column can be sized to the longest id and stay stable as + // new deployments come online wave by wave. + ListInstances(ctx context.Context, environmentID string) ([]types.Instance, error) StreamEnvironmentEvents(ctx context.Context, environmentID string) (<-chan types.Event, error) GetDeployment(ctx context.Context, id string) (*types.Deployment, error) TailLogs(ctx context.Context, deploymentID string, w io.Writer) error @@ -36,6 +41,10 @@ func NewFollowAPI(c *massdriver.Client) FollowAPI { return sdkFollowAPI{c: c} } type sdkFollowAPI struct{ c *massdriver.Client } +func (s sdkFollowAPI) ListInstances(ctx context.Context, environmentID string) ([]types.Instance, error) { + return s.c.Instances.List(ctx, instances.ListInput{EnvironmentID: environmentID}) +} + func (s sdkFollowAPI) StreamEnvironmentEvents(ctx context.Context, environmentID string) (<-chan types.Event, error) { return s.c.Environments.StreamEvents(ctx, environmentID) } @@ -65,12 +74,22 @@ func FollowEnvironment(ctx context.Context, api FollowAPI, envID string, w io.Wr ctx, cancel := context.WithCancel(ctx) defer cancel() + // List instances first so we know the column width before any logs + // land. New instances created later won't change the padding, but + // for the env-deploy case that's never โ€” fork created every instance + // before the first deployment fires. + insts, err := api.ListInstances(ctx, envID) + if err != nil { + return fmt.Errorf("list instances in %s: %w", envID, err) + } + padWidth := maxInstanceIDWidth(insts) + eventCh, err := api.StreamEnvironmentEvents(ctx, envID) if err != nil { return fmt.Errorf("subscribe to environment events for %s: %w", envID, err) } - sink := newPrefixedSink(w) + sink := newPrefixedSink(w, padWidth) seen := map[string]struct{}{} active := map[string]struct{}{} lastActivity := time.Now() @@ -164,24 +183,56 @@ func quietWindowTick(window time.Duration) time.Duration { return tick } +// maxInstanceIDWidth returns the longest instance.ID across the list, used +// to right-pad log prefixes so multi-instance tails stay column-aligned +// (no horizontal jitter as `[fancy-claude-pg]` and `[fancy-claude-mysql]` +// interleave). +func maxInstanceIDWidth(insts []types.Instance) int { + width := 0 + for _, inst := range insts { + if n := len(inst.ID); n > width { + width = n + } + } + return width +} + // prefixedSink serializes interleaved writes from per-instance tail // goroutines and tags each line with the instance id. Writers handed // out by [prefixedSink.For] line-buffer until they see a `\n`, then // emit a single `[id] ` write under the sink's mutex so two // goroutines never scribble on top of each other. +// +// Each prefix is right-padded to padWidth (the longest instance id in +// the environment) so multi-instance tails stay column-aligned. type prefixedSink struct { - mu sync.Mutex - w io.Writer + mu sync.Mutex + w io.Writer + padWidth int } -func newPrefixedSink(w io.Writer) *prefixedSink { - return &prefixedSink{w: w} +func newPrefixedSink(w io.Writer, padWidth int) *prefixedSink { + return &prefixedSink{w: w, padWidth: padWidth} } // For returns an io.Writer that prefixes every line it writes with -// "[] " and forwards to the shared writer. +// "[] " (padded to the sink's column width) and forwards to the +// shared writer. func (s *prefixedSink) For(id string) io.Writer { - return &prefixedWriter{sink: s, prefix: []byte("[" + id + "] ")} + padding := s.padWidth - len(id) + if padding < 0 { + padding = 0 + } + // "[" + id + "]" + repeated space + trailing separator + prefix := make([]byte, 0, 3+len(id)+padding+1) + prefix = append(prefix, '[') + prefix = append(prefix, id...) + prefix = append(prefix, ']') + for range padding { + prefix = append(prefix, ' ') + } + prefix = append(prefix, ' ') + return &prefixedWriter{sink: s, prefix: prefix} } type prefixedWriter struct { diff --git a/internal/commands/environment/follow_test.go b/internal/commands/environment/follow_test.go index 0c214457..9218a1d2 100644 --- a/internal/commands/environment/follow_test.go +++ b/internal/commands/environment/follow_test.go @@ -20,6 +20,8 @@ import ( type stubFollowAPI struct { mu sync.Mutex + instances []types.Instance + events chan types.Event // instance ID returned by GetDeployment, keyed by deployment ID. @@ -39,6 +41,10 @@ func newStubFollowAPI() *stubFollowAPI { } } +func (s *stubFollowAPI) ListInstances(_ context.Context, _ string) ([]types.Instance, error) { + return s.instances, nil +} + func (s *stubFollowAPI) StreamEnvironmentEvents(_ context.Context, _ string) (<-chan types.Event, error) { return s.events, nil } @@ -72,10 +78,14 @@ func TestFollowEnvironment_PrefixesLinesWithInstanceID(t *testing.T) { t.Cleanup(func() { environment.FollowQuietWindow = 30 * time.Second }) //nolint:reassign // restore default api := newStubFollowAPI() + api.instances = []types.Instance{ + {ID: "ecomm-prod-db"}, // 13 chars + {ID: "ecomm-prod-mysql"}, // 16 chars โ€” sets pad width + } api.depToInstance["dep-db-1"] = "ecomm-prod-db" - api.depToInstance["dep-app-1"] = "ecomm-prod-app" + api.depToInstance["dep-app-1"] = "ecomm-prod-mysql" api.logs["dep-db-1"] = "applying db schema\nmigrations done\n" - api.logs["dep-app-1"] = "starting app\nready\n" + api.logs["dep-app-1"] = "starting mysql\nready\n" // Fire a RUNNING then COMPLETED event for each deployment. api.events <- &types.DeploymentEvent{Deployment: types.Deployment{ID: "dep-db-1", Status: "RUNNING"}} @@ -93,12 +103,13 @@ func TestFollowEnvironment_PrefixesLinesWithInstanceID(t *testing.T) { t.Fatalf("FollowEnvironment: %v", err) } + // Pad width = 16 (mysql id). Shorter ids get 3 spaces of padding after `]`. out := buf.String() for _, want := range []string{ - "[ecomm-prod-db] applying db schema\n", - "[ecomm-prod-db] migrations done\n", - "[ecomm-prod-app] starting app\n", - "[ecomm-prod-app] ready\n", + "[ecomm-prod-db] applying db schema\n", + "[ecomm-prod-db] migrations done\n", + "[ecomm-prod-mysql] starting mysql\n", + "[ecomm-prod-mysql] ready\n", } { if !strings.Contains(out, want) { t.Errorf("missing expected line %q in output:\n%s", want, out) @@ -136,6 +147,9 @@ type errorAPI struct{ err error } func (e errorAPI) StreamEnvironmentEvents(_ context.Context, _ string) (<-chan types.Event, error) { return nil, e.err } +func (errorAPI) ListInstances(_ context.Context, _ string) ([]types.Instance, error) { + return nil, nil +} func (errorAPI) GetDeployment(_ context.Context, _ string) (*types.Deployment, error) { return nil, nil } From 43b744d8597bd64f31f7085ea90e888c92e912c0 Mon Sep 17 00:00:00 2001 From: Cory O'Daniel Date: Fri, 15 May 2026 17:00:50 -0700 Subject: [PATCH 7/7] Fix nilnil lint: return zero values from errorAPI stubs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit errorAPI is only invoked through StreamEnvironmentEvents which already errors, so the other methods are dead code in practice โ€” but `nilnil` flags any `return nil, nil` from a non-error path. Return empty `*types.Deployment{}` and empty slice instead. --- internal/commands/environment/follow_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/commands/environment/follow_test.go b/internal/commands/environment/follow_test.go index 9218a1d2..3696f6c7 100644 --- a/internal/commands/environment/follow_test.go +++ b/internal/commands/environment/follow_test.go @@ -148,9 +148,9 @@ func (e errorAPI) StreamEnvironmentEvents(_ context.Context, _ string) (<-chan t return nil, e.err } func (errorAPI) ListInstances(_ context.Context, _ string) ([]types.Instance, error) { - return nil, nil + return []types.Instance{}, nil } func (errorAPI) GetDeployment(_ context.Context, _ string) (*types.Deployment, error) { - return nil, nil + return &types.Deployment{}, nil } func (errorAPI) TailLogs(_ context.Context, _ string, _ io.Writer) error { return nil }