Skip to content

Commit

Permalink
Merge pull request #33 from bilibili/v1.1
Browse files Browse the repository at this point in the history
V1.1
  • Loading branch information
lintanghui authored Jul 4, 2019
2 parents f49d4a1 + 655e8a4 commit 895e035
Show file tree
Hide file tree
Showing 27 changed files with 510 additions and 604 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,11 @@
#### Discovery
##### Version 1.1.0
> 1. use kratos pkg
> 2. replace gin by kratos/bm
> 3. fix poll return nil when be canceled.
> 4. add init protect mode
> 5. supoort set.
##### Version 1.0.2
> 1.fix nodesproc. get all zone nodes.
Expand Down
4 changes: 3 additions & 1 deletion cmd/discovery/discovery-example.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

# 同一discovery集群的所有node节点地址,包含本node
nodes = ["127.0.0.1:7171"]

enableprotect=false
# 本可用区zone(一般指机房)标识
[env]
region = "sh"
Expand All @@ -19,10 +19,12 @@ DeployEnv = "dev"
# 注意:ip别配置为0.0.0.0或者127.0.0.1
[httpServer]
addr = "127.0.0.1:7171"
timeout="40s"

# 当前节点同步其他节点使用的http client
# dial 连接建立超时时间
# keepAlive 连接复用保持时间
[httpClient]
dial = "1s"
keepAlive = "120s"
timeout="40s"
9 changes: 4 additions & 5 deletions cmd/discovery/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,29 @@ import (
"github.com/bilibili/discovery/conf"
"github.com/bilibili/discovery/discovery"
"github.com/bilibili/discovery/http"

log "github.com/golang/glog"
log "github.com/bilibili/kratos/pkg/log"
)

