Skip to content

Commit

Permalink
Add logic to ensure replication has started instead of assuming it ha…
Browse files Browse the repository at this point in the history
…s, with timeout
  • Loading branch information
andyedison committed Oct 14, 2024
1 parent ef238ee commit 1e25e71
Showing 1 changed file with 41 additions and 2 deletions.
43 changes: 41 additions & 2 deletions go/logic/inspect.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import (
"github.com/openark/golib/sqlutils"
)

const startSlavePostWaitMilliseconds = 1000 * time.Millisecond
const startReplicationPostWait = 250 * time.Millisecond
const startReplicationMaxWait = 2 * time.Second

// Inspector reads data from the read-MySQL-server (typically a replica, but can be the master)
// It is used for gaining initial status and structure, and later also follow up on progress and changelog
Expand Down Expand Up @@ -302,12 +303,50 @@ func (this *Inspector) restartReplication() error {
if startError != nil {
return startError
}
time.Sleep(startSlavePostWaitMilliseconds)

// loop until replication is running unless we hit a max timeout.
startTime := time.Now()
for {
replicationRunning, err := this.validateReplicationRestarted()
if err != nil {
return fmt.Errorf("Failed to validate if replication had been restarted: %w", err)
}
if replicationRunning {
break
}
if time.Since(startTime) > startReplicationMaxWait {
return fmt.Errorf("Replication did not restart within the maximum wait time of %s", startReplicationMaxWait)
}
this.migrationContext.Log.Debugf("Replication not yet restarted, waiting...")
time.Sleep(startReplicationPostWait)
}

this.migrationContext.Log.Debugf("Replication restarted")
return nil
}

// validateReplicationRestarted checks that the Slave_IO_Running and Slave_SQL_Running are both 'Yes'
// returns true if both are 'Yes', false otherwise
func (this *Inspector) validateReplicationRestarted() (bool, error) {
errNotRunning := fmt.Errorf("Replication not running on %s", this.connectionConfig.Key.String())
query := `show /* gh-ost */ slave status`
err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error {
if rowMap.GetString("Slave_IO_Running") != "Yes" || rowMap.GetString("Slave_SQL_Running") != "Yes" {
return errNotRunning
}
return nil
})

if err != nil {
// If the error is that replication is not running, return that and not an error
if errors.Is(err, errNotRunning) {
return false, nil
}
return false, err
}
return true, nil
}

// applyBinlogFormat sets ROW binlog format and restarts replication to make
// the replication thread apply it.
func (this *Inspector) applyBinlogFormat() error {
Expand Down

0 comments on commit 1e25e71

Please sign in to comment.