diff --git a/go/logic/inspect.go b/go/logic/inspect.go index 12052e313..b1c3f4740 100644 --- a/go/logic/inspect.go +++ b/go/logic/inspect.go @@ -22,7 +22,8 @@ import ( "github.com/openark/golib/sqlutils" ) -const startSlavePostWaitMilliseconds = 1000 * time.Millisecond +const startReplicationPostWait = 250 * time.Millisecond +const startReplicationMaxWait = 2 * time.Second // Inspector reads data from the read-MySQL-server (typically a replica, but can be the master) // It is used for gaining initial status and structure, and later also follow up on progress and changelog @@ -302,12 +303,50 @@ func (this *Inspector) restartReplication() error { if startError != nil { return startError } - time.Sleep(startSlavePostWaitMilliseconds) + + // loop until replication is running unless we hit a max timeout. + startTime := time.Now() + for { + replicationRunning, err := this.validateReplicationRestarted() + if err != nil { + return fmt.Errorf("Failed to validate if replication had been restarted: %w", err) + } + if replicationRunning { + break + } + if time.Since(startTime) > startReplicationMaxWait { + return fmt.Errorf("Replication did not restart within the maximum wait time of %s", startReplicationMaxWait) + } + this.migrationContext.Log.Debugf("Replication not yet restarted, waiting...") + time.Sleep(startReplicationPostWait) + } this.migrationContext.Log.Debugf("Replication restarted") return nil } +// validateReplicationRestarted checks that the Slave_IO_Running and Slave_SQL_Running are both 'Yes' +// returns true if both are 'Yes', false otherwise +func (this *Inspector) validateReplicationRestarted() (bool, error) { + errNotRunning := fmt.Errorf("Replication not running on %s", this.connectionConfig.Key.String()) + query := `show /* gh-ost */ slave status` + err := sqlutils.QueryRowsMap(this.db, query, func(rowMap sqlutils.RowMap) error { + if rowMap.GetString("Slave_IO_Running") != "Yes" || rowMap.GetString("Slave_SQL_Running") != "Yes" { + return errNotRunning + } + return nil + }) + + if err != nil { + // If the error is that replication is not running, return that and not an error + if errors.Is(err, errNotRunning) { + return false, nil + } + return false, err + } + return true, nil +} + // applyBinlogFormat sets ROW binlog format and restarts replication to make // the replication thread apply it. func (this *Inspector) applyBinlogFormat() error {