From e1acdff05319ccbc73c77f8e870ec0ad8e2bb4af Mon Sep 17 00:00:00 2001 From: PJ Date: Fri, 11 Aug 2023 12:23:05 +0200 Subject: [PATCH] all: add state routes and build info --- api/autopilot.go | 9 +++ api/bus.go | 6 ++ api/state.go | 14 ++++ api/worker.go | 7 ++ autopilot/autopilot.go | 56 ++++++++++--- autopilot/client.go | 6 ++ build/{env.go => env_default.go} | 0 build/env_testnet.go | 2 +- build/gen.go | 134 +++++++++++++++++++++++++++++++ build/meta.go | 28 +++++++ bus/bus.go | 42 ++++++++++ bus/client.go | 6 ++ bus/client_test.go | 13 +-- cmd/renterd/main.go | 10 ++- internal/node/node.go | 37 +++++---- internal/testing/cluster.go | 6 +- worker/client.go | 6 ++ worker/worker.go | 44 ++++++++++ 18 files changed, 388 insertions(+), 38 deletions(-) create mode 100644 api/state.go rename build/{env.go => env_default.go} (100%) create mode 100644 build/gen.go create mode 100644 build/meta.go diff --git a/api/autopilot.go b/api/autopilot.go index 4dc0aecbc..2b8b4a0d1 100644 --- a/api/autopilot.go +++ b/api/autopilot.go @@ -99,6 +99,15 @@ type ( UptimeMS ParamDuration `json:"uptimeMS"` } + // AutopilotStateResponse is the response type for the /autopilot/state + // endpoint. + AutopilotStateResponse struct { + AutopilotStatusResponse // TODO: deprecate /autopilot/status + + StartTime time.Time `json:"startTime"` + BuildState + } + HostHandlerResponseChecks struct { Gouging bool `json:"gouging"` GougingBreakdown HostGougingBreakdown `json:"gougingBreakdown"` diff --git a/api/bus.go b/api/bus.go index 1b8bb86b7..2dcd9b7d5 100644 --- a/api/bus.go +++ b/api/bus.go @@ -71,6 +71,12 @@ type AccountHandlerPOST struct { HostKey types.PublicKey `json:"hostKey"` } +// BusStateResponse is the response type for the /bus/state endpoint. +type BusStateResponse struct { + StartTime time.Time `json:"startTime"` + BuildState +} + // ConsensusState holds the current blockheight and whether we are synced or not. type ConsensusState struct { BlockHeight uint64 `json:"blockHeight"` diff --git a/api/state.go b/api/state.go new file mode 100644 index 000000000..6bb58250c --- /dev/null +++ b/api/state.go @@ -0,0 +1,14 @@ +package api + +import "time" + +type ( + // BuildState contains static information about the build. + BuildState struct { + Network string `json:"network"` + Version string `json:"version"` + Commit string `json:"commit"` + OS string `json:"OS"` + BuildTime time.Time `json:"buildTime"` + } +) diff --git a/api/worker.go b/api/worker.go index c67a33c5a..aeddf7d38 100644 --- a/api/worker.go +++ b/api/worker.go @@ -172,6 +172,13 @@ type UploaderStats struct { AvgSectorUploadSpeedMBPS float64 `json:"avgSectorUploadSpeedMBPS"` } +// WorkerStateResponse is the response type for the /worker/state endpoint. +type WorkerStateResponse struct { + ID string `json:"id"` + StartTime time.Time `json:"startTime"` + BuildState +} + // An UploadOption overrides an option on the upload and migrate endpoints in // the worker. type UploadOption func(url.Values) diff --git a/autopilot/autopilot.go b/autopilot/autopilot.go index 2df11eeee..64b294da6 100644 --- a/autopilot/autopilot.go +++ b/autopilot/autopilot.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "runtime" "strings" "sync" "time" @@ -16,6 +17,7 @@ import ( "go.sia.tech/jape" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/build" "go.sia.tech/renterd/hostdb" "go.sia.tech/renterd/object" "go.sia.tech/renterd/tracing" @@ -108,11 +110,11 @@ type Autopilot struct { tickerDuration time.Duration wg sync.WaitGroup - startStopMu sync.Mutex - runningSince time.Time - stopChan chan struct{} - ticker *time.Ticker - triggerChan chan bool + startStopMu sync.Mutex + startTime time.Time + stopChan chan struct{} + ticker *time.Ticker + triggerChan chan bool } // state holds a bunch of variables that are used by the autopilot and updated @@ -169,6 +171,7 @@ func (ap *Autopilot) Handler() http.Handler { "POST /hosts": ap.hostsHandlerPOST, "GET /host/:hostKey": ap.hostHandlerGET, "GET /status": ap.statusHandlerGET, + "GET /state": ap.stateHandlerGET, })) } @@ -178,7 +181,7 @@ func (ap *Autopilot) Run() error { ap.startStopMu.Unlock() return errors.New("already running") } - ap.runningSince = time.Now() + ap.startTime = time.Now() ap.stopChan = make(chan struct{}) ap.triggerChan = make(chan bool) ap.ticker = time.NewTicker(ap.tickerDuration) @@ -294,7 +297,7 @@ func (ap *Autopilot) Shutdown(_ context.Context) error { close(ap.stopChan) close(ap.triggerChan) ap.wg.Wait() - ap.runningSince = time.Time{} + ap.startTime = time.Time{} } return nil } @@ -317,11 +320,17 @@ func (ap *Autopilot) Trigger(forceScan bool) bool { } } +func (ap *Autopilot) StartTime() time.Time { + ap.startStopMu.Lock() + defer ap.startStopMu.Unlock() + return ap.startTime +} + func (ap *Autopilot) Uptime() (dur time.Duration) { ap.startStopMu.Lock() defer ap.startStopMu.Unlock() if ap.isRunning() { - dur = time.Since(ap.runningSince) + dur = time.Since(ap.startTime) } return } @@ -387,7 +396,7 @@ func (ap *Autopilot) blockUntilSynced(interrupt <-chan time.Time) bool { } func (ap *Autopilot) isRunning() bool { - return !ap.runningSince.IsZero() + return !ap.startTime.IsZero() } func (ap *Autopilot) updateState(ctx context.Context) error { @@ -599,6 +608,35 @@ func (ap *Autopilot) statusHandlerGET(jc jape.Context) { }) } +func (ap *Autopilot) stateHandlerGET(jc jape.Context) { + migrating, mLastStart := ap.m.Status() + scanning, sLastStart := ap.s.Status() + _, err := ap.bus.Autopilot(jc.Request.Context(), ap.id) + if err != nil && !strings.Contains(err.Error(), api.ErrAutopilotNotFound.Error()) { + jc.Error(err, http.StatusInternalServerError) + return + } + + jc.Encode(api.AutopilotStateResponse{ + AutopilotStatusResponse: api.AutopilotStatusResponse{ + Configured: err == nil, + Migrating: migrating, + MigratingLastStart: api.ParamTime(mLastStart), + Scanning: scanning, + ScanningLastStart: api.ParamTime(sLastStart), + UptimeMS: api.ParamDuration(ap.Uptime()), + }, + StartTime: ap.StartTime(), + BuildState: api.BuildState{ + Network: build.ConsensusNetworkName, + Version: build.Version(), + Commit: build.Commit(), + OS: runtime.GOOS, + BuildTime: build.BuildTime(), + }, + }) +} + func (ap *Autopilot) hostsHandlerPOST(jc jape.Context) { var req api.SearchHostsRequest if jc.Decode(&req) != nil { diff --git a/autopilot/client.go b/autopilot/client.go index b47d3d836..b190c21a1 100644 --- a/autopilot/client.go +++ b/autopilot/client.go @@ -61,6 +61,12 @@ func (c *Client) HostInfos(ctx context.Context, filterMode, usabilityMode string return } +// State returns the current state of the autopilot. +func (c *Client) State() (state api.AutopilotStateResponse, err error) { + err = c.c.GET("/state", &state) + return +} + func (c *Client) Status() (resp api.AutopilotStatusResponse, err error) { err = c.c.GET("/status", &resp) return diff --git a/build/env.go b/build/env_default.go similarity index 100% rename from build/env.go rename to build/env_default.go diff --git a/build/env_testnet.go b/build/env_testnet.go index 0588c2ee8..c94181952 100644 --- a/build/env_testnet.go +++ b/build/env_testnet.go @@ -11,7 +11,7 @@ import ( ) const ( - ConsensusNetworkName = "Testnet-Zen" + ConsensusNetworkName = "Zen Testnet" DefaultAPIAddress = "localhost:9880" DefaultGatewayAddress = ":9881" ) diff --git a/build/gen.go b/build/gen.go new file mode 100644 index 000000000..0d253bb13 --- /dev/null +++ b/build/gen.go @@ -0,0 +1,134 @@ +//go:build ignore + +// This script generates meta.go which contains version info for the hostd binary. It can be run with `go generate`. +// +//go:generate -command go run gen.go +package main + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "os" + "os/exec" + "strings" + "text/template" + "time" +) + +const logFormat = `{%n "commit": "%H",%n "shortCommit": "%h",%n "timestamp": "%cD",%n "tag": "%(describe:tags=true)"%n}` + +type ( + gitTime time.Time + + gitMeta struct { + Commit string `json:"commit"` + ShortCommit string `json:"shortCommit"` + Timestamp gitTime `json:"timestamp"` + Tag string `json:"tag"` + } +) + +var buildTemplate = template.Must(template.New("").Parse(`// Code generated by go generate; DO NOT EDIT. +// This file was generated by go generate at {{ .RunTime }}. +package build + +import ( + "time" +) + +const ( + commit = "{{ .Commit }}" + version = "{{ .Version }}" + buildTime = {{ .UnixTimestamp }} +) + +// Commit returns the commit hash of hostd +func Commit() string { + return commit +} + +// Version returns the version of hostd +func Version() string { + return version +} + +// BuildTime returns the time at which the binary was built. +func BuildTime() time.Time { + return time.Unix(buildTime, 0) +}`)) + +// UnmarshalJSON implements the json.Unmarshaler interface. +func (t *gitTime) UnmarshalJSON(buf []byte) error { + timeFormats := []string{ + time.RFC1123Z, + "Mon, 2 Jan 2006 15:04:05 -0700", + "2006-01-02 15:04:05 -0700", + time.UnixDate, + time.ANSIC, + time.RFC3339, + time.RFC1123, + } + + for _, format := range timeFormats { + parsed, err := time.Parse(format, strings.Trim(string(buf), `"`)) + if err == nil { + *t = gitTime(parsed) + return nil + } + } + return errors.New("failed to parse time") +} + +func getGitMeta() (meta gitMeta, _ error) { + cmd := exec.Command("git", "log", "-1", "--pretty=format:"+logFormat+"") + buf, err := cmd.Output() + if err != nil { + if err, ok := err.(*exec.ExitError); ok && len(err.Stderr) > 0 { + return gitMeta{}, fmt.Errorf("command failed: %w", errors.New(string(err.Stderr))) + } + return gitMeta{}, fmt.Errorf("failed to execute command: %w", err) + } else if err := json.Unmarshal(buf, &meta); err != nil { + return gitMeta{}, fmt.Errorf("failed to unmarshal json: %w", err) + } + return +} + +func main() { + meta, err := getGitMeta() + if err != nil { + log.Fatalln(err) + } + + commit := meta.ShortCommit + version := meta.Tag + if len(version) == 0 { + // no version, use commit and current time for development + version = commit + meta.Timestamp = gitTime(time.Now()) + } + + f, err := os.Create("meta.go") + if err != nil { + log.Fatalln(err) + } + defer f.Close() + + err = buildTemplate.Execute(f, struct { + Commit string + Version string + UnixTimestamp int64 + + RunTime string + }{ + Commit: commit, + Version: version, + UnixTimestamp: time.Time(meta.Timestamp).Unix(), + + RunTime: time.Now().Format(time.RFC3339), + }) + if err != nil { + log.Fatalln(err) + } +} diff --git a/build/meta.go b/build/meta.go new file mode 100644 index 000000000..d72fa96c7 --- /dev/null +++ b/build/meta.go @@ -0,0 +1,28 @@ +// Code generated by go generate; DO NOT EDIT. +// This file was generated by go generate at 2023-08-11T11:44:04+02:00. +package build + +import ( + "time" +) + +const ( + commit = "?" + version = "?" + buildTime = 0 +) + +// Commit returns the commit hash of hostd +func Commit() string { + return commit +} + +// Version returns the version of hostd +func Version() string { + return version +} + +// BuildTime returns the time at which the binary was built. +func BuildTime() time.Time { + return time.Unix(buildTime, 0) +} \ No newline at end of file diff --git a/bus/bus.go b/bus/bus.go index 1c0a997b9..a0440f4e0 100644 --- a/bus/bus.go +++ b/bus/bus.go @@ -7,7 +7,9 @@ import ( "fmt" "math" "net/http" + "runtime" "strings" + "sync" "time" "go.sia.tech/core/consensus" @@ -159,6 +161,9 @@ type bus struct { logger *zap.SugaredLogger accounts *accounts contractLocks *contractLocks + + mu sync.Mutex + startTime time.Time } func (b *bus) consensusAcceptBlock(jc jape.Context) { @@ -1346,6 +1351,19 @@ func (b *bus) contractTaxHandlerGET(jc jape.Context) { jc.Encode(cs.FileContractTax(types.FileContract{Payout: payout})) } +func (b *bus) stateHandlerGET(jc jape.Context) { + jc.Encode(api.BusStateResponse{ + StartTime: b.StartTime(), + BuildState: api.BuildState{ + Network: build.ConsensusNetworkName, + Version: build.Version(), + Commit: build.Commit(), + OS: runtime.GOOS, + BuildTime: build.BuildTime(), + }, + }) +} + // New returns a new Bus. func New(s Syncer, cm ChainManager, tp TransactionPool, w Wallet, hdb HostDB, as AutopilotStore, ms MetadataStore, ss SettingStore, eas EphemeralAccountStore, l *zap.Logger) (*bus, error) { b := &bus{ @@ -1537,16 +1555,40 @@ func (b *bus) Handler() http.Handler { "PUT /setting/:key": b.settingKeyHandlerPUT, "DELETE /setting/:key": b.settingKeyHandlerDELETE, + "GET /state": b.stateHandlerGET, + "GET /params/upload": b.paramsHandlerUploadGET, "GET /params/gouging": b.paramsHandlerGougingGET, })) } +// Run starts the bus. +func (b *bus) Run() error { + b.mu.Lock() + defer b.mu.Unlock() + if !b.startTime.IsZero() { + return errors.New("bus already running") + } + b.startTime = time.Now() + return nil +} + // Shutdown shuts down the bus. func (b *bus) Shutdown(ctx context.Context) error { + // Reset start time. + b.mu.Lock() + b.startTime = time.Time{} + b.mu.Unlock() + return b.eas.SaveAccounts(ctx, b.accounts.ToPersist()) } +func (b *bus) StartTime() time.Time { + b.mu.Lock() + defer b.mu.Unlock() + return b.startTime +} + func (b *bus) fetchSetting(ctx context.Context, key string, value interface{}) error { if val, err := b.ss.Setting(ctx, key); err != nil { return fmt.Errorf("could not get contract set settings: %w", err) diff --git a/bus/client.go b/bus/client.go index cbad5d127..b637844eb 100644 --- a/bus/client.go +++ b/bus/client.go @@ -733,6 +733,12 @@ func (c *Client) ObjectsStats() (osr api.ObjectsStatsResponse, err error) { return } +// State returns the current state of the bus. +func (c *Client) State() (state api.BusStateResponse, err error) { + err = c.c.GET("/state", &state) + return +} + // RenameObject renames a single object. func (c *Client) RenameObject(ctx context.Context, from, to string) (err error) { return c.renameObjects(ctx, from, to, api.ObjectsRenameModeSingle) diff --git a/bus/client_test.go b/bus/client_test.go index 9267a57c1..c47394412 100644 --- a/bus/client_test.go +++ b/bus/client_test.go @@ -23,7 +23,7 @@ func TestClient(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - c, serveFn, shutdownFn, err := newTestClient(t.TempDir()) + c, serveFn, runFn, shutdownFn, err := newTestClient(t.TempDir()) if err != nil { t.Fatal(err) } @@ -32,6 +32,7 @@ func TestClient(t *testing.T) { t.Error(err) } }() + go runFn() go serveFn() // assert setting 'foo' is not found @@ -60,23 +61,23 @@ func TestClient(t *testing.T) { } } -func newTestClient(dir string) (*bus.Client, func() error, func(context.Context) error, error) { +func newTestClient(dir string) (*bus.Client, func() error, func() error, func(context.Context) error, error) { // create listener l, err := net.Listen("tcp", "127.0.0.1:0") if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } // create client client := bus.NewClient("http://"+l.Addr().String(), "test") - b, cleanup, err := node.NewBus(node.BusConfig{ + b, runFn, cleanup, err := node.NewBus(node.BusConfig{ Bootstrap: false, GatewayAddr: "127.0.0.1:0", Miner: node.NewMiner(client), UsedUTXOExpiry: time.Minute, }, filepath.Join(dir, "bus"), types.GeneratePrivateKey(), zap.New(zapcore.NewNopCore())) if err != nil { - return nil, nil, nil, err + return nil, nil, nil, nil, err } // create server @@ -94,5 +95,5 @@ func newTestClient(dir string) (*bus.Client, func() error, func(context.Context) server.Shutdown(ctx) return cleanup(ctx) } - return client, serveFn, shutdownFn, nil + return client, serveFn, runFn, shutdownFn, nil } diff --git a/cmd/renterd/main.go b/cmd/renterd/main.go index 6c80aa17a..840e747d4 100644 --- a/cmd/renterd/main.go +++ b/cmd/renterd/main.go @@ -309,9 +309,11 @@ func main() { busAddr, busPassword := busCfg.remoteAddr, busCfg.apiPassword if busAddr == "" { - b, shutdownFn, err := node.NewBus(busCfg.BusConfig, *dir, getSeed(), logger) + b, runFn, shutdownFn, err := node.NewBus(busCfg.BusConfig, *dir, getSeed(), logger) if err != nil { log.Fatal("failed to create bus, err: ", err) + } else if err := runFn(); err != nil { + log.Fatal("failed to start the bus", err) } shutdownFns = append(shutdownFns, shutdownFn) @@ -327,9 +329,11 @@ func main() { workerAddrs, workerPassword := workerCfg.remoteAddrs, workerCfg.apiPassword if workerAddrs == "" { if workerCfg.enabled { - w, shutdownFn, err := node.NewWorker(workerCfg.WorkerConfig, bc, getSeed(), logger) + w, runFn, shutdownFn, err := node.NewWorker(workerCfg.WorkerConfig, bc, getSeed(), logger) if err != nil { log.Fatal("failed to create worker", err) + } else if err := runFn(); err != nil { + log.Fatal("failed to start the worker", err) } shutdownFns = append(shutdownFns, shutdownFn) @@ -388,7 +392,7 @@ func main() { log.Println("Shutting down...") shutdownFns = append(shutdownFns, srv.Shutdown) case err := <-autopilotErr: - log.Fatalln("Fatal autopilot error:", err) + log.Fatal("Fatal autopilot error:", err) } // Shut down the autopilot first, then the rest of the services in reverse order. diff --git a/internal/node/node.go b/internal/node/node.go index cac5a879a..3f04a0240 100644 --- a/internal/node/node.go +++ b/internal/node/node.go @@ -68,7 +68,10 @@ type AutopilotConfig struct { MigratorParallelSlabsPerWorker uint64 } -type ShutdownFn = func(context.Context) error +type ( + RunFn = func() error + ShutdownFn = func(context.Context) error +) func convertToSiad(core types.EncoderTo, siad encoding.SiaUnmarshaler) { var buf bytes.Buffer @@ -201,24 +204,24 @@ func (tp txpool) UnconfirmedParents(txn types.Transaction) ([]types.Transaction, return parents, nil } -func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { +func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { gatewayDir := filepath.Join(dir, "gateway") if err := os.MkdirAll(gatewayDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } g, err := gateway.New(cfg.GatewayAddr, cfg.Bootstrap, gatewayDir) if err != nil { - return nil, nil, err + return nil, nil, nil, err } consensusDir := filepath.Join(dir, "consensus") if err := os.MkdirAll(consensusDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } cs, errCh := mconsensus.New(g, cfg.Bootstrap, consensusDir) select { case err := <-errCh: if err != nil { - return nil, nil, err + return nil, nil, nil, err } default: go func() { @@ -229,11 +232,11 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht } tpoolDir := filepath.Join(dir, "transactionpool") if err := os.MkdirAll(tpoolDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } tp, err := transactionpool.New(cs, g, tpoolDir) if err != nil { - return nil, nil, err + return nil, nil, nil, err } // If no DB dialector was provided, use SQLite. @@ -241,7 +244,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht if dbConn == nil { dbDir := filepath.Join(dir, "db") if err := os.MkdirAll(dbDir, 0700); err != nil { - return nil, nil, err + return nil, nil, nil, err } dbConn = stores.NewSQLiteConnection(filepath.Join(dbDir, "db.sqlite")) } @@ -251,7 +254,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht sqlStoreDir := filepath.Join(dir, "partial_slabs") sqlStore, ccid, err := stores.NewSQLStore(dbConn, sqlStoreDir, true, cfg.PersistInterval, walletAddr, sqlLogger) if err != nil { - return nil, nil, err + return nil, nil, nil, err } cancelSubscribe := make(chan struct{}) @@ -277,14 +280,14 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht if m := cfg.Miner; m != nil { if err := cs.ConsensusSetSubscribe(m, ccid, nil); err != nil { - return nil, nil, err + return nil, nil, nil, err } tp.TransactionPoolSubscribe(m) } b, err := bus.New(syncer{g, tp}, chainManager{cs: cs, network: cfg.Network}, txpool{tp}, w, sqlStore, sqlStore, sqlStore, sqlStore, sqlStore, l) if err != nil { - return nil, nil, err + return nil, nil, nil, err } shutdownFn := func(ctx context.Context) error { @@ -300,20 +303,20 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht sqlStore.Close(), }) } - return b.Handler(), shutdownFn, nil + return b.Handler(), b.Run, shutdownFn, nil } -func NewWorker(cfg WorkerConfig, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) { +func NewWorker(cfg WorkerConfig, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { workerKey := blake2b.Sum256(append([]byte("worker"), seed...)) w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l) if err != nil { - return nil, nil, err + return nil, nil, nil, err } - return w.Handler(), w.Shutdown, nil + return w.Handler(), w.Run, w.Shutdown, nil } -func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, func() error, ShutdownFn, error) { +func NewAutopilot(cfg AutopilotConfig, b autopilot.Bus, workers []autopilot.Worker, l *zap.Logger) (http.Handler, RunFn, ShutdownFn, error) { ap, err := autopilot.New(cfg.ID, b, workers, l, cfg.Heartbeat, cfg.ScannerInterval, cfg.ScannerBatchSize, cfg.ScannerMinRecentFailures, cfg.ScannerNumThreads, cfg.MigrationHealthCutoff, cfg.AccountsRefillInterval, cfg.RevisionSubmissionBuffer, cfg.MigratorParallelSlabsPerWorker) if err != nil { return nil, nil, nil, err diff --git a/internal/testing/cluster.go b/internal/testing/cluster.go index 95e6eac5f..22f893e4c 100644 --- a/internal/testing/cluster.go +++ b/internal/testing/cluster.go @@ -254,7 +254,7 @@ func newTestClusterCustom(dir, dbName string, funding bool, wk types.PrivateKey, busCfg.Miner = node.NewMiner(busClient) // Create bus. - b, bStopFn, err := node.NewBus(busCfg, busDir, wk, logger) + b, bStartFn, bStopFn, err := node.NewBus(busCfg, busDir, wk, logger) if err != nil { return nil, err } @@ -268,7 +268,7 @@ func newTestClusterCustom(dir, dbName string, funding bool, wk types.PrivateKey, busShutdownFns = append(busShutdownFns, bStopFn) // Create worker. - w, wStopFn, err := node.NewWorker(workerCfg, busClient, wk, logger) + w, wStartFn, wStopFn, err := node.NewWorker(workerCfg, busClient, wk, logger) if err != nil { return nil, err } @@ -331,6 +331,8 @@ func newTestClusterCustom(dir, dbName string, funding bool, wk types.PrivateKey, cluster.wg.Add(1) go func() { _ = aStartFn() + _ = bStartFn + _ = wStartFn() cluster.wg.Done() }() diff --git a/worker/client.go b/worker/client.go index 6b466e3d2..f893aa1d0 100644 --- a/worker/client.go +++ b/worker/client.go @@ -155,6 +155,12 @@ func (c *Client) RHPUpdateRegistry(ctx context.Context, hostKey types.PublicKey, return } +// State returns the current state of the worker. +func (c *Client) State() (state api.WorkerStateResponse, err error) { + err = c.c.GET("/state", &state) + return +} + // MigrateSlab migrates the specified slab. func (c *Client) MigrateSlab(ctx context.Context, slab object.Slab, set string) error { values := make(url.Values) diff --git a/worker/worker.go b/worker/worker.go index 45d3d766f..02799b8d4 100644 --- a/worker/worker.go +++ b/worker/worker.go @@ -11,6 +11,7 @@ import ( "net" "net/http" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -25,6 +26,7 @@ import ( "go.sia.tech/jape" "go.sia.tech/renterd/alerts" "go.sia.tech/renterd/api" + "go.sia.tech/renterd/build" "go.sia.tech/renterd/hostdb" "go.sia.tech/renterd/metrics" "go.sia.tech/renterd/object" @@ -247,6 +249,9 @@ type worker struct { transportPoolV3 *transportPoolV3 logger *zap.SugaredLogger + + mu sync.Mutex + startTime time.Time } func dial(ctx context.Context, hostIP string) (net.Conn, error) { @@ -1165,6 +1170,20 @@ func (w *worker) accountHandlerGET(jc jape.Context) { jc.Encode(account) } +func (w *worker) stateHandlerGET(jc jape.Context) { + jc.Encode(api.WorkerStateResponse{ + ID: w.id, + StartTime: w.StartTime(), + BuildState: api.BuildState{ + Network: build.ConsensusNetworkName, + Version: build.Version(), + Commit: build.Commit(), + OS: runtime.GOOS, + BuildTime: build.BuildTime(), + }, + }) +} + // New returns an HTTP handler that serves the worker API. func New(masterKey [32]byte, id string, b Bus, contractLockingDuration, busFlushInterval, downloadOverdriveTimeout, uploadOverdriveTimeout time.Duration, downloadMaxOverdrive, uploadMaxOverdrive uint64, allowPrivateIPs bool, l *zap.Logger) (*worker, error) { if contractLockingDuration == 0 { @@ -1226,9 +1245,22 @@ func (w *worker) Handler() http.Handler { "GET /objects/*path": w.objectsHandlerGET, "PUT /objects/*path": w.objectsHandlerPUT, "DELETE /objects/*path": w.objectsHandlerDELETE, + + "GET /state": w.stateHandlerGET, })) } +func (w *worker) Run() error { + w.mu.Lock() + defer w.mu.Unlock() + + if !w.startTime.IsZero() { + return errors.New("worker already running") + } + w.startTime = time.Now() + return nil +} + // Shutdown shuts down the worker. func (w *worker) Shutdown(_ context.Context) error { w.interactionsMu.Lock() @@ -1246,9 +1278,21 @@ func (w *worker) Shutdown(_ context.Context) error { // Stop the uploader. w.uploadManager.Stop() + + // Reset start time. + w.mu.Lock() + w.startTime = time.Time{} + w.mu.Unlock() return nil } +// StartTime returns the time at which the worker was started. +func (w *worker) StartTime() time.Time { + w.mu.Lock() + defer w.mu.Unlock() + return w.startTime +} + type contractLock struct { lockID uint64 fcid types.FileContractID