func main() {
flag.Parse()
if err := conf.Init(); err != nil {
log.Errorf("conf.Init() error(%v)", err)
log.Error("conf.Init() error(%v)", err)
panic(err)
}
log.Init(conf.Conf.Log)
dis, cancel := discovery.New(conf.Conf)
http.Init(conf.Conf, dis)
// init signal
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
s := <-c
log.Infof("discovery get a signal %s", s.String())
log.Info("discovery get a signal %s", s.String())
switch s {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
cancel()
time.Sleep(time.Second)
log.Info("discovery quit !!!")
log.Flush()
return
case syscall.SIGHUP:
default:
Expand Down
26 changes: 10 additions & 16 deletions conf/conf.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import (
"io/ioutil"
"os"

"github.com/bilibili/discovery/lib/http"

"github.com/BurntSushi/toml"
log "github.com/bilibili/kratos/pkg/log"
http "github.com/bilibili/kratos/pkg/net/http/blademaster"
)

var (
Expand All @@ -27,21 +27,20 @@ func init() {
hostname = os.Getenv("HOSTNAME")
}
flag.StringVar(&confPath, "conf", "discovery-example.toml", "config path")
flag.StringVar(&region, "region", os.Getenv("REGION"), "avaliable region. or use REGION env variable, value: sh etc.")
flag.StringVar(&zone, "zone", os.Getenv("ZONE"), "avaliable zone. or use ZONE env variable, value: sh001/sh002 etc.")
flag.StringVar(&deployEnv, "deploy.env", os.Getenv("DEPLOY_ENV"), "deploy env. or use DEPLOY_ENV env variable, value: dev/fat1/uat/pre/prod etc.")
flag.StringVar(&hostname, "hostname", hostname, "machine hostname")
flag.StringVar(&schedulerPath, "scheduler", "scheduler.json", "scheduler info")
}

// Config config.
type Config struct {
Nodes []string
Zones map[string][]string
HTTPServer *ServerConfig
HTTPClient *http.ClientConfig
Env *Env
Scheduler []byte
Nodes []string
Zones map[string][]string
HTTPServer *http.ServerConfig
HTTPClient *http.ClientConfig
Env *Env
Log *log.Config
Scheduler []byte
EnableProtect bool
}

// Fix fix env config.
Expand Down Expand Up @@ -72,11 +71,6 @@ type Env struct {
DeployEnv string
}

// ServerConfig Http Servers conf.
type ServerConfig struct {
Addr string
}

// Init init conf
func Init() (err error) {
if _, err = toml.DecodeFile(confPath, &Conf); err != nil {
Expand Down
25 changes: 17 additions & 8 deletions discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,39 @@ package discovery
import (
"context"
"sync/atomic"
"time"

"github.com/bilibili/discovery/conf"
"github.com/bilibili/discovery/lib/http"
"github.com/bilibili/discovery/registry"
http "github.com/bilibili/kratos/pkg/net/http/blademaster"
)

// Discovery discovery.
type Discovery struct {
c *conf.Config
client *http.Client
registry *registry.Registry
nodes atomic.Value
c *conf.Config
protected bool
client *http.Client
registry *registry.Registry
nodes atomic.Value
}

// New get a discovery.
func New(c *conf.Config) (d *Discovery, cancel context.CancelFunc) {
d = &Discovery{
c: c,
client: http.NewClient(c.HTTPClient),
registry: registry.NewRegistry(c),
protected: c.EnableProtect,
c: c,
client: http.NewClient(c.HTTPClient),
registry: registry.NewRegistry(c),
}
d.nodes.Store(registry.NewNodes(c))
d.syncUp()
cancel = d.regSelf()
go d.nodesproc()
go d.exitProtect()
return
}

func (d *Discovery) exitProtect() {
time.Sleep(time.Second * 90)
d.protected = false
}
29 changes: 16 additions & 13 deletions discovery/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,37 @@ package discovery
import (
"context"

"github.com/bilibili/discovery/errors"
"github.com/bilibili/discovery/model"
"github.com/bilibili/discovery/registry"
"github.com/bilibili/kratos/pkg/ecode"

log "github.com/golang/glog"
log "github.com/bilibili/kratos/pkg/log"
)

// Register a new instance.
func (d *Discovery) Register(c context.Context, ins *model.Instance, latestTimestamp int64, replication bool) {
func (d *Discovery) Register(c context.Context, ins *model.Instance, latestTimestamp int64, replication bool, fromzone bool) {
_ = d.registry.Register(ins, latestTimestamp)
if !replication {
_ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Register, ins, ins.Zone != d.c.Env.Zone)
_ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Register, ins, fromzone)
}
}

// Renew marks the given instance of the given app name as renewed, and also marks whether it originated from replication.
func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Instance, err error) {
i, ok := d.registry.Renew(arg)
if !ok {
err = errors.NothingFound
log.Errorf("renew appid(%s) hostname(%s) zone(%s) env(%s) error", arg.AppID, arg.Hostname, arg.Zone, arg.Env)
err = ecode.NothingFound
log.Error("renew appid(%s) hostname(%s) zone(%s) env(%s) error", arg.AppID, arg.Hostname, arg.Zone, arg.Env)
return
}
if !arg.Replication {
_ = d.nodes.Load().(*registry.Nodes).Replicate(c, model.Renew, i, arg.Zone != d.c.Env.Zone)
return
}
if arg.DirtyTimestamp > i.DirtyTimestamp {
err = errors.NothingFound
err = ecode.NothingFound
} else if arg.DirtyTimestamp < i.DirtyTimestamp {
err = errors.Conflict
err = ecode.Conflict
}
return
}
Expand All @@ -42,8 +42,8 @@ func (d *Discovery) Renew(c context.Context, arg *model.ArgRenew) (i *model.Inst
func (d *Discovery) Cancel(c context.Context, arg *model.ArgCancel) (err error) {
i, ok := d.registry.Cancel(arg)
if !ok {
err = errors.NothingFound
log.Errorf("cancel appid(%s) hostname(%s) error", arg.AppID, arg.Hostname)
err = ecode.NothingFound
log.Error("cancel appid(%s) hostname(%s) error", arg.AppID, arg.Hostname)
return
}
if !arg.Replication {
Expand All @@ -68,7 +68,7 @@ func (d *Discovery) Fetchs(c context.Context, arg *model.ArgFetchs) (is map[stri
for _, appid := range arg.AppID {
i, err := d.registry.Fetch(arg.Zone, arg.Env, appid, 0, arg.Status)
if err != nil {
log.Errorf("Fetchs fetch appid(%v) err", err)
log.Error("Fetchs fetch appid(%v) err", err)
continue
}
is[appid] = i
Expand All @@ -77,7 +77,7 @@ func (d *Discovery) Fetchs(c context.Context, arg *model.ArgFetchs) (is map[stri
}

// Polls hangs request and then write instances when that has changes, or return NotModified.
func (d *Discovery) Polls(c context.Context, arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, err error) {
func (d *Discovery) Polls(c context.Context, arg *model.ArgPolls) (ch chan map[string]*model.InstanceInfo, new bool, miss string, err error) {
return d.registry.Polls(arg)
}

Expand All @@ -94,7 +94,10 @@ func (d *Discovery) Nodes(c context.Context) (nsi []*model.Node) {
// Set set metadata,color,status of instance.
func (d *Discovery) Set(c context.Context, arg *model.ArgSet) (err error) {
if !d.registry.Set(arg) {
err = errors.ParamsErr
err = ecode.RequestErr
}
if !arg.Replication {
d.nodes.Load().(*registry.Nodes).ReplicateSet(c, arg, arg.FromZone)
}
return
}
Loading

0 comments on commit 895e035

Please sign in to comment.