Skip to content

Commit

Permalink
fix(natsjsregistry): spread load evenly among instances
Browse files Browse the repository at this point in the history
Signed-off-by: jkoberg <[email protected]>
  • Loading branch information
kobergj committed Jul 17, 2024
1 parent 38bc0a5 commit a092419
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 31 deletions.
5 changes: 5 additions & 0 deletions changelog/unreleased/fix-natsjskv-registry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
Fix: Repair nats-js-kv registry

The registry would always send traffic to only one pod. This is now fixed and load should be spread evenly. Also implements watcher method so the cache can use it.

https://github.com/owncloud/ocis/pull/9618
79 changes: 48 additions & 31 deletions ocis-pkg/natsjsregistry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"time"

natsjskv "github.com/go-micro/plugins/v4/store/nats-js-kv"
"github.com/google/uuid"
"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
"go-micro.dev/v4/store"
Expand Down Expand Up @@ -80,76 +81,92 @@ func (n *storeregistry) Register(s *registry.Service, _ ...registry.RegisterOpti
if s == nil {
return errors.New("wont store nil service")
}

unique := uuid.New().String()
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Metadata["uuid"] = unique

b, err := json.Marshal(s)
if err != nil {
return err
}
return n.store.Write(&store.Record{
Key: s.Name,
Key: s.Name + "-" + unique,
Value: b,
Expiry: n.expiry,
})
}

// Deregister removes a service from the registry
// Deregister removes a service from the registry.
func (n *storeregistry) Deregister(s *registry.Service, _ ...registry.DeregisterOption) error {
n.lock.RLock()
defer n.lock.RUnlock()

return n.store.Delete(s.Name)
var unique string
if s.Metadata != nil {
unique = s.Metadata["uuid"]
}

return n.store.Delete(s.Name + "-" + unique)
}

// GetService gets a specific service from the registry
func (n *storeregistry) GetService(s string, _ ...registry.GetOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
svcs := make([]*registry.Service, 0, len(recs))
for _, rec := range recs {
var s registry.Service
if err := json.Unmarshal(rec.Value, &s); err != nil {
return nil, err
}
svcs = append(svcs, &s)
}
return svcs, nil
return n.listServices(store.ListPrefix(s))
}

// ListServices lists all registered services
func (n *storeregistry) ListServices(...registry.ListOption) ([]*registry.Service, error) {
return n.listServices()
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return NewWatcher(n)
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
}

func (n *storeregistry) listServices(opts ...store.ListOption) ([]*registry.Service, error) {
n.lock.RLock()
defer n.lock.RUnlock()

keys, err := n.store.List()
keys, err := n.store.List(opts...)
if err != nil {
return nil, err
}

var svcs []*registry.Service
svcs := make([]*registry.Service, 0, len(keys))
for _, k := range keys {
s, err := n.GetService(k)
s, err := n.getService(k)
if err != nil {
// TODO: continue ?
return nil, err
}
svcs = append(svcs, s...)
svcs = append(svcs, s)

}
return svcs, nil
}

// Watch allowes following the changes in the registry if it would be implemented
func (n *storeregistry) Watch(...registry.WatchOption) (registry.Watcher, error) {
return nil, errors.New("watcher not implemented")
}

// String returns the name of the registry
func (n *storeregistry) String() string {
return n.typ
func (n *storeregistry) getService(s string) (*registry.Service, error) {
recs, err := n.store.Read(s)
if err != nil {
return nil, err
}
if len(recs) == 0 {
return nil, registry.ErrNotFound
}
var svc registry.Service
if err := json.Unmarshal(recs[0].Value, &svc); err != nil {
return nil, err
}
return &svc, nil
}

func (n *storeregistry) storeOptions(opts registry.Options) []store.Option {
Expand Down
74 changes: 74 additions & 0 deletions ocis-pkg/natsjsregistry/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package natsjsregistry

import (
"errors"

"github.com/nats-io/nats.go"
"go-micro.dev/v4/registry"
)

// NatsWatcher is the watcher of the nats interface
type NatsWatcher interface {
Watch(bucket string) (nats.KeyWatcher, error)
}

// Watcher is used to keep track of changes in the registry
type Watcher struct {
watch nats.KeyWatcher
updates <-chan nats.KeyValueEntry
reg *storeregistry
}

// NewWatcher returns a new watcher
func NewWatcher(s *storeregistry) (*Watcher, error) {
w, ok := s.store.(NatsWatcher)
if !ok {
return nil, errors.New("store does not implement watcher interface")
}

watcher, err := w.Watch("service-registry")
if err != nil {
return nil, err
}

return &Watcher{
watch: watcher,
updates: watcher.Updates(),
reg: s,
}, nil
}

// Next returns the next result. It is a blocking call
func (w *Watcher) Next() (*registry.Result, error) {
kve := <-w.updates
if kve == nil {
return nil, errors.New("watcher stopped")
}

service, err := w.reg.getService(kve.Key())
if err != nil {
return nil, err
}

var action string
switch kve.Operation() {
default:
action = "create"
case nats.KeyValuePut:
action = "create"
case nats.KeyValueDelete:
action = "delete"
case nats.KeyValuePurge:
action = "delete"
}

return &registry.Result{
Service: service,
Action: action,
}, nil
}

// Stop stops the watcher
func (w *Watcher) Stop() {
_ = w.watch.Stop()
}

0 comments on commit a092419

Please sign in to comment.