From a2caec038e6a6e61c5d5f056c98124a017e9a976 Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Tue, 2 Jul 2019 21:37:11 +0800 Subject: [PATCH 1/4] use kratos --- cmd/discovery/main.go | 9 ++- conf/conf.go | 15 ++-- discovery/discovery.go | 18 +++-- discovery/register.go | 26 +++---- discovery/register_test.go | 57 +++++++-------- discovery/syncup.go | 31 ++++---- errors/errors.go | 55 --------------- go.mod | 14 ++-- go.sum | 71 +++++++++++++------ http/discovery.go | 93 ++++++++++++------------ http/http.go | 116 +++++++++--------------------- lib/http/client.go | 140 ------------------------------------- lib/time/time.go | 17 ----- model/instance.go | 21 +++--- model/param.go | 4 ++ naming/client.go | 46 ++++++------ naming/client_test.go | 13 ++-- naming/grpc/resolver.go | 4 +- registry/guard.go | 4 +- registry/node.go | 27 ++++--- registry/node_test.go | 64 +++++++++-------- registry/registry.go | 34 +++++---- registry/registry_test.go | 32 ++++----- registry/scheduler.go | 4 +- 24 files changed, 348 insertions(+), 567 deletions(-) delete mode 100644 errors/errors.go delete mode 100644 lib/http/client.go delete mode 100644 lib/time/time.go diff --git a/cmd/discovery/main.go b/cmd/discovery/main.go index 78a4d57..0e3ae03 100644 --- a/cmd/discovery/main.go +++ b/cmd/discovery/main.go @@ -10,16 +10,16 @@ import ( "github.com/bilibili/discovery/conf" "github.com/bilibili/discovery/discovery" "github.com/bilibili/discovery/http" - - log "github.com/golang/glog" + log "github.com/bilibili/kratos/pkg/log" ) func main() { flag.Parse() if err := conf.Init(); err != nil { - log.Errorf("conf.Init() error(%v)", err) + log.Error("conf.Init() error(%v)", err) panic(err) } + log.Init(conf.Conf.Log) dis, cancel := discovery.New(conf.Conf) http.Init(conf.Conf, dis) // init signal @@ -27,13 +27,12 @@ func main() { signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT) for { s := <-c - log.Infof("discovery get a signal %s", s.String()) + log.Info("discovery get a signal %s", s.String()) switch s { case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT: cancel() time.Sleep(time.Second) log.Info("discovery quit !!!") - log.Flush() return case syscall.SIGHUP: default: diff --git a/conf/conf.go b/conf/conf.go index c193e94..2776274 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -5,9 +5,9 @@ import ( "io/ioutil" "os" - "github.com/bilibili/discovery/lib/http" - "github.com/BurntSushi/toml" + log "github.com/bilibili/kratos/pkg/log" + http "github.com/bilibili/kratos/pkg/net/http/blademaster" ) var ( @@ -27,9 +27,6 @@ func init() { hostname = os.Getenv("HOSTNAME") } flag.StringVar(&confPath, "conf", "discovery-example.toml", "config path") - flag.StringVar(®ion, "region", os.Getenv("REGION"), "avaliable region. or use REGION env variable, value: sh etc.") - flag.StringVar(&zone, "zone", os.Getenv("ZONE"), "avaliable zone. or use ZONE env variable, value: sh001/sh002 etc.") - flag.StringVar(&deployEnv, "deploy.env", os.Getenv("DEPLOY_ENV"), "deploy env. or use DEPLOY_ENV env variable, value: dev/fat1/uat/pre/prod etc.") flag.StringVar(&hostname, "hostname", hostname, "machine hostname") flag.StringVar(&schedulerPath, "scheduler", "scheduler.json", "scheduler info") } @@ -38,9 +35,10 @@ func init() { type Config struct { Nodes []string Zones map[string][]string - HTTPServer *ServerConfig + HTTPServer *http.ServerConfig HTTPClient *http.ClientConfig Env *Env + Log *log.Config Scheduler []byte } @@ -72,11 +70,6 @@ type Env struct { DeployEnv string } -// ServerConfig Http Servers conf. -type ServerConfig struct { - Addr string -} - // Init init conf func Init() (err error) { if _, err = toml.DecodeFile(confPath, &Conf); err != nil { diff --git a/discovery/discovery.go b/discovery/discovery.go index 34be0a0..1041190 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -3,18 +3,20 @@ package discovery import ( "context" "sync/atomic" + "time" "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/lib/http" "github.com/bilibili/discovery/registry" + http "github.com/bilibili/kratos/pkg/net/http/blademaster" ) // Discovery discovery. type Discovery struct { - c *conf.Config - client *http.Client - registry *registry.Registry - nodes atomic.Value + c *conf.Config + protected bool + client *http.Client + registry *registry.Registry + nodes atomic.Value } // New get a discovery. @@ -28,5 +30,11 @@ func New(c *conf.Config) (d *Discovery, cancel context.CancelFunc) { d.syncUp() cancel = d.regSelf() go d.nodesproc() + go d.exitProtect() return } + +func (d *Discovery) exitProtect() { + time.Sleep(time.Second * 90) + d.protected = false +} diff --git a/discovery/register.go b/discovery/register.go index 59d2683..e9e9f3d 100644 --- a/discovery/register.go +++ b/discovery/register.go @@ -3,18 +3,18 @@ package discovery import ( "context" - "github.com/bilibili/discovery/errors" "github.com/bilibili/discovery/model" "github.com/bilibili/discovery/registry" + "github.com/bilibili/kratos/pkg/ecode" - log "github.com/golang/glog" + log "github.com/bilibili/kratos/pkg/log" ) // Register a new instance. -func (d *Discovery) Register(c context.Context, ins *model.Instance, latestTimestamp int64, replication bool) { +func (d *Discovery) Register(c context.Context, ins *model.Instance, latestTimestamp int64, replication bool, fromzone bool) { _ = d.registry.Register(ins, latestTimestamp) if !replication { - _ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Register, ins, ins.Zone != d.c.Env.Zone) + _ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Register, ins, fromzone) } } @@ -22,8 +22,8 @@ func (d *Discovery) Register(c context.Context, ins *model.Instance, latestTimes func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Instance, err error) { i, ok := d.registry.Renew(arg) if !ok { - err = errors.NothingFound - log.Errorf("renew appid(%s) hostname(%s) zone(%s) env(%s) error", arg.AppID, arg.Hostname, arg.Zone, arg.Env) + err = ecode.NothingFound + log.Error("renew appid(%s) hostname(%s) zone(%s) env(%s) error", arg.AppID, arg.Hostname, arg.Zone, arg.Env) return } if !arg.Replication { @@ -31,9 +31,9 @@ func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Inst return } if arg.DirtyTimestamp > i.DirtyTimestamp { - err = errors.NothingFound + err = ecode.NothingFound } else if arg.DirtyTimestamp < i.DirtyTimestamp { - err = errors.Conflict + err = ecode.Conflict } return } @@ -42,8 +42,8 @@ func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Inst func (d *Discovery) Cancel(c context.Context, arg *model.ArgCancel) (err error) { i, ok := d.registry.Cancel(arg) if !ok { - err = errors.NothingFound - log.Errorf("cancel appid(%s) hostname(%s) error", arg.AppID, arg.Hostname) + err = ecode.NothingFound + log.Error("cancel appid(%s) hostname(%s) error", arg.AppID, arg.Hostname) return } if !arg.Replication { @@ -68,7 +68,7 @@ func (d *Discovery) Fetchs(c context.Context, arg *model.ArgFetchs) (is map[stri for _, appid := range arg.AppID { i, err := d.registry.Fetch(arg.Zone, arg.Env, appid, 0, arg.Status) if err != nil { - log.Errorf("Fetchs fetch appid(%v) err", err) + log.Error("Fetchs fetch appid(%v) err", err) continue } is[appid] = i @@ -77,7 +77,7 @@ func (d *Discovery) Fetchs(c context.Context, arg *model.ArgFetchs) (is map[stri } // Polls hangs request and then write instances when that has changes, or return NotModified. -func (d *Discovery) Polls(c context.Context, arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, err error) { +func (d *Discovery) Polls(c context.Context, arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, miss string, err error) { return d.registry.Polls(arg) } @@ -94,7 +94,7 @@ func (d *Discovery) Nodes(c context.Context) (nsi []*model.Node) { // Set set metadata,color,status of instance. func (d *Discovery) Set(c context.Context, arg *model.ArgSet) (err error) { if !d.registry.Set(arg) { - err = errors.ParamsErr + err = ecode.RequestErr } return } diff --git a/discovery/register_test.go b/discovery/register_test.go index 4fe1143..7201196 100644 --- a/discovery/register_test.go +++ b/discovery/register_test.go @@ -8,11 +8,11 @@ import ( "time" dc "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/errors" - "github.com/bilibili/discovery/lib/http" - xtime "github.com/bilibili/discovery/lib/time" "github.com/bilibili/discovery/model" + "github.com/bilibili/kratos/pkg/ecode" + http "github.com/bilibili/kratos/pkg/net/http/blademaster" + xtime "github.com/bilibili/kratos/pkg/time" . "github.com/smartystreets/goconvey/convey" gock "gopkg.in/h2non/gock.v1" ) @@ -70,10 +70,11 @@ var config = newConfig() func newConfig() *dc.Config { c := &dc.Config{ HTTPClient: &http.ClientConfig{ + Timeout: xtime.Duration(time.Second * 30), Dial: xtime.Duration(time.Second), KeepAlive: xtime.Duration(time.Second * 30), }, - HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, + HTTPServer: &http.ServerConfig{Addr: "127.0.0.1:7171"}, Nodes: []string{"127.0.0.1:7171", "127.0.0.1:7172"}, Env: &dc.Env{ Zone: "sh001", @@ -105,7 +106,7 @@ func TestRegister(t *testing.T) { svr.client.SetTransport(gock.DefaultTransport) svr.syncUp() i := model.NewInstance(reg) - svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication) + svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication, true) ins, err := svr.Fetch(context.TODO(), fet) So(err, ShouldBeNil) So(len(ins.Instances), ShouldResemble, 1) @@ -141,9 +142,9 @@ func TestDiscovery(t *testing.T) { reg2.Hostname = "test2" i1 := model.NewInstance(reg) i2 := model.NewInstance(reg2) - svr.Register(context.TODO(), i1, reg.LatestTimestamp, reg.Replication) - svr.Register(context.TODO(), i2, reg2.LatestTimestamp, reg.Replication) - ch, new, err := svr.Polls(context.TODO(), pollArg) + svr.Register(context.TODO(), i1, reg.LatestTimestamp, reg.Replication, reg.FromZone) + svr.Register(context.TODO(), i2, reg2.LatestTimestamp, reg.Replication, reg.FromZone) + ch, new, _, err := svr.Polls(context.TODO(), pollArg) So(err, ShouldBeNil) So(new, ShouldBeTrue) ins := <-ch @@ -152,7 +153,7 @@ func TestDiscovery(t *testing.T) { time.Sleep(time.Second) err = svr.Cancel(context.TODO(), cancel) So(err, ShouldBeNil) - ch, new, err = svr.Polls(context.TODO(), pollArg) + ch, new, _, err = svr.Polls(context.TODO(), pollArg) So(err, ShouldBeNil) So(new, ShouldBeTrue) ins = <-ch @@ -168,8 +169,8 @@ func TestFetchs(t *testing.T) { reg2.AppID = "appid2" i1 := model.NewInstance(reg) i2 := model.NewInstance(reg2) - svr.Register(context.TODO(), i1, reg.LatestTimestamp, reg.Replication) - svr.Register(context.TODO(), i2, reg2.LatestTimestamp, reg.Replication) + svr.Register(context.TODO(), i1, reg.LatestTimestamp, reg.Replication, reg.FromZone) + svr.Register(context.TODO(), i2, reg2.LatestTimestamp, reg.Replication, reg.FromZone) fetchs := newFetchArg() fetchs.AppID = append(fetchs.AppID, "appid2") is, err := svr.Fetchs(ctx, fetchs) @@ -186,15 +187,15 @@ func TestZones(t *testing.T) { reg2.Zone = "sh002" i1 := model.NewInstance(reg) i2 := model.NewInstance(reg2) - svr.Register(context.TODO(), i1, reg.LatestTimestamp, reg.Replication) - svr.Register(context.TODO(), i2, reg2.LatestTimestamp, reg2.Replication) - ch, new, err := svr.Polls(context.TODO(), newPoll()) + svr.Register(context.TODO(), i1, reg.LatestTimestamp, reg.Replication, reg.FromZone) + svr.Register(context.TODO(), i2, reg2.LatestTimestamp, reg2.Replication, reg2.FromZone) + ch, new, _, err := svr.Polls(context.TODO(), newPoll()) So(err, ShouldBeNil) So(new, ShouldBeTrue) ins := <-ch So(len(ins["main.arch.test"].Instances), ShouldEqual, 2) pollArg.Zone = "sh002" - ch, new, err = svr.Polls(context.TODO(), newPoll()) + ch, new, _, err = svr.Polls(context.TODO(), newPoll()) So(err, ShouldBeNil) So(new, ShouldBeTrue) ins = <-ch @@ -206,16 +207,16 @@ func TestZones(t *testing.T) { reg3.Zone = "sh002" reg3.Hostname = "test03" i3 := model.NewInstance(reg3) - svr.Register(context.TODO(), i3, reg3.LatestTimestamp, reg3.Replication) - ch, _, err = svr.Polls(context.TODO(), pollArg) + svr.Register(context.TODO(), i3, reg3.LatestTimestamp, reg3.Replication, reg3.FromZone) + ch, _, _, err = svr.Polls(context.TODO(), pollArg) So(err, ShouldBeNil) ins = <-ch So(len(ins["main.arch.test"].Instances), ShouldResemble, 2) So(len(ins["main.arch.test"].Instances["sh002"]), ShouldResemble, 2) So(len(ins["main.arch.test"].Instances["sh001"]), ShouldResemble, 1) pollArg.LatestTimestamp = []int64{ins["main.arch.test"].LatestTimestamp} - _, _, err = svr.Polls(context.TODO(), pollArg) - So(err, ShouldResemble, errors.NotModified) + _, _, _, err = svr.Polls(context.TODO(), pollArg) + So(err, ShouldResemble, ecode.NotModified) }) }) } @@ -225,20 +226,20 @@ func TestRenew(t *testing.T) { defer cancel() svr.client.SetTransport(gock.DefaultTransport) i := model.NewInstance(reg) - svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication) + svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication, reg.FromZone) _, err := svr.Renew(context.TODO(), rew) So(err, ShouldBeNil) rew2.AppID = "main.arch.noexist" _, err = svr.Renew(context.TODO(), rew2) - So(err, ShouldResemble, errors.NothingFound) + So(err, ShouldResemble, ecode.NothingFound) rew2.AppID = "main.arch.test" rew2.DirtyTimestamp = 1 rew2.Replication = true _, err = svr.Renew(context.TODO(), rew2) - So(err, ShouldResemble, errors.Conflict) + So(err, ShouldResemble, ecode.Conflict) rew2.DirtyTimestamp = time.Now().UnixNano() _, err = svr.Renew(context.TODO(), rew2) - So(err, ShouldResemble, errors.NothingFound) + So(err, ShouldResemble, ecode.NothingFound) }) } @@ -248,13 +249,13 @@ func TestCancel(t *testing.T) { defer disCancel() svr.client.SetTransport(gock.DefaultTransport) i := model.NewInstance(reg) - svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication) + svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication, reg.FromZone) err := svr.Cancel(context.TODO(), cancel) So(err, ShouldBeNil) err = svr.Cancel(context.TODO(), cancel) - So(err, ShouldResemble, errors.NothingFound) + So(err, ShouldResemble, ecode.NothingFound) _, err = svr.Fetch(context.TODO(), fet) - So(err, ShouldResemble, errors.NothingFound) + So(err, ShouldResemble, ecode.NothingFound) }) } @@ -264,7 +265,7 @@ func TestFetchAll(t *testing.T) { defer cancel() svr.client.SetTransport(gock.DefaultTransport) i := model.NewInstance(reg) - svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication) + svr.Register(context.TODO(), i, reg.LatestTimestamp, reg.Replication, reg.FromZone) fs := svr.FetchAll(context.TODO())[i.AppID] So(len(fs), ShouldResemble, 1) }) @@ -275,7 +276,7 @@ func TestNodes(t *testing.T) { svr, cancel := New(config) defer cancel() svr.client.SetTransport(gock.DefaultTransport) - svr.Register(context.Background(), defRegDiscovery(), time.Now().UnixNano(), false) + svr.Register(context.Background(), defRegDiscovery(), time.Now().UnixNano(), false, true) time.Sleep(time.Second) ns := svr.Nodes(context.TODO()) So(len(ns), ShouldResemble, 2) diff --git a/discovery/syncup.go b/discovery/syncup.go index be89e5b..b44808b 100644 --- a/discovery/syncup.go +++ b/discovery/syncup.go @@ -7,17 +7,24 @@ import ( "time" "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/errors" "github.com/bilibili/discovery/model" "github.com/bilibili/discovery/registry" + "github.com/bilibili/kratos/pkg/ecode" - log "github.com/golang/glog" + log "github.com/bilibili/kratos/pkg/log" ) var ( _fetchAllURL = "http://%s/discovery/fetch/all" ) +// Protected return if service in init protect mode. +// if service in init protect mode,only support write, +// read operator isn't supported. +func (d *Discovery) Protected() bool { + return d.protected +} + // syncUp populates the registry information from a peer eureka node. func (d *Discovery) syncUp() { nodes := d.nodes.Load().(*registry.Nodes) @@ -31,11 +38,11 @@ func (d *Discovery) syncUp() { Data map[string][]*model.Instance `json:"data"` } if err := d.client.Get(context.TODO(), uri, "", nil, &res); err != nil { - log.Errorf("d.client.Get(%v) error(%v)", uri, err) + log.Error("d.client.Get(%v) error(%v)", uri, err) continue } if res.Code != 0 { - log.Errorf("service syncup from(%s) failed ", uri) + log.Error("service syncup from(%s) failed ", uri) continue } for _, is := range res.Data { @@ -67,7 +74,7 @@ func (d *Discovery) regSelf() context.CancelFunc { RenewTimestamp: now, DirtyTimestamp: now, } - d.Register(ctx, ins, now, false) + d.Register(ctx, ins, now, false, false) go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() @@ -80,8 +87,8 @@ func (d *Discovery) regSelf() context.CancelFunc { Env: d.c.Env.DeployEnv, Hostname: d.c.Env.Host, } - if _, err := d.Renew(ctx, arg); err != nil && err == errors.NothingFound { - d.Register(ctx, ins, now, false) + if _, err := d.Renew(ctx, arg); err != nil && err == ecode.NothingFound { + d.Register(ctx, ins, now, false, false) } case <-ctx.Done(): arg := &model.ArgCancel{ @@ -91,7 +98,7 @@ func (d *Discovery) regSelf() context.CancelFunc { Hostname: d.c.Env.Host, } if err := d.Cancel(context.Background(), arg); err != nil { - log.Errorf("d.Cancel(%+v) error(%v)", arg, err) + log.Error("d.Cancel(%+v) error(%v)", arg, err) } return } @@ -111,9 +118,9 @@ func (d *Discovery) nodesproc() { Hostname: d.c.Env.Host, LatestTimestamp: []int64{lastTs}, } - ch, _, err := d.registry.Polls(arg) - if err != nil && err != errors.NotModified { - log.Errorf("d.registry(%v) error(%v)", arg, err) + ch, _, _, err := d.registry.Polls(arg) + if err != nil && err != ecode.NotModified { + log.Error("d.registry(%v) error(%v)", arg, err) time.Sleep(time.Second) continue } @@ -148,6 +155,6 @@ func (d *Discovery) nodesproc() { ns := registry.NewNodes(c) ns.UP() d.nodes.Store(ns) - log.Infof("discovery changed nodes:%v zones:%v", nodes, zones) + log.Info("discovery changed nodes:%v zones:%v", nodes, zones) } } diff --git a/errors/errors.go b/errors/errors.go deleted file mode 100644 index 63c8692..0000000 --- a/errors/errors.go +++ /dev/null @@ -1,55 +0,0 @@ -package errors - -import "strconv" - -// Error error. -type Error interface { - error - // Code get error code. - Code() int - Equal(error) bool -} - -type ecode int - -// ecode error. -var ( - OK ecode - NotModified ecode = -304 - ParamsErr ecode = -400 - NothingFound ecode = -404 - Conflict ecode = -409 - ServerErr ecode = -500 -) - -func (e ecode) Error() string { - return strconv.FormatInt(int64(e), 10) -} - -func (e ecode) Code() int { - return int(e) -} - -func (e ecode) Equal(err error) bool { - cd := Code(err) - return e.Code() == cd.Code() -} - -// Code converts error to ecode. -func Code(e error) (ie Error) { - if e == nil { - ie = OK - return - } - i, err := strconv.Atoi(e.Error()) - if err != nil { - i = -500 - } - ie = ecode(i) - return -} - -// Int converts int to ecode. -func Int(i int) (ie Error) { - return ecode(i) -} diff --git a/go.mod b/go.mod index f1e8e49..504f1db 100644 --- a/go.mod +++ b/go.mod @@ -4,19 +4,17 @@ go 1.12 require ( github.com/BurntSushi/toml v0.3.1 - github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 // indirect - github.com/gin-gonic/gin v0.0.0-20180512030042-bf7803815b0b - github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b + github.com/bilibili/kratos v0.1.0 + github.com/golang/protobuf v1.3.1 // indirect + github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c // indirect github.com/jtolds/gls v4.20.0+incompatible // indirect - github.com/mattn/go-isatty v0.0.7 // indirect + github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 // indirect github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 // indirect github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a - github.com/ugorji/go v1.1.4 // indirect - golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f + golang.org/x/sync v0.0.0-20181108010431-42b317875d0f + golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223 // indirect google.golang.org/grpc v1.20.1 - gopkg.in/go-playground/validator.v8 v8.18.2 // indirect gopkg.in/h2non/gock.v1 v1.0.8 - gopkg.in/yaml.v2 v2.2.2 // indirect ) replace ( diff --git a/go.sum b/go.sum index 232bb9d..c6c6825 100644 --- a/go.sum +++ b/go.sum @@ -1,13 +1,29 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= +github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/bilibili/kratos v0.1.0 h1:oTmlKDDDvmxRplYovXyRhAbVTNa/qvVJCvUBUcTAQPA= +github.com/bilibili/kratos v0.1.0/go.mod h1:MTvV7hwt9510/km0UVbfFvgm9KAE3j0GWIhQeCZeDzA= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= +github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548/go.mod h1:e6NPNENfs9mPDVNRekM7lKScauxd5kXTr1Mfyig6TDM= +github.com/cznic/strutil v0.0.0-20181122101858-275e90344537/go.mod h1:AHHPPPXTw0h6pVabbcbyGRK1DckRn7r/STdZEeIDzZc= github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3 h1:t8FVkw33L+wilf2QiWkw0UV77qRpcH/JHPKGpKa2E8g= -github.com/gin-contrib/sse v0.0.0-20190301062529-5545eab6dad3/go.mod h1:VJ0WA2NBN22VlZ2dKZQPAPnyWw5XTlK1KymzLKsr59s= -github.com/gin-gonic/gin v0.0.0-20180512030042-bf7803815b0b h1:499x87Cnm/smy5wEx5DhVM0zq1jHRJgyC1cNdnjHc5o= -github.com/gin-gonic/gin v0.0.0-20180512030042-bf7803815b0b/go.mod h1:7cKuhb5qV2ggCFctp2fJQ+ErvciLZrIeoOSOm6mUr7Y= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= +github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc= +github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= +github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= +github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEKwgtJRd2xk99HeFyHw3yid4rvQIY= +github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= +github.com/gogo/protobuf v1.2.0 h1:xU6/SpYbvkNYiptHJYEDRseDLvYE7wSqhYYNy0QSUzI= +github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= @@ -25,45 +41,60 @@ github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c h1:7lF+Vz0LqiRid github.com/gopherjs/gopherjs v0.0.0-20190430165422-3e4dfb77656c/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/grpc/grpc-go v1.20.1 h1:pk72GtSPpOdZDTkPneppDMGW10HYPC7RqNJT/JvUpV0= github.com/grpc/grpc-go v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= -github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs= -github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU= -github.com/mattn/go-isatty v0.0.7 h1:UvyT9uN+3r7yLEYSlJsbQGdsaB/a0DlgWP3pql6iwOc= -github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= -github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= -github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= -github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= +github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= +github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU= +github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4= +github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU= +github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= +github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32 h1:W6apQkHrMkS0Muv8G/TipAy/FJl/rCYT0+EuS8+Z0z4= github.com/nbio/st v0.0.0-20140626010706-e9e8d9816f32/go.mod h1:9wM+0iRr9ahx58uYLpLIr5fm8diHn0JbqRycJi6w0Ms= +github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= +github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910 h1:idejC8f05m9MGOsuEi1ATq9shN03HrxNkD/luQvxCv8= +github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jOqq0gIVUe6Yk0/QMZ640k6NvkxcBf+8= +github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= +github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= +github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= +github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= +github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3 h1:hBSHahWMEgzwRyS6dRpxY0XyjZsHyQ61s084wo5PJe0= github.com/smartystreets/assertions v0.0.0-20190401211740-f487f9de1cd3/go.mod h1:OnSkiWE9lh6wB0YB77sQom3nweQdgAjqCqsofrRNTgc= github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a h1:JSvGDIbmil4Ui/dDdFBExb7/cmkNjyX5F97oglmvCDo= github.com/smartystreets/goconvey v0.0.0-20180222194500-ef6db91d284a/go.mod h1:XDJAKZRPZ1CvBcN2aX5YOUTYGHki24fSF0Iv48Ibg0s= github.com/stretchr/objx v0.1.0 h1:4G4v2dO3VZwixGIRoQ5Lfboy6nUhCyYzaqnIAPPhYs4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= -github.com/ugorji/go v1.1.4 h1:j4s+tAvLfL3bZyefP2SEWmhBzmuIlH/eqNuPdFPgngw= -github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= +github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df/go.mod h1:3HfLQly3YNLGxNv/2YOfmz30vcjG9hbuME1GpxoLlGs= +github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= -gopkg.in/go-playground/validator.v8 v8.18.2 h1:lFB4DoMU6B626w8ny76MV7VX6W2VHct2GVOI3xgiMrQ= -gopkg.in/go-playground/validator.v8 v8.18.2/go.mod h1:RX2a/7Ha8BgOhfk7j780h4/u/RRjR0eouCJSH80/M2Y= +gopkg.in/go-playground/validator.v9 v9.26.0 h1:2NPPsBpD0ZoxshmLWewQru8rWmbT5JqSzz9D1ZrAjYQ= +gopkg.in/go-playground/validator.v9 v9.26.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= gopkg.in/h2non/gock.v1 v1.0.8 h1:P8Ul3tXxL84suEhp+a7Uu6f9rBszP+gLkae2D6U1gS0= gopkg.in/h2non/gock.v1 v1.0.8/go.mod h1:KHI4Z1sxDW6P4N3DfTWSEza07YpkQP7KJBfglRMEjKY= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= -gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/http/discovery.go b/http/discovery.go index 8327ab9..9c6b386 100644 --- a/http/discovery.go +++ b/http/discovery.go @@ -4,34 +4,32 @@ import ( "encoding/json" "time" - "github.com/bilibili/discovery/errors" "github.com/bilibili/discovery/model" - gin "github.com/gin-gonic/gin" - log "github.com/golang/glog" + "github.com/bilibili/kratos/pkg/ecode" + log "github.com/bilibili/kratos/pkg/log" + bm "github.com/bilibili/kratos/pkg/net/http/blademaster" ) const ( _pollWaitSecond = 30 * time.Second ) -func register(c *gin.Context) { +func register(c *bm.Context) { arg := new(model.ArgRegister) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } i := model.NewInstance(arg) if i.Status == 0 || i.Status > 2 { - result(c, nil, errors.ParamsErr) - log.Errorf("register params status invalid") + log.Error("register params status invalid") return } if arg.Metadata != "" { // check the metadata type is json if !json.Valid([]byte(arg.Metadata)) { - result(c, nil, errors.ParamsErr) - log.Errorf("register params() metadata(%v) invalid json", arg.Metadata) + c.JSON(nil, ecode.RequestErr) + log.Error("register params() metadata(%v) invalid json", arg.Metadata) return } } @@ -39,101 +37,97 @@ func register(c *gin.Context) { if arg.DirtyTimestamp > 0 { i.DirtyTimestamp = arg.DirtyTimestamp } - dis.Register(c, i, arg.LatestTimestamp, arg.Replication) - result(c, nil, nil) + dis.Register(c, i, arg.LatestTimestamp, arg.Replication, arg.FromZone) + c.JSON(nil, nil) } -func renew(c *gin.Context) { +func renew(c *bm.Context) { arg := new(model.ArgRenew) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } // renew - instance, err := dis.Renew(c, arg) - result(c, instance, err) + c.JSON(dis.Renew(c, arg)) } -func cancel(c *gin.Context) { +func cancel(c *bm.Context) { arg := new(model.ArgCancel) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } // cancel - result(c, nil, dis.Cancel(c, arg)) + c.JSON(nil, dis.Cancel(c, arg)) } -func fetchAll(c *gin.Context) { - result(c, dis.FetchAll(c), nil) +func fetchAll(c *bm.Context) { + c.JSON(dis.FetchAll(c), nil) } -func fetch(c *gin.Context) { +func fetch(c *bm.Context) { arg := new(model.ArgFetch) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } - insInfo, err := dis.Fetch(c, arg) - result(c, insInfo, err) + c.JSON(dis.Fetch(c, arg)) } -func fetchs(c *gin.Context) { +func fetchs(c *bm.Context) { arg := new(model.ArgFetchs) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) + return } - ins, err := dis.Fetchs(c, arg) - result(c, ins, err) + c.JSON(dis.Fetchs(c, arg)) } -func poll(c *gin.Context) { +func poll(c *bm.Context) { arg := new(model.ArgPolls) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } - ch, new, err := dis.Polls(c, arg) - if err != nil && err != errors.NotModified { - result(c, nil, err) + ch, new, miss, err := dis.Polls(c, arg) + if err != nil && err != ecode.NotModified { + c.JSON(map[string]interface{}{ + miss: map[string]string{"err": "not found"}, + }, err) return } // wait for instance change select { case e := <-ch: - result(c, resp{Data: e[arg.AppID[0]]}, nil) + c.JSON(e[arg.AppID[0]], nil) if !new { dis.DelConns(arg) // broadcast will delete all connections of appid } return case <-time.After(_pollWaitSecond): - result(c, nil, errors.NotModified) + c.JSON(nil, ecode.NotModified) case <-c.Done(): } - result(c, nil, errors.NotModified) + c.JSON(nil, ecode.NotModified) dis.DelConns(arg) } -func polls(c *gin.Context) { +func polls(c *bm.Context) { arg := new(model.ArgPolls) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } if len(arg.AppID) != len(arg.LatestTimestamp) { - result(c, nil, errors.ParamsErr) + c.JSON(nil, ecode.RequestErr) return } - ch, new, err := dis.Polls(c, arg) - if err != nil && err != errors.NotModified { - result(c, nil, err) + ch, new, miss, err := dis.Polls(c, arg) + if err != nil && err != ecode.NotModified { + c.JSON(map[string]interface{}{ + miss: map[string]string{"err": "not found"}, + }, err) return } // wait for instance change select { case e := <-ch: - result(c, e, nil) + c.JSON(e, nil) if !new { dis.DelConns(arg) // broadcast will delete all connections of appid } @@ -141,25 +135,24 @@ func polls(c *gin.Context) { case <-time.After(_pollWaitSecond): case <-c.Done(): } - result(c, nil, errors.NotModified) + c.JSON(nil, ecode.NotModified) dis.DelConns(arg) } -func set(c *gin.Context) { +func set(c *bm.Context) { arg := new(model.ArgSet) if err := c.Bind(arg); err != nil { - result(c, nil, errors.ParamsErr) return } // len of color,status,metadata must equal to len of hostname or be zero if (len(arg.Hostname) != len(arg.Status) && len(arg.Status) != 0) || (len(arg.Hostname) != len(arg.Metadata) && len(arg.Metadata) != 0) { - result(c, nil, errors.ParamsErr) + c.JSON(nil, ecode.RequestErr) return } - result(c, nil, dis.Set(c, arg)) + c.JSON(nil, dis.Set(c, arg)) } -func nodes(c *gin.Context) { - result(c, dis.Nodes(c), nil) +func nodes(c *bm.Context) { + c.JSON(dis.Nodes(c), nil) } diff --git a/http/http.go b/http/http.go index b0ae2c9..318c4ff 100644 --- a/http/http.go +++ b/http/http.go @@ -1,105 +1,53 @@ package http import ( - "fmt" - "net/http/httputil" - "os" - "runtime" - "time" + "errors" "github.com/bilibili/discovery/conf" "github.com/bilibili/discovery/discovery" - "github.com/bilibili/discovery/errors" - "github.com/gin-gonic/gin" - log "github.com/golang/glog" -) - -const ( - contextErrCode = "context/err/code" + log "github.com/bilibili/kratos/pkg/log" + bm "github.com/bilibili/kratos/pkg/net/http/blademaster" ) var ( - dis *discovery.Discovery + dis *discovery.Discovery + protected = true + errProtected = errors.New("discovery in protect mode,only support register") ) // Init init http -func Init(c *conf.Config, d *discovery.Discovery) { - dis = d - gin.SetMode(gin.ReleaseMode) - engine := gin.New() - engine.Use(loggerHandler, recoverHandler) - innerRouter(engine) - go func() { - if err := engine.Run(c.HTTPServer.Addr); err != nil { - panic(err) - } - }() +func Init(c *conf.Config, s *discovery.Discovery) { + dis = s + engineInner := bm.DefaultServer(c.HTTPServer) + innerRouter(engineInner) + if err := engineInner.Start(); err != nil { + log.Error("bm.DefaultServer error(%v)", err) + panic(err) + } } // innerRouter init local router api path. -func innerRouter(e *gin.Engine) { +func innerRouter(e *bm.Engine) { group := e.Group("/discovery") - group.POST("/register", register) - group.POST("/renew", renew) - group.POST("/cancel", cancel) - group.GET("/fetch/all", fetchAll) - group.GET("/fetch", fetch) - group.GET("/fetchs", fetchs) - group.GET("/poll", poll) - group.GET("/polls", polls) - group.GET("/nodes", nodes) - group.POST("/set", set) -} - -func loggerHandler(c *gin.Context) { - // Start timer - start := time.Now() - path := c.Request.URL.Path - raw := c.Request.URL.RawQuery - method := c.Request.Method - - // Process request - c.Next() - - // Stop timer - end := time.Now() - latency := end.Sub(start) - statusCode := c.Writer.Status() - ecode := c.GetInt(contextErrCode) - clientIP := c.ClientIP() - if raw != "" { - path = path + "?" + raw + { + group.POST("/register", register) + group.POST("/renew", renew) + group.POST("/cancel", cancel) + group.GET("/fetch/all", initProtect, fetchAll) + group.GET("/fetch", initProtect, fetch) + group.GET("/fetchs", initProtect, fetchs) + group.GET("/poll", initProtect, poll) + group.GET("/polls", initProtect, polls) + //manager + group.POST("/set", set) + group.GET("/nodes", initProtect, nodes) } - log.Infof("METHOD:%s | PATH:%s | CODE:%d | IP:%s | TIME:%d | ECODE:%d", method, path, statusCode, clientIP, latency/time.Millisecond, ecode) -} - -func recoverHandler(c *gin.Context) { - defer func() { - if err := recover(); err != nil { - const size = 64 << 10 - buf := make([]byte, size) - buf = buf[:runtime.Stack(buf, false)] - httprequest, _ := httputil.DumpRequest(c.Request, false) - pnc := fmt.Sprintf("[Recovery] %s panic recovered:\n%s\n%s\n%s", time.Now().Format("2006-01-02 15:04:05"), string(httprequest), err, buf) - fmt.Fprintf(os.Stderr, pnc) - log.Error(pnc) - c.AbortWithStatus(500) - } - }() - c.Next() } -type resp struct { - Code int `json:"code"` - Data interface{} `json:"data"` -} - -func result(c *gin.Context, data interface{}, err error) { - ee := errors.Code(err) - c.Set(contextErrCode, ee.Code()) - c.JSON(200, resp{ - Code: ee.Code(), - Data: data, - }) +func initProtect(ctx *bm.Context) { + if dis.Protected() { + ctx.JSON(nil, errProtected) + ctx.AbortWithStatus(503) + } } diff --git a/lib/http/client.go b/lib/http/client.go deleted file mode 100644 index b216e87..0000000 --- a/lib/http/client.go +++ /dev/null @@ -1,140 +0,0 @@ -package http - -import ( - "bytes" - "context" - "crypto/tls" - "encoding/json" - "io" - "net" - xhttp "net/http" - "net/url" - "strings" - xtime "time" - - "github.com/bilibili/discovery/lib/time" -) - -var ( - _minRead int64 = 16 * 1024 // 16kb -) - -// ClientConfig is http client conf. -type ClientConfig struct { - Dial time.Duration - KeepAlive time.Duration -} - -// Client is http client. -type Client struct { - client *xhttp.Client - transport xhttp.RoundTripper -} - -// NewClient new a http client. -func NewClient(c *ClientConfig) *Client { - client := new(Client) - dialer := &net.Dialer{ - Timeout: xtime.Duration(c.Dial), - KeepAlive: xtime.Duration(c.KeepAlive), - } - client.transport = &xhttp.Transport{ - DialContext: dialer.DialContext, - TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, - } - client.client = &xhttp.Client{ - Transport: client.transport, - } - return client -} - -// NewRequest new http request with method, uri, ip, values and headers. -func (client *Client) NewRequest(method, uri, realIP string, params url.Values) (req *xhttp.Request, err error) { - if method == xhttp.MethodGet { - req, err = xhttp.NewRequest(xhttp.MethodGet, uri+"?"+params.Encode(), nil) - } else { - req, err = xhttp.NewRequest(xhttp.MethodPost, uri, strings.NewReader(params.Encode())) - } - if err != nil { - return - } - const ( - _contentType = "Content-Type" - _urlencoded = "application/x-www-form-urlencoded" - ) - if method == xhttp.MethodPost { - req.Header.Set(_contentType, _urlencoded) - } - return -} - -// Get issues a GET to the specified URL. -func (client *Client) Get(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) { - req, err := client.NewRequest(xhttp.MethodGet, uri, ip, params) - if err != nil { - return - } - return client.Do(c, req, res) -} - -// Post issues a Post to the specified URL. -func (client *Client) Post(c context.Context, uri, ip string, params url.Values, res interface{}) (err error) { - req, err := client.NewRequest(xhttp.MethodPost, uri, ip, params) - if err != nil { - return - } - return client.Do(c, req, res) -} - -// Raw sends an HTTP request and returns bytes response -func (client *Client) Raw(c context.Context, req *xhttp.Request, v ...string) (bs []byte, err error) { - resp, err := client.client.Do(req.WithContext(c)) - if err != nil { - return - } - defer resp.Body.Close() - if resp.StatusCode >= xhttp.StatusBadRequest { - return - } - bs, err = readAll(resp.Body, _minRead) - return -} - -// SetTransport set client transport -func (client *Client) SetTransport(t xhttp.RoundTripper) { - client.transport = t - client.client.Transport = t -} - -// Do sends an HTTP request and returns an HTTP json response. -func (client *Client) Do(c context.Context, req *xhttp.Request, res interface{}, v ...string) (err error) { - var bs []byte - if bs, err = client.Raw(c, req, v...); err != nil { - return - } - if res != nil { - err = json.Unmarshal(bs, res) - } - return -} - -// readAll reads from r until an error or EOF and returns the data it read -// from the internal buffer allocated with a specified capacity. -func readAll(r io.Reader, capacity int64) (b []byte, err error) { - buf := bytes.NewBuffer(make([]byte, 0, capacity)) - // If the buffer overflows, we will get bytes.ErrTooLarge. - // Return that as an error. Any other panic remains. - defer func() { - e := recover() - if e == nil { - return - } - if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge { - err = panicErr - } else { - panic(e) - } - }() - _, err = buf.ReadFrom(r) - return buf.Bytes(), err -} diff --git a/lib/time/time.go b/lib/time/time.go deleted file mode 100644 index 4f193c5..0000000 --- a/lib/time/time.go +++ /dev/null @@ -1,17 +0,0 @@ -package time - -import ( - xtime "time" -) - -// Duration be used toml unmarshal string time, like 1s, 500ms. -type Duration xtime.Duration - -// UnmarshalText unmarshal text to duration. -func (d *Duration) UnmarshalText(text []byte) error { - tmp, err := xtime.ParseDuration(string(text)) - if err == nil { - *d = Duration(tmp) - } - return err -} diff --git a/model/instance.go b/model/instance.go index cfb2156..6e80c70 100644 --- a/model/instance.go +++ b/model/instance.go @@ -5,9 +5,8 @@ import ( "sync" "time" - "github.com/bilibili/discovery/errors" - - log "github.com/golang/glog" + "github.com/bilibili/kratos/pkg/ecode" + log "github.com/bilibili/kratos/pkg/log" ) // InstanceStatus Status of instance @@ -85,7 +84,7 @@ func NewInstance(arg *ArgRegister) (i *Instance) { } if arg.Metadata != "" { if err := json.Unmarshal([]byte(arg.Metadata), &i.Metadata); err != nil { - log.Errorf("json unmarshal metadata err %v", err) + log.Error("json unmarshal metadata err %v", err) } } return @@ -161,7 +160,7 @@ func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *I p.lock.RLock() defer p.lock.RUnlock() if latestTime >= p.latestTimestamp { - err = errors.NotModified + err = ecode.NotModified return } ci = &InstanceInfo{ @@ -186,9 +185,9 @@ func (p *Apps) InstanceInfo(zone string, latestTime int64, status uint32) (ci *I } } if !ok { - err = errors.NothingFound + err = ecode.NothingFound } else if len(ci.Instances) == 0 { - err = errors.NotModified + err = ecode.NotModified } return } @@ -245,7 +244,7 @@ func (a *App) NewInstance(ni *Instance, latestTime int64) (i *Instance, ok bool) if ok { ni.UpTimestamp = oi.UpTimestamp if ni.DirtyTimestamp < oi.DirtyTimestamp { - log.Warningf("register exist(%v) dirty timestamp over than caller(%v)", oi, ni) + log.Warn("register exist(%v) dirty timestamp over than caller(%v)", oi, ni) ni = oi } } @@ -317,12 +316,12 @@ func (a *App) Set(changes *ArgSet) (ok bool) { } for i, hostname := range changes.Hostname { if dst, ok = a.instances[hostname]; !ok { - log.Errorf("SetWeight hostname(%s) not found", hostname) + log.Error("SetWeight hostname(%s) not found", hostname) return } if len(changes.Status) != 0 { if changes.Status[i] != InstanceStatusUP && changes.Status[i] != InstancestatusWating { - log.Errorf("SetWeight change status(%d) is error", changes.Status[i]) + log.Error("SetWeight change status(%d) is error", changes.Status[i]) ok = false return } @@ -333,7 +332,7 @@ func (a *App) Set(changes *ArgSet) (ok bool) { } if len(changes.Metadata) != 0 { if err := json.Unmarshal([]byte(changes.Metadata[i]), &dst.Metadata); err != nil { - log.Errorf("set change metadata err %s", changes.Metadata[i]) + log.Error("set change metadata err %s", changes.Metadata[i]) ok = false return } diff --git a/model/param.go b/model/param.go index 1febfae..b367b51 100644 --- a/model/param.go +++ b/model/param.go @@ -14,6 +14,7 @@ type ArgRegister struct { Replication bool `form:"replication"` LatestTimestamp int64 `form:"latest_timestamp"` DirtyTimestamp int64 `form:"dirty_timestamp"` + FromZone bool `form:"from_zone"` } // ArgRenew define renew params. @@ -24,6 +25,7 @@ type ArgRenew struct { Hostname string `form:"hostname" validate:"required"` Replication bool `form:"replication"` DirtyTimestamp int64 `form:"dirty_timestamp"` + FromZone bool `form:"from_zone"` } // ArgCancel define cancel params. @@ -32,6 +34,7 @@ type ArgCancel struct { Env string `form:"env" validate:"required"` AppID string `form:"appid" validate:"required"` Hostname string `form:"hostname" validate:"required"` + FromZone bool `form:"from_zone"` Replication bool `form:"replication"` LatestTimestamp int64 `form:"latest_timestamp"` } @@ -79,5 +82,6 @@ type ArgSet struct { Status []uint32 `form:"status" validate:"gte=0"` Metadata []string `form:"metadata" validate:"gte=0"` Replication bool `form:"replication"` + FromZone bool `form:"from_zone"` SetTimestamp int64 `form:"set_timestamp"` } diff --git a/naming/client.go b/naming/client.go index b3154d6..c19d011 100644 --- a/naming/client.go +++ b/naming/client.go @@ -14,11 +14,10 @@ import ( "sync/atomic" "time" - ecode "github.com/bilibili/discovery/errors" - "github.com/bilibili/discovery/lib/http" - xtime "github.com/bilibili/discovery/lib/time" - - log "github.com/golang/glog" + ecode "github.com/bilibili/kratos/pkg/ecode" + log "github.com/bilibili/kratos/pkg/log" + http "github.com/bilibili/kratos/pkg/net/http/blademaster" + xtime "github.com/bilibili/kratos/pkg/time" ) const ( @@ -117,6 +116,7 @@ func New(c *Config) (d *Discovery) { cfg := &http.ClientConfig{ Dial: xtime.Duration(3 * time.Second), KeepAlive: xtime.Duration(40 * time.Second), + Timeout: xtime.Duration(40 * time.Second), } d.httpClient = http.NewClient(cfg) // discovery self @@ -210,7 +210,7 @@ func (d *Discovery) Build(appid string) Resolver { default: } } - log.Infof("disocvery: AddWatch(%s) already watch(%v)", appid, ok) + log.Info("disocvery: AddWatch(%s) already watch(%v)", appid, ok) d.once.Do(func() { go d.serverproc() }) @@ -325,7 +325,7 @@ func (d *Discovery) register(ctx context.Context, ins *Instance) (err error) { var metadata []byte if ins.Metadata != nil { if metadata, err = json.Marshal(ins.Metadata); err != nil { - log.Errorf("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) + log.Error("discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) } } res := new(struct { @@ -341,16 +341,16 @@ func (d *Discovery) register(ctx context.Context, ins *Instance) (err error) { params.Set("metadata", string(metadata)) if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { d.switchNode() - log.Errorf("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", + log.Error("discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", uri, c.Zone, c.Env, ins.AppID, ins.Addrs, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { - log.Warningf("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", uri, c.Env, ins.AppID, ins.Addrs, res.Code) + log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", uri, c.Env, ins.AppID, ins.Addrs, res.Code) err = ec return } - log.Infof("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success", uri, c.Env, ins.AppID, ins.Addrs) + log.Info("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success", uri, c.Env, ins.AppID, ins.Addrs) return } @@ -369,7 +369,7 @@ func (d *Discovery) renew(ctx context.Context, ins *Instance) (err error) { params.Set("appid", ins.AppID) if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { d.switchNode() - log.Errorf("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", + log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", uri, c.Env, ins.AppID, c.Host, err) return } @@ -378,7 +378,7 @@ func (d *Discovery) renew(ctx context.Context, ins *Instance) (err error) { if ec.Equal(ecode.NothingFound) { return } - log.Errorf("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", + log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", uri, c.Env, ins.AppID, c.Host, res.Code) return } @@ -401,17 +401,17 @@ func (d *Discovery) cancel(ins *Instance) (err error) { // request if err = d.httpClient.Post(context.TODO(), uri, "", params, &res); err != nil { d.switchNode() - log.Errorf("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", + log.Error("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)", uri, c.Env, ins.AppID, c.Host, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { - log.Warningf("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", + log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)", uri, c.Env, ins.AppID, c.Host, res.Code) err = ec return } - log.Infof("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success", + log.Info("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success", uri, c.Env, ins.AppID, c.Host) return } @@ -438,24 +438,24 @@ func (d *Discovery) set(ctx context.Context, ins *Instance) (err error) { if ins.Metadata != nil { var metadata []byte if metadata, err = json.Marshal(ins.Metadata); err != nil { - log.Errorf("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) + log.Error("discovery:set instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err) return } params.Set("metadata", string(metadata)) } if err = d.httpClient.Post(ctx, uri, "", params, &res); err != nil { d.switchNode() - log.Errorf("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", + log.Error("discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)", uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { - log.Warningf("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", + log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", uri, conf.Env, ins.AppID, ins.Addrs, res.Code) err = ec return } - log.Infof("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success", uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs) + log.Info("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success", uri+"?"+params.Encode(), conf.Env, ins.AppID, ins.Addrs) return } @@ -549,12 +549,12 @@ func (d *Discovery) polls(ctx context.Context) (apps map[string]*InstancesInfo, } if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil { d.switchNode() - log.Errorf("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err) + log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err) return } if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) { if !ec.Equal(ecode.NotModified) { - log.Errorf("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code) + log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code) err = ec } return @@ -563,11 +563,11 @@ func (d *Discovery) polls(ctx context.Context) (apps map[string]*InstancesInfo, for _, app := range res.Data { if app == nil || app.LastTs == 0 { err = ecode.ServerErr - log.Errorf("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info) + log.Error("discovery: client.Get(%s) latest_timestamp is 0,instances:(%s)", uri+"?"+params.Encode(), info) return } } - log.Infof("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info) + log.Info("discovery: successfully polls(%s) instances (%s)", uri+"?"+params.Encode(), info) apps = res.Data return } diff --git a/naming/client_test.go b/naming/client_test.go index ec7f0f4..801f611 100644 --- a/naming/client_test.go +++ b/naming/client_test.go @@ -10,12 +10,12 @@ import ( "testing" "time" - xhttp "github.com/bilibili/discovery/lib/http" - xtime "github.com/bilibili/discovery/lib/time" "github.com/bilibili/discovery/conf" "github.com/bilibili/discovery/discovery" "github.com/bilibili/discovery/http" + xhttp "github.com/bilibili/kratos/pkg/net/http/blademaster" + xtime "github.com/bilibili/kratos/pkg/time" . "github.com/smartystreets/goconvey/convey" ) @@ -35,10 +35,12 @@ func mockDiscoverySvr() { Host: "test_server", }, Nodes: []string{"127.0.0.1:7171"}, - HTTPServer: &conf.ServerConfig{ - Addr: "127.0.0.1:7171", + HTTPServer: &xhttp.ServerConfig{ + Addr: "127.0.0.1:7171", + Timeout: xtime.Duration(time.Second * 1), }, HTTPClient: &xhttp.ClientConfig{ + Timeout: xtime.Duration(time.Second * 40), Dial: xtime.Duration(time.Second), KeepAlive: xtime.Duration(time.Second * 30), }, @@ -65,6 +67,7 @@ func TestDiscovery(t *testing.T) { Zone: "test", Env: "test", AppID: appid, + Addrs: []string{"http://127.0.0.1:8000"}, Hostname: "test-host", } _, err := dis.Register(instance) @@ -115,6 +118,7 @@ func TestDiscovery(t *testing.T) { Zone: "test", Env: "test", AppID: appid, + Addrs: []string{"http://127.0.0.1:8000"}, Hostname: "test-host2", } err := addNewInstance(instance2) @@ -136,6 +140,7 @@ func TestDiscovery(t *testing.T) { func addNewInstance(ins *Instance) error { cli := xhttp.NewClient(&xhttp.ClientConfig{ + Timeout: xtime.Duration(time.Second * 30), Dial: xtime.Duration(time.Second), KeepAlive: xtime.Duration(time.Second * 30), }) diff --git a/naming/grpc/resolver.go b/naming/grpc/resolver.go index cd471be..79ea001 100644 --- a/naming/grpc/resolver.go +++ b/naming/grpc/resolver.go @@ -9,7 +9,7 @@ import ( "github.com/bilibili/discovery/naming" - log "github.com/golang/glog" + log "github.com/bilibili/kratos/pkg/log" "google.golang.org/grpc/resolver" ) @@ -134,7 +134,7 @@ func (r *Resolver) newAddress(instances []*naming.Instance) { } rpcAddr, color, weight := extractAddrs(ins) if rpcAddr == "" { - log.Warningf("grpc resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs) + log.Warn("grpc resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs) continue } if weight <= 0 { diff --git a/registry/guard.go b/registry/guard.go index 62f81fe..390cf34 100644 --- a/registry/guard.go +++ b/registry/guard.go @@ -4,7 +4,7 @@ import ( "sync" "sync/atomic" - log "github.com/golang/glog" + log "github.com/bilibili/kratos/pkg/log" ) const ( @@ -54,7 +54,7 @@ func (g *Guard) incrFac() { func (g *Guard) ok() (is bool) { is = atomic.LoadInt64(&g.facLastMin) < atomic.LoadInt64(&g.expThreshold) if is { - log.Warningf("discovery is protected, the factual renews(%d) less than expected renews(%d)", atomic.LoadInt64(&g.facLastMin), atomic.LoadInt64(&g.expThreshold)) + log.Warn("discovery is protected, the factual renews(%d) less than expected renews(%d)", atomic.LoadInt64(&g.facLastMin), atomic.LoadInt64(&g.expThreshold)) } return } diff --git a/registry/node.go b/registry/node.go index 506ea04..ae81ff3 100644 --- a/registry/node.go +++ b/registry/node.go @@ -9,11 +9,10 @@ import ( "strings" "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/errors" - "github.com/bilibili/discovery/lib/http" "github.com/bilibili/discovery/model" - - log "github.com/golang/glog" + "github.com/bilibili/kratos/pkg/ecode" + log "github.com/bilibili/kratos/pkg/log" + http "github.com/bilibili/kratos/pkg/net/http/blademaster" ) const ( @@ -65,7 +64,7 @@ func newNode(c *conf.Config, addr string) (n *Node) { func (n *Node) Register(c context.Context, i *model.Instance) (err error) { err = n.call(c, model.Register, i, n.registerURL, nil) if err != nil { - log.Warningf("node be called(%s) register instance(%v) error(%v)", n.registerURL, i, err) + log.Warn("node be called(%s) register instance(%v) error(%v)", n.registerURL, i, err) } return } @@ -74,7 +73,7 @@ func (n *Node) Register(c context.Context, i *model.Instance) (err error) { func (n *Node) Cancel(c context.Context, i *model.Instance) (err error) { err = n.call(c, model.Cancel, i, n.cancelURL, nil) if err != nil { - log.Warningf("node be called(%s) instance(%v) already canceled", n.cancelURL, i) + log.Warn("node be called(%s) instance(%v) already canceled", n.cancelURL, i) } return } @@ -84,19 +83,19 @@ func (n *Node) Cancel(c context.Context, i *model.Instance) (err error) { func (n *Node) Renew(c context.Context, i *model.Instance) (err error) { var res *model.Instance err = n.call(c, model.Renew, i, n.renewURL, &res) - if err == errors.ServerErr { - log.Warningf("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err) + if err == ecode.ServerErr { + log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err) n.status = model.NodeStatusLost return } n.status = model.NodeStatusUP - if err == errors.NothingFound { - log.Warningf("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err) + if err == ecode.NothingFound { + log.Warn("node be called(%s) instance(%v) error(%v)", n.renewURL, i, err) err = n.call(c, model.Register, i, n.registerURL, nil) return } // NOTE: register response instance whitch in conflict with peer node - if err == errors.Conflict && res != nil { + if err == ecode.Conflict && res != nil { err = n.call(c, model.Register, res, n.pRegisterURL, nil) } return @@ -133,12 +132,12 @@ func (n *Node) call(c context.Context, action model.Action, i *model.Instance, u Data json.RawMessage `json:"data"` } if err = n.client.Post(c, uri, "", params, &res); err != nil { - log.Errorf("node be called(%s) instance(%v) error(%v)", uri, i, err) + log.Error("node be called(%s) instance(%v) error(%v)", uri, i, err) return } if res.Code != 0 { - log.Errorf("node be called(%s) instance(%v) response code(%v)", uri, i, res.Code) - if err = errors.Int(res.Code); err == errors.Conflict { + log.Error("node be called(%s) instance(%v) response code(%v)", uri, i, res.Code) + if err = ecode.Int(res.Code); err == ecode.Conflict { _ = json.Unmarshal([]byte(res.Data), data) } } diff --git a/registry/node_test.go b/registry/node_test.go index 8c8c336..af2d454 100644 --- a/registry/node_test.go +++ b/registry/node_test.go @@ -4,24 +4,42 @@ import ( "context" "strings" "testing" + "time" dc "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/errors" - "github.com/bilibili/discovery/lib/http" "github.com/bilibili/discovery/model" + "github.com/bilibili/kratos/pkg/ecode" + http "github.com/bilibili/kratos/pkg/net/http/blademaster" + xtime "github.com/bilibili/kratos/pkg/time" . "github.com/smartystreets/goconvey/convey" gock "gopkg.in/h2non/gock.v1" ) +var config = newConfig() + +func newConfig() *dc.Config { + c := &dc.Config{ + HTTPClient: &http.ClientConfig{ + Timeout: xtime.Duration(time.Second * 30), + Dial: xtime.Duration(time.Second), + KeepAlive: xtime.Duration(time.Second * 30), + }, + HTTPServer: &http.ServerConfig{Addr: "127.0.0.1:7171"}, + Nodes: []string{"127.0.0.1:7172", "127.0.0.1:7173", "127.0.0.1:7171"}, + Zones: map[string][]string{"zone": []string{"127.0.0.1:7172"}}, + Env: &dc.Env{ + Zone: "sh001", + DeployEnv: "pre", + Host: "test_server", + }, + } + return c +} func TestReplicate(t *testing.T) { Convey("test replicate", t, func() { i := model.NewInstance(reg) - nodes := NewNodes(&dc.Config{HTTPClient: &http.ClientConfig{}, - Env: &dc.Env{}, - HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, - Nodes: []string{"127.0.0.1:7172", "127.0.0.1:7173", "127.0.0.1:7171"}, - }) + nodes := NewNodes(config) nodes.nodes[0].client.SetTransport(gock.DefaultTransport) nodes.nodes[1].client.SetTransport(gock.DefaultTransport) httpMock("POST", "http://127.0.0.1:7172/discovery/register").Reply(200).JSON(`{"code":0}`) @@ -37,22 +55,12 @@ func TestReplicate(t *testing.T) { func TestNodes(t *testing.T) { Convey("test nodes", t, func() { - nodes := NewNodes(&dc.Config{HTTPClient: &http.ClientConfig{}, - Env: &dc.Env{}, - HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, - Nodes: []string{"127.0.0.1:7172", "127.0.0.1:7173", "127.0.0.1:7171"}, - Zones: map[string][]string{"zone": []string{"127.0.0.1:7172"}}, - }) + nodes := NewNodes(config) res := nodes.Nodes() So(len(res), ShouldResemble, 3) }) Convey("test all nodes", t, func() { - nodes := NewNodes(&dc.Config{HTTPClient: &http.ClientConfig{}, - Env: &dc.Env{}, - HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, - Nodes: []string{"127.0.0.1:7172", "127.0.0.1:7173", "127.0.0.1:7171"}, - Zones: map[string][]string{"zone": []string{"127.0.0.1:7172"}}, - }) + nodes := NewNodes(config) res := nodes.AllNodes() So(len(res), ShouldResemble, 4) }) @@ -60,11 +68,7 @@ func TestNodes(t *testing.T) { func TestUp(t *testing.T) { Convey("test up", t, func() { - nodes := NewNodes(&dc.Config{HTTPClient: &http.ClientConfig{}, - Env: &dc.Env{}, - HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, - Nodes: []string{"127.0.0.1:7172", "127.0.0.1:7173", "127.0.0.1:7171"}, - }) + nodes := NewNodes(config) nodes.UP() for _, nd := range nodes.nodes { if nd.addr == "127.0.0.1:7171" { @@ -77,14 +81,12 @@ func TestUp(t *testing.T) { func TestCall(t *testing.T) { Convey("test call", t, func() { var res *model.Instance - node := newNode(&dc.Config{HTTPClient: &http.ClientConfig{}, - HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, - Nodes: []string{"127.0.0.1:7171"}}, "127.0.0.1:7172") + node := newNode(config, "127.0.0.1:7172") node.client.SetTransport(gock.DefaultTransport) httpMock("POST", "http://127.0.0.1:7172/discovery/register").Reply(200).JSON(`{"ts":1514341945,"code":-409,"data":{"region":"shsb","zone":"fuck","appid":"main.arch.account-service","env":"pre","hostname":"cs4sq","http":"","rpc":"0.0.0.0:18888","weight":2}}`) i := model.NewInstance(reg) err := node.call(context.TODO(), model.Register, i, "http://127.0.0.1:7172/discovery/register", &res) - So(err, ShouldResemble, errors.Conflict) + So(err, ShouldResemble, ecode.Conflict) So(res.AppID, ShouldResemble, "main.arch.account-service") }) } @@ -92,7 +94,7 @@ func TestCall(t *testing.T) { func TestNodeCancel(t *testing.T) { Convey("test node renew 409 error", t, func() { i := model.NewInstance(reg) - node := newNode(&dc.Config{HTTPClient: &http.ClientConfig{}, Nodes: []string{"127.0.0.1:7171", "127.0.0.1:7172"}}, "127.0.0.1:7172") + node := newNode(config, "127.0.0.1:7172") node.pRegisterURL = "http://127.0.0.1:7171/discovery/register" node.client.SetTransport(gock.DefaultTransport) httpMock("POST", "http://127.0.0.1:7172/discovery/cancel").Reply(200).JSON(`{"code":0}`) @@ -104,7 +106,7 @@ func TestNodeCancel(t *testing.T) { func TestNodeRenew(t *testing.T) { Convey("test node renew 409 error", t, func() { i := model.NewInstance(reg) - node := newNode(&dc.Config{HTTPClient: &http.ClientConfig{}}, "127.0.0.1:7172") + node := newNode(config, "127.0.0.1:7172") node.pRegisterURL = "http://127.0.0.1:7171/discovery/register" node.client.SetTransport(gock.DefaultTransport) httpMock("POST", "http://127.0.0.1:7172/discovery/renew").Reply(200).JSON(`{"code":-409,"data":{"region":"shsb","zone":"fuck","appid":"main.arch.account-service","env":"pre","hostname":"cs4sq","http":"","rpc":"0.0.0.0:18888","weight":2}}`) @@ -117,7 +119,7 @@ func TestNodeRenew(t *testing.T) { func TestNodeRenew2(t *testing.T) { Convey("test node renew 404 error", t, func() { i := model.NewInstance(reg) - node := newNode(&dc.Config{HTTPClient: &http.ClientConfig{}, HTTPServer: &dc.ServerConfig{Addr: "127.0.0.1:7171"}, Nodes: []string{"127.0.0.1:7171"}}, "127.0.0.1:7172") + node := newNode(config, "127.0.0.1:7172") node.client.SetTransport(gock.DefaultTransport) httpMock("POST", "http://127.0.0.1:7172/discovery/renew").Reply(200).JSON(`{"code":-404}`) httpMock("POST", "http://127.0.0.1:7172/discovery/register").Reply(200).JSON(`{"code":0}`) diff --git a/registry/registry.go b/registry/registry.go index 0ac96c2..eada66a 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -7,10 +7,10 @@ import ( "time" "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/errors" "github.com/bilibili/discovery/model" - log "github.com/golang/glog" + "github.com/bilibili/kratos/pkg/ecode" + log "github.com/bilibili/kratos/pkg/log" ) const ( @@ -164,7 +164,7 @@ func (r *Registry) Fetch(zone, env, appid string, latestTime int64, status uint3 a, ok := r.appm[key] r.aLock.RUnlock() if !ok { - err = errors.NothingFound + err = ecode.NothingFound return } info, err = a.InstanceInfo(zone, latestTime, status) @@ -179,7 +179,7 @@ func (r *Registry) Fetch(zone, env, appid string, latestTime int64, status uint3 } // Polls hangs request and then write instances when that has changes, or return NotModified. -func (r *Registry) Polls(arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, err error) { +func (r *Registry) Polls(arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, miss string, err error) { var ( ins = make(map[string]*model.InstanceInfo, len(arg.AppID)) in *model.InstanceInfo @@ -189,8 +189,9 @@ func (r *Registry) Polls(arg *model.ArgPolls) (ch chan map[string]*model.Instanc } for i := range arg.AppID { in, err = r.Fetch(arg.Zone, arg.Env, arg.AppID[i], arg.LatestTimestamp[i], model.InstanceStatusUP) - if err == errors.NothingFound { - log.Errorf("Polls zone(%s) env(%s) appid(%s) error(%v)", arg.Zone, arg.Env, arg.AppID[i], err) + if err == ecode.NothingFound { + miss = arg.AppID[i] + log.Error("Polls zone(%s) env(%s) appid(%s) error(%v)", arg.Zone, arg.Env, arg.AppID[i], err) return } if err == nil { @@ -215,13 +216,13 @@ func (r *Registry) Polls(arg *model.ArgPolls) (ch chan map[string]*model.Instanc ch = make(chan map[string]*model.InstanceInfo, 5) // NOTE: there maybe have more than one connection on the same hostname!!! } connection = newConn(ch, arg.LatestTimestamp[i], arg) - log.Infof("Polls from(%s) new connection(%d)", arg.Hostname, connection.count) + log.Info("Polls from(%s) new connection(%d)", arg.Hostname, connection.count) } else { connection.count++ // NOTE: there maybe have more than one connection on the same hostname!!! if ch == nil { ch = connection.ch } - log.Infof("Polls from(%s) reuse connection(%d)", arg.Hostname, connection.count) + log.Info("Polls from(%s) reuse connection(%d)", arg.Hostname, connection.count) } r.conns[k][arg.Hostname] = connection } @@ -241,13 +242,18 @@ func (r *Registry) broadcast(env, appid string) { } delete(r.conns, key) for _, conn := range conns { - ii, _ := r.Fetch(conn.arg.Zone, env, appid, 0, model.InstanceStatusUP) // TODO(felix): latesttime!=0 increase + ii, err := r.Fetch(conn.arg.Zone, env, appid, 0, model.InstanceStatusUP) // TODO(felix): latesttime!=0 increase + if err != nil { + // may be not found ,just continue until next poll return err. + log.Error("get appid:%s env:%s zone:%s err:%v", appid, env, conn.arg.Zone, err) + continue + } for i := 0; i < conn.count; i++ { select { case conn.ch <- map[string]*model.InstanceInfo{appid: ii}: // NOTE: if chan is full, means no poller. - log.Infof("broadcast to(%s) success(%d)", conn.arg.Hostname, i+1) + log.Info("broadcast to(%s) success(%d)", conn.arg.Hostname, i+1) case <-time.After(time.Millisecond * 500): - log.Infof("broadcast to(%s) failed(%d) maybe chan full", conn.arg.Hostname, i+1) + log.Info("broadcast to(%s) failed(%d) maybe chan full", conn.arg.Hostname, i+1) } } } @@ -353,15 +359,15 @@ func (r *Registry) DelConns(arg *model.ArgPolls) { k := pollKey(arg.Env, arg.AppID[i]) conns, ok := r.conns[k] if !ok { - log.Warningf("DelConn key(%s) not found", k) + log.Warn("DelConn key(%s) not found", k) continue } if connection, ok := conns[arg.Hostname]; ok { if connection.count > 1 { - log.Infof("DelConns from(%s) count decr(%d)", arg.Hostname, connection.count) + log.Info("DelConns from(%s) count decr(%d)", arg.Hostname, connection.count) connection.count-- } else { - log.Infof("DelConns from(%s) delete(%d)", arg.Hostname, connection.count) + log.Info("DelConns from(%s) delete(%d)", arg.Hostname, connection.count) delete(conns, arg.Hostname) } } diff --git a/registry/registry_test.go b/registry/registry_test.go index 135f518..b119421 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -7,8 +7,8 @@ import ( "time" "github.com/bilibili/discovery/conf" - "github.com/bilibili/discovery/errors" "github.com/bilibili/discovery/model" + "github.com/bilibili/kratos/pkg/ecode" . "github.com/smartystreets/goconvey/convey" ) @@ -37,14 +37,14 @@ func TestDiscovery(t *testing.T) { info, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.AppID, 0, fetchArg.Status) So(err, ShouldBeNil) So(len(info.Instances["sh0001"]), ShouldEqual, 2) - ch, _, err := r.Polls(pollArg) + ch, _, _, err := r.Polls(pollArg) So(err, ShouldBeNil) apps := <-ch So(len(apps["main.arch.test"].Instances["sh0001"]), ShouldEqual, 2) pollArg.LatestTimestamp[0] = apps["main.arch.test"].LatestTimestamp fmt.Println(apps["main.arch.test"]) r.Cancel(cancel) - ch, _, err = r.Polls(pollArg) + ch, _, _, err = r.Polls(pollArg) So(err, ShouldBeNil) apps = <-ch So(len(apps["main.arch.test"].Instances), ShouldEqual, 1) @@ -88,7 +88,7 @@ func TestCancel(t *testing.T) { So(i, ShouldResemble, src) fetchArg := &model.ArgFetch{Zone: "sh0001", Env: "pre", AppID: "main.arch.test", Status: 3} _, err := r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.AppID, 0, fetchArg.Status) - So(err, ShouldResemble, errors.NothingFound) + So(err, ShouldResemble, ecode.NothingFound) }) } @@ -106,7 +106,7 @@ func BenchmarkCancel(b *testing.B) { } benchCompareInstance(b, src, i) fetchArg := &model.ArgFetch{Zone: "sh0001", Env: "pre", AppID: "main.arch.test", Status: 3} - if _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.AppID, 0, fetchArg.Status); err != errors.NothingFound { + if _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.AppID, 0, fetchArg.Status); err != ecode.NothingFound { b.Errorf("Fetch(%v) error(%v)", src.AppID, err) } } @@ -170,7 +170,7 @@ func TestPoll(t *testing.T) { r := register(t, i) Convey("test poll", t, func() { pollArg := &model.ArgPolls{Zone: "sh0001", Env: "pre", AppID: []string{"main.arch.test"}, Hostname: "csq"} - ch, _, err := r.Polls(pollArg) + ch, _, _, err := r.Polls(pollArg) So(err, ShouldBeNil) c := <-ch So(len(c[pollArg.AppID[0]].Instances), ShouldEqual, 1) @@ -183,7 +183,7 @@ func TestPolls(t *testing.T) { r := register(t, i1, i2) Convey("test polls", t, func() { pollArg := &model.ArgPolls{Zone: "sh0001", Env: "pre", LatestTimestamp: []int64{0, 0}, AppID: []string{"main.arch.test", "main.arch.test2"}, Hostname: "csq"} - ch, new, err := r.Polls(pollArg) + ch, new, _, err := r.Polls(pollArg) So(err, ShouldBeNil) So(new, ShouldBeTrue) c := <-ch @@ -204,12 +204,12 @@ func TestPollsChan(t *testing.T) { err error ) pollArg := &model.ArgPolls{Zone: "sh0001", Env: "pre", LatestTimestamp: []int64{time.Now().UnixNano(), time.Now().UnixNano()}, AppID: []string{"main.arch.test", "main.arch.test2"}, Hostname: "csq"} - ch1, new, err = r.Polls(pollArg) - c.So(err, ShouldEqual, errors.NotModified) + ch1, new, _, err = r.Polls(pollArg) + c.So(err, ShouldEqual, ecode.NotModified) c.So(new, ShouldBeFalse) c.So(ch1, ShouldNotBeNil) - ch2, new, err = r.Polls(pollArg) - c.So(err, ShouldEqual, errors.NotModified) + ch2, new, _, err = r.Polls(pollArg) + c.So(err, ShouldEqual, ecode.NotModified) c.So(new, ShouldBeFalse) c.So(ch2, ShouldNotBeNil) // wait group @@ -243,7 +243,7 @@ func BenchmarkPoll(b *testing.B) { ) r, _ := benchRegister(b) pollArg := &model.ArgPolls{Zone: "sh0001", Env: "pre", AppID: []string{"main.arch.test"}, Hostname: "csq"} - if ch, _, err = r.Polls(pollArg); err != nil { + if ch, _, _, err = r.Polls(pollArg); err != nil { b.Errorf("Poll(%v) error(%v)", arg.AppID, err) } if c = <-ch; len(c[pollArg.AppID[0]].Instances) != 1 { @@ -267,8 +267,8 @@ func TestBroadcast(t *testing.T) { }) }() pollArg := &model.ArgPolls{Zone: "sh0001", Env: "pre", AppID: []string{"main.arch.test"}, LatestTimestamp: []int64{time.Now().UnixNano()}} - ch, _, err := r.Polls(pollArg) - So(err, ShouldResemble, errors.NotModified) + ch, _, _, err := r.Polls(pollArg) + So(err, ShouldResemble, ecode.NotModified) c := <-ch So(len(c[pollArg.AppID[0]].Instances["sh0001"]), ShouldResemble, 2) So(c[pollArg.AppID[0]].Instances, ShouldNotBeNil) @@ -294,7 +294,7 @@ func BenchmarkBroadcast(b *testing.B) { } }() pollArg := &model.ArgPolls{Zone: "sh0001", Env: "pre", AppID: []string{"main.arch.test"}, LatestTimestamp: []int64{time.Now().UnixNano()}} - if ch, _, err = r.Polls(pollArg); err != nil && err != errors.NotModified { + if ch, _, _, err = r.Polls(pollArg); err != nil && err != ecode.NotModified { b.Errorf("Poll(%v) error(%v)", pollArg.AppID, err) } c = <-ch @@ -463,6 +463,6 @@ func TestEvict2(t *testing.T) { r.evict() fetchArg := &model.ArgFetch{Zone: "sh0001", Env: "pre", AppID: "main.arch.test", Status: 1} _, err = r.Fetch(fetchArg.Zone, fetchArg.Env, fetchArg.AppID, 0, fetchArg.Status) - So(err, ShouldResemble, errors.NothingFound) + So(err, ShouldResemble, ecode.NothingFound) }) } diff --git a/registry/scheduler.go b/registry/scheduler.go index 2feb53c..353c8c0 100644 --- a/registry/scheduler.go +++ b/registry/scheduler.go @@ -6,7 +6,7 @@ import ( "github.com/bilibili/discovery/model" - log "github.com/golang/glog" + log "github.com/bilibili/kratos/pkg/log" ) // Scheduler info. @@ -31,7 +31,7 @@ func (s *scheduler) Load(conf []byte) { schs := make([]*model.Scheduler, 0) err := json.Unmarshal(conf, &schs) if err != nil { - log.Errorf("load scheduler info err %v", err) + log.Error("load scheduler info err %v", err) } for _, sch := range schs { s.schedulers[appsKey(sch.AppID, sch.Env)] = sch From e713b767d65c6157fe894d921bf70498d592fb20 Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Wed, 3 Jul 2019 09:49:54 +0800 Subject: [PATCH 2/4] add replicateset test --- CHANGELOG.md | 5 +++ discovery/register.go | 3 ++ discovery/register_test.go | 2 +- model/instance.go | 4 +- model/param.go | 5 ++- registry/node.go | 38 +++++++++++++++++++ registry/node_test.go | 76 +++++++++++++++++++++++++++++++------- registry/nodes.go | 27 +++++++++++++- registry/registry_test.go | 5 ++- 9 files changed, 144 insertions(+), 21 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b6b5da8..41f4907 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,4 +1,9 @@ #### Discovery +##### Version 1.1.0 +> 1. use kratos pkg +> 2. replace gin by kratos/bm +> 3. fix poll return nil when been canceled. + ##### Version 1.0.2 > 1.fix nodesproc. get all zone nodes. diff --git a/discovery/register.go b/discovery/register.go index e9e9f3d..c087737 100644 --- a/discovery/register.go +++ b/discovery/register.go @@ -96,5 +96,8 @@ func (d *Discovery) Set(c context.Context, arg *model.ArgSet) (err error) { if !d.registry.Set(arg) { err = ecode.RequestErr } + if !arg.Replication { + d.nodes.Load().(*registry.Nodes).ReplicateSet(c, arg, arg.FromZone) + } return } diff --git a/discovery/register_test.go b/discovery/register_test.go index 7201196..0e87e8f 100644 --- a/discovery/register_test.go +++ b/discovery/register_test.go @@ -27,7 +27,7 @@ var ( set = &model.ArgSet{AppID: "main.arch.test", Zone: "sh001", Env: "pre", Hostname: []string{"test1"}, - Status: []uint32{1}, + Status: []int64{1}, } pollArg = newPoll() ) diff --git a/model/instance.go b/model/instance.go index 6e80c70..ddeae24 100644 --- a/model/instance.go +++ b/model/instance.go @@ -320,12 +320,12 @@ func (a *App) Set(changes *ArgSet) (ok bool) { return } if len(changes.Status) != 0 { - if changes.Status[i] != InstanceStatusUP && changes.Status[i] != InstancestatusWating { + if uint32(changes.Status[i]) != InstanceStatusUP && uint32(changes.Status[i]) != InstancestatusWating { log.Error("SetWeight change status(%d) is error", changes.Status[i]) ok = false return } - dst.Status = changes.Status[i] + dst.Status = uint32(changes.Status[i]) if dst.Status == InstanceStatusUP { dst.UpTimestamp = setTime } diff --git a/model/param.go b/model/param.go index b367b51..bccf388 100644 --- a/model/param.go +++ b/model/param.go @@ -75,11 +75,12 @@ type ArgPolls struct { // ArgSet define set param. type ArgSet struct { + Region string `form:"region"` Zone string `form:"zone" validate:"required"` Env string `form:"env" validate:"required"` AppID string `form:"appid" validate:"required"` - Hostname []string `form:"hostname" validate:"gte=0"` - Status []uint32 `form:"status" validate:"gte=0"` + Hostname []string `form:"hostname,split" validate:"gte=0"` + Status []int64 `form:"status,split" validate:"gte=0"` Metadata []string `form:"metadata" validate:"gte=0"` Replication bool `form:"replication"` FromZone bool `form:"from_zone"` diff --git a/registry/node.go b/registry/node.go index ae81ff3..92fe07b 100644 --- a/registry/node.go +++ b/registry/node.go @@ -13,6 +13,7 @@ import ( "github.com/bilibili/kratos/pkg/ecode" log "github.com/bilibili/kratos/pkg/log" http "github.com/bilibili/kratos/pkg/net/http/blademaster" + xstr "github.com/bilibili/kratos/pkg/str" ) const ( @@ -101,6 +102,11 @@ func (n *Node) Renew(c context.Context, i *model.Instance) (err error) { return } +// Set the infomation of instance by this node to the peer node represented +func (n *Node) Set(c context.Context, arg *model.ArgSet) (err error) { + err = n.setCall(c, arg, n.setURL) + return +} func (n *Node) call(c context.Context, action model.Action, i *model.Instance, uri string, data interface{}) (err error) { params := url.Values{} params.Set("zone", i.Zone) @@ -135,6 +141,7 @@ func (n *Node) call(c context.Context, action model.Action, i *model.Instance, u log.Error("node be called(%s) instance(%v) error(%v)", uri, i, err) return } + fmt.Println(res) if res.Code != 0 { log.Error("node be called(%s) instance(%v) response code(%v)", uri, i, res.Code) if err = ecode.Int(res.Code); err == ecode.Conflict { @@ -143,3 +150,34 @@ func (n *Node) call(c context.Context, action model.Action, i *model.Instance, u } return } + +func (n *Node) setCall(c context.Context, arg *model.ArgSet, uri string) (err error) { + params := url.Values{} + params.Set("region", arg.Region) + params.Set("zone", arg.Zone) + params.Set("env", arg.Env) + params.Set("appid", arg.AppID) + params.Set("hostname", strings.Join(arg.Hostname, ",")) + params.Set("set_timestamp", strconv.FormatInt(arg.SetTimestamp, 10)) + params.Set("replication", "true") + if len(arg.Status) != 0 { + params.Set("status", xstr.JoinInts(arg.Status)) + } + if len(arg.Metadata) != 0 { + for _, metadata := range arg.Metadata { + params.Add("metadata", metadata) + } + } + var res struct { + Code int `json:"code"` + } + if err = n.client.Post(c, uri, "", params, &res); err != nil { + log.Error("node be setCalled(%s) appid(%s) env (%s) error(%v)", uri, arg.AppID, arg.Env, err) + return + } + if res.Code != 0 { + log.Error("node be setCalled(%s) appid(%s) env (%s) responce code(%v)", uri, arg.AppID, arg.Env, res.Code) + err = ecode.Int(res.Code) + } + return +} diff --git a/registry/node_test.go b/registry/node_test.go index af2d454..5978fef 100644 --- a/registry/node_test.go +++ b/registry/node_test.go @@ -2,6 +2,7 @@ package registry import ( "context" + "net/http" "strings" "testing" "time" @@ -9,7 +10,8 @@ import ( dc "github.com/bilibili/discovery/conf" "github.com/bilibili/discovery/model" "github.com/bilibili/kratos/pkg/ecode" - http "github.com/bilibili/kratos/pkg/net/http/blademaster" + bm "github.com/bilibili/kratos/pkg/net/http/blademaster" + "github.com/bilibili/kratos/pkg/net/http/blademaster/binding" xtime "github.com/bilibili/kratos/pkg/time" . "github.com/smartystreets/goconvey/convey" @@ -20,14 +22,13 @@ var config = newConfig() func newConfig() *dc.Config { c := &dc.Config{ - HTTPClient: &http.ClientConfig{ + HTTPClient: &bm.ClientConfig{ Timeout: xtime.Duration(time.Second * 30), Dial: xtime.Duration(time.Second), KeepAlive: xtime.Duration(time.Second * 30), }, - HTTPServer: &http.ServerConfig{Addr: "127.0.0.1:7171"}, - Nodes: []string{"127.0.0.1:7172", "127.0.0.1:7173", "127.0.0.1:7171"}, - Zones: map[string][]string{"zone": []string{"127.0.0.1:7172"}}, + HTTPServer: &bm.ServerConfig{Addr: "127.0.0.1:7171"}, + Nodes: []string{"127.0.0.1:7172"}, Env: &dc.Env{ Zone: "sh001", DeployEnv: "pre", @@ -41,9 +42,7 @@ func TestReplicate(t *testing.T) { i := model.NewInstance(reg) nodes := NewNodes(config) nodes.nodes[0].client.SetTransport(gock.DefaultTransport) - nodes.nodes[1].client.SetTransport(gock.DefaultTransport) httpMock("POST", "http://127.0.0.1:7172/discovery/register").Reply(200).JSON(`{"code":0}`) - httpMock("POST", "http://127.0.0.1:7173/discovery/register").Reply(200).JSON(`{"code":0}`) err := nodes.Replicate(context.TODO(), model.Register, i, false) So(err, ShouldBeNil) err = nodes.Replicate(context.TODO(), model.Renew, i, false) @@ -52,17 +51,68 @@ func TestReplicate(t *testing.T) { So(err, ShouldBeNil) }) } +func match(h *http.Request, mock *gock.Request) (ok bool, err error) { + ok = true + err = nil + var arg = new(model.ArgSet) + err = binding.Form.Bind(h, arg) + + if h.URL.Path == "/discovery/set" { + if err != nil { + mock.Reply(200).JSON(`{"ts":1514341945,"code":-400}`) + return + } + if len(arg.Hostname) != len(arg.Status) || len(arg.Hostname) != len(arg.Metadata) { + mock.Reply(200).JSON(`{"ts":1514341945,"code":-400}`) + return + } + mock.Reply(200).JSON(`{"ts":1514341945,"code":0}`) + } + return +} + +func TestReplicateSet(t *testing.T) { + Convey("test replicate set", t, func(c C) { + nodes := NewNodes(newConfig()) + nodes.nodes[0].client.SetTransport(gock.DefaultTransport) + httpMock("POST", "http://127.0.0.1:7172/discovery/set").AddMatcher(match) + set := &model.ArgSet{ + Region: "shsb", + Zone: "sh001", + Env: "pre", + AppID: "main.arch.account-service", + Hostname: []string{"test1", "test2"}, + Status: []int64{1, 1}, + Metadata: []string{`{"aa":1,"bb:2"}`, `{"aa":1,"bb:3"}`}, + } + err := nodes.ReplicateSet(context.TODO(), set, false) + c.So(err, ShouldBeNil) + set = &model.ArgSet{ + Region: "shsb", + Zone: "sh001", + Env: "pre", + AppID: "main.arch.account-service", + Hostname: []string{"test1", "test2"}, + Status: []int64{1, 1}, + Metadata: []string{`{"aa":1,"bb:2"}`}, + } + err = nodes.ReplicateSet(context.TODO(), set, false) + c.So(err, ShouldNotBeNil) + }) +} func TestNodes(t *testing.T) { Convey("test nodes", t, func() { nodes := NewNodes(config) res := nodes.Nodes() - So(len(res), ShouldResemble, 3) + So(len(res), ShouldResemble, 1) }) Convey("test all nodes", t, func() { - nodes := NewNodes(config) + cfg := newConfig() + cfg.Zones = map[string][]string{"zone": []string{"127.0.0.1:7172"}} + nodes := NewNodes(cfg) res := nodes.AllNodes() - So(len(res), ShouldResemble, 4) + So(len(res), ShouldResemble, 2) }) } @@ -81,11 +131,11 @@ func TestUp(t *testing.T) { func TestCall(t *testing.T) { Convey("test call", t, func() { var res *model.Instance - node := newNode(config, "127.0.0.1:7172") + node := newNode(newConfig(), "127.0.0.1:7173") node.client.SetTransport(gock.DefaultTransport) - httpMock("POST", "http://127.0.0.1:7172/discovery/register").Reply(200).JSON(`{"ts":1514341945,"code":-409,"data":{"region":"shsb","zone":"fuck","appid":"main.arch.account-service","env":"pre","hostname":"cs4sq","http":"","rpc":"0.0.0.0:18888","weight":2}}`) + httpMock("POST", "http://127.0.0.1:7174/discovery/register").Reply(200).JSON(`{"ts":1514341945,"code":-409,"data":{"region":"shsb","zone":"fuck","appid":"main.arch.account-service","env":"pre","hostname":"cs4sq","http":"","rpc":"0.0.0.0:18888","weight":2}}`) i := model.NewInstance(reg) - err := node.call(context.TODO(), model.Register, i, "http://127.0.0.1:7172/discovery/register", &res) + err := node.call(context.TODO(), model.Register, i, "http://127.0.0.1:7174/discovery/register", &res) So(err, ShouldResemble, ecode.Conflict) So(res.AppID, ShouldResemble, "main.arch.account-service") }) diff --git a/registry/nodes.go b/registry/nodes.go index d63280d..2c375aa 100644 --- a/registry/nodes.go +++ b/registry/nodes.go @@ -67,7 +67,32 @@ func (ns *Nodes) Replicate(c context.Context, action model.Action, i *model.Inst err = eg.Wait() return } - +// ReplicateSet replicate set information to all nodes except for this node. +func (ns *Nodes) ReplicateSet(c context.Context, arg *model.ArgSet, otherZone bool) (err error) { + if len(ns.nodes) == 0 { + return + } + eg, c := errgroup.WithContext(c) + for _, n := range ns.nodes { + if !ns.Myself(n.addr) { + eg.Go(func() error { + return n.Set(c, arg) + }) + } + } + if !otherZone { + for _, zns := range ns.zones { + if n := len(zns); n > 0 { + node := zns[rand.Intn(n)] + eg.Go(func() error { + return node.Set(c, arg) + }) + } + } + } + err = eg.Wait() + return +} func (ns *Nodes) action(c context.Context, eg *errgroup.Group, action model.Action, n *Node, i *model.Instance) { switch action { case model.Register: diff --git a/registry/registry_test.go b/registry/registry_test.go index b119421..440cc4b 100644 --- a/registry/registry_test.go +++ b/registry/registry_test.go @@ -315,7 +315,7 @@ func TestSet(t *testing.T) { r := register(t, i) changes := &model.ArgSet{Zone: "sh0001", Env: "pre", AppID: "main.arch.test"} changes.Hostname = []string{"reg"} - changes.Status = []uint32{1} + changes.Status = []int64{1} Convey("test setstatus to 1", t, func() { ok := r.Set(changes) So(ok, ShouldBeTrue) @@ -369,7 +369,8 @@ func BenchmarkSet(b *testing.B) { r, _ := benchRegister(b) changes := &model.ArgSet{Zone: "sh0001", Env: "pre", AppID: "main.arch.test"} changes.Hostname = []string{"reg"} - changes.Status = []uint32{1} + changes.Status = []int64{1} + if ok = r.Set(changes); !ok { b.Errorf("SetStatus(%v) error", arg.AppID) } From 4ffba1b02b4c3402375ac05e0092e453ee6d15de Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Wed, 3 Jul 2019 11:15:43 +0800 Subject: [PATCH 3/4] set replicate --- CHANGELOG.md | 4 +++- cmd/discovery/discovery-example.toml | 4 +++- conf/conf.go | 15 ++++++++------- discovery/discovery.go | 7 ++++--- http/discovery.go | 1 - http/http.go | 5 +++-- model/param.go | 10 +++++----- naming/client_test.go | 3 +-- registry/node.go | 3 ++- 9 files changed, 29 insertions(+), 23 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41f4907..3f0fa8b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,7 +2,9 @@ ##### Version 1.1.0 > 1. use kratos pkg > 2. replace gin by kratos/bm -> 3. fix poll return nil when been canceled. +> 3. fix poll return nil when be canceled. +> 4. add init protect mode +> 5. supoort set. ##### Version 1.0.2 > 1.fix nodesproc. get all zone nodes. diff --git a/cmd/discovery/discovery-example.toml b/cmd/discovery/discovery-example.toml index b69fc95..6289f2b 100644 --- a/cmd/discovery/discovery-example.toml +++ b/cmd/discovery/discovery-example.toml @@ -1,7 +1,7 @@ # 同一discovery集群的所有node节点地址,包含本node nodes = ["127.0.0.1:7171"] - +enableprotect=false # 本可用区zone(一般指机房)标识 [env] region = "sh" @@ -19,6 +19,7 @@ DeployEnv = "dev" # 注意:ip别配置为0.0.0.0或者127.0.0.1 [httpServer] addr = "127.0.0.1:7171" +timeout="40s" # 当前节点同步其他节点使用的http client # dial 连接建立超时时间 @@ -26,3 +27,4 @@ addr = "127.0.0.1:7171" [httpClient] dial = "1s" keepAlive = "120s" +timeout="40s" diff --git a/conf/conf.go b/conf/conf.go index 2776274..d93c31f 100644 --- a/conf/conf.go +++ b/conf/conf.go @@ -33,13 +33,14 @@ func init() { // Config config. type Config struct { - Nodes []string - Zones map[string][]string - HTTPServer *http.ServerConfig - HTTPClient *http.ClientConfig - Env *Env - Log *log.Config - Scheduler []byte + Nodes []string + Zones map[string][]string + HTTPServer *http.ServerConfig + HTTPClient *http.ClientConfig + Env *Env + Log *log.Config + Scheduler []byte + EnableProtect bool } // Fix fix env config. diff --git a/discovery/discovery.go b/discovery/discovery.go index 1041190..c77d35b 100644 --- a/discovery/discovery.go +++ b/discovery/discovery.go @@ -22,9 +22,10 @@ type Discovery struct { // New get a discovery. func New(c *conf.Config) (d *Discovery, cancel context.CancelFunc) { d = &Discovery{ - c: c, - client: http.NewClient(c.HTTPClient), - registry: registry.NewRegistry(c), + protected: c.EnableProtect, + c: c, + client: http.NewClient(c.HTTPClient), + registry: registry.NewRegistry(c), } d.nodes.Store(registry.NewNodes(c)) d.syncUp() diff --git a/http/discovery.go b/http/discovery.go index 9c6b386..7343969 100644 --- a/http/discovery.go +++ b/http/discovery.go @@ -74,7 +74,6 @@ func fetch(c *bm.Context) { func fetchs(c *bm.Context) { arg := new(model.ArgFetchs) if err := c.Bind(arg); err != nil { - return } c.JSON(dis.Fetchs(c, arg)) diff --git a/http/http.go b/http/http.go index 318c4ff..37a3612 100644 --- a/http/http.go +++ b/http/http.go @@ -47,7 +47,8 @@ func innerRouter(e *bm.Engine) { func initProtect(ctx *bm.Context) { if dis.Protected() { - ctx.JSON(nil, errProtected) - ctx.AbortWithStatus(503) + ctx.JSONMap(map[string]interface{}{ + "message": errProtected.Error(), + }, errProtected) } } diff --git a/model/param.go b/model/param.go index bccf388..34383c7 100644 --- a/model/param.go +++ b/model/param.go @@ -8,7 +8,7 @@ type ArgRegister struct { AppID string `form:"appid" validate:"required"` Hostname string `form:"hostname" validate:"required"` Status uint32 `form:"status" validate:"required"` - Addrs []string `form:"addrs" validate:"gt=0"` + Addrs []string `form:"addrs,split" validate:"gt=0"` Version string `form:"version"` Metadata string `form:"metadata"` Replication bool `form:"replication"` @@ -51,7 +51,7 @@ type ArgFetch struct { type ArgFetchs struct { Zone string `form:"zone"` Env string `form:"env" validate:"required"` - AppID []string `form:"appid" validate:"gt=0"` + AppID []string `form:"appid,split" validate:"gt=0"` Status uint32 `form:"status" validate:"required"` } @@ -68,9 +68,9 @@ type ArgPoll struct { type ArgPolls struct { Zone string `form:"zone"` Env string `form:"env" validate:"required"` - AppID []string `form:"appid" validate:"gt=0"` + AppID []string `form:"appid,split" validate:"gt=0"` Hostname string `form:"hostname" validate:"required"` - LatestTimestamp []int64 `form:"latest_timestamp"` + LatestTimestamp []int64 `form:"latest_timestamp,split"` } // ArgSet define set param. @@ -81,7 +81,7 @@ type ArgSet struct { AppID string `form:"appid" validate:"required"` Hostname []string `form:"hostname,split" validate:"gte=0"` Status []int64 `form:"status,split" validate:"gte=0"` - Metadata []string `form:"metadata" validate:"gte=0"` + Metadata []string `form:"metadata" validate:"gte=0"` // metadata may contain `,` , use metadata=xx&metadata=xx instead of split by ',' Replication bool `form:"replication"` FromZone bool `form:"from_zone"` SetTimestamp int64 `form:"set_timestamp"` diff --git a/naming/client_test.go b/naming/client_test.go index 801f611..c778b09 100644 --- a/naming/client_test.go +++ b/naming/client_test.go @@ -37,7 +37,7 @@ func mockDiscoverySvr() { Nodes: []string{"127.0.0.1:7171"}, HTTPServer: &xhttp.ServerConfig{ Addr: "127.0.0.1:7171", - Timeout: xtime.Duration(time.Second * 1), + Timeout: xtime.Duration(time.Second * 31), }, HTTPClient: &xhttp.ClientConfig{ Timeout: xtime.Duration(time.Second * 40), @@ -59,7 +59,6 @@ func TestDiscovery(t *testing.T) { Host: "test-host", } dis := New(conf) - println("new") appid := "test1" Convey("test discovery register", t, func() { instance := &Instance{ diff --git a/registry/node.go b/registry/node.go index 92fe07b..c1fbff3 100644 --- a/registry/node.go +++ b/registry/node.go @@ -109,10 +109,12 @@ func (n *Node) Set(c context.Context, arg *model.ArgSet) (err error) { } func (n *Node) call(c context.Context, action model.Action, i *model.Instance, uri string, data interface{}) (err error) { params := url.Values{} + params.Set("region", i.Region) params.Set("zone", i.Zone) params.Set("env", i.Env) params.Set("appid", i.AppID) params.Set("hostname", i.Hostname) + params.Set("from_zone", "true") if n.otherZone { params.Set("replication", "false") } else { @@ -141,7 +143,6 @@ func (n *Node) call(c context.Context, action model.Action, i *model.Instance, u log.Error("node be called(%s) instance(%v) error(%v)", uri, i, err) return } - fmt.Println(res) if res.Code != 0 { log.Error("node be called(%s) instance(%v) response code(%v)", uri, i, res.Code) if err = ecode.Int(res.Code); err == ecode.Conflict { From 655e8a4cb4e8f4dd608e64e59d2b6e3fdd722190 Mon Sep 17 00:00:00 2001 From: Tanghui Lin Date: Wed, 3 Jul 2019 12:22:27 +0800 Subject: [PATCH 4/4] fix test --- naming/client.go | 10 +++------- naming/client_test.go | 6 +++--- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/naming/client.go b/naming/client.go index c19d011..d9ae8aa 100644 --- a/naming/client.go +++ b/naming/client.go @@ -8,7 +8,6 @@ import ( "math/rand" "net/url" "os" - "strconv" "strings" "sync" "sync/atomic" @@ -17,6 +16,7 @@ import ( ecode "github.com/bilibili/kratos/pkg/ecode" log "github.com/bilibili/kratos/pkg/log" http "github.com/bilibili/kratos/pkg/net/http/blademaster" + xstr "github.com/bilibili/kratos/pkg/str" xtime "github.com/bilibili/kratos/pkg/time" ) @@ -541,12 +541,8 @@ func (d *Discovery) polls(ctx context.Context) (apps map[string]*InstancesInfo, params := url.Values{} params.Set("env", c.Env) params.Set("hostname", c.Host) - for _, appid := range appIDs { - params.Add("appid", appid) - } - for _, ts := range lastTss { - params.Add("latest_timestamp", strconv.FormatInt(ts, 10)) - } + params.Set("appid", strings.Join(appIDs, ",")) + params.Set("latest_timestamp", xstr.JoinInts(lastTss)) if err = d.httpClient.Get(ctx, uri, "", params, res); err != nil { d.switchNode() log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err) diff --git a/naming/client_test.go b/naming/client_test.go index c778b09..8b474f5 100644 --- a/naming/client_test.go +++ b/naming/client_test.go @@ -37,12 +37,12 @@ func mockDiscoverySvr() { Nodes: []string{"127.0.0.1:7171"}, HTTPServer: &xhttp.ServerConfig{ Addr: "127.0.0.1:7171", - Timeout: xtime.Duration(time.Second * 31), + Timeout: xtime.Duration(time.Second * 1), }, HTTPClient: &xhttp.ClientConfig{ - Timeout: xtime.Duration(time.Second * 40), + Timeout: xtime.Duration(time.Second * 1), Dial: xtime.Duration(time.Second), - KeepAlive: xtime.Duration(time.Second * 30), + KeepAlive: xtime.Duration(time.Second * 1), }, } _ = c.Fix()