Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
217 changes: 0 additions & 217 deletions internal/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -162,173 +161,6 @@ func (app *App) newDBCluster() error {
return nil
}

func (app *App) writeEmergeFile(msg string) {
app.logger.Warn().Msg("touch emerge file")
err := os.WriteFile(app.config.Emergefile, []byte(msg), 0o644)
if err != nil {
app.logger.Error().Err(err).Msg("failed to write emerge file")
}
}

func (app *App) writeResetupFile() {
app.logger.Warn().Msg("touch resetup file")
err := os.WriteFile(app.config.Resetupfile, []byte{}, 0o644)
if err != nil {
app.logger.Error().Err(err).Msg("failed to write resetup file")
}
}

func (app *App) doesResetupFileExist() bool {
_, err := os.Stat(app.config.Resetupfile)
return err == nil
}

func (app *App) writeMaintenanceFile() {
app.logger.Warn().Msg("touch maintenance file")
err := os.WriteFile(app.config.Maintenancefile, []byte(""), 0o644)
if err != nil {
app.logger.Error().Err(err).Msg("failed to write maintenance file")
}
}

func (app *App) doesMaintenanceFileExist() bool {
_, err := os.Stat(app.config.Maintenancefile)
return err == nil
}

func (app *App) removeMaintenanceFile() {
err := os.Remove(app.config.Maintenancefile)
if err != nil && !os.IsNotExist(err) {
app.logger.Error().Err(err).Msg("failed to remove maintenance file")
}
}

// separate goroutine performing health checks
func (app *App) healthChecker(ctx context.Context) {
ticker := time.NewTicker(app.config.HealthCheckInterval)
var logFile string
var maxLogPos int64
for {
select {
case <-ticker.C:
hc := app.getLocalNodeState()
logFile, maxLogPos = hc.UpdateBinlogStatus(logFile, maxLogPos)
app.logger.Info().Msgf("healthcheck: %v", hc)
err := app.SetHealthState(app.config.Hostname, hc)
if err != nil {
app.logger.Error().Err(err).Msg("healthcheck: failed to set status to dcs")
}
case <-ctx.Done():
return
}
}
}

// separate gorutine performing info file management
func (app *App) stateFileHandler(ctx context.Context) {
ticker := time.NewTicker(app.config.InfoFileHandlerInterval)
for {
select {
case <-ticker.C:

tree, err := app.dcs.GetTree("")
if err != nil {
app.logger.Error().Err(err).Msg("stateFileHandler: failed to get current zk tree")
_ = os.Remove(app.config.InfoFile)
continue
}
data, err := json.Marshal(tree)
if err != nil {
app.logger.Error().Err(err).Msg("stateFileHandler: failed to marshal zk node data")
_ = os.Remove(app.config.InfoFile)
continue
}
err = os.WriteFile(app.config.InfoFile, data, 0o666)
if err != nil {
app.logger.Error().Err(err).Msg("stateFileHandler: failed to write info file")
_ = os.Remove(app.config.InfoFile)
continue
}

case <-ctx.Done():
return
}
}
}

// checks if update of external CA file required
func (app *App) externalCAFileChecker(ctx context.Context) {
ticker := time.NewTicker(app.config.ExternalCAFileCheckInterval)
for {
select {
case <-ticker.C:
localNode := app.cluster.Local()
replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode)
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Error().Err(err).Msgf("external CA file checker: host %s failed to get external replica status", localNode.Host())
}
continue
}
if replicaStatus == nil {
app.logger.Info().Msgf("external CA file checker: no external replication found on host %v", localNode.Host())
continue
}
err = localNode.UpdateExternalCAFile()
if err != nil {
app.logger.Error().Err(err).Msg("external CA file checker: failed check and update CA file")
}
case <-ctx.Done():
return
}
}
}

