diff --git a/.gitignore b/.gitignore index 11bc57c6d..4bf5c7d69 100644 --- a/.gitignore +++ b/.gitignore @@ -4,3 +4,19 @@ out/ *.swo *.tar.gz docs/_site +vendor/ + +# binary file +falcon-agent +falcon-aggregator +falcon-alarm +falcon-api +falcon-gateway +falcon-graph +falcon-hbs +falcon-judge +falcon-nodata +falcon-transfer +open-falcon + +git.go diff --git a/README.md b/README.md index 460f80216..00b468334 100644 --- a/README.md +++ b/README.md @@ -59,6 +59,12 @@ mysql -h 127.0.0.1 -u root -p < 4_graph-db-schema.sql mysql -h 127.0.0.1 -u root -p < 5_alarms-db-schema.sql ``` +Optional feature: matched string persistence in MySQL DB + + mysql -h 127.0.0.1 -u root -p < scripts/mysql/db_schema/6_history-db-schema.sql + + + **NOTE: if you are upgrading from v0.1 to current version v0.2.0,then**. [More upgrading instruction](http://www.jianshu.com/p/6fb2c2b4d030) mysql -h 127.0.0.1 -u root -p < 5_alarms-db-schema.sql diff --git a/common/model/eexp.go b/common/model/eexp.go new file mode 100644 index 000000000..0c2b2fa86 --- /dev/null +++ b/common/model/eexp.go @@ -0,0 +1,125 @@ +package model + +import ( + "encoding/json" + "log" + "math" + "strconv" +) + +type Filter struct { + Func string `json:"func"` + Key string `json:"key"` + Ago uint64 `json:"ago"` // for func count + Hits uint64 `json:"hits"` // for func count + Limit uint64 `json:"limit"` // for func all + Operator string `json:"operator"` + RightValue interface{} `json:"rightValue"` +} + +type EExpResponse struct { + EExps []EExp `json:"eexps"` +} + +type EExp struct { + ID int `json:"id"` + Key string `json:"key"` // join(sorted(conditionKeys), ",") + Filters map[string]Filter `json:"filters"` + Priority int `json:"priority"` + MaxStep int `json:"maxStep"` + Note string `json:"note"` +} + +func (c *Filter) String() string { + out, _ := json.Marshal(c) + return string(out) +} + +func (ee *EExp) String() string { + out, _ := json.Marshal(ee) + return string(out) +} + +func opResultFloat64(leftValue float64, operator string, rightValue float64) (isTriggered bool) { + switch operator { + case "=", "==": + isTriggered = math.Abs(leftValue-rightValue) < 0.0001 + case "!=": + isTriggered = math.Abs(leftValue-rightValue) > 0.0001 + case "<": + isTriggered = leftValue < rightValue + case "<=": + isTriggered = leftValue <= rightValue + case ">": + isTriggered = leftValue > rightValue + case ">=": + isTriggered = leftValue >= rightValue + } + return +} + +func opResultString(leftValue string, operator string, rightValue string) (isTriggered bool) { + switch operator { + case "=", "==": + isTriggered = leftValue == rightValue + case "!=": + isTriggered = leftValue != rightValue + } + return +} + +func (ee *EExp) HitFilters(m *map[string]interface{}) bool { + var err error + for k, filter := range ee.Filters { + valueI, ok := (*m)[k] + if !ok { + return false + } + + switch filter.RightValue.(type) { + case float64: + { + + var leftValue float64 + leftValueStr, ok := valueI.(string) + if ok { + leftValue, err = strconv.ParseFloat(leftValueStr, 64) + if err != nil { + log.Println("strconv.ParseFloat failed", err) + return false + } + } else { + leftValue, ok = valueI.(float64) + if !ok { + log.Println("parse in float64 failed") + return false + } + } + + rightValue := filter.RightValue.(float64) + + if !opResultFloat64(leftValue, filter.Operator, rightValue) { + return false + } + + } + case string: + { + leftValue, ok := valueI.(string) + if !ok { + log.Println("parse in string failed") + return false + } + + rightValue := filter.RightValue.(string) + + if !opResultString(leftValue, filter.Operator, rightValue) { + return false + } + + } + } + + } + return true +} diff --git a/common/model/ejudge.go b/common/model/ejudge.go new file mode 100644 index 000000000..ed1a7d7f8 --- /dev/null +++ b/common/model/ejudge.go @@ -0,0 +1,6 @@ +package model + +type EHistoryData struct { + Timestamp int64 `json:"timestamp"` + Filters map[string]interface{} `json:"filters"` +} diff --git a/common/model/emetric.go b/common/model/emetric.go new file mode 100644 index 000000000..67a7d06a6 --- /dev/null +++ b/common/model/emetric.go @@ -0,0 +1,18 @@ +package model + +type EMetric struct { + Endpoint string `json:"endpoint"` + Key string `json:"key"` + Filters map[string]interface{} `json:"values"` + Timestamp int64 `json:"timestamp"` +} + +func NewEMetric() *EMetric { + e := EMetric{} + e.Filters = map[string]interface{}{} + return &e +} + +func (e *EMetric) PK() string { + return e.Key +} diff --git a/common/model/event.go b/common/model/event.go index 05a533b6c..03fab4da0 100644 --- a/common/model/event.go +++ b/common/model/event.go @@ -15,6 +15,7 @@ package model import ( + "encoding/json" "fmt" "github.com/open-falcon/falcon-plus/common/utils" @@ -25,9 +26,10 @@ type Event struct { Id string `json:"id"` Strategy *Strategy `json:"strategy"` Expression *Expression `json:"expression"` + EExp *EExp `json:"eexp"` Status string `json:"status"` // OK or PROBLEM Endpoint string `json:"endpoint"` - LeftValue float64 `json:"leftValue"` + LeftValue interface{} `json:"leftValue"` CurrentStep int `json:"currentStep"` EventTime int64 `json:"eventTime"` PushedTags map[string]string `json:"pushedTags"` @@ -38,13 +40,23 @@ func (this *Event) FormattedTime() string { } func (this *Event) String() string { + var leftValue interface{} + switch this.LeftValue.(type) { + case float64: + leftValue = utils.ReadableFloat(this.LeftValue.(float64)) + default: + leftValue = this.LeftValue + + } + return fmt.Sprintf( - "", + "", this.Endpoint, this.Status, this.Strategy, this.Expression, - utils.ReadableFloat(this.LeftValue), + this.EExp, + leftValue, this.CurrentStep, this.PushedTags, this.FormattedTime(), @@ -59,6 +71,13 @@ func (this *Event) ExpressionId() int { return 0 } +func (this *Event) EExpID() int { + if this.Expression != nil { + return this.EExp.ID + } + return 0 +} + func (this *Event) StrategyId() int { if this.Strategy != nil { return this.Strategy.Id @@ -88,56 +107,107 @@ func (this *Event) ActionId() int { return this.Expression.ActionId } - return this.Strategy.Tpl.ActionId + if this.Strategy != nil { + return this.Strategy.Tpl.ActionId + } + + return -1 + } func (this *Event) Priority() int { if this.Strategy != nil { return this.Strategy.Priority } - return this.Expression.Priority + if this.Expression != nil { + return this.Expression.Priority + } + + if this.EExp != nil { + return this.EExp.Priority + } + return -1 } func (this *Event) Note() string { if this.Strategy != nil { return this.Strategy.Note } - return this.Expression.Note + + if this.Expression != nil { + return this.Expression.Note + + } + if this.EExp != nil { + return this.EExp.Note + } + return "" } func (this *Event) Metric() string { if this.Strategy != nil { return this.Strategy.Metric } - return this.Expression.Metric + if this.Expression != nil { + return this.Expression.Metric + } + if this.EExp != nil { + return this.EExp.Key + } + return "" } func (this *Event) RightValue() float64 { if this.Strategy != nil { return this.Strategy.RightValue } - return this.Expression.RightValue + + if this.Expression != nil { + return this.Expression.RightValue + } + + return 0.0 } func (this *Event) Operator() string { if this.Strategy != nil { return this.Strategy.Operator } - return this.Expression.Operator + + if this.Expression != nil { + return this.Expression.Operator + } + return "" } func (this *Event) Func() string { if this.Strategy != nil { return this.Strategy.Func } - return this.Expression.Func + + if this.Expression != nil { + return this.Expression.Func + } + + if this.EExp != nil { + out, _ := json.Marshal(this.EExp.Filters) + return string(out) + } + return "" } func (this *Event) MaxStep() int { if this.Strategy != nil { return this.Strategy.MaxStep } - return this.Expression.MaxStep + + if this.Expression != nil { + return this.Expression.MaxStep + } + if this.EExp != nil { + return this.EExp.MaxStep + } + return 1 } func (this *Event) Counter() string { diff --git a/common/model/judge.go b/common/model/judge.go index aa78d3ee3..0b0c8d114 100644 --- a/common/model/judge.go +++ b/common/model/judge.go @@ -24,16 +24,18 @@ type JudgeItem struct { Endpoint string `json:"endpoint"` Metric string `json:"metric"` Value float64 `json:"value"` + ValueRaw string `json:"valueRaw"` Timestamp int64 `json:"timestamp"` JudgeType string `json:"judgeType"` Tags map[string]string `json:"tags"` } func (this *JudgeItem) String() string { - return fmt.Sprintf("", + return fmt.Sprintf("", this.Endpoint, this.Metric, this.Value, + this.ValueRaw, this.Timestamp, this.JudgeType, this.Tags) @@ -46,4 +48,5 @@ func (this *JudgeItem) PrimaryKey() string { type HistoryData struct { Timestamp int64 `json:"timestamp"` Value float64 `json:"value"` + ValueRaw string `json:"valueRaw"` } diff --git a/common/model/metric.go b/common/model/metric.go index 7cbf5547a..81c11237d 100644 --- a/common/model/metric.go +++ b/common/model/metric.go @@ -65,13 +65,14 @@ type MetaData struct { Timestamp int64 `json:"timestamp"` Step int64 `json:"step"` Value float64 `json:"value"` + ValueRaw string `json:"valueRaw"` CounterType string `json:"counterType"` Tags map[string]string `json:"tags"` } func (t *MetaData) String() string { - return fmt.Sprintf("", - t.Endpoint, t.Metric, t.Timestamp, t.Step, t.Value, t.Tags) + return fmt.Sprintf("", + t.Endpoint, t.Metric, t.Timestamp, t.Step, t.ValueRaw, t.Tags) } func (t *MetaData) PK() string { diff --git a/config/agent.json b/config/agent.json index b1b02dc2d..b99763a10 100644 --- a/config/agent.json +++ b/config/agent.json @@ -10,7 +10,9 @@ }, "heartbeat": { "enabled": true, - "addr": "%%HBS_RPC%%", + "addrs": [ + "%%HBS_RPC%%" + ], "interval": 60, "timeout": 1000 }, diff --git a/modules/agent/cfg.example.json b/modules/agent/cfg.example.json index 113b3d231..1af565a87 100644 --- a/modules/agent/cfg.example.json +++ b/modules/agent/cfg.example.json @@ -10,14 +10,15 @@ }, "heartbeat": { "enabled": true, - "addr": "127.0.0.1:6030", + "addrs": [ + "127.0.0.1:6030" + ], "interval": 60, "timeout": 1000 }, "transfer": { "enabled": true, "addrs": [ - "127.0.0.1:8433", "127.0.0.1:8433" ], "interval": 60, diff --git a/modules/agent/cron/builtin.go b/modules/agent/cron/builtin.go index 23b882edf..1ddae24e2 100644 --- a/modules/agent/cron/builtin.go +++ b/modules/agent/cron/builtin.go @@ -15,16 +15,17 @@ package cron import ( - "github.com/open-falcon/falcon-plus/common/model" - "github.com/open-falcon/falcon-plus/modules/agent/g" "log" "strconv" "strings" "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/agent/g" ) func SyncBuiltinMetrics() { - if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { + if g.Config().Heartbeat.Enabled && len(g.Config().Heartbeat.Addrs) > 0 { go syncBuiltinMetrics() } } diff --git a/modules/agent/cron/ips.go b/modules/agent/cron/ips.go index c76fcdfa3..e5f8fe283 100644 --- a/modules/agent/cron/ips.go +++ b/modules/agent/cron/ips.go @@ -15,14 +15,15 @@ package cron import ( - "github.com/open-falcon/falcon-plus/common/model" - "github.com/open-falcon/falcon-plus/modules/agent/g" "log" "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/agent/g" ) func SyncTrustableIps() { - if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { + if g.Config().Heartbeat.Enabled && len(g.Config().Heartbeat.Addrs) != 0 { go syncTrustableIps() } } diff --git a/modules/agent/cron/plugin.go b/modules/agent/cron/plugin.go index 973e84821..24ccc466c 100644 --- a/modules/agent/cron/plugin.go +++ b/modules/agent/cron/plugin.go @@ -15,12 +15,13 @@ package cron import ( - "github.com/open-falcon/falcon-plus/common/model" - "github.com/open-falcon/falcon-plus/modules/agent/g" - "github.com/open-falcon/falcon-plus/modules/agent/plugins" "log" "strings" "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/agent/g" + "github.com/open-falcon/falcon-plus/modules/agent/plugins" ) func SyncMinePlugins() { @@ -32,7 +33,7 @@ func SyncMinePlugins() { return } - if g.Config().Heartbeat.Addr == "" { + if len(g.Config().Heartbeat.Addrs) == 0 { return } diff --git a/modules/agent/cron/reporter.go b/modules/agent/cron/reporter.go index 3593dad6f..b2754e2e0 100644 --- a/modules/agent/cron/reporter.go +++ b/modules/agent/cron/reporter.go @@ -16,14 +16,15 @@ package cron import ( "fmt" - "github.com/open-falcon/falcon-plus/common/model" - "github.com/open-falcon/falcon-plus/modules/agent/g" "log" "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/agent/g" ) func ReportAgentStatus() { - if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { + if g.Config().Heartbeat.Enabled && len(g.Config().Heartbeat.Addrs) != 0 { go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second) } } diff --git a/modules/agent/g/cfg.go b/modules/agent/g/cfg.go index 1b0823ddb..e2429411d 100644 --- a/modules/agent/g/cfg.go +++ b/modules/agent/g/cfg.go @@ -31,10 +31,10 @@ type PluginConfig struct { } type HeartbeatConfig struct { - Enabled bool `json:"enabled"` - Addr string `json:"addr"` - Interval int `json:"interval"` - Timeout int `json:"timeout"` + Enabled bool `json:"enabled"` + Addrs []string `json:"addrs"` + Interval int `json:"interval"` + Timeout int `json:"timeout"` } type TransferConfig struct { diff --git a/modules/agent/g/rpc.go b/modules/agent/g/rpc.go index ede4d006e..eb9291e91 100644 --- a/modules/agent/g/rpc.go +++ b/modules/agent/g/rpc.go @@ -26,9 +26,9 @@ import ( type SingleConnRpcClient struct { sync.Mutex - rpcClient *rpc.Client - RpcServer string - Timeout time.Duration + rpcClient *rpc.Client + RpcServers []string + Timeout time.Duration } func (this *SingleConnRpcClient) close() { @@ -44,25 +44,29 @@ func (this *SingleConnRpcClient) serverConn() error { } var err error - var retry int = 1 + var retry int - for { - if this.rpcClient != nil { - return nil - } + for _, addr := range this.RpcServers { + retry = 1 - this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout) + RETRY: + this.rpcClient, err = net.JsonRpcClient("tcp", addr, this.Timeout) if err != nil { - log.Printf("dial %s fail: %v", this.RpcServer, err) + log.Println("net.JsonRpcClient failed", err) if retry > 3 { - return err + continue } + time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second) retry++ - continue + goto RETRY } - return err + log.Println("connected RPC server", addr) + + return nil } + + return errors.New("connect to RPC servers failed") } func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error { @@ -85,9 +89,9 @@ func (this *SingleConnRpcClient) Call(method string, args interface{}, reply int select { case <-time.After(timeout): - log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer) + log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServers) this.close() - return errors.New(this.RpcServer + " rpc call timeout") + return errors.New("rpc call timeout") case err := <-done: if err != nil { this.close() diff --git a/modules/agent/g/transfer.go b/modules/agent/g/transfer.go index 38500af8a..a8f96ca95 100644 --- a/modules/agent/g/transfer.go +++ b/modules/agent/g/transfer.go @@ -45,9 +45,12 @@ func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) { } func initTransferClient(addr string) *SingleConnRpcClient { + addrs := []string{ + addr, + } var c *SingleConnRpcClient = &SingleConnRpcClient{ - RpcServer: addr, - Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond, + RpcServers: addrs, + Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond, } TransferClientsLock.Lock() defer TransferClientsLock.Unlock() diff --git a/modules/agent/g/var.go b/modules/agent/g/var.go index bb9f68b35..2eb81f4ff 100644 --- a/modules/agent/g/var.go +++ b/modules/agent/g/var.go @@ -16,14 +16,16 @@ package g import ( "bytes" - "github.com/open-falcon/falcon-plus/common/model" - "github.com/toolkits/slice" + "fmt" "log" "net" "os" "strings" "sync" "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/toolkits/slice" ) var Root string @@ -40,12 +42,15 @@ var LocalIp string func InitLocalIp() { if Config().Heartbeat.Enabled { - conn, err := net.DialTimeout("tcp", Config().Heartbeat.Addr, time.Second*10) - if err != nil { - log.Println("get local addr failed !") - } else { - LocalIp = strings.Split(conn.LocalAddr().String(), ":")[0] - conn.Close() + for _, addr := range Config().Heartbeat.Addrs { + conn, err := net.DialTimeout("tcp", addr, time.Second*10) + if err != nil { + log.Println(fmt.Sprintf("connect to heartbeat server %s failed", addr)) + } else { + defer conn.Close() + LocalIp = strings.Split(conn.LocalAddr().String(), ":")[0] + break + } } } else { log.Println("hearbeat is not enabled, can't get localip") @@ -57,12 +62,13 @@ var ( ) func InitRpcClients() { - if Config().Heartbeat.Enabled { + if Config().Heartbeat.Enabled && len(Config().Heartbeat.Addrs) != 0 { HbsClient = &SingleConnRpcClient{ - RpcServer: Config().Heartbeat.Addr, - Timeout: time.Duration(Config().Heartbeat.Timeout) * time.Millisecond, + RpcServers: Config().Heartbeat.Addrs, + Timeout: time.Duration(Config().Heartbeat.Timeout) * time.Millisecond, } } + } func SendToTransfer(metrics []*model.MetricValue) { diff --git a/modules/agent/scripts/example.push.strmatch.py b/modules/agent/scripts/example.push.strmatch.py new file mode 100644 index 000000000..e7897dedd --- /dev/null +++ b/modules/agent/scripts/example.push.strmatch.py @@ -0,0 +1,45 @@ +#coding:utf8 +import sys +import random +import time +import json +import socket + +import requests + + +args= sys.argv[1:] +if args: + host = args[0] +else: + host = "127.0.0.1:1988" + +errors = ( + "warn out of memeory", + "Error 1086", + ) + +ts = int(time.time()) +payload = [ + { + "endpoint": socket.gethostname(), + "metric": "str.match", + "timestamp": ts, + "step": 30, + "value": random.choice(errors), + "counterType": "STRMATCH", + "tags": "project=test,platform=linux", + }, + ] + +url = "http://{host}/v1/push".format(host=host) + +r = requests.post(url=url, data=json.dumps(payload)) +resp_code = r.status_code +body = r.content + +print "host", host +print "payload", payload +print "resp_code", resp_code +print "body", body + diff --git a/modules/aggregator/cfg.example.json b/modules/aggregator/cfg.example.json index 13553fbf6..946a5e9af 100644 --- a/modules/aggregator/cfg.example.json +++ b/modules/aggregator/cfg.example.json @@ -5,7 +5,7 @@ "listen": "0.0.0.0:6055" }, "database": { - "addr": "root:@tcp(127.0.0.1:3306)/falcon_portal?loc=Local&parseTime=true", + "addr": "falcon:falcon@tcp(127.0.0.1:3306)/falcon_portal?loc=Local&parseTime=true", "idle": 10, "ids": [1, -1], "interval": 55 diff --git a/modules/alarm/README.md b/modules/alarm/README.md index f53579d11..353857c06 100644 --- a/modules/alarm/README.md +++ b/modules/alarm/README.md @@ -25,3 +25,12 @@ go get ./... - api: 其他各个组件的地址, 注意plus_api_token要和falcon-plus api组件配置文件中的default_token一致 - api im: 增加针对im的支持,如果采用wechat企业号,配置可参考 https://github.com/yanjunhui/chat +## Upgrade + +Support Multiple-Metrics Extend Expression version + +Change `event_cases.metric` column from 200 to 1024 in MySQL table schema: + + use alarms; + alter table event_cases change column metric metric VARCHAR(1024) NOT NULL; + diff --git a/modules/alarm/cfg.example.json b/modules/alarm/cfg.example.json index eb62d25c2..2fd5eccd3 100644 --- a/modules/alarm/cfg.example.json +++ b/modules/alarm/cfg.example.json @@ -6,6 +6,7 @@ }, "redis": { "addr": "127.0.0.1:6379", + "password": "", "maxIdle": 5, "highQueues": [ "event:p0", @@ -31,7 +32,7 @@ "plus_api_token": "default-token-used-in-server-side" }, "falcon_portal": { - "addr": "root:@tcp(127.0.0.1:3306)/alarms?charset=utf8&loc=Asia%2FChongqing", + "addr": "falcon:falcon@tcp(127.0.0.1:3306)/falcon?charset=utf8&loc=Asia%2FChongqing", "idle": 10, "max": 100 }, diff --git a/modules/alarm/cron/builder.go b/modules/alarm/cron/builder.go index 2f623f166..3e67ea1b4 100644 --- a/modules/alarm/cron/builder.go +++ b/modules/alarm/cron/builder.go @@ -23,24 +23,69 @@ import ( ) func BuildCommonSMSContent(event *model.Event) string { + var leftValue string + var rightValue string + pushedTags := map[string]string{} + + for k, v := range event.PushedTags { + pushedTags[k] = fmt.Sprintf("%v", v) + } + + switch event.LeftValue.(type) { + case float64: + { + leftValue = utils.ReadableFloat(event.LeftValue.(float64)) + rightValue = utils.ReadableFloat(event.RightValue()) + } + case string: + { + leftValue = event.LeftValue.(string) + rightValue = fmt.Sprintf("%v", event.RightValue()) + + } + + } return fmt.Sprintf( - "[P%d][%s][%s][][%s %s %s %s %s%s%s][O%d %s]", + "[P%d][%s][%s][][%s %s %s %s %v%s%v][O%d %s]", event.Priority(), event.Status, event.Endpoint, event.Note(), event.Func(), event.Metric(), - utils.SortedTags(event.PushedTags), - utils.ReadableFloat(event.LeftValue), + pushedTags, + leftValue, event.Operator(), - utils.ReadableFloat(event.RightValue()), + rightValue, event.CurrentStep, event.FormattedTime(), ) } func BuildCommonIMContent(event *model.Event) string { + var leftValue string + var rightValue string + pushedTags := map[string]string{} + + for k, v := range event.PushedTags { + pushedTags[k] = fmt.Sprintf("%v", v) + } + + switch event.LeftValue.(type) { + case float64: + { + leftValue = utils.ReadableFloat(event.LeftValue.(float64)) + rightValue = utils.ReadableFloat(event.RightValue()) + } + case string: + { + leftValue = event.LeftValue.(string) + rightValue = fmt.Sprintf("%v", event.RightValue()) + + } + + } + return fmt.Sprintf( "[P%d][%s][%s][][%s %s %s %s %s%s%s][O%d %s]", event.Priority(), @@ -49,16 +94,39 @@ func BuildCommonIMContent(event *model.Event) string { event.Note(), event.Func(), event.Metric(), - utils.SortedTags(event.PushedTags), - utils.ReadableFloat(event.LeftValue), + pushedTags, + leftValue, event.Operator(), - utils.ReadableFloat(event.RightValue()), + rightValue, event.CurrentStep, event.FormattedTime(), ) } func BuildCommonMailContent(event *model.Event) string { + var leftValue string + var rightValue string + pushedTags := map[string]string{} + + for k, v := range event.PushedTags { + pushedTags[k] = fmt.Sprintf("%v", v) + } + + switch event.LeftValue.(type) { + case float64: + { + leftValue = utils.ReadableFloat(event.LeftValue.(float64)) + rightValue = utils.ReadableFloat(event.RightValue()) + } + case string: + { + leftValue = event.LeftValue.(string) + rightValue = fmt.Sprintf("%v", event.RightValue()) + + } + + } + link := g.Link(event) return fmt.Sprintf( "%s\r\nP%d\r\nEndpoint:%s\r\nMetric:%s\r\nTags:%s\r\n%s: %s%s%s\r\nNote:%s\r\nMax:%d, Current:%d\r\nTimestamp:%s\r\n%s\r\n", @@ -66,11 +134,11 @@ func BuildCommonMailContent(event *model.Event) string { event.Priority(), event.Endpoint, event.Metric(), - utils.SortedTags(event.PushedTags), + pushedTags, event.Func(), - utils.ReadableFloat(event.LeftValue), + leftValue, event.Operator(), - utils.ReadableFloat(event.RightValue()), + rightValue, event.Note(), event.MaxStep(), event.CurrentStep, diff --git a/modules/alarm/cron/callback.go b/modules/alarm/cron/callback.go index 759a45b44..eb48425d1 100644 --- a/modules/alarm/cron/callback.go +++ b/modules/alarm/cron/callback.go @@ -93,7 +93,21 @@ func Callback(event *model.Event, action *api.Action) string { req.Param("tpl_id", fmt.Sprintf("%d", event.TplId())) req.Param("exp_id", fmt.Sprintf("%d", event.ExpressionId())) req.Param("stra_id", fmt.Sprintf("%d", event.StrategyId())) - req.Param("left_value", utils.ReadableFloat(event.LeftValue)) + + var leftValue string + switch event.LeftValue.(type) { + case float64: + { + leftValue = utils.ReadableFloat(event.LeftValue.(float64)) + } + case string: + { + leftValue = event.LeftValue.(string) + + } + } + + req.Param("left_value", leftValue) req.Param("tags", tags) resp, e := req.String() diff --git a/modules/alarm/g/cfg.go b/modules/alarm/g/cfg.go index 789525cb2..9d3db41c1 100644 --- a/modules/alarm/g/cfg.go +++ b/modules/alarm/g/cfg.go @@ -29,6 +29,7 @@ type HttpConfig struct { type RedisConfig struct { Addr string `json:"addr"` + Password string `json:"password"` MaxIdle int `json:"maxIdle"` HighQueues []string `json:"highQueues"` LowQueues []string `json:"lowQueues"` diff --git a/modules/alarm/g/redis.go b/modules/alarm/g/redis.go index 11bfe5cff..67edb2654 100644 --- a/modules/alarm/g/redis.go +++ b/modules/alarm/g/redis.go @@ -25,6 +25,8 @@ var RedisConnPool *redis.Pool func InitRedisConnPool() { redisConfig := Config().Redis + password := redisConfig.Password + RedisConnPool = &redis.Pool{ MaxIdle: redisConfig.MaxIdle, IdleTimeout: 240 * time.Second, @@ -33,6 +35,13 @@ func InitRedisConnPool() { if err != nil { return nil, err } + if password != "" { + if _, err := c.Do("AUTH", password); err != nil { + c.Close() + return nil, err + } + + } return c, err }, TestOnBorrow: PingRedis, diff --git a/modules/api/cfg.example.json b/modules/api/cfg.example.json index a68edaedf..2511728fc 100644 --- a/modules/api/cfg.example.json +++ b/modules/api/cfg.example.json @@ -1,11 +1,11 @@ { "log_level": "debug", "db": { - "falcon_portal": "root:@tcp(127.0.0.1:3306)/falcon_portal?charset=utf8&parseTime=True&loc=Local", - "graph": "root:@tcp(127.0.0.1:3306)/graph?charset=utf8&parseTime=True&loc=Local", - "uic": "root:@tcp(127.0.0.1:3306)/uic?charset=utf8&parseTime=True&loc=Local", - "dashboard": "root:@tcp(127.0.0.1:3306)/dashboard?charset=utf8&parseTime=True&loc=Local", - "alarms": "root:@tcp(127.0.0.1:3306)/alarms?charset=utf8&parseTime=True&loc=Local", + "falcon_portal": "falcon:falcon@tcp(127.0.0.1:3306)/falcon_portal?charset=utf8&parseTime=True&loc=Local", + "graph": "falcon:falcon@tcp(127.0.0.1:3306)/graph?charset=utf8&parseTime=True&loc=Local", + "uic": "falcon:falcon@tcp(127.0.0.1:3306)/uic?charset=utf8&parseTime=True&loc=Local", + "dashboard": "falcon:falcon@tcp(127.0.0.1:3306)/dashboard?charset=utf8&parseTime=True&loc=Local", + "alarms": "falcon:falcon@tcp(127.0.0.1:3306)/alarms?charset=utf8&parseTime=True&loc=Local", "db_bug": true }, "graphs": { diff --git a/modules/api/data/metric b/modules/api/data/metric index 0cbe647db..b3d7306d2 100644 --- a/modules/api/data/metric +++ b/modules/api/data/metric @@ -74,3 +74,4 @@ net.if.total.errors net.if.total.packets net.port.listen proc.num +str.match diff --git a/modules/gateway/g/g.go b/modules/gateway/g/g.go index 96f7a94d1..9e7306bd1 100644 --- a/modules/gateway/g/g.go +++ b/modules/gateway/g/g.go @@ -36,6 +36,7 @@ const ( GAUGE = "GAUGE" COUNTER = "COUNTER" DERIVE = "DERIVE" + STRMATCH = "STRMATCH" DEFAULT_STEP = 60 MIN_STEP = 30 ) diff --git a/modules/gateway/receiver/rpc/rpc_transfer.go b/modules/gateway/receiver/rpc/rpc_transfer.go index 7a3f049ed..3c09b425a 100644 --- a/modules/gateway/receiver/rpc/rpc_transfer.go +++ b/modules/gateway/receiver/rpc/rpc_transfer.go @@ -60,7 +60,7 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse continue } - if v.Type != g.COUNTER && v.Type != g.GAUGE && v.Type != g.DERIVE { + if v.Type != g.COUNTER && v.Type != g.GAUGE && v.Type != g.DERIVE && v.Type != g.STRMATCH { reply.Invalid += 1 continue } @@ -86,7 +86,6 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse continue } - // TODO 呵呵,这里需要再优雅一点 now := start.Unix() if v.Timestamp <= 0 || v.Timestamp > now*2 { v.Timestamp = now @@ -105,18 +104,28 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse var vv float64 var err error - switch cv := v.Value.(type) { - case string: - vv, err = strconv.ParseFloat(cv, 64) - if err != nil { + if v.Type != g.STRMATCH { + switch cv := v.Value.(type) { + case string: + vv, err = strconv.ParseFloat(cv, 64) + if err != nil { + valid = false + } + case float64: + vv = cv + case int64: + vv = float64(cv) + default: + valid = false + } + } else { + switch v.Value.(type) { + case string: + fv.ValueRaw = v.Value.(string) + vv = float64(1.0) + default: valid = false } - case float64: - vv = cv - case int64: - vv = float64(cv) - default: - valid = false } if !valid { diff --git a/modules/graph/cfg.example.json b/modules/graph/cfg.example.json index 8333a1bd5..6703c392f 100644 --- a/modules/graph/cfg.example.json +++ b/modules/graph/cfg.example.json @@ -12,7 +12,7 @@ "storage": "./data/6070" }, "db": { - "dsn": "root:@tcp(127.0.0.1:3306)/graph?loc=Local&parseTime=true", + "dsn": "falcon:falcon@tcp(127.0.0.1:3306)/graph?loc=Local&parseTime=true", "maxIdle": 4 }, "callTimeout": 5000, diff --git a/modules/hbs/cache/agents.go b/modules/hbs/cache/agents.go index 25d10bbf1..967212ee2 100644 --- a/modules/hbs/cache/agents.go +++ b/modules/hbs/cache/agents.go @@ -19,10 +19,11 @@ package cache // 提供http接口查询机器信息,排查重名机器的时候比较有用 import ( - "github.com/open-falcon/falcon-plus/common/model" - "github.com/open-falcon/falcon-plus/modules/hbs/db" "sync" "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/hbs/db" ) type SafeAgents struct { diff --git a/modules/hbs/cache/cache.go b/modules/hbs/cache/cache.go index 37b76773e..0ebf90a06 100644 --- a/modules/hbs/cache/cache.go +++ b/modules/hbs/cache/cache.go @@ -49,6 +49,9 @@ func Init() { log.Println("#9 MonitoredHosts...") MonitoredHosts.Init() + log.Println("#10 EExpCache...") + EExpCache.Init() + log.Println("cache done") go LoopInit() @@ -67,5 +70,6 @@ func LoopInit() { HostTemplateIds.Init() ExpressionCache.Init() MonitoredHosts.Init() + EExpCache.Init() } } diff --git a/modules/hbs/cache/eexps.go b/modules/hbs/cache/eexps.go new file mode 100644 index 000000000..0cfa01c7a --- /dev/null +++ b/modules/hbs/cache/eexps.go @@ -0,0 +1,32 @@ +package cache + +import ( + "sync" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/hbs/db" +) + +type SafeEExpCache struct { + sync.RWMutex + L []model.EExp +} + +var EExpCache = &SafeEExpCache{} + +func (this *SafeEExpCache) Get() []model.EExp { + this.RLock() + defer this.RUnlock() + return this.L +} + +func (this *SafeEExpCache) Init() { + es, err := db.QueryEExps() + if err != nil { + return + } + + this.Lock() + defer this.Unlock() + this.L = es +} diff --git a/modules/hbs/cfg.example.json b/modules/hbs/cfg.example.json index 2325d6c84..bbc07746a 100644 --- a/modules/hbs/cfg.example.json +++ b/modules/hbs/cfg.example.json @@ -1,6 +1,6 @@ { "debug": true, - "database": "root:@tcp(127.0.0.1:3306)/falcon_portal?loc=Local&parseTime=true", + "database": "falcon:falcon@tcp(127.0.0.1:3306)/falcon?loc=Local&parseTime=true", "hosts": "", "maxConns": 20, "maxIdle": 100, diff --git a/modules/hbs/db/eexp.go b/modules/hbs/db/eexp.go new file mode 100644 index 000000000..035de5c96 --- /dev/null +++ b/modules/hbs/db/eexp.go @@ -0,0 +1,220 @@ +package db + +import ( + "bytes" + "errors" + "fmt" + "log" + "sort" + "strconv" + "strings" + + "github.com/open-falcon/falcon-plus/common/model" +) + +func QueryEExps() (ret []model.EExp, err error) { + sql := "select id, exp, priority, max_step, note from eexp where pause=0" + rows, err := DB.Query(sql) + if err != nil { + log.Println("DB.Query failed", err) + return ret, err + } + + defer rows.Close() + for rows.Next() { + var ID int + var exp string + var priority int + var max_step int + var note string + + err = rows.Scan( + &ID, + &exp, + &priority, + &max_step, + ¬e, + ) + + if err != nil { + log.Println("parse result failed", err) + continue + } + + ee, err := parseEExp(exp) + if err != nil { + log.Println("parseEExp failed", err) + continue + } + ee.ID = ID + ee.Priority = priority + ee.MaxStep = max_step + ee.Note = note + + ret = append(ret, *ee) + } + + return ret, nil +} + +func parseFilter(s string) (*model.Filter, error) { + filter := model.Filter{} + var err error + + idxLeft := strings.Index(s, "(") + idxRight := strings.Index(s, ")") + if idxLeft == -1 || idxRight == -1 { + err = errors.New("parse branket failed") + return nil, err + } + + filter.Func = strings.TrimSpace(s[:idxLeft]) + p := s[idxLeft+1 : idxRight] + parts := strings.Split(p, ",") + + if filter.Func == "all" { + if len(parts) != 2 { + errmsg := fmt.Sprintf("func all parameter -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + + filter.Key = strings.TrimSpace(parts[0]) + buf := strings.TrimSpace(parts[1]) + + splits := strings.Split(buf, "#") + if len(splits) != 2 { + errmsg := fmt.Sprintf("func all parameter -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + + filter.Limit, err = strconv.ParseUint(splits[1], 10, 64) + if err != nil { + errmsg := fmt.Sprintf("func all parameter -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + + } else if filter.Func == "count" { + if len(parts) != 3 { + errmsg := fmt.Sprintf("func count parameter -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + + filter.Key = strings.TrimSpace(parts[0]) + if filter.Key == "" { + errmsg := fmt.Sprintf("func ago key -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + + filter.Ago, err = strconv.ParseUint(strings.TrimSpace(parts[1]), 10, 64) + if err != nil { + errmsg := fmt.Sprintf("func ago parameter ago -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + + filter.Hits, err = strconv.ParseUint(strings.TrimSpace(parts[2]), 10, 64) + if err != nil { + errmsg := fmt.Sprintf("func ago parameter hits -%s- is invalid", p) + err = errors.New(errmsg) + return nil, err + } + } else { + err = errors.New(fmt.Sprintf("func -%s- not support", filter.Func)) + return nil, err + } + + remain := strings.TrimSpace(s[idxRight+1:]) + + var buffer bytes.Buffer + var chr rune + for _, chr = range remain { + switch chr { + case '=': + { + buffer.WriteRune(chr) + } + case '>': + { + + buffer.WriteRune(chr) + } + + case '<': + { + + buffer.WriteRune(chr) + } + case ' ': + { + + } + default: + { + break + } + } + } + + filter.Operator = buffer.String() + + remain = remain[strings.Index(remain, filter.Operator)+len(filter.Operator):] + + valueS := strings.TrimSpace(remain) + if valueS == "" { + err = errors.New("exp is invalid, value is empty") + return nil, err + } + + if valueS[0] == '"' { + filter.RightValue = strings.Trim(valueS, `"`) + } else { + + filter.RightValue, err = strconv.ParseFloat(valueS, 64) + if err != nil { + return nil, err + } + + } + + return &filter, nil +} + +func parseEExp(s string) (*model.EExp, error) { + var err error + ee := model.EExp{} + ee.Filters = map[string]model.Filter{} + + s = strings.Trim(s, ";") + s = strings.TrimSpace(s) + if s == "" { + err = errors.New("eexp is empty") + return nil, err + } + + keys := []string{} + for _, filterS := range strings.Split(s, ";") { + filterS = strings.TrimSpace(filterS) + filter, err := parseFilter(filterS) + if err != nil { + log.Println("parseFilter failed", err) + } else { + keys = append(keys, filter.Key) + ee.Filters[filter.Key] = *filter + } + } + + if len(keys) == 0 { + err = errors.New("filters are invalid") + return nil, err + } + + sort.Sort(sort.StringSlice(keys)) + ee.Key = strings.Join(keys, ",") + + return &ee, nil +} diff --git a/modules/transfer/g/git.go b/modules/hbs/rpc/judge.go similarity index 69% rename from modules/transfer/g/git.go rename to modules/hbs/rpc/judge.go index 54b332e88..ffce98b77 100644 --- a/modules/transfer/g/git.go +++ b/modules/hbs/rpc/judge.go @@ -12,8 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -package g +package rpc -const ( - COMMIT = "e249d8a" +import ( + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/hbs/cache" ) + +func (j *Judge) GetEExps(req model.NullRpcRequest, reply *model.EExpResponse) error { + reply.EExps = cache.EExpCache.Get() + return nil +} diff --git a/modules/hbs/rpc/rpc.go b/modules/hbs/rpc/rpc.go index f99500e38..71b55f3e5 100644 --- a/modules/hbs/rpc/rpc.go +++ b/modules/hbs/rpc/rpc.go @@ -26,6 +26,7 @@ import ( type Hbs int type Agent int +type Judge int func Start() { addr := g.Config().Listen @@ -34,6 +35,7 @@ func Start() { // server.Register(new(filter.Filter)) server.Register(new(Agent)) server.Register(new(Hbs)) + server.Register(new(Judge)) l, e := net.Listen("tcp", addr) if e != nil { diff --git a/modules/judge/cfg.example.json b/modules/judge/cfg.example.json index 598a25db9..e730bf254 100644 --- a/modules/judge/cfg.example.json +++ b/modules/judge/cfg.example.json @@ -1,30 +1,46 @@ { - "debug": true, - "debugHost": "nil", - "remain": 11, - "http": { - "enabled": true, - "listen": "0.0.0.0:6081" - }, - "rpc": { - "enabled": true, - "listen": "0.0.0.0:6080" - }, - "hbs": { - "servers": ["127.0.0.1:6030"], - "timeout": 300, - "interval": 60 - }, "alarm": { "enabled": true, "minInterval": 300, "queuePattern": "event:p%v", "redis": { + "password": "", + "connTimeout": 5000, "dsn": "127.0.0.1:6379", "maxIdle": 5, - "connTimeout": 5000, "readTimeout": 5000, "writeTimeout": 5000 } + }, + "debug": true, + "debugHost": "nil", + "hbs": { + "interval": 60, + "servers": [ + "127.0.0.1:6030" + ], + "timeout": 300 + }, + "http": { + "enabled": true, + "listen": "0.0.0.0:6081" + }, + "remain": 11, + "listMaxLen": 10240, + "rpc": { + "enabled": true, + "listen": "0.0.0.0:6080" + }, + "stringMatcher": { + "batch": 200, + "callTimeout": 5000, + "connTimeout": 1000, + "dsn": { + "history": "falcon:falcon@tcp(127.0.0.1:3306)/falcon?loc=Local&parseTime=true" + }, + "enabled": false, + "maxConns": 32, + "maxIdle": 32, + "retry": 3 } } diff --git a/modules/judge/cron/eexp.go b/modules/judge/cron/eexp.go new file mode 100644 index 000000000..2197d3c51 --- /dev/null +++ b/modules/judge/cron/eexp.go @@ -0,0 +1,56 @@ +package cron + +import ( + "log" + "time" + + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/judge/g" +) + +func SyncEExps() { + duration := time.Duration(g.Config().Hbs.Interval) * time.Second + for { + syncEExps() + syncEFilter() + time.Sleep(duration) + } +} + +func syncEExps() { + var resp model.EExpResponse + err := g.HbsClient.Call("Judge.GetEExps", model.NullRpcRequest{}, &resp) + if err != nil { + log.Println("[ERROR] Judge.GetEExps:", err) + return + } + + rebuildEExpMap(&resp) +} + +func rebuildEExpMap(resp *model.EExpResponse) { + m := make(map[string][]model.EExp) + for _, exp := range resp.EExps { + if _, exists := m[exp.Key]; exists { + m[exp.Key] = append(m[exp.Key], exp) + } else { + m[exp.Key] = []model.EExp{exp} + } + } + + g.EExpMap.ReInit(m) +} + +func syncEFilter() { + m := make(map[string]string) + + //M map[string][]*model.EExp + eeMap := g.EExpMap.Get() + for _, ees := range eeMap { + for _, eexp := range ees { + m[eexp.Key] = eexp.Key + } + } + + g.EFilterMap.ReInit(m) +} diff --git a/modules/judge/cron/strategy.go b/modules/judge/cron/strategy.go index 133002bac..711531047 100644 --- a/modules/judge/cron/strategy.go +++ b/modules/judge/cron/strategy.go @@ -18,6 +18,8 @@ import ( "encoding/json" "fmt" "log" + "regexp" + "strings" "time" "github.com/open-falcon/falcon-plus/common/model" @@ -30,6 +32,8 @@ func SyncStrategies() { syncStrategies() syncExpression() syncFilter() + rebuildStrMatcherMap() + rebuildStrMatcherExpMap() time.Sleep(duration) } } @@ -51,7 +55,6 @@ func rebuildStrategyMap(strategiesResponse *model.StrategiesResponse) { for _, hs := range strategiesResponse.HostStrategies { hostname := hs.Hostname if g.Config().Debug && hostname == g.Config().DebugHost { - log.Println(hostname, "strategies:") bs, _ := json.Marshal(hs.Strategies) fmt.Println(string(bs)) } @@ -116,3 +119,97 @@ func syncFilter() { g.FilterMap.ReInit(m) } + +func parsePatternFromFunc(s string) (pattern string) { + NOT_FOUND := -1 + idxMatchBracket := strings.Index(s, "match(") + idxComma := strings.LastIndex(s, ",") + if idxMatchBracket != NOT_FOUND && idxComma != NOT_FOUND { + pattern = s[len("match("):idxComma] + } + return pattern +} + +func rebuildStrMatcherMap() { + m := make(map[string]map[string]*regexp.Regexp) + + strategyMap := g.StrategyMap.Get() + for endpointSlashMetric, strategies := range strategyMap { + parts := strings.Split(endpointSlashMetric, "/") + if len(parts) < 1 { + continue + } + endpoint := parts[0] + + for _, strategy := range strategies { + if strategy.Func == "" || strategy.Func[0:1] != "m" { + continue + } + + pattern := parsePatternFromFunc(strategy.Func) + if pattern == "" { + log.Println(`WARN: pattern is empty or parse pattern failed`, strategy.Func) + continue + } + + // auto append prefix to ignore case + re, err := regexp.Compile(`(?i)` + pattern) + if err != nil { + log.Println(`WARN: compiling pattern failed`, pattern) + continue + } + + if _, ok := m[endpoint]; !ok { + subM := make(map[string]*regexp.Regexp) + m[endpoint] = subM + + } + m[endpoint][pattern] = re + } + } + + g.StrMatcherMap.ReInit(m) +} + +func rebuildStrMatcherExpMap() { + m := make(map[string]map[string]*regexp.Regexp) + + exps := g.ExpressionMap.Get() + + for metricSlashTag, exps := range exps { + parts := strings.Split(metricSlashTag, "/") + if len(parts) < 2 { + log.Println("WARN: parse metric from g.ExpressionMap failed", metricSlashTag) + continue + } + //metric := parts[0] + tag := parts[1] + + for _, exp := range exps { + if exp.Func == "" || exp.Func[0:1] != "m" { + continue + } + + pattern := parsePatternFromFunc(exp.Func) + if pattern == "" { + log.Println(`WARN: pattern is empty or parse pattern failed`, exp.Func) + continue + } + + // auto append prefix to ignore case + re, err := regexp.Compile(`(?i)` + pattern) + if err != nil { + log.Println(`WARN: compiling pattern failed`, pattern) + continue + } + + if _, ok := m[tag]; !ok { + subM := make(map[string]*regexp.Regexp) + m[tag] = subM + } + m[tag][pattern] = re + } + } + + g.StrMatcherExpMap.ReInit(m) +} diff --git a/modules/judge/cron/string_matcher.go b/modules/judge/cron/string_matcher.go new file mode 100644 index 000000000..d1deaf6ca --- /dev/null +++ b/modules/judge/cron/string_matcher.go @@ -0,0 +1,22 @@ +package cron + +import ( + "github.com/open-falcon/falcon-plus/modules/judge/string_matcher" + "log" + "time" +) + +func CleanStringMatcherHistory() { + for { + cleanStringMatcherHistory() + time.Sleep(time.Hour * 12) + } +} + +func cleanStringMatcherHistory() { + ago := time.Now().Unix() - 3600*24*7 + err := string_matcher.Consumer.BatchDeleteHistory(ago) + if err != nil { + log.Println("ERROR: BatchDeleteHistory failed", err) + } +} diff --git a/modules/judge/g/cfg.go b/modules/judge/g/cfg.go index d18a2ab0b..5d0d7e45d 100644 --- a/modules/judge/g/cfg.go +++ b/modules/judge/g/cfg.go @@ -16,9 +16,10 @@ package g import ( "encoding/json" - "github.com/toolkits/file" "log" "sync" + + "github.com/toolkits/file" ) type HttpConfig struct { @@ -39,6 +40,7 @@ type HbsConfig struct { type RedisConfig struct { Dsn string `json:"dsn"` + Password string `json:"Password"` MaxIdle int `json:"maxIdle"` ConnTimeout int `json:"connTimeout"` ReadTimeout int `json:"readTimeout"` @@ -52,14 +54,27 @@ type AlarmConfig struct { Redis *RedisConfig `json:"redis"` } +type StringMatcherConfig struct { + Enabled bool `json:"enabled"` + Batch int `json:"batch"` + ConnTimeout int `json:"connTimeout"` + CallTimeout int `json:"callTimeout"` + MaxConns int `json:"maxConns"` + MaxIdle int `json:"maxIdle"` + MaxRetry int `json:"retry"` + DSN map[string]string `json:"dsn"` +} + type GlobalConfig struct { - Debug bool `json:"debug"` - DebugHost string `json:"debugHost"` - Remain int `json:"remain"` - Http *HttpConfig `json:"http"` - Rpc *RpcConfig `json:"rpc"` - Hbs *HbsConfig `json:"hbs"` - Alarm *AlarmConfig `json:"alarm"` + Debug bool `json:"debug"` + DebugHost string `json:"debugHost"` + Remain int `json:"remain"` + ListMaxLen uint32 `json:"listMaxLen"` + Http *HttpConfig `json:"http"` + Rpc *RpcConfig `json:"rpc"` + Hbs *HbsConfig `json:"hbs"` + Alarm *AlarmConfig `json:"alarm"` + StringMatcher *StringMatcherConfig `json:"stringMatcher"` } var ( diff --git a/modules/judge/g/eexp.go b/modules/judge/g/eexp.go new file mode 100644 index 000000000..6ef9ba17a --- /dev/null +++ b/modules/judge/g/eexp.go @@ -0,0 +1,49 @@ +package g + +import ( + "sync" + + "github.com/open-falcon/falcon-plus/common/model" +) + +type SafeEExpMap struct { + sync.RWMutex + M map[string][]model.EExp +} + +type SafeEFilterMap struct { + sync.RWMutex + M map[string]string +} + +var ( + EExpMap = &SafeEExpMap{M: make(map[string][]model.EExp)} + EFilterMap = &SafeEFilterMap{M: make(map[string]string)} +) + +func (this *SafeEExpMap) ReInit(m map[string][]model.EExp) { + this.Lock() + defer this.Unlock() + this.M = m +} + +func (this *SafeEExpMap) Get() map[string][]model.EExp { + this.RLock() + defer this.RUnlock() + return this.M +} + +func (this *SafeEFilterMap) ReInit(m map[string]string) { + this.Lock() + defer this.Unlock() + this.M = m +} + +func (this *SafeEFilterMap) Exists(key string) bool { + this.RLock() + defer this.RUnlock() + if _, ok := this.M[key]; ok { + return true + } + return false +} diff --git a/modules/judge/g/g.go b/modules/judge/g/g.go index f024877bd..e4ded1be7 100644 --- a/modules/judge/g/g.go +++ b/modules/judge/g/g.go @@ -23,7 +23,11 @@ import ( // 2.0.1: bugfix HistoryData limit // 2.0.2: clean stale data const ( - VERSION = "2.0.2" + VERSION = "2.0.2" + GAUGE = "GAUGE" + COUNTER = "COUNTER" + DERIVE = "DERIVE" + STRMATCH = "STRMATCH" ) func init() { diff --git a/modules/judge/g/redis.go b/modules/judge/g/redis.go index 92c4806d4..9e5ed7fb9 100644 --- a/modules/judge/g/redis.go +++ b/modules/judge/g/redis.go @@ -28,6 +28,7 @@ func InitRedisConnPool() { } dsn := Config().Alarm.Redis.Dsn + password := Config().Alarm.Redis.Password maxIdle := Config().Alarm.Redis.MaxIdle idleTimeout := 240 * time.Second @@ -43,6 +44,13 @@ func InitRedisConnPool() { if err != nil { return nil, err } + if password != "" { + if _, err := c.Do("AUTH", password); err != nil { + c.Close() + return nil, err + } + } + return c, err }, TestOnBorrow: PingRedis, diff --git a/modules/judge/g/var.go b/modules/judge/g/var.go index 61a0cec34..2fe035e31 100644 --- a/modules/judge/g/var.go +++ b/modules/judge/g/var.go @@ -15,6 +15,8 @@ package g import ( + "fmt" + "regexp" "sync" "time" @@ -44,12 +46,29 @@ type SafeFilterMap struct { M map[string]string } +// SafeStrMatcherMap Generate this map from strategies +type SafeStrMatcherMap struct { + sync.RWMutex + // endpoint1 => map[pattern => RegexpObject, ...] + // endpoint2 => map[pattern => RegexpObject, ...] + M map[string]map[string]*regexp.Regexp +} + +// SafeStrMatcherExpMap Generate this map from expressions +// tag1 => map[pattern => RegexpObject, ...] +// tag2 => map[pattern => RegexpObject, ...] +type SafeStrMatcherExpMap struct { + SafeStrMatcherMap +} + var ( - HbsClient *SingleConnRpcClient - StrategyMap = &SafeStrategyMap{M: make(map[string][]model.Strategy)} - ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)} - LastEvents = &SafeEventMap{M: make(map[string]*model.Event)} - FilterMap = &SafeFilterMap{M: make(map[string]string)} + HbsClient *SingleConnRpcClient + StrategyMap = &SafeStrategyMap{M: make(map[string][]model.Strategy)} + ExpressionMap = &SafeExpressionMap{M: make(map[string][]*model.Expression)} + LastEvents = &SafeEventMap{M: make(map[string]*model.Event)} + FilterMap = &SafeFilterMap{M: make(map[string]string)} + StrMatcherMap = &SafeStrMatcherMap{M: make(map[string]map[string]*regexp.Regexp)} + StrMatcherExpMap = &SafeStrMatcherExpMap{SafeStrMatcherMap{M: make(map[string]map[string]*regexp.Regexp)}} ) func InitHbsClient() { @@ -110,3 +129,78 @@ func (this *SafeFilterMap) Exists(key string) bool { } return false } + +func (t *SafeStrMatcherMap) ReInit(m map[string]map[string]*regexp.Regexp) { + t.Lock() + defer t.Unlock() + t.M = m +} + +func (t *SafeStrMatcherMap) Get(key string) (map[string]*regexp.Regexp, bool) { + t.Lock() + defer t.Unlock() + m, ok := t.M[key] + return m, ok +} + +func (t *SafeStrMatcherMap) GetAll() map[string]map[string]*regexp.Regexp { + t.Lock() + defer t.Unlock() + return t.M +} + +func (t *SafeStrMatcherMap) Append(key string, pattern string, re *regexp.Regexp) { + t.Lock() + defer t.Unlock() + _, ok := t.M[key] + if !ok { + m := map[string]*regexp.Regexp{} + t.M[key] = m + } + t.M[key][pattern] = re +} + +func (t *SafeStrMatcherMap) Exists(key string) bool { + t.Lock() + defer t.Unlock() + + if _, ok := t.M[key]; ok { + return true + } + + return false +} + +func (t *SafeStrMatcherMap) Match(key string, value string) bool { + t.Lock() + defer t.Unlock() + + m, ok := t.M[key] + if ok { + for _, re := range m { + if re.MatchString(value) { + return true + } + + } + } + return false +} + +func (t *SafeStrMatcherExpMap) Match(m map[string]string, value string) bool { + t.Lock() + defer t.Unlock() + + for k, v := range m { + key := fmt.Sprintf("%s=%s", k, v) + subM, ok := t.M[key] + if ok { + for _, re := range subM { + if re.MatchString(value) { + return true + } + } + } + } + return false +} diff --git a/modules/judge/http/info.go b/modules/judge/http/info.go index a0d943af7..1ca2ce35e 100644 --- a/modules/judge/http/info.go +++ b/modules/judge/http/info.go @@ -16,14 +16,25 @@ package http import ( "fmt" + "net/http" + "strings" + "github.com/open-falcon/falcon-plus/common/utils" "github.com/open-falcon/falcon-plus/modules/judge/g" "github.com/open-falcon/falcon-plus/modules/judge/store" - "net/http" - "strings" ) func configInfoRoutes() { + http.HandleFunc("/eexps/", func(w http.ResponseWriter, r *http.Request) { + m := g.EExpMap.Get() + RenderDataJson(w, m) + }) + + http.HandleFunc("/strategies/", func(w http.ResponseWriter, r *http.Request) { + m := g.StrategyMap.Get() + RenderDataJson(w, m) + }) + // e.g. /strategy/lg-dinp-docker01.bj/cpu.idle http.HandleFunc("/strategy/", func(w http.ResponseWriter, r *http.Request) { urlParam := r.URL.Path[len("/strategy/"):] @@ -31,6 +42,11 @@ func configInfoRoutes() { RenderDataJson(w, m[urlParam]) }) + http.HandleFunc("/expressions/", func(w http.ResponseWriter, r *http.Request) { + m := g.ExpressionMap.Get() + RenderDataJson(w, m) + }) + // e.g. /expression/net.port.listen/port=22 http.HandleFunc("/expression/", func(w http.ResponseWriter, r *http.Request) { urlParam := r.URL.Path[len("/expression/"):] @@ -38,6 +54,16 @@ func configInfoRoutes() { RenderDataJson(w, m[urlParam]) }) + http.HandleFunc("/str_matchers/", func(w http.ResponseWriter, r *http.Request) { + m := g.StrMatcherMap.GetAll() + RenderDataJson(w, m) + }) + + http.HandleFunc("/str_matcher_exps/", func(w http.ResponseWriter, r *http.Request) { + m := g.StrMatcherExpMap.GetAll() + RenderDataJson(w, m) + }) + http.HandleFunc("/count", func(w http.ResponseWriter, r *http.Request) { sum := 0 arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} diff --git a/modules/judge/main.go b/modules/judge/main.go index 5ec3fa558..e4a12f8fd 100644 --- a/modules/judge/main.go +++ b/modules/judge/main.go @@ -17,12 +17,16 @@ package main import ( "flag" "fmt" + "log" + "os" + "github.com/open-falcon/falcon-plus/modules/judge/cron" "github.com/open-falcon/falcon-plus/modules/judge/g" "github.com/open-falcon/falcon-plus/modules/judge/http" + "github.com/open-falcon/falcon-plus/modules/judge/model" "github.com/open-falcon/falcon-plus/modules/judge/rpc" "github.com/open-falcon/falcon-plus/modules/judge/store" - "os" + "github.com/open-falcon/falcon-plus/modules/judge/string_matcher" ) func main() { @@ -36,17 +40,36 @@ func main() { } g.ParseConfig(*cfg) + _cfg := g.Config() + + if _cfg.Debug { + log.SetFlags(log.Ldate | log.Ltime | log.Llongfile) + } g.InitRedisConnPool() g.InitHbsClient() + // string matcher history persistence + if _cfg.StringMatcher != nil && _cfg.StringMatcher.Enabled { + string_matcher.InitStringMatcher() + batch := _cfg.StringMatcher.Batch + retry := _cfg.StringMatcher.MaxRetry + go string_matcher.Consumer.Start(batch, retry) + } + store.InitHistoryBigMap() + model.InitEHistoryBigMap() go http.Start() go rpc.Start() go cron.SyncStrategies() + go cron.SyncEExps() go cron.CleanStale() + if _cfg.StringMatcher != nil && _cfg.StringMatcher.Enabled { + go cron.CleanStringMatcherHistory() + } + select {} } diff --git a/modules/judge/model/efunc.go b/modules/judge/model/efunc.go new file mode 100644 index 000000000..97a61776b --- /dev/null +++ b/modules/judge/model/efunc.go @@ -0,0 +1,113 @@ +package model + +import ( + "math" + + "github.com/open-falcon/falcon-plus/common/model" +) + +type Function interface { + Compute(L *SafeELinkedList) (vs []*model.EHistoryData, leftValue interface{}, isTriggered bool, isEnough bool) +} + +type AllFunction struct { + Function + Key string + Limit uint64 + Operator string + RightValue interface{} +} + +func (this AllFunction) Compute(L *SafeELinkedList) (vs []*model.EHistoryData, leftValue interface{}, isTriggered bool, isEnough bool) { + vs, isEnough = L.HistoryData(this.Limit) + if !isEnough { + return + } + + isTriggered = true + var i uint64 + for i = 0; i < this.Limit; i++ { + value, ok := vs[i].Filters[this.Key] + if !ok { + break + } + isTriggered = checkIsTriggered(value, this.Operator, this.RightValue) + if !isTriggered { + break + } + } + + leftValue = vs[0].Filters[this.Key] + return +} + +type CountFunction struct { + Function + Key string + Ago uint64 + Hits uint64 + Operator string + RightValue interface{} + Now int64 +} + +func (this CountFunction) Compute(L *SafeELinkedList) (vs []*model.EHistoryData, leftValue interface{}, isTriggered bool, isEnough bool) { + vs, isEnough = L.HistoryDataByTime(this.Ago, this.Hits, uint64(this.Now)) + if !isEnough { + return + } + + isTriggered = true + var i uint64 + for i = 0; i < this.Hits; i++ { + value, ok := vs[i].Filters[this.Key] + if !ok { + break + } + isTriggered = checkIsTriggered(value, this.Operator, this.RightValue) + if !isTriggered { + break + } + } + + leftValue = vs[0].Filters[this.Key] + return +} + +func checkIsTriggered(leftValueI interface{}, operator string, rightValueI interface{}) (isTriggered bool) { + switch rightValueI.(type) { + case string: + { + leftValue := leftValueI.(string) + rightValue := rightValueI.(string) + switch operator { + case "=", "==": + isTriggered = leftValue == rightValue + case "!=": + isTriggered = leftValue != rightValue + } + + } + case float64: + { + leftValue := leftValueI.(float64) + rightValue := rightValueI.(float64) + switch operator { + case "=", "==": + isTriggered = math.Abs(leftValue-rightValue) < 0.0001 + case "!=": + isTriggered = math.Abs(leftValue-rightValue) > 0.0001 + case "<": + isTriggered = leftValue < rightValue + case "<=": + isTriggered = leftValue <= rightValue + case ">": + isTriggered = leftValue > rightValue + case ">=": + isTriggered = leftValue >= rightValue + } + } + } + + return +} diff --git a/modules/judge/model/ehistory.go b/modules/judge/model/ehistory.go new file mode 100644 index 000000000..10033c1ef --- /dev/null +++ b/modules/judge/model/ehistory.go @@ -0,0 +1,101 @@ +package model + +import ( + "container/list" + "sync" + + cmodel "github.com/open-falcon/falcon-plus/common/model" +) + +type EJudgeItemMap struct { + sync.RWMutex + M map[string]*SafeELinkedList +} + +func NewEJudgeItemMap() *EJudgeItemMap { + return &EJudgeItemMap{M: make(map[string]*SafeELinkedList)} +} + +func (this *EJudgeItemMap) Get(key string) (*SafeELinkedList, bool) { + this.RLock() + defer this.RUnlock() + val, ok := this.M[key] + return val, ok +} + +func (this *EJudgeItemMap) Set(key string, val *SafeELinkedList) { + this.Lock() + defer this.Unlock() + this.M[key] = val +} + +func (this *EJudgeItemMap) Len() int { + this.RLock() + defer this.RUnlock() + return len(this.M) +} + +func (this *EJudgeItemMap) Delete(key string) { + this.Lock() + defer this.Unlock() + delete(this.M, key) +} + +func (this *EJudgeItemMap) BatchDelete(keys []string) { + count := len(keys) + if count == 0 { + return + } + + this.Lock() + defer this.Unlock() + for i := 0; i < count; i++ { + delete(this.M, keys[i]) + } +} + +func (this *EJudgeItemMap) CleanStale(before int64) { + keys := []string{} + + this.RLock() + for key, L := range this.M { + front := L.Front() + if front == nil { + continue + } + + if front.Value.(*cmodel.EMetric).Timestamp < before { + keys = append(keys, key) + } + } + this.RUnlock() + + this.BatchDelete(keys) +} + +func (this *EJudgeItemMap) PushFrontAndMaintain(key string, val *cmodel.EMetric, maxCount uint32, now int64) { + if linkedList, exists := this.Get(key); exists { + needJudge := linkedList.PushFrontAndMaintain(val, maxCount) + if needJudge { + EJudge(linkedList, val, now) + } + } else { + NL := list.New() + NL.PushFront(val) + safeList := &SafeELinkedList{L: NL} + this.Set(key, safeList) + EJudge(safeList, val, now) + } +} + +// 这是个线程不安全的大Map,需要提前初始化好 +var EHistoryBigMap = make(map[string]*EJudgeItemMap) + +func InitEHistoryBigMap() { + arr := []string{"0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f"} + for i := 0; i < 16; i++ { + for j := 0; j < 16; j++ { + EHistoryBigMap[arr[i]+arr[j]] = NewEJudgeItemMap() + } + } +} diff --git a/modules/judge/model/ejudge.go b/modules/judge/model/ejudge.go new file mode 100644 index 000000000..608b977d8 --- /dev/null +++ b/modules/judge/model/ejudge.go @@ -0,0 +1,158 @@ +package model + +import ( + "encoding/json" + "fmt" + "log" + + cmodel "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/judge/g" +) + +func EJudge(L *SafeELinkedList, firstItem *cmodel.EMetric, now int64) { + CheckEExp(L, firstItem, now) +} + +func sendEvent(event *cmodel.Event) { + // update last event + g.LastEvents.Set(event.Id, event) + + bs, err := json.Marshal(event) + if err != nil { + log.Printf("json marshal event %v fail: %v", event, err) + return + } + + // send to redis + cfg := g.Config() + if cfg.Alarm == nil { + log.Println("alarm configuration not found") + return + } + + redisKey := fmt.Sprintf(cfg.Alarm.QueuePattern, event.Priority()) + rc := g.RedisConnPool.Get() + defer rc.Close() + rc.Do("LPUSH", redisKey, string(bs)) +} + +func CheckEExp(L *SafeELinkedList, firstItem *cmodel.EMetric, now int64) { + // expression可能会被多次重复处理,用此数据结构保证只被处理一次 + handledExpression := make(map[int]bool) + + m := g.EExpMap.Get() + + for _, eexps := range m { + for _, eexp := range eexps { + _, ok := handledExpression[eexp.ID] + + if eexp.Key == firstItem.Key && !ok { + if judgeItemWithExpression(L, &eexp, firstItem, now) { + handledExpression[eexp.ID] = true + } + } + } + } + +} + +func judgeItemWithExpression(L *SafeELinkedList, eexp *cmodel.EExp, firstItem *cmodel.EMetric, now int64) (hit bool) { + var fn Function + var historyData []*cmodel.EHistoryData + var leftValue interface{} + var isTriggered bool + var isEnough bool + for _, filter := range eexp.Filters { + switch filter.Func { + case "all": + fn = &AllFunction{Limit: filter.Limit, Operator: filter.Operator, Key: filter.Key, RightValue: filter.RightValue} + case "count": + fn = &CountFunction{Ago: filter.Ago, Hits: filter.Hits, Operator: filter.Operator, Key: filter.Key, RightValue: filter.RightValue, Now: now} + default: + { + log.Println(fmt.Sprintf("not support function -%#v-", filter.Func)) + return false + } + } + + historyData, leftValue, isTriggered, isEnough = fn.Compute(L) + if !isEnough { + break + } + if !isTriggered { + break + } + } + + if !isEnough { + return false + } + if !isTriggered { + return false + } + + pushTags := map[string]string{} + for k, v := range firstItem.Filters { + pushTags[k] = fmt.Sprintf("%v", v) + } + + event := &cmodel.Event{ + Id: fmt.Sprintf("e_%d_%s", eexp.ID, firstItem.PK()), + EExp: eexp, + Endpoint: firstItem.Endpoint, + LeftValue: leftValue, + EventTime: firstItem.Timestamp, + PushedTags: pushTags, + } + + sendEventIfNeed(historyData, isTriggered, now, event, eexp.MaxStep) + + return true + +} + +func sendEventIfNeed(historyData []*cmodel.EHistoryData, isTriggered bool, now int64, event *cmodel.Event, maxStep int) { + lastEvent, exists := g.LastEvents.Get(event.Id) + if isTriggered { + event.Status = "PROBLEM" + if !exists || lastEvent.Status[0] == 'O' { + // 本次触发了阈值,之前又没报过警,得产生一个报警Event + event.CurrentStep = 1 + + // 但是有些用户把最大报警次数配置成了0,相当于屏蔽了,要检查一下 + if maxStep == 0 { + return + } + + sendEvent(event) + return + } + + // 逻辑走到这里,说明之前Event是PROBLEM状态 + if lastEvent.CurrentStep >= maxStep { + // 报警次数已经足够多,到达了最多报警次数了,不再报警 + return + } + + if historyData[len(historyData)-1].Timestamp <= lastEvent.EventTime { + // 产生过报警的点,就不能再使用来判断了,否则容易出现一分钟报一次的情况 + // 只需要拿最后一个historyData来做判断即可,因为它的时间最老 + return + } + + if now-lastEvent.EventTime < g.Config().Alarm.MinInterval { + // 报警不能太频繁,两次报警之间至少要间隔MinInterval秒,否则就不能报警 + return + } + + event.CurrentStep = lastEvent.CurrentStep + 1 + sendEvent(event) + } else { + // 如果LastEvent是Problem,报OK,否则啥都不做 + if exists && lastEvent.Status[0] == 'P' { + event.Status = "OK" + event.CurrentStep = 1 + sendEvent(event) + } + } +} diff --git a/modules/judge/model/elinkedlist.go b/modules/judge/model/elinkedlist.go new file mode 100644 index 000000000..e045e0712 --- /dev/null +++ b/modules/judge/model/elinkedlist.go @@ -0,0 +1,148 @@ +package model + +import ( + "container/list" + "sync" + + cmodel "github.com/open-falcon/falcon-plus/common/model" +) + +type SafeELinkedList struct { + sync.RWMutex + L *list.List +} + +func (this *SafeELinkedList) ToSlice() []*cmodel.EMetric { + this.RLock() + defer this.RUnlock() + sz := this.L.Len() + if sz == 0 { + return []*cmodel.EMetric{} + } + + ret := make([]*cmodel.EMetric, 0, sz) + for e := this.L.Front(); e != nil; e = e.Next() { + ret = append(ret, e.Value.(*cmodel.EMetric)) + } + return ret +} + +// @param limit 至多返回这些,如果不够,有多少返回多少 +// @return bool isEnough +func (this *SafeELinkedList) HistoryData(limit uint64) ([]*cmodel.EHistoryData, bool) { + if limit < 1 { + // 其实limit不合法,此处也返回false吧,上层代码要注意 + // 因为false通常使上层代码进入异常分支,这样就统一了 + return []*cmodel.EHistoryData{}, false + } + + size := uint64(this.Len()) + if size == 0 { + return []*cmodel.EHistoryData{}, false + } + + firstElement := this.Front() + firstItem := firstElement.Value.(*cmodel.EMetric) + + var vs []*cmodel.EHistoryData + isEnough := true + + if size < limit { + // 有多少获取多少 + limit = size + isEnough = false + } + vs = make([]*cmodel.EHistoryData, limit) + vs[0] = &cmodel.EHistoryData{Timestamp: firstItem.Timestamp, Filters: firstItem.Filters} + var i uint64 + currentElement := firstElement + for i = 1; i < limit; { + nextElement := currentElement.Next() + vs[i] = &cmodel.EHistoryData{ + Timestamp: nextElement.Value.(*cmodel.EMetric).Timestamp, + Filters: nextElement.Value.(*cmodel.EMetric).Filters, + } + i++ + currentElement = nextElement + } + + return vs, isEnough +} + +// @param limit 至多返回这些,如果不够,有多少返回多少 +// @return bool isEnough +func (this *SafeELinkedList) HistoryDataByTime(ago uint64, hits uint64, now uint64) ([]*cmodel.EHistoryData, bool) { + var isEnough bool + var vs []*cmodel.EHistoryData + + if hits < 1 { + // 其实limit不合法,此处也返回false吧,上层代码要注意 + // 因为false通常使上层代码进入异常分支,这样就统一了 + return vs, isEnough + } + + size := uint64(this.Len()) + if size == 0 { + return vs, isEnough + } + + if size < hits { + return vs, isEnough + } + + t := int64(now - ago) + + matched := uint64(0) + for e := this.Front(); e != nil && matched < hits; e = e.Next() { + item := e.Value.(*cmodel.EMetric) + if item.Timestamp >= t { + item := &cmodel.EHistoryData{Timestamp: item.Timestamp, Filters: item.Filters} + vs = append(vs, item) + matched++ + } + } + + isEnough = matched >= hits + + return vs, isEnough +} + +func (this *SafeELinkedList) PushFront(v interface{}) *list.Element { + this.Lock() + defer this.Unlock() + return this.L.PushFront(v) +} + +// @return needJudge 如果是false不需要做judge,因为新上来的数据不合法 +func (this *SafeELinkedList) PushFrontAndMaintain(v *cmodel.EMetric, maxCount uint32) bool { + this.Lock() + defer this.Unlock() + + sz := uint32(this.L.Len()) + this.L.PushFront(v) + + sz++ + if sz <= maxCount { + return true + } + + del := sz - maxCount + var i uint32 + for i = 0; i < del; i++ { + this.L.Remove(this.L.Back()) + } + + return true +} + +func (this *SafeELinkedList) Front() *list.Element { + this.RLock() + defer this.RUnlock() + return this.L.Front() +} + +func (this *SafeELinkedList) Len() int { + this.RLock() + defer this.RUnlock() + return this.L.Len() +} diff --git a/modules/judge/rpc/receiver.go b/modules/judge/rpc/receiver.go index 6d65e56d7..5a2e9f527 100644 --- a/modules/judge/rpc/receiver.go +++ b/modules/judge/rpc/receiver.go @@ -15,30 +15,77 @@ package rpc import ( + "log" "time" - "github.com/open-falcon/falcon-plus/common/model" + cmodel "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/common/utils" "github.com/open-falcon/falcon-plus/modules/judge/g" + "github.com/open-falcon/falcon-plus/modules/judge/model" "github.com/open-falcon/falcon-plus/modules/judge/store" + "github.com/open-falcon/falcon-plus/modules/judge/string_matcher" ) type Judge int -func (this *Judge) Ping(req model.NullRpcRequest, resp *model.SimpleRpcResponse) error { +func (this *Judge) Ping(req cmodel.NullRpcRequest, resp *cmodel.SimpleRpcResponse) error { return nil } -func (this *Judge) Send(items []*model.JudgeItem, resp *model.SimpleRpcResponse) error { - remain := g.Config().Remain +func (this *Judge) Send(items []*cmodel.JudgeItem, resp *cmodel.SimpleRpcResponse) error { + cfg := g.Config() + remain := cfg.Remain // 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销 now := time.Now().Unix() + for _, item := range items { exists := g.FilterMap.Exists(item.Metric) if !exists { continue } - pk := item.PrimaryKey() - store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now) + + if item.JudgeType != g.STRMATCH { + pk := item.PrimaryKey() + store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now) + + } else if item.JudgeType == g.STRMATCH && item.ValueRaw != "" { + yesEndpoint := g.StrMatcherMap.Match(item.Endpoint, item.ValueRaw) + yesTag := g.StrMatcherExpMap.Match(item.Tags, item.ValueRaw) + + if yesEndpoint || yesTag { + pk := item.PrimaryKey() + store.HistoryBigMap[pk[0:2]].PushFrontAndMaintain(pk, item, remain, now) + + // save matched string into SQL DB + if cfg.StringMatcher.Enabled { + success := string_matcher.Producer.Append(item) + if !success { + log.Println("string_matcher.Producer failed") + } + } + } + } + + } + return nil +} + +func (this *Judge) SendE(items []*cmodel.EMetric, resp *cmodel.SimpleRpcResponse) error { + cfg := g.Config() + + listMaxLen := cfg.ListMaxLen + // 把当前时间的计算放在最外层,是为了减少获取时间时的系统调用开销 + now := time.Now().Unix() + + for _, item := range items { + exists := g.EFilterMap.Exists(item.Key) + if !exists { + continue + } + + pksum := utils.Md5(item.PK()) + model.EHistoryBigMap[pksum[0:2]].PushFrontAndMaintain(pksum, item, listMaxLen, now) + } return nil } diff --git a/modules/judge/store/func.go b/modules/judge/store/func.go index 48bb8f13c..0c9d9b6e8 100644 --- a/modules/judge/store/func.go +++ b/modules/judge/store/func.go @@ -130,6 +130,27 @@ func (this LookupFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, return } +type MatchFunction struct { + Function + Pattern string + // There are matched records where history.timesatmp > (UNIX_NOW - period) limit n. + Period int + Operator string + RightValue float64 +} + +func (this MatchFunction) Compute(L *SafeLinkedList) (vs []*model.HistoryData, leftValue float64, isTriggered bool, isEnough bool) { + vs, isEnough = L.HistoryDataString(this.Pattern, this.Period) + if !isEnough { + return + } + + leftValue = float64(len(vs)) + isTriggered = checkIsTriggered(leftValue, this.Operator, this.RightValue) + return + +} + type SumFunction struct { Function Limit int @@ -261,15 +282,35 @@ func atois(s string) (ret []int, err error) { return } -// @str: e.g. all(#3) sum(#3) avg(#10) diff(#10) +// @str: e.g. all(#3) sum(#3) avg(#10) diff(#10), match(^error|^warn,60) func ParseFuncFromString(str string, operator string, rightValue float64) (fn Function, err error) { if str == "" { return nil, fmt.Errorf("func can not be null!") } - idx := strings.Index(str, "#") - args, err := atois(str[idx+1 : len(str)-1]) - if err != nil { - return nil, err + + NOT_FOUND := -1 + + idxMatchBracket := strings.Index(str, "match(") + idxComma := strings.LastIndex(str, ",") + + var pattern string + var period int + var idx int + var args []int + + if idxMatchBracket == NOT_FOUND { + idx = strings.Index(str, "#") + args, err = atois(str[idx+1 : len(str)-1]) + if err != nil { + return nil, err + } + } else if idxMatchBracket != NOT_FOUND && idxComma != NOT_FOUND { + pattern = str[len("match("):idxComma] + period, err = strconv.Atoi(strings.TrimSpace(str[idxComma+1 : len(str)-1])) + if err != nil { + return nil, err + } + idx = strings.Index(str, "(") + 1 } switch str[:idx-1] { @@ -289,6 +330,8 @@ func ParseFuncFromString(str string, operator string, rightValue float64) (fn Fu fn = &PDiffFunction{Limit: args[0], Operator: operator, RightValue: rightValue} case "lookup": fn = &LookupFunction{Num: args[0], Limit: args[1], Operator: operator, RightValue: rightValue} + case "match": + fn = &MatchFunction{Pattern: pattern, Period: period, Operator: operator, RightValue: rightValue} default: err = fmt.Errorf("not_supported_method") } diff --git a/modules/judge/store/judge.go b/modules/judge/store/judge.go index e621a05a8..b03b559ff 100644 --- a/modules/judge/store/judge.go +++ b/modules/judge/store/judge.go @@ -39,6 +39,8 @@ func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) { // 因为key仅仅是endpoint和metric,所以得到的strategies并不一定是与当前judgeItem相关的 // 比如lg-dinp-docker01.bj配置了两个proc.num的策略,一个name=docker,一个name=agent // 所以此处要排除掉一部分 + + // NOTICE: 这个逻辑以为着,strategies 里面的 tag 如果多余一个,则全部匹配才会触发调用 judgeItemWithStrategy. feature 还是 bugs? related := true for tagKey, tagVal := range s.Tags { if myVal, exists := firstItem.Tags[tagKey]; !exists || myVal != tagVal { @@ -46,7 +48,6 @@ func CheckStrategy(L *SafeLinkedList, firstItem *model.JudgeItem, now int64) { break } } - if !related { continue } @@ -63,6 +64,7 @@ func judgeItemWithStrategy(L *SafeLinkedList, strategy model.Strategy, firstItem } historyData, leftValue, isTriggered, isEnough := fn.Compute(L) + if !isEnough { return } diff --git a/modules/judge/store/linkedlist.go b/modules/judge/store/linkedlist.go index 809e96389..9bb97c720 100644 --- a/modules/judge/store/linkedlist.go +++ b/modules/judge/store/linkedlist.go @@ -17,7 +17,10 @@ package store import ( "container/list" "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/modules/judge/g" + "sync" + "time" ) type SafeLinkedList struct { @@ -106,6 +109,32 @@ func (this *SafeLinkedList) HistoryData(limit int) ([]*model.HistoryData, bool) return vs, isEnough } +// @param limit 至多返回这些,如果不够,有多少返回多少 +// @return bool isEnough +func (this *SafeLinkedList) HistoryDataString(pattern string, period int) ([]*model.HistoryData, bool) { + size := this.Len() + if size == 0 { + return []*model.HistoryData{}, false + } + + now := time.Now().Unix() + then := now - int64(period) + + maxItems := 512 + var vs []*model.HistoryData + hits := 0 + + for e := this.Front(); e != nil && hits < maxItems; e = e.Next() { + item := e.Value.(*model.JudgeItem) + + if item.Timestamp >= then { + vs = append(vs, &model.HistoryData{Timestamp: item.Timestamp, Value: item.Value, ValueRaw: item.ValueRaw}) + hits += 1 + } + } + return vs, hits > 0 +} + func (this *SafeLinkedList) PushFront(v interface{}) *list.Element { this.Lock() defer this.Unlock() @@ -120,7 +149,8 @@ func (this *SafeLinkedList) PushFrontAndMaintain(v *model.JudgeItem, maxCount in sz := this.L.Len() if sz > 0 { // 新push上来的数据有可能重复了,或者timestamp不对,这种数据要丢掉 - if v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0 { + // JudgeType=g.STRMATCH 没有这个限制,因为 数据来自 API PUSH 接口,让第三方应用自己实现重复过滤。 + if v.JudgeType != g.STRMATCH && (v.Timestamp <= this.L.Front().Value.(*model.JudgeItem).Timestamp || v.Timestamp <= 0) { return false } } diff --git a/modules/judge/string_matcher/string_matcher.go b/modules/judge/string_matcher/string_matcher.go new file mode 100644 index 000000000..7b9121038 --- /dev/null +++ b/modules/judge/string_matcher/string_matcher.go @@ -0,0 +1,188 @@ +package string_matcher + +import ( + "errors" + "log" + "time" + + _ "github.com/go-sql-driver/mysql" + + "github.com/jmoiron/sqlx" + "github.com/open-falcon/falcon-plus/common/model" + "github.com/open-falcon/falcon-plus/common/utils" + "github.com/open-falcon/falcon-plus/modules/judge/g" + nsema "github.com/toolkits/concurrent/semaphore" + nlist "github.com/toolkits/container/list" +) + +const ( + DefaultSendQueueMaxSize = 1024000 // 102.4w + DefaultSendTaskSleepInterval = time.Millisecond * 50 // 50ms +) + +type HistoryMgr struct { + dbConnPool map[string]*sqlx.DB + dbConcurrent int + Queue *nlist.SafeListLimited +} + +type HistoryProducer struct { + Mgr *HistoryMgr +} + +type HistoryConsumer struct { + Mgr *HistoryMgr +} + +var ( + Producer *HistoryProducer + Consumer *HistoryConsumer +) + +func NewHistoryMgr() *HistoryMgr { + mgr := new(HistoryMgr) + + cfg := g.Config() + dbConnPool := make(map[string]*sqlx.DB) + + for name, dsn := range cfg.StringMatcher.DSN { + conn, err := sqlx.Connect("mysql", dsn) + if err != nil { + panic(err) + } + dbConnPool[name] = conn + } + + mgr.dbConnPool = dbConnPool + + dbConcurrent := cfg.StringMatcher.MaxConns + if dbConcurrent < 1 { + dbConcurrent = 1 + } + mgr.dbConcurrent = dbConcurrent + mgr.Queue = nlist.NewSafeListLimited(DefaultSendQueueMaxSize) + + return mgr +} + +func truncateString(str string, num int) string { + bnoden := str + if len(str) > num { + if num > 3 { + num -= 3 + } + bnoden = str[0:num] + "..." + } + return bnoden +} + +func (mgr *HistoryMgr) Append(items []interface{}) (err error) { + conn, ok := mgr.dbConnPool["history"] + if !ok { + return errors.New("get Sql Connection failed") + } + + err = conn.Ping() + if err != nil { + return err + } + + tx, err := conn.Beginx() + if err != nil { + return err + } + + for _, item := range items { + meta := item.(*model.JudgeItem) + s := "INSERT INTO history (endpoint, metric, value, counter_type, tags, Timestamp) values (?, ?, ?, ?, ?, ?); " + tags := utils.SortedTags(meta.Tags) + _, err = tx.Exec(s, + meta.Endpoint, + meta.Metric, + truncateString(meta.ValueRaw, 1024), + meta.JudgeType, + tags, + meta.Timestamp) + + if err != nil { + tx.Rollback() + return err + } + } + + err = tx.Commit() + if err != nil { + tx.Rollback() + return err + } + + return nil +} + +func InitStringMatcher() { + mgr := NewHistoryMgr() + Producer = new(HistoryProducer) + Producer.Mgr = mgr + Consumer = new(HistoryConsumer) + Consumer.Mgr = mgr +} + +func (c *HistoryConsumer) BatchDeleteHistory(before int64) error { + conn, ok := c.Mgr.dbConnPool["history"] + if !ok { + return errors.New("get Sql Connection failed") + } + + err := conn.Ping() + if err != nil { + return err + } + + s := "DELETE FROM history where Timestamp < ?" + _, err = conn.Exec(s, before) + if err != nil { + return err + } + return nil + +} + +func (c *HistoryConsumer) Start(batch, retry int) { + sema := nsema.NewSemaphore(c.Mgr.dbConcurrent) + + for { + items := c.Mgr.Queue.PopBackBy(batch) + if len(items) == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + // 同步Call + 有限并发 进行发送 + sema.Acquire() + go func(itemList []interface{}) { + defer sema.Release() + + var err error + + for i := 0; i < retry; i++ { + err = c.Mgr.Append(itemList) + if err != nil { + log.Println("SqlDbInsert failed", err) + } else { + //proc.SendToSqlDbCnt.IncrBy(int64(len(itemList))) + break + } + + time.Sleep(100 * time.Millisecond) + } + + if err != nil { + //proc.SendToSqlDbFailCnt.IncrBy(int64(len(itemList))) + return + } + }(items) + } +} + +func (p *HistoryProducer) Append(item *model.JudgeItem) bool { + return p.Mgr.Queue.PushFront(item) +} diff --git a/modules/nodata/cfg.example.json b/modules/nodata/cfg.example.json index 63baf9fe9..ef208c797 100644 --- a/modules/nodata/cfg.example.json +++ b/modules/nodata/cfg.example.json @@ -12,7 +12,7 @@ }, "config": { "enabled": true, - "dsn": "root:@tcp(127.0.0.1:3306)/falcon_portal?loc=Local&parseTime=true&wait_timeout=604800", + "dsn": "falcon:falcon@tcp(127.0.0.1:3306)/falcon_portal?loc=Local&parseTime=true&wait_timeout=604800", "maxIdle": 4 }, "collector":{ diff --git a/modules/transfer/cfg.example.json b/modules/transfer/cfg.example.json index 57e98958c..9f09ab45a 100644 --- a/modules/transfer/cfg.example.json +++ b/modules/transfer/cfg.example.json @@ -1,6 +1,6 @@ { "debug": true, - "minStep": 30, + "minStep": 2, "http": { "enabled": true, "listen": "0.0.0.0:6060" @@ -27,7 +27,7 @@ } }, "graph": { - "enabled": true, + "enabled": false, "batch": 200, "connTimeout": 1000, "callTimeout": 5000, diff --git a/modules/transfer/g/g.go b/modules/transfer/g/g.go index c21dc15ae..66f6f135a 100644 --- a/modules/transfer/g/g.go +++ b/modules/transfer/g/g.go @@ -36,6 +36,7 @@ const ( GAUGE = "GAUGE" COUNTER = "COUNTER" DERIVE = "DERIVE" + STRMATCH = "STRMATCH" DEFAULT_STEP = 60 ) diff --git a/modules/transfer/main.go b/modules/transfer/main.go index 8839c07db..374bd09c6 100644 --- a/modules/transfer/main.go +++ b/modules/transfer/main.go @@ -22,6 +22,7 @@ import ( "github.com/open-falcon/falcon-plus/modules/transfer/proc" "github.com/open-falcon/falcon-plus/modules/transfer/receiver" "github.com/open-falcon/falcon-plus/modules/transfer/sender" + "log" "os" ) @@ -42,6 +43,11 @@ func main() { // global config g.ParseConfig(*cfg) + + if g.Config().Debug { + log.SetFlags(log.Ldate | log.Ltime | log.Llongfile) + } + // proc proc.Start() diff --git a/modules/transfer/proc/proc.go b/modules/transfer/proc/proc.go index bfbffcd9e..d691b243d 100644 --- a/modules/transfer/proc/proc.go +++ b/modules/transfer/proc/proc.go @@ -15,8 +15,9 @@ package proc import ( - nproc "github.com/toolkits/proc" "log" + + nproc "github.com/toolkits/proc" ) // trace @@ -37,22 +38,26 @@ var ( HttpRecvCnt = nproc.NewSCounterQps("HttpRecvCnt") SocketRecvCnt = nproc.NewSCounterQps("SocketRecvCnt") - SendToJudgeCnt = nproc.NewSCounterQps("SendToJudgeCnt") - SendToTsdbCnt = nproc.NewSCounterQps("SendToTsdbCnt") - SendToGraphCnt = nproc.NewSCounterQps("SendToGraphCnt") + SendToJudgeCnt = nproc.NewSCounterQps("SendToJudgeCnt") + SendToEJudgeCnt = nproc.NewSCounterQps("SendToEJudgeCnt") + SendToTsdbCnt = nproc.NewSCounterQps("SendToTsdbCnt") + SendToGraphCnt = nproc.NewSCounterQps("SendToGraphCnt") - SendToJudgeDropCnt = nproc.NewSCounterQps("SendToJudgeDropCnt") - SendToTsdbDropCnt = nproc.NewSCounterQps("SendToTsdbDropCnt") - SendToGraphDropCnt = nproc.NewSCounterQps("SendToGraphDropCnt") + SendToJudgeDropCnt = nproc.NewSCounterQps("SendToJudgeDropCnt") + SendToEJudgeDropCnt = nproc.NewSCounterQps("SendToEJudgeDropCnt") + SendToTsdbDropCnt = nproc.NewSCounterQps("SendToTsdbDropCnt") + SendToGraphDropCnt = nproc.NewSCounterQps("SendToGraphDropCnt") - SendToJudgeFailCnt = nproc.NewSCounterQps("SendToJudgeFailCnt") - SendToTsdbFailCnt = nproc.NewSCounterQps("SendToTsdbFailCnt") - SendToGraphFailCnt = nproc.NewSCounterQps("SendToGraphFailCnt") + SendToJudgeFailCnt = nproc.NewSCounterQps("SendToJudgeFailCnt") + SendToEJudgeFailCnt = nproc.NewSCounterQps("SendToEJudgeFailCnt") + SendToTsdbFailCnt = nproc.NewSCounterQps("SendToTsdbFailCnt") + SendToGraphFailCnt = nproc.NewSCounterQps("SendToGraphFailCnt") // 发送缓存大小 - JudgeQueuesCnt = nproc.NewSCounterBase("JudgeSendCacheCnt") - TsdbQueuesCnt = nproc.NewSCounterBase("TsdbSendCacheCnt") - GraphQueuesCnt = nproc.NewSCounterBase("GraphSendCacheCnt") + JudgeQueuesCnt = nproc.NewSCounterBase("JudgeSendCacheCnt") + EJudgeQueuesCnt = nproc.NewSCounterBase("EJudgeSendCacheCnt") + TsdbQueuesCnt = nproc.NewSCounterBase("TsdbSendCacheCnt") + GraphQueuesCnt = nproc.NewSCounterBase("GraphSendCacheCnt") // http请求次数 HistoryRequestCnt = nproc.NewSCounterQps("HistoryRequestCnt") @@ -82,21 +87,25 @@ func GetAll() []interface{} { // send cnt ret = append(ret, SendToJudgeCnt.Get()) + ret = append(ret, SendToEJudgeCnt.Get()) ret = append(ret, SendToTsdbCnt.Get()) ret = append(ret, SendToGraphCnt.Get()) // drop cnt ret = append(ret, SendToJudgeDropCnt.Get()) + ret = append(ret, SendToEJudgeDropCnt.Get()) ret = append(ret, SendToTsdbDropCnt.Get()) ret = append(ret, SendToGraphDropCnt.Get()) // send fail cnt ret = append(ret, SendToJudgeFailCnt.Get()) + ret = append(ret, SendToEJudgeFailCnt.Get()) ret = append(ret, SendToTsdbFailCnt.Get()) ret = append(ret, SendToGraphFailCnt.Get()) // cache cnt ret = append(ret, JudgeQueuesCnt.Get()) + ret = append(ret, EJudgeQueuesCnt.Get()) ret = append(ret, TsdbQueuesCnt.Get()) ret = append(ret, GraphQueuesCnt.Get()) diff --git a/modules/transfer/receiver/rpc/rpc_transfer.go b/modules/transfer/receiver/rpc/rpc_transfer.go index 76c633150..340cce78f 100644 --- a/modules/transfer/receiver/rpc/rpc_transfer.go +++ b/modules/transfer/receiver/rpc/rpc_transfer.go @@ -16,13 +16,20 @@ package rpc import ( "fmt" + "log" + "strconv" + "strings" + "time" + cmodel "github.com/open-falcon/falcon-plus/common/model" cutils "github.com/open-falcon/falcon-plus/common/utils" "github.com/open-falcon/falcon-plus/modules/transfer/g" "github.com/open-falcon/falcon-plus/modules/transfer/proc" "github.com/open-falcon/falcon-plus/modules/transfer/sender" - "strconv" - "time" +) + +var ( + NOT_FOUND = -1 ) type Transfer int @@ -53,12 +60,22 @@ func (t *Transfer) Update(args []*cmodel.MetricValue, reply *cmodel.TransferResp // process new metric values func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse, from string) error { + cfg := g.Config() + start := time.Now() reply.Invalid = 0 + errmsg := []string{} + items := []*cmodel.MetaData{} + for _, v := range args { + if cfg.Debug { + log.Println("metric", v) + } + if v == nil { + errmsg = append(errmsg, "metric empty") reply.Invalid += 1 continue } @@ -66,38 +83,44 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse // 历史遗留问题. // 老版本agent上报的metric=kernel.hostname的数据,其取值为string类型,现在已经不支持了;所以,这里硬编码过滤掉 if v.Metric == "kernel.hostname" { + errmsg = append(errmsg, "skip metric=kernel.hostname") reply.Invalid += 1 continue } if v.Metric == "" || v.Endpoint == "" { + errmsg = append(errmsg, "metric or endpoint is empty") reply.Invalid += 1 continue } - if v.Type != g.COUNTER && v.Type != g.GAUGE && v.Type != g.DERIVE { + if v.Type != g.COUNTER && v.Type != g.GAUGE && v.Type != g.DERIVE && v.Type != g.STRMATCH { reply.Invalid += 1 + errmsg = append(errmsg, "got unexpected counterType"+v.Type) continue } if v.Value == "" { + errmsg = append(errmsg, v.Metric+" value is empty") reply.Invalid += 1 continue } if v.Step <= 0 { + errmsg = append(errmsg, "Step <= 0") reply.Invalid += 1 continue } if len(v.Metric)+len(v.Tags) > 510 { + errmsg = append(errmsg, " Metric+Tags too long") reply.Invalid += 1 continue } - // TODO 呵呵,这里需要再优雅一点 now := start.Unix() if v.Timestamp <= 0 || v.Timestamp > now*2 { + errmsg = append(errmsg, v.Metric+" Timestamp invalid") v.Timestamp = now } @@ -114,26 +137,39 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse var vv float64 var err error - switch cv := v.Value.(type) { - case string: - vv, err = strconv.ParseFloat(cv, 64) - if err != nil { + if v.Type != g.STRMATCH { + switch cv := v.Value.(type) { + case string: + vv, err = strconv.ParseFloat(cv, 64) + if err != nil { + valid = false + } + + case float64: + vv = cv + case int64: + vv = float64(cv) + default: + valid = false + } + } else { + switch v.Value.(type) { + case string: + fv.ValueRaw = v.Value.(string) + vv = float64(1.0) + default: valid = false } - case float64: - vv = cv - case int64: - vv = float64(cv) - default: - valid = false } if !valid { + errmsg = append(errmsg, "parse value into float64/string failed") reply.Invalid += 1 continue } fv.Value = vv + items = append(items, fv) } @@ -146,8 +182,6 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse proc.HttpRecvCnt.IncrBy(cnt) } - cfg := g.Config() - if cfg.Graph.Enabled { sender.Push2GraphSendQueue(items) } @@ -159,8 +193,11 @@ func RecvMetricValues(args []*cmodel.MetricValue, reply *cmodel.TransferResponse if cfg.Tsdb.Enabled { sender.Push2TsdbSendQueue(items) } - - reply.Message = "ok" + if reply.Invalid == 0 { + reply.Message = "ok" + } else { + reply.Message = strings.Join(errmsg, ";\n") + } reply.Total = len(args) reply.Latency = (time.Now().UnixNano() - start.UnixNano()) / 1000000 diff --git a/modules/transfer/sender/conn_pools.go b/modules/transfer/sender/conn_pools.go index 7df9a5076..db5610737 100644 --- a/modules/transfer/sender/conn_pools.go +++ b/modules/transfer/sender/conn_pools.go @@ -31,6 +31,13 @@ func initConnPools() { JudgeConnPools = backend.CreateSafeRpcConnPools(cfg.Judge.MaxConns, cfg.Judge.MaxIdle, cfg.Judge.ConnTimeout, cfg.Judge.CallTimeout, judgeInstances.ToSlice()) + ejudgeInstances := nset.NewStringSet() + for _, instance := range cfg.Judge.Cluster { + ejudgeInstances.Add(instance) + } + EJudgeConnPools = backend.CreateSafeRpcConnPools(cfg.Judge.MaxConns, cfg.Judge.MaxIdle, + cfg.Judge.ConnTimeout, cfg.Judge.CallTimeout, ejudgeInstances.ToSlice()) + // tsdb if cfg.Tsdb.Enabled { TsdbConnPoolHelper = backend.NewTsdbConnPoolHelper(cfg.Tsdb.Address, cfg.Tsdb.MaxConns, cfg.Tsdb.MaxIdle, cfg.Tsdb.ConnTimeout, cfg.Tsdb.CallTimeout) diff --git a/modules/transfer/sender/node_rings.go b/modules/transfer/sender/node_rings.go index 713144c40..6a9f7d426 100644 --- a/modules/transfer/sender/node_rings.go +++ b/modules/transfer/sender/node_rings.go @@ -24,5 +24,6 @@ func initNodeRings() { cfg := g.Config() JudgeNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Judge.Replicas), cutils.KeysOfMap(cfg.Judge.Cluster)) + EJudgeNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Judge.Replicas), cutils.KeysOfMap(cfg.Judge.Cluster)) GraphNodeRing = rings.NewConsistentHashNodesRing(int32(cfg.Graph.Replicas), cutils.KeysOfMap(cfg.Graph.Cluster)) } diff --git a/modules/transfer/sender/send_queues.go b/modules/transfer/sender/send_queues.go index 3b9bd0323..455588dbc 100644 --- a/modules/transfer/sender/send_queues.go +++ b/modules/transfer/sender/send_queues.go @@ -26,6 +26,11 @@ func initSendQueues() { JudgeQueues[node] = Q } + for node := range cfg.Judge.Cluster { + Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize) + EJudgeQueues[node] = Q + } + for node, nitem := range cfg.Graph.ClusterList { for _, addr := range nitem.Addrs { Q := nlist.NewSafeListLimited(DefaultSendQueueMaxSize) diff --git a/modules/transfer/sender/send_tasks.go b/modules/transfer/sender/send_tasks.go index d45070132..6ad5730ca 100644 --- a/modules/transfer/sender/send_tasks.go +++ b/modules/transfer/sender/send_tasks.go @@ -16,13 +16,14 @@ package sender import ( "bytes" + "log" + "time" + cmodel "github.com/open-falcon/falcon-plus/common/model" "github.com/open-falcon/falcon-plus/modules/transfer/g" "github.com/open-falcon/falcon-plus/modules/transfer/proc" nsema "github.com/toolkits/concurrent/semaphore" "github.com/toolkits/container/list" - "log" - "time" ) // send @@ -56,6 +57,11 @@ func startSendTasks() { go forward2JudgeTask(queue, node, judgeConcurrent) } + for node := range cfg.Judge.Cluster { + queue := EJudgeQueues[node] + go forward2EJudgeTask(queue, node, judgeConcurrent) + } + for node, nitem := range cfg.Graph.ClusterList { for _, addr := range nitem.Addrs { queue := GraphQueues[node+addr] @@ -70,8 +76,9 @@ func startSendTasks() { // Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge func forward2JudgeTask(Q *list.SafeListLimited, node string, concurrent int) { - batch := g.Config().Judge.Batch // 一次发送,最多batch条数据 - addr := g.Config().Judge.Cluster[node] + cfg := g.Config() + batch := cfg.Judge.Batch // 一次发送,最多batch条数据 + addr := cfg.Judge.Cluster[node] sema := nsema.NewSemaphore(concurrent) for { @@ -115,6 +122,53 @@ func forward2JudgeTask(Q *list.SafeListLimited, node string, concurrent int) { } } +// Judge定时任务, 将 Judge发送缓存中的数据 通过rpc连接池 发送到Judge +func forward2EJudgeTask(Q *list.SafeListLimited, node string, concurrent int) { + batch := g.Config().Judge.Batch // 一次发送,最多batch条数据 + addr := g.Config().Judge.Cluster[node] + sema := nsema.NewSemaphore(concurrent) + + for { + items := Q.PopBackBy(batch) + count := len(items) + if count == 0 { + time.Sleep(DefaultSendTaskSleepInterval) + continue + } + + judgeItems := make([]*cmodel.EMetric, count) + for i := 0; i < count; i++ { + judgeItems[i] = items[i].(*cmodel.EMetric) + } + + // 同步Call + 有限并发 进行发送 + sema.Acquire() + go func(addr string, judgeItems []*cmodel.EMetric, count int) { + defer sema.Release() + + resp := &cmodel.SimpleRpcResponse{} + var err error + sendOk := false + for i := 0; i < 3; i++ { //最多重试3次 + err = EJudgeConnPools.Call(addr, "Judge.SendE", judgeItems, resp) + if err == nil { + sendOk = true + break + } + time.Sleep(time.Millisecond * 10) + } + + // statistics + if !sendOk { + log.Printf("send judge %s:%s fail: %v", node, addr, err) + proc.SendToEJudgeFailCnt.IncrBy(int64(count)) + } else { + proc.SendToEJudgeCnt.IncrBy(int64(count)) + } + }(addr, judgeItems, count) + } +} + // Graph定时任务, 将 Graph发送缓存中的数据 通过rpc连接池 发送到Graph func forward2GraphTask(Q *list.SafeListLimited, node string, addr string, concurrent int) { batch := g.Config().Graph.Batch // 一次发送,最多batch条数据 diff --git a/modules/transfer/sender/sender.go b/modules/transfer/sender/sender.go index ef07ae84e..9fd84a240 100644 --- a/modules/transfer/sender/sender.go +++ b/modules/transfer/sender/sender.go @@ -16,13 +16,14 @@ package sender import ( "fmt" + "log" + backend "github.com/open-falcon/falcon-plus/common/backend_pool" cmodel "github.com/open-falcon/falcon-plus/common/model" "github.com/open-falcon/falcon-plus/modules/transfer/g" "github.com/open-falcon/falcon-plus/modules/transfer/proc" rings "github.com/toolkits/consistent/rings" nlist "github.com/toolkits/container/list" - "log" ) const ( @@ -37,22 +38,25 @@ var ( // 服务节点的一致性哈希环 // pk -> node var ( - JudgeNodeRing *rings.ConsistentHashNodeRing - GraphNodeRing *rings.ConsistentHashNodeRing + JudgeNodeRing *rings.ConsistentHashNodeRing + EJudgeNodeRing *rings.ConsistentHashNodeRing + GraphNodeRing *rings.ConsistentHashNodeRing ) // 发送缓存队列 // node -> queue_of_data var ( - TsdbQueue *nlist.SafeListLimited - JudgeQueues = make(map[string]*nlist.SafeListLimited) - GraphQueues = make(map[string]*nlist.SafeListLimited) + TsdbQueue *nlist.SafeListLimited + JudgeQueues = make(map[string]*nlist.SafeListLimited) + EJudgeQueues = make(map[string]*nlist.SafeListLimited) + GraphQueues = make(map[string]*nlist.SafeListLimited) ) // 连接池 // node_address -> connection_pool var ( JudgeConnPools *backend.SafeRpcConnPools + EJudgeConnPools *backend.SafeRpcConnPools TsdbConnPoolHelper *backend.TsdbConnPoolHelper GraphConnPools *backend.SafeRpcConnPools ) @@ -85,16 +89,19 @@ func Push2JudgeSendQueue(items []*cmodel.MetaData) { } // align ts - step := int(item.Step) - if step < MinStep { - step = MinStep - } - ts := alignTs(item.Timestamp, int64(step)) + //step := int(item.Step) + //if step < MinStep { + // step = MinStep + //} + //ts := alignTs(item.Timestamp, int64(step)) + + ts := item.Timestamp judgeItem := &cmodel.JudgeItem{ Endpoint: item.Endpoint, Metric: item.Metric, Value: item.Value, + ValueRaw: item.ValueRaw, Timestamp: ts, JudgeType: item.CounterType, Tags: item.Tags, @@ -109,6 +116,26 @@ func Push2JudgeSendQueue(items []*cmodel.MetaData) { } } +// 将数据 打入 某个Judge的发送缓存队列, 具体是哪一个Judge 由一致性哈希 决定 +func Push2EJudgeSendQueue(items []*cmodel.EMetric) { + for _, item := range items { + pk := item.PK() + node, err := EJudgeNodeRing.GetNode(pk) + if err != nil { + log.Println("EJudgeNodeRing.GetNode failed", err) + continue + } + + Q := EJudgeQueues[node] + isSuccess := Q.PushFront(item) + + // statistics + if !isSuccess { + proc.SendToEJudgeDropCnt.Incr() + } + } +} + // 将数据 打入 某个Graph的发送缓存队列, 具体是哪一个Graph 由一致性哈希 决定 func Push2GraphSendQueue(items []*cmodel.MetaData) { cfg := g.Config().Graph @@ -174,6 +201,11 @@ func convert2GraphItem(d *cmodel.MetaData) (*cmodel.GraphItem, error) { item.DsType = g.DERIVE item.Min = "0" item.Max = "U" + } else if d.CounterType == g.STRMATCH { + item.DsType = g.GAUGE + item.Min = "U" + item.Max = "U" + } else { return item, fmt.Errorf("not_supported_counter_type") } diff --git a/scripts/mysql/db_schema/2_portal-db-schema.sql b/scripts/mysql/db_schema/2_portal-db-schema.sql index 0ca6713e0..fbc9b7d75 100644 --- a/scripts/mysql/db_schema/2_portal-db-schema.sql +++ b/scripts/mysql/db_schema/2_portal-db-schema.sql @@ -88,7 +88,7 @@ CREATE TABLE `strategy` ( `tags` VARCHAR(256) NOT NULL DEFAULT '', `max_step` INT(11) NOT NULL DEFAULT '1', `priority` TINYINT(4) NOT NULL DEFAULT '0', - `func` VARCHAR(16) NOT NULL DEFAULT 'all(#1)', + `func` VARCHAR(256) NOT NULL DEFAULT 'all(#1)', `op` VARCHAR(8) NOT NULL DEFAULT '', `right_value` VARCHAR(64) NOT NULL, `note` VARCHAR(128) NOT NULL DEFAULT '', @@ -107,7 +107,7 @@ DROP TABLE IF EXISTS expression; CREATE TABLE `expression` ( `id` INT(10) UNSIGNED NOT NULL AUTO_INCREMENT, `expression` VARCHAR(1024) NOT NULL, - `func` VARCHAR(16) NOT NULL DEFAULT 'all(#1)', + `func` VARCHAR(128) NOT NULL DEFAULT 'all(#1)', `op` VARCHAR(8) NOT NULL DEFAULT '', `right_value` VARCHAR(16) NOT NULL DEFAULT '', `max_step` INT(11) NOT NULL DEFAULT '1', diff --git a/scripts/mysql/db_schema/5_alarms-db-schema.sql b/scripts/mysql/db_schema/5_alarms-db-schema.sql index 5314da472..dbb08a256 100644 --- a/scripts/mysql/db_schema/5_alarms-db-schema.sql +++ b/scripts/mysql/db_schema/5_alarms-db-schema.sql @@ -10,8 +10,8 @@ SET NAMES utf8; DROP TABLE IF EXISTS event_cases; CREATE TABLE IF NOT EXISTS event_cases( id VARCHAR(50), - endpoint VARCHAR(100) NOT NULL, - metric VARCHAR(200) NOT NULL, + endpoint VARCHAR(512) NOT NULL, + metric VARCHAR(1024) NOT NULL, func VARCHAR(50), cond VARCHAR(200) NOT NULL, note VARCHAR(500), diff --git a/scripts/mysql/db_schema/6_history-db-schema.sql b/scripts/mysql/db_schema/6_history-db-schema.sql new file mode 100644 index 000000000..3e9ab5157 --- /dev/null +++ b/scripts/mysql/db_schema/6_history-db-schema.sql @@ -0,0 +1,21 @@ +CREATE DATABASE history + DEFAULT CHARACTER SET utf8mb4 + DEFAULT COLLATE utf8mb4_unicode_ci; +USE history; +SET NAMES utf8mb4; + +CREATE TABLE IF NOT EXISTS history ( + `id` int(10) unsigned NOT NULL AUTO_INCREMENT, + `endpoint` varchar(255) NOT NULL DEFAULT '', + `step` int(11) not null default 60 comment 'in second', + `counter_type` varchar(16) not null comment 'GAUGE|COUNTER|DERIVE', + `metric` VARCHAR(128) NOT NULL DEFAULT '', + `value` VARCHAR(1024) NOT NULL DEFAULT '', + `tags` VARCHAR(1024) NOT NULL DEFAULT '', + `timestamp` int(11) DEFAULT NULL, + INDEX(`endpoint`), + INDEX(`metric`), + INDEX(`timestamp`), + PRIMARY KEY (`id`) +) ENGINE =InnoDB DEFAULT CHARSET =utf8mb4; + diff --git a/scripts/mysql/db_schema/mmee.alarms.sql b/scripts/mysql/db_schema/mmee.alarms.sql new file mode 100644 index 000000000..787f67397 --- /dev/null +++ b/scripts/mysql/db_schema/mmee.alarms.sql @@ -0,0 +1,52 @@ +/* +* 建立告警归档资料表, 主要存储各个告警的最后触发状况 +*/ +CREATE TABLE IF NOT EXISTS event_cases( + id VARCHAR(50), + endpoint VARCHAR(512) NOT NULL, + metric VARCHAR(1024) NOT NULL, + func VARCHAR(512), + cond VARCHAR(200) NOT NULL, + note VARCHAR(500), + max_step int(10) unsigned, + current_step int(10) unsigned, + priority INT(6) NOT NULL, + status VARCHAR(20) NOT NULL, + timestamp Timestamp NOT NULL, + update_at Timestamp NULL DEFAULT NULL, + closed_at Timestamp NULL DEFAULT NULL, + closed_note VARCHAR(250), + user_modified int(10) unsigned, + tpl_creator VARCHAR(64), + expression_id int(10) unsigned, + strategy_id int(10) unsigned, + template_id int(10) unsigned, + process_note MEDIUMINT, + process_status VARCHAR(20) DEFAULT 'unresolved', + PRIMARY KEY (id), + INDEX (endpoint, strategy_id, template_id) +) + ENGINE =InnoDB + DEFAULT CHARSET =utf8; + + +/* +* 建立告警归档资料表, 存储各个告警触发状况的历史状态 +*/ +CREATE TABLE IF NOT EXISTS events ( + id int(10) NOT NULL AUTO_INCREMENT, + event_caseId VARCHAR(50), + step int(10) unsigned, + cond VARCHAR(200) NOT NULL, + status int(3) unsigned DEFAULT 0, + timestamp Timestamp, + PRIMARY KEY (id), + INDEX(event_caseId), + FOREIGN KEY (event_caseId) REFERENCES event_cases(id) + ON DELETE CASCADE + ON UPDATE CASCADE +) + ENGINE =InnoDB + DEFAULT CHARSET =utf8; + + diff --git a/scripts/mysql/db_schema/mmee.eexp.sql b/scripts/mysql/db_schema/mmee.eexp.sql new file mode 100644 index 000000000..461fd8c0e --- /dev/null +++ b/scripts/mysql/db_schema/mmee.eexp.sql @@ -0,0 +1,81 @@ +-- +-- CREATE DATABASE alarms +-- DEFAULT CHARACTER SET utf8 +-- DEFAULT COLLATE utf8_general_ci; +-- USE alarms; +-- SET NAMES utf8; +-- + +/* +* 建立告警归档资料表, 主要存储各个告警的最后触发状况 +*/ +DROP TABLE IF EXISTS event_cases; +CREATE TABLE IF NOT EXISTS event_cases( + id VARCHAR(50), + endpoint VARCHAR(512) NOT NULL, + metric VARCHAR(1024) NOT NULL, + func VARCHAR(50), + cond VARCHAR(200) NOT NULL, + note VARCHAR(500), + max_step int(10) unsigned, + current_step int(10) unsigned, + priority INT(6) NOT NULL, + status VARCHAR(20) NOT NULL, + timestamp Timestamp NOT NULL, + update_at Timestamp NULL DEFAULT NULL, + closed_at Timestamp NULL DEFAULT NULL, + closed_note VARCHAR(250), + user_modified int(10) unsigned, + tpl_creator VARCHAR(64), + expression_id int(10) unsigned, + strategy_id int(10) unsigned, + template_id int(10) unsigned, + process_note MEDIUMINT, + process_status VARCHAR(20) DEFAULT 'unresolved', + PRIMARY KEY (id), + INDEX (endpoint, strategy_id, template_id) +) + ENGINE =InnoDB + DEFAULT CHARSET =utf8; + + +/* +* 建立告警归档资料表, 存储各个告警触发状况的历史状态 +*/ +DROP TABLE IF EXISTS events; +CREATE TABLE IF NOT EXISTS events ( + id int(10) NOT NULL AUTO_INCREMENT, + event_caseId VARCHAR(50), + step int(10) unsigned, + cond VARCHAR(200) NOT NULL, + status int(3) unsigned DEFAULT 0, + timestamp Timestamp, + PRIMARY KEY (id), + INDEX(event_caseId), + FOREIGN KEY (event_caseId) REFERENCES event_cases(id) + ON DELETE CASCADE + ON UPDATE CASCADE +) + ENGINE =InnoDB + DEFAULT CHARSET =utf8; + +/* +* 告警留言表 +*/ +CREATE TABLE IF NOT EXISTS event_note ( + id MEDIUMINT NOT NULL AUTO_INCREMENT, + event_caseId VARCHAR(50), + note VARCHAR(300), + case_id VARCHAR(20), + status VARCHAR(15), + timestamp Timestamp, + user_id int(10) unsigned, + PRIMARY KEY (id), + INDEX (event_caseId), + FOREIGN KEY (event_caseId) REFERENCES event_cases(id) + ON DELETE CASCADE + ON UPDATE CASCADE, + FOREIGN KEY (user_id) REFERENCES user(id) + ON DELETE CASCADE + ON UPDATE CASCADE +); diff --git a/vendor/github.com/chyeh/viper/util.go b/vendor/github.com/chyeh/viper/util.go index ed3bcd486..9983da7a6 100644 --- a/vendor/github.com/chyeh/viper/util.go +++ b/vendor/github.com/chyeh/viper/util.go @@ -22,9 +22,9 @@ import ( "unicode" "github.com/BurntSushi/toml" + "github.com/chyeh/cast" "github.com/hashicorp/hcl" "github.com/magiconair/properties" - "github.com/chyeh/cast" jww "github.com/spf13/jwalterweatherman" "gopkg.in/yaml.v2" ) diff --git a/vendor/github.com/chyeh/viper/viper.go b/vendor/github.com/chyeh/viper/viper.go index 3644a89fb..12cdfbb11 100644 --- a/vendor/github.com/chyeh/viper/viper.go +++ b/vendor/github.com/chyeh/viper/viper.go @@ -31,9 +31,9 @@ import ( "strings" "time" + "github.com/chyeh/cast" "github.com/fsnotify/fsnotify" "github.com/mitchellh/mapstructure" - "github.com/chyeh/cast" jww "github.com/spf13/jwalterweatherman" "github.com/spf13/pflag" ) diff --git a/vendor/github.com/emirpasic/gods/maps/hashmap/hashmap.go b/vendor/github.com/emirpasic/gods/maps/hashmap/hashmap.go index 3f42ffc10..77ae819a5 100644 --- a/vendor/github.com/emirpasic/gods/maps/hashmap/hashmap.go +++ b/vendor/github.com/emirpasic/gods/maps/hashmap/hashmap.go @@ -13,6 +13,7 @@ package hashmap import ( "fmt" + "github.com/emirpasic/gods/maps" ) diff --git a/vendor/github.com/emirpasic/gods/maps/hashmap/serialization.go b/vendor/github.com/emirpasic/gods/maps/hashmap/serialization.go index b8e9026e8..24f745410 100644 --- a/vendor/github.com/emirpasic/gods/maps/hashmap/serialization.go +++ b/vendor/github.com/emirpasic/gods/maps/hashmap/serialization.go @@ -6,6 +6,7 @@ package hashmap import ( "encoding/json" + "github.com/emirpasic/gods/containers" "github.com/emirpasic/gods/utils" ) diff --git a/vendor/github.com/gin-gonic/gin/mode.go b/vendor/github.com/gin-gonic/gin/mode.go index c600b7b5b..e24dbdc2c 100644 --- a/vendor/github.com/gin-gonic/gin/mode.go +++ b/vendor/github.com/gin-gonic/gin/mode.go @@ -19,9 +19,9 @@ const ( TestMode string = "test" ) const ( - debugCode = iota - releaseCode - testCode + debugCode = iota + releaseCode + testCode ) // DefaultWriter is the default io.Writer used the Gin for debug output and diff --git a/vendor/github.com/pelletier/go-toml/tomltree_write.go b/vendor/github.com/pelletier/go-toml/tomltree_write.go index 6a7fa1745..4df87eba0 100644 --- a/vendor/github.com/pelletier/go-toml/tomltree_write.go +++ b/vendor/github.com/pelletier/go-toml/tomltree_write.go @@ -4,11 +4,11 @@ import ( "bytes" "fmt" "io" + "reflect" "sort" "strconv" "strings" "time" - "reflect" ) // encodes a string to a TOML-compliant string value diff --git a/vendor/github.com/smartystreets/assertions/messages.go b/vendor/github.com/smartystreets/assertions/messages.go index f03eb43e0..ae64db3f9 100644 --- a/vendor/github.com/smartystreets/assertions/messages.go +++ b/vendor/github.com/smartystreets/assertions/messages.go @@ -78,8 +78,8 @@ const ( // type checking shouldCompareWithInterfacePointer = "The expected value must be a pointer to an interface type (eg. *fmt.Stringer)" shouldNotBeNilActual = "The actual value was 'nil' and should be a value or a pointer to a value!" - shouldBeError = "Expected error to occur!" - shouldBeErrorExpectedMessage = "Expected error '%v' to occur (but got error: '%v')!" + shouldBeError = "Expected error to occur!" + shouldBeErrorExpectedMessage = "Expected error '%v' to occur (but got error: '%v')!" ) const ( // time comparisons diff --git a/vendor/github.com/toolkits/concurrent/semaphore/semaphore.go b/vendor/github.com/toolkits/concurrent/semaphore/semaphore.go index a760bc37b..cc6b728d3 100644 --- a/vendor/github.com/toolkits/concurrent/semaphore/semaphore.go +++ b/vendor/github.com/toolkits/concurrent/semaphore/semaphore.go @@ -1,7 +1,5 @@ package semaphore -import () - type Semaphore struct { bufSize int channel chan int8 diff --git a/vendor/github.com/toolkits/cron/semaphore.go b/vendor/github.com/toolkits/cron/semaphore.go index e6981447d..a45381cff 100644 --- a/vendor/github.com/toolkits/cron/semaphore.go +++ b/vendor/github.com/toolkits/cron/semaphore.go @@ -1,7 +1,5 @@ package cron -import () - type Semaphore struct { bufSize int channel chan int8 diff --git a/vendor/github.com/toolkits/file/downloader.go b/vendor/github.com/toolkits/file/downloader.go index 4d6e17c6f..7aa76b8af 100644 --- a/vendor/github.com/toolkits/file/downloader.go +++ b/vendor/github.com/toolkits/file/downloader.go @@ -1,9 +1,9 @@ package file import ( - "io" - "net/http" - "os" + "io" + "net/http" + "os" ) func Download(toFile, url string) error { diff --git a/vendor/github.com/toolkits/http/httpclient/httpclientpool.go b/vendor/github.com/toolkits/http/httpclient/httpclientpool.go index 9d7bc61d8..6da951bf4 100644 --- a/vendor/github.com/toolkits/http/httpclient/httpclientpool.go +++ b/vendor/github.com/toolkits/http/httpclient/httpclientpool.go @@ -1,9 +1,10 @@ package httpclient import ( - "github.com/toolkits/container/nmap" "net/http" "time" + + "github.com/toolkits/container/nmap" ) // diff --git a/vendor/github.com/toolkits/nux/cpuinfo.go b/vendor/github.com/toolkits/nux/cpuinfo.go index 66ff4ccbc..d79d55ef3 100644 --- a/vendor/github.com/toolkits/nux/cpuinfo.go +++ b/vendor/github.com/toolkits/nux/cpuinfo.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/toolkits/file" "io" "io/ioutil" "runtime" "strings" + + "github.com/toolkits/file" ) func NumCpu() int { diff --git a/vendor/github.com/toolkits/nux/cpustat.go b/vendor/github.com/toolkits/nux/cpustat.go index 6651462be..14b01c380 100644 --- a/vendor/github.com/toolkits/nux/cpustat.go +++ b/vendor/github.com/toolkits/nux/cpustat.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" + + "github.com/toolkits/file" ) type CpuUsage struct { diff --git a/vendor/github.com/toolkits/nux/dfstat_linux.go b/vendor/github.com/toolkits/nux/dfstat_linux.go index 9f8e46dce..bcff5803c 100644 --- a/vendor/github.com/toolkits/nux/dfstat_linux.go +++ b/vendor/github.com/toolkits/nux/dfstat_linux.go @@ -3,11 +3,12 @@ package nux import ( "bufio" "bytes" - "github.com/toolkits/file" "io" "io/ioutil" "strings" "syscall" + + "github.com/toolkits/file" ) // return: [][$fs_spec, $fs_file, $fs_vfstype] diff --git a/vendor/github.com/toolkits/nux/ifstat.go b/vendor/github.com/toolkits/nux/ifstat.go index 24e4d200f..c00ed60fb 100644 --- a/vendor/github.com/toolkits/nux/ifstat.go +++ b/vendor/github.com/toolkits/nux/ifstat.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" + + "github.com/toolkits/file" ) const ( diff --git a/vendor/github.com/toolkits/nux/iostat.go b/vendor/github.com/toolkits/nux/iostat.go index 7ebb67a56..61b56ebb1 100644 --- a/vendor/github.com/toolkits/nux/iostat.go +++ b/vendor/github.com/toolkits/nux/iostat.go @@ -4,12 +4,13 @@ import ( "bufio" "bytes" "fmt" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" "time" + + "github.com/toolkits/file" ) type DiskStats struct { diff --git a/vendor/github.com/toolkits/nux/kernel.go b/vendor/github.com/toolkits/nux/kernel.go index d56ba8fd1..018bd591e 100644 --- a/vendor/github.com/toolkits/nux/kernel.go +++ b/vendor/github.com/toolkits/nux/kernel.go @@ -2,10 +2,11 @@ package nux import ( "fmt" - "github.com/toolkits/file" "os" "strconv" "strings" + + "github.com/toolkits/file" ) func KernelMaxFiles() (uint64, error) { diff --git a/vendor/github.com/toolkits/nux/loadavg.go b/vendor/github.com/toolkits/nux/loadavg.go index 4388f6cc5..539c028a0 100644 --- a/vendor/github.com/toolkits/nux/loadavg.go +++ b/vendor/github.com/toolkits/nux/loadavg.go @@ -2,9 +2,10 @@ package nux import ( "fmt" - "github.com/toolkits/file" "strconv" "strings" + + "github.com/toolkits/file" ) type Loadavg struct { diff --git a/vendor/github.com/toolkits/nux/meminfo.go b/vendor/github.com/toolkits/nux/meminfo.go index 25903498d..88942ea6b 100644 --- a/vendor/github.com/toolkits/nux/meminfo.go +++ b/vendor/github.com/toolkits/nux/meminfo.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" + + "github.com/toolkits/file" ) type Mem struct { diff --git a/vendor/github.com/toolkits/nux/netstat.go b/vendor/github.com/toolkits/nux/netstat.go index 4bf37290d..ec74b01c2 100644 --- a/vendor/github.com/toolkits/nux/netstat.go +++ b/vendor/github.com/toolkits/nux/netstat.go @@ -3,11 +3,12 @@ package nux import ( "bufio" "bytes" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" + + "github.com/toolkits/file" ) // @param ext e.g. TcpExt or IpExt diff --git a/vendor/github.com/toolkits/nux/proc.go b/vendor/github.com/toolkits/nux/proc.go index 3c2defe54..d9ecbaea8 100644 --- a/vendor/github.com/toolkits/nux/proc.go +++ b/vendor/github.com/toolkits/nux/proc.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" + + "github.com/toolkits/file" ) type Proc struct { diff --git a/vendor/github.com/toolkits/nux/snmp.go b/vendor/github.com/toolkits/nux/snmp.go index b45a4d1ec..5a3278198 100644 --- a/vendor/github.com/toolkits/nux/snmp.go +++ b/vendor/github.com/toolkits/nux/snmp.go @@ -3,11 +3,12 @@ package nux import ( "bufio" "bytes" - "github.com/toolkits/file" "io" "io/ioutil" "strconv" "strings" + + "github.com/toolkits/file" ) func Snmp(title string) (ret map[string]int64, err error) { diff --git a/vendor/github.com/toolkits/nux/ss_s.go b/vendor/github.com/toolkits/nux/ss_s.go index 404115e57..e1391ff7a 100644 --- a/vendor/github.com/toolkits/nux/ss_s.go +++ b/vendor/github.com/toolkits/nux/ss_s.go @@ -3,10 +3,11 @@ package nux import ( "bufio" "bytes" - "github.com/toolkits/file" - "github.com/toolkits/sys" "strconv" "strings" + + "github.com/toolkits/file" + "github.com/toolkits/sys" ) func SocketStatSummary() (m map[string]uint64, err error) { diff --git a/vendor/github.com/toolkits/nux/system.go b/vendor/github.com/toolkits/nux/system.go index b0b6cead7..9954b7491 100644 --- a/vendor/github.com/toolkits/nux/system.go +++ b/vendor/github.com/toolkits/nux/system.go @@ -2,9 +2,10 @@ package nux import ( "fmt" - "github.com/toolkits/file" "strconv" "strings" + + "github.com/toolkits/file" ) func SystemUptime() (days, hours, mins int64, err error) { diff --git a/vendor/github.com/toolkits/proc/counter.go b/vendor/github.com/toolkits/proc/counter.go index 2cf4725ce..9e42a61cc 100644 --- a/vendor/github.com/toolkits/proc/counter.go +++ b/vendor/github.com/toolkits/proc/counter.go @@ -1,9 +1,10 @@ package proc import ( - ntime "github.com/toolkits/time" "sync" "time" + + ntime "github.com/toolkits/time" ) const ( diff --git a/vendor/vendor.json b/vendor/vendor.json index a47506ae9..2bd015bbd 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -192,6 +192,24 @@ "revision": "1c35d901db3da928c72a72d8458480cc9ade058f", "revisionTime": "2017-01-02T12:52:26Z" }, + { + "checksumSHA1": "htwRqmZAo4/R9oNE1TGIECyTj4w=", + "path": "github.com/jmoiron/sqlx", + "revision": "cf35089a197953c69420c8d0cecda90809764b1d", + "revisionTime": "2018-02-28T18:46:24Z" + }, + { + "checksumSHA1": "bXFrevmVL5Q2EwYlRHlPihxvAJA=", + "path": "github.com/jmoiron/sqlx/reflectx", + "revision": "cf35089a197953c69420c8d0cecda90809764b1d", + "revisionTime": "2018-02-28T18:46:24Z" + }, + { + "checksumSHA1": "q+dertYFOBviMOPcBLeMtXPHh6U=", + "path": "github.com/jmoiron/sqlx/types", + "revision": "cf35089a197953c69420c8d0cecda90809764b1d", + "revisionTime": "2018-02-28T18:46:24Z" + }, { "checksumSHA1": "Js/yx9fZ3+wH1wZpHNIxSTMIaCg=", "path": "github.com/jtolds/gls",