Skip to content

Commit

Permalink
✨ [patch] Now Agents can connect to multiple Servers
Browse files Browse the repository at this point in the history
Signed-off-by: Rintaro Okamura <[email protected]>
  • Loading branch information
rinx committed Feb 26, 2021
1 parent 0854377 commit aac31e9
Show file tree
Hide file tree
Showing 13 changed files with 161 additions and 105 deletions.
28 changes: 14 additions & 14 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ require (
github.com/aws/aws-sdk-go v1.27.0
github.com/cespare/xxhash/v2 v2.1.1
github.com/fsnotify/fsnotify v1.4.9
github.com/go-redis/redis/v8 v8.4.11
github.com/go-redis/redis/v8 v8.6.0
github.com/go-sql-driver/mysql v1.5.0
github.com/gocql/gocql v0.0.0-20200131111108-92af2e088537
github.com/gocraft/dbr/v2 v2.7.1
Expand All @@ -59,29 +59,29 @@ require (
github.com/klauspost/compress v0.0.0-00010101000000-000000000000
github.com/kpango/fastime v1.0.16
github.com/kpango/fuid v0.0.0-20200823100533-287aa95e0641
github.com/kpango/gache v1.2.4
github.com/kpango/glg v1.5.1
github.com/mattn/go-sqlite3 v2.0.3+incompatible // indirect
github.com/kpango/gache v1.2.5
github.com/kpango/glg v1.5.5
github.com/pierrec/lz4/v3 v3.3.2
github.com/rancher/remotedialer v0.2.5
github.com/scylladb/gocqlx v1.5.0
github.com/tensorflow/tensorflow v0.0.0-00010101000000-000000000000
github.com/urfave/cli/v2 v2.2.0
github.com/urfave/cli/v2 v2.3.0
github.com/vdaas/vald v0.0.0-00010101000000-000000000000
go.opencensus.io v0.22.6
go.opentelemetry.io/otel v0.16.0
go.opentelemetry.io/otel/exporters/metric/prometheus v0.16.0
go.opencensus.io v0.23.0
go.opentelemetry.io/otel v0.17.0
go.opentelemetry.io/otel/exporters/metric/prometheus v0.17.0
go.opentelemetry.io/otel/metric v0.17.0
go.uber.org/automaxprocs v1.4.0
go.uber.org/goleak v1.1.10
go.uber.org/zap v1.16.0
golang.org/x/net v0.0.0-20210119194325-5f4716e94777
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c
google.golang.org/api v0.38.0
golang.org/x/net v0.0.0-20210224082022-3d97a244fca7
golang.org/x/sys v0.0.0-20210225134936-a50acf3fe073
google.golang.org/api v0.40.0
google.golang.org/grpc v1.35.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/api v0.20.2
k8s.io/apimachinery v0.20.2
k8s.io/client-go v0.20.2
k8s.io/api v0.20.4
k8s.io/apimachinery v0.20.4
k8s.io/client-go v0.20.4
k8s.io/metrics v0.0.0-00010101000000-000000000000
sigs.k8s.io/controller-runtime v0.0.0-00010101000000-000000000000
)
87 changes: 49 additions & 38 deletions go.sum

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion pkg/alvd/agent/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type Config struct {
ServerAddress string
ServerAddresses []string

AgentName string

Expand Down
6 changes: 2 additions & 4 deletions pkg/alvd/agent/config/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,9 @@ func WithAgentName(name string) OptionFunc {
}
}

func WithServerAddress(addr string) OptionFunc {
func WithServerAddresses(addrs []string) OptionFunc {
return func(c *Config) error {
if addr != "" {
c.ServerAddress = addr
}
c.ServerAddresses = addrs

return nil
}
Expand Down
26 changes: 13 additions & 13 deletions pkg/alvd/agent/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
)

type daemon struct {
serverAddress string
serverAddresses []string

agentName string
grpcPort int
Expand Down Expand Up @@ -65,31 +65,31 @@ func New(cfg *config.Config) (Daemon, error) {
}

return &daemon{
serverAddress: cfg.ServerAddress,
agentName: cfg.AgentName,
grpcPort: cfg.GRPCPort,
agent: a,
ngt: ngt,
handler: h,
serverAddresses: cfg.ServerAddresses,
agentName: cfg.AgentName,
grpcPort: cfg.GRPCPort,
agent: a,
ngt: ngt,
handler: h,
}, nil
}

func (d *daemon) Start(ctx context.Context) <-chan error {
ctx, d.cancel = context.WithCancel(ctx)

var tunEch <-chan error
d.tunnel, tunEch = tunnel.Connect(ctx, &tunnel.Config{
ServerAddress: d.serverAddress,
AgentName: d.agentName,
AgentPort: d.grpcPort,
})
d.tunnel = tunnel.New(d.agentName, d.grpcPort)
tunEch := d.tunnel.Start(ctx)
for _, addr := range d.serverAddresses {
d.tunnel.Connect(addr)
}

nech := d.ngt.Start(ctx)
gech := d.agent.Start(ctx)

ech := make(chan error, 1)

go func() {
defer close(ech)
var err error
for {
select {
Expand Down
85 changes: 67 additions & 18 deletions pkg/alvd/agent/service/tunnel/tunnel.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,40 +12,78 @@ import (
)

type tunnel struct {
*Config
cancel context.CancelFunc
}
agentName string
agentPort int

type Config struct {
ServerAddress string
cancel context.CancelFunc
cancelByAddr map[string]context.CancelFunc

AgentName string
AgentPort int
connectCh chan string
disconnectCh chan string
}

type Tunnel interface {
Start(ctx context.Context) <-chan error
Connect(addr string)
Disconnect(addr string)
Close()
}

func Connect(ctx context.Context, cfg *Config) (Tunnel, <-chan error) {
ctx, cancel := context.WithCancel(ctx)
func New(name string, port int) Tunnel {
return &tunnel{
agentName: name,
agentPort: port,
cancelByAddr: make(map[string]context.CancelFunc, 0),
}
}

func (t *tunnel) Start(ctx context.Context) <-chan error {
ctx, t.cancel = context.WithCancel(ctx)
ech := make(chan error, 1)
t.connectCh = make(chan string, 10)
t.disconnectCh = make(chan string, 10)

go func() {
defer close(ech)
defer close(t.connectCh)
defer close(t.disconnectCh)
var err error
for {
select {
case <-ctx.Done():
err = ctx.Err()
if err != nil && err != context.Canceled {
log.Errorf("error: %s", err)
}
return
case addr := <-t.connectCh:
t.connect(ctx, addr)
case addr := <-t.disconnectCh:
t.disconnect(ctx, addr)
}
}
}()

return ech
}

func (t *tunnel) connect(ctx context.Context, addr string) {
ctx, t.cancelByAddr[addr] = context.WithCancel(ctx)

headers := http.Header{
"X-ALVD-ID": []string{cfg.AgentName},
"X-ALVD-GRPC-PORT": []string{strconv.Itoa(cfg.AgentPort)},
"X-ALVD-ID": []string{t.agentName},
"X-ALVD-GRPC-PORT": []string{strconv.Itoa(t.agentPort)},
}

go func() {
defer close(ech)
for {
remotedialer.ClientConnect(
ctx,
fmt.Sprintf("ws://%s/connect", cfg.ServerAddress),
fmt.Sprintf("ws://%s/connect", addr),
headers,
nil,
connectAuthorizer,
onConnectFunc(cfg.ServerAddress),
onConnectFunc(addr),
)

select {
Expand All @@ -59,11 +97,22 @@ func Connect(ctx context.Context, cfg *Config) (Tunnel, <-chan error) {
}
}
}()
}

return &tunnel{
Config: cfg,
cancel: cancel,
}, ech
func (t *tunnel) disconnect(ctx context.Context, addr string) {
cancel, ok := t.cancelByAddr[addr]
if ok {
cancel()
delete(t.cancelByAddr, addr)
}
}

func (t *tunnel) Connect(addr string) {
t.connectCh <- addr
}

func (t *tunnel) Disconnect(addr string) {
t.disconnectCh <- addr
}

func (t *tunnel) Close() {
Expand Down
12 changes: 6 additions & 6 deletions pkg/alvd/cli/agent/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

type Opts struct {
ServerAddress string
ServerAddresses []string
AgentName string
LogLevel string
Dimension int
Expand All @@ -37,10 +37,10 @@ var Flags = []cli.Flag{
Value: "",
Usage: "agent name (if not specified, uuid will be generated)",
},
&cli.StringFlag{
&cli.StringSliceFlag{
Name: "server",
Value: "0.0.0.0:8000",
Usage: "alvd server address",
Value: cli.NewStringSlice("0.0.0.0:8000"),
Usage: "alvd server addresses",
},
&cli.StringFlag{
Name: "log-level",
Expand Down Expand Up @@ -117,7 +117,7 @@ var Flags = []cli.Flag{
func ParseOpts(c *cli.Context) *Opts {
return &Opts{
AgentName: c.String("name"),
ServerAddress: c.String("server"),
ServerAddresses: c.StringSlice("server"),
LogLevel: c.String("log-level"),
Dimension: c.Int("dimension"),
DistanceType: c.String("distance-type"),
Expand Down Expand Up @@ -177,7 +177,7 @@ func NewCommand() *cli.Command {
func ToConfig(opts *Opts) (*config.Config, error) {
cfg, err := config.New(
config.WithAgentName(opts.AgentName),
config.WithServerAddress(opts.ServerAddress),
config.WithServerAddresses(opts.ServerAddresses),
config.WithDimension(opts.Dimension),
config.WithDistanceType(opts.DistanceType),
config.WithObjectType(opts.ObjectType),
Expand Down
2 changes: 1 addition & 1 deletion pkg/alvd/cli/server/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func ToConfig(opts *Opts) (*config.Config, error) {
cfg, err := config.New(
config.WithAgentEnabled(opts.AgentEnabled),
config.WithAgentOpts(opts.Opts),
config.WithAddr(opts.ServerAddress),
config.WithAddrs(opts.ServerAddresses),
config.WithGRPCHost(opts.ServerGRPCHost),
config.WithGRPCPort(opts.ServerGRPCPort),
config.WithReplicas(opts.Replicas),
Expand Down
4 changes: 2 additions & 2 deletions pkg/alvd/observability/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@ import (
"time"

"github.com/rinx/alvd/internal/log"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/label"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/metric/global"
)

var (
Expand Down Expand Up @@ -40,7 +40,7 @@ type Meter interface {
func Init(interval time.Duration) {
once.Do(func() {
instance = &meter{
meter: otel.Meter("rinx.github.io/alvd"),
meter: global.Meter("rinx.github.io/alvd"),
interval: interval,
mch: make(chan metric.Measurement, 10),
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/alvd/observability/prometheus/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ package prometheus
import (
"net/http"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/metric/prometheus"
"go.opentelemetry.io/otel/metric/global"
)

type prom struct {
Expand All @@ -23,7 +23,7 @@ func New() (Prometheus, error) {
return nil, err
}

otel.SetMeterProvider(exporter.MeterProvider())
global.SetMeterProvider(exporter.MeterProvider())

return &prom{
exporter: exporter,
Expand Down
2 changes: 1 addition & 1 deletion pkg/alvd/server/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ type Config struct {
AgentEnabled bool
AgentOpts *agent.Opts

Addr string
Addrs []string
GRPCHost string
GRPCPort int

Expand Down
6 changes: 2 additions & 4 deletions pkg/alvd/server/config/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,9 @@ func WithAgentOpts(opts *agent.Opts) OptionFunc {
}
}

func WithAddr(addr string) OptionFunc {
func WithAddrs(addrs []string) OptionFunc {
return func(c *Config) error {
if addr != "" {
c.Addr = addr
}
c.Addrs = addrs

return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/alvd/server/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func New(cfg *config.Config) (Daemon, error) {
}

return &daemon{
addr: cfg.Addr,
addr: cfg.Addrs[0],
gateway: g,
tunnel: tun,
manager: m,
Expand Down

0 comments on commit aac31e9

Please sign in to comment.