diff --git a/output/clickhouse.go b/output/clickhouse.go index 91008e07..a3313247 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -67,32 +67,24 @@ type ClickHouse struct { distMetricTbls []string distSeriesTbls []string - bmSeries map[int64]int64 - wrSeries int - numFlying int32 - mux sync.Mutex - taskDone *sync.Cond + bmSeries map[int64]int64 + wrSeries int + numFlying int32 + mux sync.Mutex + taskDone *sync.Cond + nextResetQuota time.Time } // NewClickHouse new a clickhouse instance func NewClickHouse(cfg *config.Config, taskCfg *config.TaskConfig) *ClickHouse { ck := &ClickHouse{cfg: cfg, taskCfg: taskCfg} ck.taskDone = sync.NewCond(&ck.mux) + ck.nextResetQuota = time.Now().Add(10 * time.Second) return ck } // Init the clickhouse intance func (c *ClickHouse) Init() (err error) { - // FIXME: goroutine leak? - go func() { - ticker := time.NewTicker(10 * time.Second) - defer ticker.Stop() - for range ticker.C { - c.mux.Lock() - c.wrSeries = 0 - c.mux.Unlock() - } - }() return c.initSchema() } @@ -137,6 +129,15 @@ func (c *ClickHouse) AllowWriteSeries(sid, mid int64) (allowed bool) { if c.wrSeries < wrSeriesQuota { c.wrSeries++ allowed = true + } else { + now := time.Now() + if now.After(c.nextResetQuota) { + c.nextResetQuota = now.Add(10 * time.Second) + c.wrSeries = 1 + allowed = true + } + } + if allowed { statistics.WriteSeriesAllowChanged.WithLabelValues(c.taskCfg.Name).Inc() } else { statistics.WriteSeriesDropQuota.WithLabelValues(c.taskCfg.Name).Inc()