func (app *App) replMonWriter(ctx context.Context) {
ticker := time.NewTicker(app.config.ReplMonWriteInterval)
for {
select {
case <-ticker.C:
localNode := app.cluster.Local()
sstatus, err := localNode.GetReplicaStatus()
if err != nil {
app.logger.Error().Err(err).Msg("repl mon writer: error while checking replica status")
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if sstatus != nil {
app.logger.Info().Msgf("repl mon writer: host is replica")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
readOnly, _, err := localNode.IsReadOnly()
if err != nil {
app.logger.Error().Err(err).Msg("repl mon writer: error while checking read only status")
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if readOnly {
app.logger.Info().Msgf("repl mon writer: host is read only")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
if mysql.IsErrorTableDoesNotExists(err) {
err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
app.logger.Error().Err(err).Msg("repl mon writer: error while creating repl mon table")
}
continue
}
app.logger.Error().Err(err).Msg("repl mon writer: error while writing in repl mon table")
}
case <-ctx.Done():
return
}
}
}

func (app *App) checkHAReplicasRunning(local *mysql.Node) (replicasRunning bool, hasUnreachReplicas bool) {
checker := func(host string) error {
node := app.cluster.Get(host)
Expand Down Expand Up @@ -2207,55 +2039,6 @@ func (app *App) findBestStreamFrom(node *mysql.Node, clusterState map[string]*no
}
}

func (app *App) enterMaintenance(maintenance *Maintenance, master string) error {
if app.config.DisableSemiSyncReplicationOnMaintenance {
node := app.cluster.Get(master)
err := node.SemiSyncDisable()
if err != nil {
return err
}
err = app.DeleteActiveNodes()
if err != nil {
return err
}
}
maintenance.MySyncPaused = true
return app.SetMaintenance(maintenance)
}

func (app *App) leaveMaintenance() error {
err := app.cluster.UpdateHostsInfo()
if err != nil {
return err
}
clusterState := app.getClusterStateFromDB()
master, err := app.ensureCurrentMaster(clusterState)
if err != nil {
if errors.Is(err, ErrManyMasters) {
app.writeEmergeFile(err.Error())
}
return err
}
clusterStateDcs, err := app.getClusterStateFromDcs()
if err != nil {
return err
}
app.repairCluster(clusterState, clusterStateDcs, master)
clusterState = app.getClusterStateFromDB()
err = app.updateActiveNodes(clusterState, clusterStateDcs, []string{}, master)
if err != nil {
return err
}
activeNodes, err := app.GetActiveNodes()
if err != nil {
return err
}
if len(activeNodes) == 0 {
return ErrNoActiveNodes
}
return app.DeleteMaintenance()
}

func (app *App) performChangeMaster(host, master string) error {
// Do we need to change master status here
if host == master {
Expand Down
136 changes: 136 additions & 0 deletions internal/app/app_background.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package app

import (
"context"
"encoding/json"
"os"
"time"

"github.com/yandex/mysync/internal/mysql"
)

// separate goroutine performing health checks
func (app *App) healthChecker(ctx context.Context) {
ticker := time.NewTicker(app.config.HealthCheckInterval)
var logFile string
var maxLogPos int64
for {
select {
case <-ticker.C:
hc := app.getLocalNodeState()
logFile, maxLogPos = hc.UpdateBinlogStatus(logFile, maxLogPos)
app.logger.Info().Msgf("healthcheck: %v", hc)
err := app.SetHealthState(app.config.Hostname, hc)
if err != nil {
app.logger.Error().Err(err).Msg("healthcheck: failed to set status to dcs")
}
case <-ctx.Done():
return
}
}
}

// separate gorutine performing info file management
func (app *App) stateFileHandler(ctx context.Context) {
ticker := time.NewTicker(app.config.InfoFileHandlerInterval)
for {
select {
case <-ticker.C:

tree, err := app.dcs.GetTree("")
if err != nil {
app.logger.Error().Err(err).Msg("stateFileHandler: failed to get current zk tree")
_ = os.Remove(app.config.InfoFile)
continue
}
data, err := json.Marshal(tree)
if err != nil {
app.logger.Error().Err(err).Msg("stateFileHandler: failed to marshal zk node data")
_ = os.Remove(app.config.InfoFile)
continue
}
err = os.WriteFile(app.config.InfoFile, data, 0o666)
if err != nil {
app.logger.Error().Err(err).Msg("stateFileHandler: failed to write info file")
_ = os.Remove(app.config.InfoFile)
continue
}

case <-ctx.Done():
return
}
}
}

// checks if update of external CA file required
func (app *App) externalCAFileChecker(ctx context.Context) {
ticker := time.NewTicker(app.config.ExternalCAFileCheckInterval)
for {
select {
case <-ticker.C:
localNode := app.cluster.Local()
replicaStatus, err := app.externalReplication.GetReplicaStatus(localNode)
if err != nil {
if !mysql.IsErrorChannelDoesNotExists(err) {
app.logger.Error().Err(err).Msgf("external CA file checker: host %s failed to get external replica status", localNode.Host())
}
continue
}
if replicaStatus == nil {
app.logger.Info().Msgf("external CA file checker: no external replication found on host %v", localNode.Host())
continue
}
err = localNode.UpdateExternalCAFile()
if err != nil {
app.logger.Error().Err(err).Msg("external CA file checker: failed check and update CA file")
}
case <-ctx.Done():
return
}
}
}

func (app *App) replMonWriter(ctx context.Context) {
ticker := time.NewTicker(app.config.ReplMonWriteInterval)
for {
select {
case <-ticker.C:
localNode := app.cluster.Local()
sstatus, err := localNode.GetReplicaStatus()
if err != nil {
app.logger.Error().Err(err).Msg("repl mon writer: error while checking replica status")
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if sstatus != nil {
app.logger.Info().Msgf("repl mon writer: host is replica")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
readOnly, _, err := localNode.IsReadOnly()
if err != nil {
app.logger.Error().Err(err).Msg("repl mon writer: error while checking read only status")
time.Sleep(app.config.ReplMonErrorWaitInterval)
continue
}
if readOnly {
app.logger.Info().Msgf("repl mon writer: host is read only")
time.Sleep(app.config.ReplMonSlaveWaitInterval)
continue
}
err = localNode.UpdateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
if mysql.IsErrorTableDoesNotExists(err) {
err = localNode.CreateReplMonTable(app.config.ReplMonSchemeName, app.config.ReplMonTableName)
if err != nil {
app.logger.Error().Err(err).Msg("repl mon writer: error while creating repl mon table")
}
continue
}
app.logger.Error().Err(err).Msg("repl mon writer: error while writing in repl mon table")
}
case <-ctx.Done():
return
}
}
}
Loading
Loading