diff --git a/pkg/tern/local_apply_grouped.go b/pkg/tern/local_apply_grouped.go index f110bc57..c81a4133 100644 --- a/pkg/tern/local_apply_grouped.go +++ b/pkg/tern/local_apply_grouped.go @@ -19,7 +19,6 @@ func (c *LocalClient) executeGroupedApply(ctx context.Context, apply *storage.Ap ctx, cancelApply := context.WithCancel(ctx) defer cancelApply() defer c.startApplyHeartbeat(ctx, apply, cancelApply)() - creds := c.credentials() mode := groupedApplyMode(apply) modeDescription := groupedApplyModeDescription(apply) @@ -57,6 +56,11 @@ func (c *LocalClient) executeGroupedApply(ctx context.Context, apply *storage.Ap fmt.Sprintf("MySQL applies support one namespace per apply, but plan has %d: %v", len(plan.Namespaces), names)) return } + creds, err := c.credentialsForGroupedApply(plan) + if err != nil { + c.failApplyWithTasks(ctx, apply, tasks, err.Error()) + return + } changes := planNamespacesToChanges(plan.Namespaces) // For Vitess: initialize the VitessApplyData row before the engine starts. diff --git a/pkg/tern/local_apply_sequential.go b/pkg/tern/local_apply_sequential.go index 4d3ad651..e10b2a6e 100644 --- a/pkg/tern/local_apply_sequential.go +++ b/pkg/tern/local_apply_sequential.go @@ -146,6 +146,16 @@ func (c *LocalClient) runEngineTask(ctx context.Context, apply *storage.Apply, t } else if handled { return taskStopped } + taskCreds := creds + if c.config.Type == storage.DatabaseTypeMySQL { + var err error + taskCreds, err = c.credentialsForMySQLNamespace(task.Namespace) + if err != nil { + c.markTaskFailed(ctx, task, err.Error()) + c.logger.Error("task failed to resolve namespace credentials", "error", err, "task_id", task.TaskIdentifier, "namespace", task.Namespace, "table", task.TableName) + return taskFailed + } + } // Sequential mode: one DDL per engine call. Use the task identifier as // MigrationContext so each table's schema change is tracked independently. @@ -157,7 +167,7 @@ func (c *LocalClient) runEngineTask(ctx context.Context, apply *storage.Apply, t }}, Options: options, ResumeState: &engine.ResumeState{MigrationContext: task.TaskIdentifier}, - Credentials: creds, + Credentials: taskCreds, }) if err != nil { @@ -182,7 +192,7 @@ func (c *LocalClient) runEngineTask(ctx context.Context, apply *storage.Apply, t c.logger.Info("task running", "task_id", task.TaskIdentifier, "table", task.TableName) // Poll to completion - pollAction := c.pollTaskToCompletion(ctx, apply, task, creds) + pollAction := c.pollTaskToCompletion(ctx, apply, task, taskCreds) if pollAction == taskAbort || pollAction == taskStopped { return pollAction } diff --git a/pkg/tern/local_client.go b/pkg/tern/local_client.go index 7b0cd65e..2d2bf581 100644 --- a/pkg/tern/local_client.go +++ b/pkg/tern/local_client.go @@ -108,6 +108,7 @@ import ( "github.com/block/schemabot/pkg/mysqlconn" ternv1 "github.com/block/schemabot/pkg/proto/ternv1" "github.com/block/schemabot/pkg/psclient" + "github.com/block/schemabot/pkg/schema" "github.com/block/schemabot/pkg/state" "github.com/block/schemabot/pkg/storage" ) @@ -253,6 +254,82 @@ func (c *LocalClient) credentials() *engine.Credentials { } } +func (c *LocalClient) credentialsForMySQLNamespace(namespace string) (*engine.Credentials, error) { + if c.config.Type != storage.DatabaseTypeMySQL { + return c.credentials(), nil + } + hasDatabase, err := mysqlDSNHasDatabase(c.config.TargetDSN) + if err != nil { + return nil, fmt.Errorf("inspect MySQL target DSN for namespace injection: %w", err) + } + // Transitional: a target DSN that already names a database is used as-is. + // The data-plane model is a namespace-free DSN with the schema injected per + // operation (below); existing static/local configs still carry the database + // in the DSN, and those keep working until they migrate to namespace-free. + if hasDatabase { + return c.credentials(), nil + } + // A namespace-free target DSN is the inventory/data-plane shape: the concrete + // namespace is the connection schema and must be injected per operation. + if namespace == "" { + return nil, fmt.Errorf("MySQL namespace is required for a namespace-free target DSN") + } + dsn, err := mysqlDSNWithDatabase(c.config.TargetDSN, namespace) + if err != nil { + return nil, err + } + return &engine.Credentials{ + DSN: dsn, + Metadata: c.config.Metadata, + }, nil +} + +func (c *LocalClient) credentialsForTask(task *storage.Task) (*engine.Credentials, error) { + if c.config.Type != storage.DatabaseTypeMySQL { + return c.credentials(), nil + } + if task == nil { + return nil, fmt.Errorf("task is required for MySQL credentials") + } + return c.credentialsForMySQLNamespace(task.Namespace) +} + +// credentialsForGroupedApply resolves the single-namespace credentials for a +// grouped/atomic MySQL apply. A grouped apply runs one Spirit execution against +// one schema, so the plan must carry exactly one namespace. Fail closed rather +// than pick a namespace by map iteration order (or silently use a namespace-free +// DSN) if that invariant is ever violated. +func (c *LocalClient) credentialsForGroupedApply(plan *storage.Plan) (*engine.Credentials, error) { + if c.config.Type != storage.DatabaseTypeMySQL { + return c.credentials(), nil + } + if len(plan.Namespaces) != 1 { + return nil, fmt.Errorf("grouped MySQL apply requires exactly one namespace, plan has %d", len(plan.Namespaces)) + } + var namespace string + for ns := range plan.Namespaces { + namespace = ns + } + return c.credentialsForMySQLNamespace(namespace) +} + +func mysqlDSNWithDatabase(dsn, database string) (string, error) { + cfg, err := mysql.ParseDSN(dsn) + if err != nil { + return "", fmt.Errorf("parse MySQL DSN: %w", err) + } + cfg.DBName = database + return cfg.FormatDSN(), nil +} + +func mysqlDSNHasDatabase(dsn string) (bool, error) { + cfg, err := mysql.ParseDSN(dsn) + if err != nil { + return false, fmt.Errorf("parse MySQL DSN: %w", err) + } + return cfg.DBName != "", nil +} + func (c *LocalClient) deferredCutoverSignalExists(ctx context.Context, apply *storage.Apply) (bool, bool, error) { if apply == nil { return false, false, fmt.Errorf("apply is required for deferred cutover signal lookup") @@ -286,11 +363,20 @@ func (c *LocalClient) PullSchema(ctx context.Context, req *ternv1.PullSchemaRequ return nil, fmt.Errorf("pull schema for database %s: request type %q does not match client type %q: %w", c.config.Database, req.Type, c.config.Type, ErrPullSchemaInvalidRequest) } + targetDSN := c.config.TargetDSN + if c.config.Type == storage.DatabaseTypeMySQL { + creds, err := c.credentialsForMySQLNamespace(c.config.Database) + if err != nil { + return nil, fmt.Errorf("resolve database %s credentials for schema pull: %w", c.config.Database, err) + } + targetDSN = creds.DSN + } + attrs := []any{"database", c.config.Database} - attrs = append(attrs, dsnLogAttrs(c.config.TargetDSN)...) + attrs = append(attrs, dsnLogAttrs(targetDSN)...) c.logger.Info("LocalClient.PullSchema: loading live schema", attrs...) - db, err := mysqlconn.Open(c.config.TargetDSN) + db, err := mysqlconn.Open(targetDSN) if err != nil { return nil, fmt.Errorf("open database %s for schema pull: %w", c.config.Database, err) } @@ -348,29 +434,15 @@ func (c *LocalClient) Plan(ctx context.Context, req *ternv1.PlanRequest) (*ternv return nil, fmt.Errorf("type must be %q or %q", storage.DatabaseTypeMySQL, storage.DatabaseTypeVitess) } - eng := c.getEngine() - if eng == nil { - return nil, fmt.Errorf("no engine configured for type: %s", c.config.Type) - } - // Convert schema files from proto to engine type schemaFiles := protoToSchemaFiles(req.SchemaFiles) - creds := c.credentials() - planLogAttrs := []any{"database", c.config.Database} planLogAttrs = append(planLogAttrs, dsnLogAttrs(c.config.TargetDSN)...) planLogAttrs = append(planLogAttrs, "schema_file_count", len(schemaFiles)) c.logger.Info("LocalClient.Plan: calling engine", planLogAttrs...) - result, err := eng.Plan(ctx, &engine.PlanRequest{ - Database: c.config.Database, - DatabaseType: c.config.Type, - SchemaFiles: schemaFiles, - Repository: req.Repository, - PullRequest: int(req.PullRequest), - Credentials: creds, - }) + result, err := c.planWithEngine(ctx, req, c.config.Database, schemaFiles) if err != nil { c.logger.Error("plan failed", "error", err, "database", c.config.Database) return nil, err // Error already has clear prefix (SQL syntax/usage error) @@ -537,6 +609,73 @@ func (c *LocalClient) Plan(ctx context.Context, req *ternv1.PlanRequest) (*ternv }, nil } +func (c *LocalClient) planWithEngine(ctx context.Context, req *ternv1.PlanRequest, database string, schemaFiles schema.SchemaFiles) (*engine.PlanResult, error) { + eng := c.getEngine() + if eng == nil { + return nil, fmt.Errorf("no engine configured for type: %s", c.config.Type) + } + if c.config.Type != storage.DatabaseTypeMySQL { + return c.planNamespaceWithEngine(ctx, eng, req, database, schemaFiles, c.credentials()) + } + hasDatabase, err := mysqlDSNHasDatabase(c.config.TargetDSN) + if err != nil { + return nil, err + } + if hasDatabase { + return c.planNamespaceWithEngine(ctx, eng, req, database, schemaFiles, c.credentials()) + } + if len(schemaFiles) == 0 { + return nil, fmt.Errorf("schema files are required for namespace-free MySQL target DSN") + } + if len(schemaFiles) == 1 { + for namespace := range schemaFiles { + creds, err := c.credentialsForMySQLNamespace(namespace) + if err != nil { + return nil, err + } + return c.planNamespaceWithEngine(ctx, eng, req, namespace, schemaFiles, creds) + } + } + return c.planMySQLNamespacesWithEngine(ctx, eng, req, schemaFiles) +} + +func (c *LocalClient) planNamespaceWithEngine(ctx context.Context, eng engine.Engine, req *ternv1.PlanRequest, database string, schemaFiles schema.SchemaFiles, creds *engine.Credentials) (*engine.PlanResult, error) { + return eng.Plan(ctx, &engine.PlanRequest{ + Database: database, + DatabaseType: c.config.Type, + SchemaFiles: schemaFiles, + Repository: req.Repository, + PullRequest: int(req.PullRequest), + Credentials: creds, + }) +} + +func (c *LocalClient) planMySQLNamespacesWithEngine(ctx context.Context, eng engine.Engine, req *ternv1.PlanRequest, schemaFiles schema.SchemaFiles) (*engine.PlanResult, error) { + namespaces := make([]string, 0, len(schemaFiles)) + for namespace := range schemaFiles { + namespaces = append(namespaces, namespace) + } + sort.Strings(namespaces) + + result := &engine.PlanResult{PlanID: fmt.Sprintf("plan-%d", time.Now().UnixNano()), NoChanges: true} + for _, namespace := range namespaces { + creds, err := c.credentialsForMySQLNamespace(namespace) + if err != nil { + return nil, err + } + nsResult, err := c.planNamespaceWithEngine(ctx, eng, req, namespace, schema.SchemaFiles{namespace: schemaFiles[namespace]}, creds) + if err != nil { + return nil, fmt.Errorf("plan MySQL namespace %q: %w", namespace, err) + } + result.Changes = append(result.Changes, nsResult.Changes...) + result.LintViolations = append(result.LintViolations, nsResult.LintViolations...) + if !nsResult.NoChanges || len(nsResult.Changes) > 0 { + result.NoChanges = false + } + } + return result, nil +} + // Apply executes a previously generated plan. // In local mode, Apply has additional conflict checking and polls for completion. // diff --git a/pkg/tern/local_client_test.go b/pkg/tern/local_client_test.go index e60fa890..060e73f7 100644 --- a/pkg/tern/local_client_test.go +++ b/pkg/tern/local_client_test.go @@ -492,6 +492,7 @@ func TestLocalClient_ProcessPendingStopControlRequest(t *testing.T) { ApplyID: apply.ID, TaskIdentifier: "task-stop-local", Database: "testdb", + Namespace: "testdb", DatabaseType: storage.DatabaseTypeMySQL, TableName: "users", State: state.Task.Running, @@ -601,6 +602,7 @@ func TestLocalClient_ProcessPendingStartControlRequestStartsDeferredDeploy(t *te ApplyID: apply.ID, TaskIdentifier: "task-deferred-start", Database: "testdb", + Namespace: "testdb", DatabaseType: storage.DatabaseTypeMySQL, TableName: "users", State: state.Task.WaitingForDeploy, @@ -716,6 +718,7 @@ func TestLocalClient_ProcessPendingCutoverControlRequest(t *testing.T) { ApplyID: apply.ID, TaskIdentifier: "task-cutover-local", Database: "testdb", + Namespace: "testdb", DatabaseType: storage.DatabaseTypeMySQL, TableName: "users", State: state.Task.WaitingForCutover, @@ -766,6 +769,7 @@ func TestLocalClient_ProcessPendingCutoverControlRequestUsesCompletedTaskForCuto ApplyID: apply.ID, TaskIdentifier: "task-cutover-completed-task-local", Database: "testdb", + Namespace: "testdb", DatabaseType: storage.DatabaseTypeMySQL, TableName: "users", State: state.Task.Completed, @@ -912,6 +916,7 @@ func TestLocalClient_ProcessPendingCutoverControlRequestFailsRejectedRequest(t * ApplyID: apply.ID, TaskIdentifier: "task-cutover-rejected-local", Database: "testdb", + Namespace: "testdb", DatabaseType: storage.DatabaseTypeMySQL, TableName: "users", State: state.Task.WaitingForCutover, @@ -1582,3 +1587,102 @@ func TestLocalClient_RevertWindowDuration(t *testing.T) { }) } } + +// MySQL targets come in two shapes. A target DSN that already names a database +// is the local-DSN shape: the namespace is an organizational label and the +// configured DSN is used unchanged. A namespace-free target DSN is the +// inventory/data-plane shape: the concrete namespace is the connection schema +// and must be injected per operation, so a missing namespace is an error. +// Vitess credentials carry engine metadata and never inject a MySQL schema. +func TestLocalClient_CredentialsNamespaceResolution(t *testing.T) { + t.Run("local DSN with database keeps configured DSN regardless of namespace", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/orders", + }, logger: slog.Default()} + + creds, err := c.credentialsForMySQLNamespace("ignored-label") + require.NoError(t, err) + assert.Equal(t, "root@tcp(localhost:3306)/orders", creds.DSN) + + creds, err = c.credentialsForMySQLNamespace("") + require.NoError(t, err) + assert.Equal(t, "root@tcp(localhost:3306)/orders", creds.DSN) + }) + + t.Run("namespace-free DSN injects the namespace as the connection schema", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/", + }, logger: slog.Default()} + + creds, err := c.credentialsForMySQLNamespace("orders_schema") + require.NoError(t, err) + assert.Equal(t, "root@tcp(localhost:3306)/orders_schema", creds.DSN) + }) + + t.Run("namespace-free DSN without a namespace is an error", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/", + }, logger: slog.Default()} + + _, err := c.credentialsForMySQLNamespace("") + require.Error(t, err) + assert.Contains(t, err.Error(), "namespace is required") + }) + + t.Run("credentialsForTask uses the task namespace for a namespace-free DSN", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/", + }, logger: slog.Default()} + + creds, err := c.credentialsForTask(&storage.Task{Namespace: "orders_schema"}) + require.NoError(t, err) + assert.Equal(t, "root@tcp(localhost:3306)/orders_schema", creds.DSN) + }) + + t.Run("credentialsForGroupedApply injects the plan namespace", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/", + }, logger: slog.Default()} + + creds, err := c.credentialsForGroupedApply(&storage.Plan{ + Namespaces: map[string]*storage.NamespacePlanData{"orders_schema": {}}, + }) + require.NoError(t, err) + assert.Equal(t, "root@tcp(localhost:3306)/orders_schema", creds.DSN) + }) + + t.Run("credentialsForGroupedApply fails closed unless exactly one namespace", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/", + }, logger: slog.Default()} + + _, err := c.credentialsForGroupedApply(&storage.Plan{Namespaces: nil}) + require.Error(t, err) + assert.Contains(t, err.Error(), "exactly one namespace") + + _, err = c.credentialsForGroupedApply(&storage.Plan{ + Namespaces: map[string]*storage.NamespacePlanData{"a": {}, "b": {}}, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "exactly one namespace") + }) + + t.Run("Vitess credentials carry metadata and never inject a schema", func(t *testing.T) { + c := &LocalClient{config: LocalConfig{ + Type: storage.DatabaseTypeVitess, + TargetDSN: "vtgate-dsn", + Metadata: map[string]string{"organization": "acme"}, + }, logger: slog.Default()} + + creds, err := c.credentialsForTask(&storage.Task{Namespace: "ignored"}) + require.NoError(t, err) + assert.Equal(t, "vtgate-dsn", creds.DSN) + assert.Equal(t, "acme", creds.Metadata["organization"]) + }) +} diff --git a/pkg/tern/local_control.go b/pkg/tern/local_control.go index 2b092ab3..7f2f7fe3 100644 --- a/pkg/tern/local_control.go +++ b/pkg/tern/local_control.go @@ -88,7 +88,10 @@ func (c *LocalClient) cutover(ctx context.Context, req *ternv1.CutoverRequest, c return nil, fmt.Errorf("schema change has a pending stop request; cutover is blocked until stop is processed") } - creds := c.credentials() + creds, err := c.credentialsForTask(task) + if err != nil { + return nil, fmt.Errorf("resolve credentials for cutover task %s: %w", task.TaskIdentifier, err) + } eng := c.getEngine() if eng == nil { return nil, fmt.Errorf("no engine configured for type: %s", c.config.Type) @@ -262,14 +265,14 @@ func (c *LocalClient) stop(ctx context.Context, req *ternv1.StopRequest, caller return nil, errors.New(revertWindowStopRejectionMessage(applyIdentifier)) } - creds := c.credentials() eng := c.getEngine() applyCancel := c.currentApplyCancel() // Stop the engine first, THEN snapshot progress. // eng.Stop() blocks until Spirit's goroutine exits, so by the time it // returns the progress data reflects the true final state of each table. - if err := c.stopEngineForTasks(ctx, eng, creds, tasks, targetApplyID); err != nil { + stopCreds, err := c.stopEngineForTasks(ctx, eng, tasks, targetApplyID) + if err != nil { return nil, fmt.Errorf("engine stop failed: %w", err) } @@ -286,7 +289,7 @@ func (c *LocalClient) stop(ctx context.Context, req *ternv1.StopRequest, caller terminalState = state.Task.Cancelled } else { // Snapshot progress AFTER Spirit has fully stopped to preserve row copy progress. - engineTableProgress = c.snapshotEngineProgress(ctx, eng, creds) + engineTableProgress = c.snapshotEngineProgress(ctx, eng, stopCreds) } stoppedCount, skippedCount, applyID := c.markTasksWithState(ctx, tasks, targetApplyID, engineTableProgress, terminalState) @@ -528,10 +531,10 @@ func hasLiveEngineWork(taskState string) bool { // a single engine operation (one Spirit runner or one PlanetScale deploy // request) whose stop terminates the whole operation, so one eng.Stop covers the // targeted apply. -func (c *LocalClient) stopEngineForTasks(ctx context.Context, eng engine.Engine, creds *engine.Credentials, tasks []*storage.Task, targetApplyID int64) error { +func (c *LocalClient) stopEngineForTasks(ctx context.Context, eng engine.Engine, tasks []*storage.Task, targetApplyID int64) (*engine.Credentials, error) { if eng == nil { c.logger.Error("stopEngineForTasks: engine is nil") - return fmt.Errorf("no engine configured for type: %s", c.config.Type) + return nil, fmt.Errorf("no engine configured for type: %s", c.config.Type) } for _, task := range tasks { if targetApplyID > 0 && task.ApplyID != targetApplyID { @@ -546,20 +549,25 @@ func (c *LocalClient) stopEngineForTasks(ctx context.Context, eng engine.Engine, "task_id", task.TaskIdentifier, "state", task.State) continue } + creds, err := c.credentialsForTask(task) + if err != nil { + return nil, fmt.Errorf("resolve credentials for stop task %s: %w", task.TaskIdentifier, err) + } req, err := c.buildControlRequest(ctx, task, creds, eng, engine.ControlStop) if err != nil { - return fmt.Errorf("build stop request for task %s: %w", task.TaskIdentifier, err) + return nil, fmt.Errorf("build stop request for task %s: %w", task.TaskIdentifier, err) } if _, err := eng.Stop(ctx, req); err != nil { if c.config.Type == storage.DatabaseTypeVitess { - return fmt.Errorf("cancel deploy request for task %s: %w", task.TaskIdentifier, err) + return nil, fmt.Errorf("cancel deploy request for task %s: %w", task.TaskIdentifier, err) } c.logger.Warn("engine stop returned error (runner may have already exited)", "task_id", task.TaskIdentifier, "error", err) } - return nil + return creds, nil } - return nil + c.logger.Debug("no targeted task has live engine work to stop", "database", c.config.Database, "type", c.config.Type, "target_apply_id", targetApplyID) + return nil, nil } // snapshotEngineProgress captures per-table progress from the engine after stopping. @@ -568,6 +576,10 @@ func (c *LocalClient) snapshotEngineProgress(ctx context.Context, eng engine.Eng c.logger.Error("snapshotEngineProgress: engine is nil") return nil } + if creds == nil { + c.logger.Debug("skipping engine progress snapshot because no live engine work was stopped", "database", c.config.Database, "type", c.config.Type) + return nil + } progress, err := eng.Progress(ctx, &engine.ProgressRequest{ Database: c.config.Database, Credentials: creds, @@ -749,7 +761,11 @@ func (c *LocalClient) controlSetup(ctx context.Context) (*storage.Task, *engine. if eng == nil { return nil, nil, nil, fmt.Errorf("no engine configured for type: %s", c.config.Type) } - return task, c.credentials(), eng, nil + creds, err := c.credentialsForTask(task) + if err != nil { + return nil, nil, nil, err + } + return task, creds, eng, nil } // buildControlRequest creates a ControlRequest with persisted engine resume data. diff --git a/pkg/tern/local_control_resume.go b/pkg/tern/local_control_resume.go index 3ae1daeb..91b0857c 100644 --- a/pkg/tern/local_control_resume.go +++ b/pkg/tern/local_control_resume.go @@ -206,11 +206,14 @@ func (c *LocalClient) startDeferredDeploy(ctx context.Context, apply *storage.Ap if len(applyTasks) == 0 { return nil, fmt.Errorf("no tasks found for apply %s", apply.ApplyIdentifier) } - creds := c.credentials() eng := c.getEngine() if eng == nil { return nil, fmt.Errorf("no engine configured for type: %s", c.config.Type) } + creds, err := c.credentialsForTask(applyTasks[0]) + if err != nil { + return nil, fmt.Errorf("resolve credentials for deferred deploy task %s: %w", applyTasks[0].TaskIdentifier, err) + } controlReq, err := c.buildControlRequest(ctx, applyTasks[0], creds, eng, engine.ControlStart) if err != nil { return nil, fmt.Errorf("build deferred deploy request for task %s: %w", applyTasks[0].TaskIdentifier, err) @@ -393,18 +396,7 @@ func (c *LocalClient) resumeApplySequential(ctx context.Context, apply *storage. // still needs schema changes. Returns false if the table already has the // desired schema (e.g., Spirit's cutover completed during the stop sequence). func (c *LocalClient) tableStillNeedsChange(ctx context.Context, apply *storage.Apply, plan *storage.Plan, tableName string) (bool, error) { - creds := c.credentials() - eng := c.getEngine() - if eng == nil { - return false, fmt.Errorf("no engine available") - } - - result, err := eng.Plan(ctx, &engine.PlanRequest{ - Database: apply.Database, - DatabaseType: c.config.Type, - SchemaFiles: plan.SchemaFiles, - Credentials: creds, - }) + result, err := c.planWithEngine(ctx, &ternv1.PlanRequest{}, apply.Database, plan.SchemaFiles) if err != nil { return false, fmt.Errorf("re-plan check failed: %w", err) } @@ -431,18 +423,7 @@ type replanResult struct { // Used by both Start() and ResumeApply() to handle tables that completed before // stop or crash. func (c *LocalClient) replanAndFilterTasks(ctx context.Context, apply *storage.Apply, tasks []*storage.Task, plan *storage.Plan) (*replanResult, error) { - creds := c.credentials() - eng := c.getEngine() - if eng == nil { - return nil, fmt.Errorf("no engine available") - } - - replanOut, err := eng.Plan(ctx, &engine.PlanRequest{ - Database: apply.Database, - DatabaseType: c.config.Type, - SchemaFiles: plan.SchemaFiles, - Credentials: creds, - }) + replanOut, err := c.planWithEngine(ctx, &ternv1.PlanRequest{}, apply.Database, plan.SchemaFiles) if err != nil { return nil, fmt.Errorf("re-plan failed: %w", err) } @@ -568,11 +549,14 @@ func (c *LocalClient) launchAtomicResume(ctx context.Context, apply *storage.App tasks []*storage.Task, plan *storage.Plan, options map[string]string, logMessage string, block bool, startRequested bool) error { allTasks := tasks - creds := c.credentials() eng := c.getEngine() if eng == nil { return fmt.Errorf("no engine available for grouped resume apply %s", apply.ApplyIdentifier) } + creds, err := c.credentialsForGroupedApply(plan) + if err != nil { + return fmt.Errorf("resolve credentials for grouped resume apply %s: %w", apply.ApplyIdentifier, err) + } if drainer, ok := eng.(engine.Drainer); ok { drainer.Drain() diff --git a/pkg/tern/local_control_test.go b/pkg/tern/local_control_test.go index 94fbc788..0cf3f167 100644 --- a/pkg/tern/local_control_test.go +++ b/pkg/tern/local_control_test.go @@ -127,9 +127,10 @@ func (s *controlTestStorage) ControlRequests() storage.ControlRequestStore { type controlCaptureEngine struct { engine.Engine - cutoverReq *engine.ControlRequest - stopReq *engine.ControlRequest - stopErr error + cutoverReq *engine.ControlRequest + stopReq *engine.ControlRequest + progressReq *engine.ProgressRequest + stopErr error // onStop runs when Stop is invoked, before it returns. Used to observe // storage state at the moment of the engine stop (e.g. to assert the engine // is stopped before tasks are marked stopped/cancelled). @@ -156,15 +157,17 @@ func (e *controlCaptureEngine) Stop(_ context.Context, req *engine.ControlReques return &engine.ControlResult{Accepted: true}, nil } -func (e *controlCaptureEngine) Progress(context.Context, *engine.ProgressRequest) (*engine.ProgressResult, error) { +func (e *controlCaptureEngine) Progress(_ context.Context, req *engine.ProgressRequest) (*engine.ProgressResult, error) { + e.progressReq = req return &engine.ProgressResult{}, nil } func newMySQLControlTestClient(apply *storage.Apply, tasks []*storage.Task, eng *controlCaptureEngine) *LocalClient { return &LocalClient{ config: LocalConfig{ - Database: "testdb", - Type: storage.DatabaseTypeMySQL, + Database: "testdb", + Type: storage.DatabaseTypeMySQL, + TargetDSN: "root@tcp(localhost:3306)/", }, storage: &controlTestStorage{ applies: &controlTestApplyStore{apply: apply}, @@ -233,6 +236,7 @@ func TestLocalClient_StopMarksMySQLApplyStopped(t *testing.T) { ApplyID: apply.ID, TaskIdentifier: "task-mysql-stop", Database: "testdb", + Namespace: "testdb", State: state.Task.Running, } eng := &controlCaptureEngine{} @@ -249,6 +253,50 @@ func TestLocalClient_StopMarksMySQLApplyStopped(t *testing.T) { require.NotNil(t, eng.stopReq, "stop should call the engine") } +// Sequential MySQL applies can contain tasks from multiple namespaces, but only +// one Spirit operation is live at a time. Stop and the post-stop progress +// snapshot must use the namespace for the task that had live engine work, not a +// different targeted task that happened to appear first in storage order. +func TestLocalClient_StopSnapshotsProgressWithStoppedTaskNamespace(t *testing.T) { + apply := &storage.Apply{ + ID: 42, + ApplyIdentifier: "apply-mysql-multi-namespace-stop", + State: state.Apply.Running, + Database: "testdb", + DatabaseType: storage.DatabaseTypeMySQL, + Environment: "staging", + } + pendingTask := &storage.Task{ + ID: 7, + ApplyID: apply.ID, + TaskIdentifier: "task-pending", + Database: "testdb", + Namespace: "pending_schema", + State: state.Task.Pending, + } + liveTask := &storage.Task{ + ID: 8, + ApplyID: apply.ID, + TaskIdentifier: "task-live", + Database: "testdb", + Namespace: "live_schema", + State: state.Task.Running, + } + eng := &controlCaptureEngine{} + client := newMySQLControlTestClient(apply, []*storage.Task{pendingTask, liveTask}, eng) + + resp, err := client.Stop(t.Context(), &ternv1.StopRequest{ApplyId: apply.ApplyIdentifier}) + + require.NoError(t, err) + assert.True(t, resp.Accepted) + require.NotNil(t, eng.stopReq, "stop should call the engine for the live task") + require.NotNil(t, eng.stopReq.Credentials) + assert.Equal(t, "root@tcp(localhost:3306)/live_schema", eng.stopReq.Credentials.DSN) + require.NotNil(t, eng.progressReq, "stop should snapshot progress with the stopped task credentials") + require.NotNil(t, eng.progressReq.Credentials) + assert.Equal(t, "root@tcp(localhost:3306)/live_schema", eng.progressReq.Credentials.DSN) +} + // A stop request can race with a worker that finalized all task rows but // exited before finalizing the apply row. When every targeted task is already // terminal, stop derives the apply's final state from its tasks: an apply @@ -319,6 +367,7 @@ func TestLocalClient_StopAllTasksTerminalDerivesApplyState(t *testing.T) { TaskIdentifier: fmt.Sprintf("task-mysql-stop-terminal-%d", i+1), TableName: fmt.Sprintf("t%d", i+1), Database: "testdb", + Namespace: "testdb", State: taskState, } if i < len(tc.taskErrors) { @@ -521,6 +570,7 @@ func TestLocalClient_StopRecoveringMySQLStopsEngineBeforeStorage(t *testing.T) { ApplyID: apply.ID, TaskIdentifier: "task-mysql-recovering", Database: "testdb", + Namespace: "testdb", State: state.Task.Recovering, } eng := &controlCaptureEngine{}