diff --git a/cmd/clickhouse_sinker/main.go b/cmd/clickhouse_sinker/main.go index 456c1717..d74d72c6 100644 --- a/cmd/clickhouse_sinker/main.go +++ b/cmd/clickhouse_sinker/main.go @@ -134,10 +134,27 @@ func init() { func main() { util.Run("clickhouse_sinker", func() error { - // Initialize http server for metrics and debug - mux := http.NewServeMux() - mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { - _, _ = w.Write([]byte(` + httpHost := cmdOps.HTTPHost + if httpHost == "" { + ip, err := util.GetOutboundIP() + if err != nil { + return fmt.Errorf("failed to determine outbound ip: %w", err) + } + httpHost = ip.String() + } + + httpPort := cmdOps.HTTPPort + if httpPort == 0 { + httpPort = util.HttpPortBase + } + httpPort = util.GetSpareTCPPort(httpPort) + + // cmdOps.HTTPPort=0: disable the http server + if cmdOps.HTTPPort > 0 { + // Initialize http server for metrics and debug + mux := http.NewServeMux() + mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(` ClickHouse Sinker

ClickHouse Sinker

@@ -149,80 +166,65 @@ func main() {

Live Full

pprof

`)) - }) + }) - mux.HandleFunc("/state", func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - if runner != nil && runner.GetCurrentConfig() != nil { - var stateLags map[string]cm.StateLag - var bs []byte - var err error - if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil { - if bs, err = json.Marshal(stateLags); err == nil { - _, _ = w.Write(bs) + mux.HandleFunc("/state", func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + if runner != nil && runner.GetCurrentConfig() != nil { + var stateLags map[string]cm.StateLag + var bs []byte + var err error + if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil { + if bs, err = json.Marshal(stateLags); err == nil { + _, _ = w.Write(bs) + } } } - } - }) - health.Health.AddLivenessCheck("task", func() error { - var err error - if runner != nil && runner.GetCurrentConfig() != nil { - var stateLags map[string]cm.StateLag - var count int - if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil { - for _, value := range stateLags { - if value.State == "Dead" { - count++ + }) + health.Health.AddLivenessCheck("task", func() error { + var err error + if runner != nil && runner.GetCurrentConfig() != nil { + var stateLags map[string]cm.StateLag + var count int + if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil { + for _, value := range stateLags { + if value.State == "Dead" { + count++ + } } + if count == len(stateLags) { + return fmt.Errorf("All task is Dead.") + } + } else { + return err } - if count == len(stateLags) { - return fmt.Errorf("All task is Dead.") - } - } else { - return err } - } - return nil - }) - mux.Handle("/metrics", httpMetrics) - mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1 - mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1 - mux.HandleFunc("/debug/pprof/", pprof.Index) - mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) - mux.HandleFunc("/debug/pprof/profile", pprof.Profile) - mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) - mux.HandleFunc("/debug/pprof/trace", pprof.Trace) - mux.Handle("/debug/vars", http.DefaultServeMux) - - // cmdOps.HTTPPort=0: let OS choose the listen port, and record the exact metrics URL to log. - httpPort := cmdOps.HTTPPort - if httpPort == 0 { - httpPort = util.HttpPortBase - } - httpPort = util.GetSpareTCPPort(httpPort) + return nil + }) + mux.Handle("/metrics", httpMetrics) + mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1 + mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1 + mux.HandleFunc("/debug/pprof/", pprof.Index) + mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) + mux.HandleFunc("/debug/pprof/profile", pprof.Profile) + mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) + mux.HandleFunc("/debug/pprof/trace", pprof.Trace) + mux.Handle("/debug/vars", http.DefaultServeMux) - httpHost := cmdOps.HTTPHost - if httpHost == "" { - ip, err := util.GetOutboundIP() + httpAddr = fmt.Sprintf("%s:%d", httpHost, httpPort) + listener, err := net.Listen("tcp", httpAddr) if err != nil { - return fmt.Errorf("failed to determine outbound ip: %w", err) + return fmt.Errorf("failed to listen on %q: %w", httpAddr, err) } - httpHost = ip.String() - } - httpAddr = fmt.Sprintf("%s:%d", httpHost, httpPort) - listener, err := net.Listen("tcp", httpAddr) - if err != nil { - return fmt.Errorf("failed to listen on %q: %w", httpAddr, err) - } + util.Logger.Info(fmt.Sprintf("Run http server at http://%s/", httpAddr)) - util.Logger.Info(fmt.Sprintf("Run http server at http://%s/", httpAddr)) - - go func() { - if err := http.Serve(listener, mux); err != nil { - util.Logger.Error("http.ListenAndServe failed", zap.Error(err)) - } - }() + go func() { + if err := http.Serve(listener, mux); err != nil { + util.Logger.Error("http.ListenAndServe failed", zap.Error(err)) + } + }() + } var rcm cm.RemoteConfManager var properties map[string]interface{}