diff --git a/internal/app/app.go b/internal/app/app.go index dfb258f5..d11f7073 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -2,7 +2,6 @@ package app import ( "context" - "encoding/json" "errors" "fmt" "io" @@ -162,173 +161,6 @@ func (app *App) newDBCluster() error { return nil } -func (app *App) writeEmergeFile(msg string) { - app.logger.Warn().Msg("touch emerge file") - err := os.WriteFile(app.config.Emergefile, []byte(msg), 0o644) - if err != nil { - app.logger.Error().Err(err).Msg("failed to write emerge file") - } -} - -func (app *App) writeResetupFile() { - app.logger.Warn().Msg("touch resetup file") - err := os.WriteFile(app.config.Resetupfile, []byte{}, 0o644) - if err != nil { - app.logger.Error().Err(err).Msg("failed to write resetup file") - } -} - -func (app *App) doesResetupFileExist() bool { - _, err := os.Stat(app.config.Resetupfile) - return err == nil -} - -func (app *App) writeMaintenanceFile() { - app.logger.Warn().Msg("touch maintenance file") - err := os.WriteFile(app.config.Maintenancefile, []byte(""), 0o644) - if err != nil { - app.logger.Error().Err(err).Msg("failed to write maintenance file") - } -} - -func (app *App) doesMaintenanceFileExist() bool { - _, err := os.Stat(app.config.Maintenancefile) - return err == nil -} - -func (app *App) removeMaintenanceFile() { - err := os.Remove(app.config.Maintenancefile) - if err != nil && !os.IsNotExist(err) { - app.logger.Error().Err(err).Msg("failed to remove maintenance file") - } -} - -// separate goroutine performing health checks -func (app *App) healthChecker(ctx context.Context) { - ticker := time.NewTicker(app.config.HealthCheckInterval) - var logFile string - var maxLogPos int64 - for { - select { - case <-ticker.C: - hc := app.getLocalNodeState() - logFile, maxLogPos = hc.UpdateBinlogStatus(logFile, maxLogPos) - app.logger.Info().Msgf("healthcheck: %v", hc) - err := app.SetHealthState(app.config.Hostname, hc) - if err != nil { - app.logger.Error().Err(err).Msg("healthcheck: failed to set status to dcs") - } - case <-ctx.Done(): - return - } - } -} - -// separate gorutine performing info file management -func (app *App) stateFileHandler(ctx context.Context) { - ticker := time.NewTicker(app.config.InfoFileHandlerInterval) - for { - select { - case <-ticker.C: - - tree, err := app.dcs.GetTree("") - if err != nil { - app.logger.Error().Err(err).Msg("stateFileHandler: failed to get current zk tree") - _ = os.Remove(app.config.InfoFile) - continue - } - data, err := json.Marshal(tree) - if err != nil { - app.logger.Error().Err(err).Msg("stateFileHandler: failed to marshal zk node data") - _ = os.Remove(app.config.InfoFile) - continue - } - err = os.WriteFile(app.config.InfoFile, data, 0o666) - if err != nil { - app.logger.Error().Err(err).Msg("stateFileHandler: failed to write info file") - _ = os.Remove(app.config.InfoFile) - continue - } - - case <-ctx.Done(): - return - } - } -} - -// checks if update of external CA file required -func (app *App) externalCAFileChecker(ctx context.Context) { - ticker := time.NewTicker(app.config.ExternalCAFileCheckInterval) - for { - select { - case <-ticker.C: - localNode := app.cluster.Local() - replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode) - if err != nil { - if !mysql.IsErrorChannelDoesNotExists(err) { - app.logger.Error().Err(err).Msgf("external CA file checker: host %s failed to get external replica status", localNode.Host()) - } - continue - } - if replicaStatus == nil { - app.logger.Info().Msgf("external CA file checker: no external replication found on host %v", localNode.Host()) - continue - } - err = localNode.UpdateExternalCAFile() - if err != nil { - app.logger.Error().Err(err).Msg("external CA file checker: failed check and update CA file") - } - case <-ctx.Done(): - return - } - } -} - -func (app *App) replMonWriter(ctx context.Context) { - ticker := time.NewTicker(app.config.ReplMonWriteInterval) - for { - select { - case <-ticker.C: - localNode := app.cluster.Local() - sstatus, err := localNode.GetReplicaStatus() - if err != nil { - app.logger.Error().Err(err).Msg("repl mon writer: error while checking replica status") - time.Sleep(app.config.ReplMonErrorWaitInterval) - continue - } - if sstatus != nil { - app.logger.Info().Msgf("repl mon writer: host is replica") - time.Sleep(app.config.ReplMonSlaveWaitInterval) - continue - } - readOnly, _, err := localNode.IsReadOnly() - if err != nil { - app.logger.Error().Err(err).Msg("repl mon writer: error while checking read only status") - time.Sleep(app.config.ReplMonErrorWaitInterval) - continue - } - if readOnly { - app.logger.Info().Msgf("repl mon writer: host is read only") - time.Sleep(app.config.ReplMonSlaveWaitInterval) - continue - } - err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName) - if err != nil { - if mysql.IsErrorTableDoesNotExists(err) { - err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName) - if err != nil { - app.logger.Error().Err(err).Msg("repl mon writer: error while creating repl mon table") - } - continue - } - app.logger.Error().Err(err).Msg("repl mon writer: error while writing in repl mon table") - } - case <-ctx.Done(): - return - } - } -} - func (app *App) checkHAReplicasRunning(local *mysql.Node) (replicasRunning bool, hasUnreachReplicas bool) { checker := func(host string) error { node := app.cluster.Get(host) @@ -2207,55 +2039,6 @@ func (app *App) findBestStreamFrom(node *mysql.Node, clusterState map[string]*no } } -func (app *App) enterMaintenance(maintenance *Maintenance, master string) error { - if app.config.DisableSemiSyncReplicationOnMaintenance { - node := app.cluster.Get(master) - err := node.SemiSyncDisable() - if err != nil { - return err - } - err = app.DeleteActiveNodes() - if err != nil { - return err - } - } - maintenance.MySyncPaused = true - return app.SetMaintenance(maintenance) -} - -func (app *App) leaveMaintenance() error { - err := app.cluster.UpdateHostsInfo() - if err != nil { - return err - } - clusterState := app.getClusterStateFromDB() - master, err := app.ensureCurrentMaster(clusterState) - if err != nil { - if errors.Is(err, ErrManyMasters) { - app.writeEmergeFile(err.Error()) - } - return err - } - clusterStateDcs, err := app.getClusterStateFromDcs() - if err != nil { - return err - } - app.repairCluster(clusterState, clusterStateDcs, master) - clusterState = app.getClusterStateFromDB() - err = app.updateActiveNodes(clusterState, clusterStateDcs, []string{}, master) - if err != nil { - return err - } - activeNodes, err := app.GetActiveNodes() - if err != nil { - return err - } - if len(activeNodes) == 0 { - return ErrNoActiveNodes - } - return app.DeleteMaintenance() -} - func (app *App) performChangeMaster(host, master string) error { // Do we need to change master status here if host == master { diff --git a/internal/app/app_background.go b/internal/app/app_background.go new file mode 100644 index 00000000..56700435 --- /dev/null +++ b/internal/app/app_background.go @@ -0,0 +1,136 @@ +package app + +import ( + "context" + "encoding/json" + "os" + "time" + + "github.com/yandex/mysync/internal/mysql" +) + +// separate goroutine performing health checks +func (app *App) healthChecker(ctx context.Context) { + ticker := time.NewTicker(app.config.HealthCheckInterval) + var logFile string + var maxLogPos int64 + for { + select { + case <-ticker.C: + hc := app.getLocalNodeState() + logFile, maxLogPos = hc.UpdateBinlogStatus(logFile, maxLogPos) + app.logger.Info().Msgf("healthcheck: %v", hc) + err := app.SetHealthState(app.config.Hostname, hc) + if err != nil { + app.logger.Error().Err(err).Msg("healthcheck: failed to set status to dcs") + } + case <-ctx.Done(): + return + } + } +} + +// separate gorutine performing info file management +func (app *App) stateFileHandler(ctx context.Context) { + ticker := time.NewTicker(app.config.InfoFileHandlerInterval) + for { + select { + case <-ticker.C: + + tree, err := app.dcs.GetTree("") + if err != nil { + app.logger.Error().Err(err).Msg("stateFileHandler: failed to get current zk tree") + _ = os.Remove(app.config.InfoFile) + continue + } + data, err := json.Marshal(tree) + if err != nil { + app.logger.Error().Err(err).Msg("stateFileHandler: failed to marshal zk node data") + _ = os.Remove(app.config.InfoFile) + continue + } + err = os.WriteFile(app.config.InfoFile, data, 0o666) + if err != nil { + app.logger.Error().Err(err).Msg("stateFileHandler: failed to write info file") + _ = os.Remove(app.config.InfoFile) + continue + } + + case <-ctx.Done(): + return + } + } +} + +// checks if update of external CA file required +func (app *App) externalCAFileChecker(ctx context.Context) { + ticker := time.NewTicker(app.config.ExternalCAFileCheckInterval) + for { + select { + case <-ticker.C: + localNode := app.cluster.Local() + replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode) + if err != nil { + if !mysql.IsErrorChannelDoesNotExists(err) { + app.logger.Error().Err(err).Msgf("external CA file checker: host %s failed to get external replica status", localNode.Host()) + } + continue + } + if replicaStatus == nil { + app.logger.Info().Msgf("external CA file checker: no external replication found on host %v", localNode.Host()) + continue + } + err = localNode.UpdateExternalCAFile() + if err != nil { + app.logger.Error().Err(err).Msg("external CA file checker: failed check and update CA file") + } + case <-ctx.Done(): + return + } + } +} + +func (app *App) replMonWriter(ctx context.Context) { + ticker := time.NewTicker(app.config.ReplMonWriteInterval) + for { + select { + case <-ticker.C: + localNode := app.cluster.Local() + sstatus, err := localNode.GetReplicaStatus() + if err != nil { + app.logger.Error().Err(err).Msg("repl mon writer: error while checking replica status") + time.Sleep(app.config.ReplMonErrorWaitInterval) + continue + } + if sstatus != nil { + app.logger.Info().Msgf("repl mon writer: host is replica") + time.Sleep(app.config.ReplMonSlaveWaitInterval) + continue + } + readOnly, _, err := localNode.IsReadOnly() + if err != nil { + app.logger.Error().Err(err).Msg("repl mon writer: error while checking read only status") + time.Sleep(app.config.ReplMonErrorWaitInterval) + continue + } + if readOnly { + app.logger.Info().Msgf("repl mon writer: host is read only") + time.Sleep(app.config.ReplMonSlaveWaitInterval) + continue + } + err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName) + if err != nil { + if mysql.IsErrorTableDoesNotExists(err) { + err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName) + if err != nil { + app.logger.Error().Err(err).Msg("repl mon writer: error while creating repl mon table") + } + continue + } + app.logger.Error().Err(err).Msg("repl mon writer: error while writing in repl mon table") + } + case <-ctx.Done(): + return + } + } +} diff --git a/internal/app/app_dcs_paths.go b/internal/app/app_dcs_paths.go new file mode 100644 index 00000000..72cb1913 --- /dev/null +++ b/internal/app/app_dcs_paths.go @@ -0,0 +1,57 @@ +package app + +// ZooKeeper path constants used by mysync business logic. +// All paths are relative to the DCS root configured in config. +const ( + // manager's lock + pathManagerLock = "manager" + + pathMasterNode = "master" + + // def 1: activeNodes are master + alive running HA replicas ??? + // def 2: active nodes is last iteration surely ok ones + // structure: list of hosts(strings) + pathActiveNodes = "active_nodes" + + // structure: pathHealthPrefix/hostname -> NodeState + pathHealthPrefix = "health" + + // structure: single Switchover + pathCurrentSwitch = "switch" + + // structure: single Switchover + pathLastSwitch = "last_switch" + + // structure: single Switchover + pathLastRejectedSwitch = "last_rejected_switch" + + // structure: single Maintenance + pathMaintenance = "maintenance" + + // structure: pathRecovery/hostname -> nil + pathRecovery = "recovery" + + // List of HA nodes. May be modified by external tools (e.g. remove node from HA-cluster) + // list of strings + pathHANodes = "ha_nodes" + + // List of Cascade nodes. May be modified by external tools (e.g. on node removal) + // structure: pathCascadeNodesPrefix/hostname -> CascadeNodeConfiguration + pathCascadeNodesPrefix = "cascade_nodes" + + // low space flag + // structure: single value boolean + pathLowSpace = "low_space" + + // resetup status + // structure: pathResetupStatus/hostname -> ResetupStatus + pathResetupStatus = "resetup_status" + + pathLastShutdownNodeTime = "last_shutdown_node_time" + + // last known timestamp from repl_mon table + pathMasterReplMonTS = "master_repl_mon_ts" + + // timing start timestamps, structure: pathTimings/ -> time.Time + pathTimings = "timing" +) diff --git a/internal/app/app_files.go b/internal/app/app_files.go new file mode 100644 index 00000000..1f7b24e6 --- /dev/null +++ b/internal/app/app_files.go @@ -0,0 +1,44 @@ +package app + +import "os" + +func (app *App) writeEmergeFile(msg string) { + app.logger.Warn().Msg("touch emerge file") + err := os.WriteFile(app.config.Emergefile, []byte(msg), 0o644) + if err != nil { + app.logger.Error().Err(err).Msg("failed to write emerge file") + } +} + +func (app *App) writeResetupFile() { + app.logger.Warn().Msg("touch resetup file") + err := os.WriteFile(app.config.Resetupfile, []byte{}, 0o644) + if err != nil { + app.logger.Error().Err(err).Msg("failed to write resetup file") + } +} + +func (app *App) doesResetupFileExist() bool { + _, err := os.Stat(app.config.Resetupfile) + return err == nil +} + +func (app *App) writeMaintenanceFile() { + app.logger.Warn().Msg("touch maintenance file") + err := os.WriteFile(app.config.Maintenancefile, []byte(""), 0o644) + if err != nil { + app.logger.Error().Err(err).Msg("failed to write maintenance file") + } +} + +func (app *App) doesMaintenanceFileExist() bool { + _, err := os.Stat(app.config.Maintenancefile) + return err == nil +} + +func (app *App) removeMaintenanceFile() { + err := os.Remove(app.config.Maintenancefile) + if err != nil && !os.IsNotExist(err) { + app.logger.Error().Err(err).Msg("failed to remove maintenance file") + } +} diff --git a/internal/app/app_maintenance.go b/internal/app/app_maintenance.go new file mode 100644 index 00000000..06f59b5f --- /dev/null +++ b/internal/app/app_maintenance.go @@ -0,0 +1,52 @@ +package app + +import "errors" + +func (app *App) enterMaintenance(maintenance *Maintenance, master string) error { + if app.config.DisableSemiSyncReplicationOnMaintenance { + node := app.cluster.Get(master) + err := node.SemiSyncDisable() + if err != nil { + return err + } + err = app.DeleteActiveNodes() + if err != nil { + return err + } + } + maintenance.MySyncPaused = true + return app.SetMaintenance(maintenance) +} + +func (app *App) leaveMaintenance() error { + err := app.cluster.UpdateHostsInfo() + if err != nil { + return err + } + clusterState := app.getClusterStateFromDB() + master, err := app.ensureCurrentMaster(clusterState) + if err != nil { + if errors.Is(err, ErrManyMasters) { + app.writeEmergeFile(err.Error()) + } + return err + } + clusterStateDcs, err := app.getClusterStateFromDcs() + if err != nil { + return err + } + app.repairCluster(clusterState, clusterStateDcs, master) + clusterState = app.getClusterStateFromDB() + err = app.updateActiveNodes(clusterState, clusterStateDcs, []string{}, master) + if err != nil { + return err + } + activeNodes, err := app.GetActiveNodes() + if err != nil { + return err + } + if len(activeNodes) == 0 { + return ErrNoActiveNodes + } + return app.DeleteMaintenance() +} diff --git a/internal/app/data.go b/internal/app/data.go index fb6fd6b6..5008e336 100644 --- a/internal/app/data.go +++ b/internal/app/data.go @@ -16,60 +16,6 @@ const ( stateMaintenance = "Maintenance" ) -const ( - // manager's lock - pathManagerLock = "manager" - - pathMasterNode = "master" - - // def 1: activeNodes are master + alive running HA replicas ??? - // def 2: active nodes is last iteration surely ok ones - // structure: list of hosts(strings) - pathActiveNodes = "active_nodes" - - // structure: pathHealthPrefix/hostname -> NodeState - pathHealthPrefix = "health" - - // structure: single Switchover - pathCurrentSwitch = "switch" - - // structure: single Switchover - pathLastSwitch = "last_switch" - - // structure: single Switchover - pathLastRejectedSwitch = "last_rejected_switch" - - // structure: single Maintenance - pathMaintenance = "maintenance" - - // structure: pathRecovery/hostname -> nil - pathRecovery = "recovery" - - // List of HA nodes. May be modified by external tools (e.g. remove node from HA-cluster) - // list of strings - pathHANodes = "ha_nodes" - - // List of Cascade nodes. May be modified by external tools (e.g. on node removal) - // structure: pathCascadeNodesPrefix/hostname -> CascadeNodeConfiguration - pathCascadeNodesPrefix = "cascade_nodes" - - // low space flag - // structure: single value boolean - pathLowSpace = "low_space" - - // resetup status - // structure: pathResetupStatus/hostname -> ResetupStatus - pathResetupStatus = "resetup_status" - - pathLastShutdownNodeTime = "last_shutdown_node_time" - - // last known timestamp from repl_mon table - pathMasterReplMonTS = "master_repl_mon_ts" - - // timing start timestamps, structure: pathTimings/ -> time.Time - pathTimings = "timing" -) - var ( ErrNoMaster = errors.New("no alive master found") ErrManyMasters = errors.New("more than one master found")