Skip to content

Commit

Permalink
fix ClickHouse.Init goroutine leak
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 9, 2022
1 parent e12a7b4 commit 827b20b
Showing 1 changed file with 16 additions and 15 deletions.
31 changes: 16 additions & 15 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,32 +67,24 @@ type ClickHouse struct {
distMetricTbls []string
distSeriesTbls []string

bmSeries map[int64]int64
wrSeries int
numFlying int32
mux sync.Mutex
taskDone *sync.Cond
bmSeries map[int64]int64
wrSeries int
numFlying int32
mux sync.Mutex
taskDone *sync.Cond
nextResetQuota time.Time
}

// NewClickHouse new a clickhouse instance
func NewClickHouse(cfg *config.Config, taskCfg *config.TaskConfig) *ClickHouse {
ck := &ClickHouse{cfg: cfg, taskCfg: taskCfg}
ck.taskDone = sync.NewCond(&ck.mux)
ck.nextResetQuota = time.Now().Add(10 * time.Second)
return ck
}

// Init the clickhouse intance
func (c *ClickHouse) Init() (err error) {
// FIXME: goroutine leak?
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for range ticker.C {
c.mux.Lock()
c.wrSeries = 0
c.mux.Unlock()
}
}()
return c.initSchema()
}

Expand Down Expand Up @@ -137,6 +129,15 @@ func (c *ClickHouse) AllowWriteSeries(sid, mid int64) (allowed bool) {
if c.wrSeries < wrSeriesQuota {
c.wrSeries++
allowed = true
} else {
now := time.Now()
if now.After(c.nextResetQuota) {
c.nextResetQuota = now.Add(10 * time.Second)
c.wrSeries = 1
allowed = true
}
}
if allowed {
statistics.WriteSeriesAllowChanged.WithLabelValues(c.taskCfg.Name).Inc()
} else {
statistics.WriteSeriesDropQuota.WithLabelValues(c.taskCfg.Name).Inc()
Expand Down

0 comments on commit 827b20b

Please sign in to comment.