Skip to content

Commit

Permalink
add flag for number of Coordinator workers
Browse files Browse the repository at this point in the history
  • Loading branch information
meiji163 committed Oct 25, 2024
1 parent 126c981 commit 641fe92
Show file tree
Hide file tree
Showing 4 changed files with 3 additions and 6 deletions.
2 changes: 1 addition & 1 deletion go/base/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ type MigrationContext struct {
CutOverType CutOver
ReplicaServerId uint

// Number of workers used by the Coordinator
// Number of workers used by the trx coordinator
NumWorkers int

Hostname string
Expand Down
1 change: 1 addition & 0 deletions go/cmd/gh-ost/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ func main() {
defaultRetries := flag.Int64("default-retries", 60, "Default number of retries for various operations before panicking")
cutOverLockTimeoutSeconds := flag.Int64("cut-over-lock-timeout-seconds", 3, "Max number of seconds to hold locks on tables while attempting to cut-over (retry attempted when lock exceeds timeout)")
niceRatio := flag.Float64("nice-ratio", 0, "force being 'nice', imply sleep time per chunk time; range: [0.0..100.0]. Example values: 0 is aggressive. 1: for every 1ms spent copying rows, sleep additional 1ms (effectively doubling runtime); 0.7: for every 10ms spend in a rowcopy chunk, spend 7ms sleeping immediately after")
flag.IntVar(&migrationContext.NumWorkers, "workers", 8, "Number of concurrent workers for applying DML events. Each worker uses one goroutine.")

maxLagMillis := flag.Int64("max-lag-millis", 1500, "replication lag at which to throttle operation")
replicationLagQuery := flag.String("replication-lag-query", "", "Deprecated. gh-ost uses an internal, subsecond resolution query")
Expand Down
2 changes: 0 additions & 2 deletions go/logic/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,8 +276,6 @@ func NewCoordinator(migrationContext *base.MigrationContext, applier *Applier, t
waitingJobs: make(map[int64][]chan struct{}),

events: make(chan *replication.BinlogEvent, 1000),

workerQueue: make(chan *Worker, 16),
}
}

Expand Down
4 changes: 1 addition & 3 deletions go/logic/migrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,6 @@ func (this *Migrator) Migrate() (err error) {
return err
}

// TODO(meiji163): configure workers
this.migrationContext.NumWorkers = 16
this.trxCoordinator = NewCoordinator(this.migrationContext, this.applier, this.throttler, this.onChangelogEvent)

if err := this.initiateStreaming(); err != nil {
Expand Down Expand Up @@ -364,7 +362,7 @@ func (this *Migrator) Migrate() (err error) {
}
}

this.migrationContext.Log.Info("starting applier workers")
this.migrationContext.Log.Infof("starting %d applier workers", this.migrationContext.NumWorkers)
this.trxCoordinator.InitializeWorkers(this.migrationContext.NumWorkers)

initialLag, _ := this.inspector.getReplicationLag()
Expand Down

0 comments on commit 641fe92

Please sign in to comment.