Skip to content

Commit

Permalink
Integrate new version go-chassis/cari, to support multi-az address ma…
Browse files Browse the repository at this point in the history
…nagement (#59)
  • Loading branch information
humingcheng authored Dec 18, 2023
1 parent e0ff6c8 commit e395a35
Show file tree
Hide file tree
Showing 9 changed files with 746 additions and 254 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ jobs:
- name: golangci-lint
uses: golangci/golangci-lint-action@v2
with:
version: v1.48.0
version: v1.55.2
args: --skip-dirs=examples --skip-files=.*_test.go$
46 changes: 16 additions & 30 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"github.com/go-chassis/cari/rbac"
"github.com/patrickmn/go-cache"
"io/ioutil"
"net/http"
"net/url"
Expand All @@ -17,12 +15,14 @@ import (
"time"

"github.com/cenkalti/backoff/v4"
"github.com/gorilla/websocket"

"github.com/go-chassis/cari/addresspool"
"github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/rbac"
"github.com/go-chassis/foundation/httpclient"
"github.com/go-chassis/foundation/httputil"
"github.com/go-chassis/openlog"
"github.com/gorilla/websocket"
"github.com/patrickmn/go-cache"
)

// Define constants for the client
Expand All @@ -47,16 +47,14 @@ const (
DefaultTokenExpiration = 10 * time.Hour
HeaderRevision = "X-Resource-Revision"
EnvProjectID = "CSE_PROJECT_ID"
// EnvCheckSCIInterval sc instance health check interval in second
EnvCheckSCIInterval = "CHASSIS_SC_HEALTH_CHECK_INTERVAL"
)

// Define variables for the client
var (
MSAPIPath = ""
GovernAPIPATH = ""
TenantHeader = "X-Domain-Name"
defineOnce = sync.Once{}
defineOnce = sync.Once{}
)
var (
// ErrNotModified means instance is not changed
Expand All @@ -81,13 +79,13 @@ type Client struct {
// record the websocket connection with the service center
conns map[string]*websocket.Conn
revision string
pool *AddressPool
pool *addresspool.Pool
}

// URLParameter maintains the list of parameters to be added in URL
type URLParameter map[string]string

//ResetRevision reset the revision to 0
// ResetRevision reset the revision to 0
func (c *Client) ResetRevision() {
c.revision = "0"
}
Expand Down Expand Up @@ -116,9 +114,7 @@ func NewClient(opt Options) (*Client, error) {
}
//Update the API Base Path based on the project
c.updateAPIPath()
c.pool = NewPool(c.protocol)
c.pool.SetAddress(opt.Endpoints)
c.pool.Monitor()
c.pool = addresspool.NewPool(opt.Endpoints)
return c, nil
}

Expand Down Expand Up @@ -175,23 +171,13 @@ func (c *Client) SyncEndpoints() error {
if err != nil {
return fmt.Errorf("sync SC ep failed. err:%s", err.Error())
}
eps := make([]string, 0)
for _, instance := range instances {
m := getProtocolMap(instance.Endpoints)
eps = append(eps, m["rest"])
}
if len(eps) != 0 {
c.pool.SetAddress(eps)
openlog.Info("Sync service center endpoints " + strings.Join(eps, ","))
return nil
}
return fmt.Errorf("sync endpoints failed")
return c.pool.SetAddressByInstances(instances)
}

func (c *Client) formatURL(api string, querys []URLParameter, options *CallOptions) string {
builder := URLBuilder{
Protocol: c.protocol,
Host: c.getAddress(),
Host: c.GetAddress(),
Path: api,
URLParameters: querys,
CallOptions: options,
Expand Down Expand Up @@ -490,8 +476,8 @@ func (c *Client) GetMicroService(microServiceID string, opts ...CallOption) (*di
return nil, fmt.Errorf("GetMicroService failed, MicroServiceId: %s, response StatusCode: %d, response body: %s\n, microserviceURL: %s", microServiceID, resp.StatusCode, string(body), microserviceURL)
}

//BatchFindInstances fetch instances based on service name, env, app and version
//finally it return instances grouped by service name
// BatchFindInstances fetch instances based on service name, env, app and version
// finally it return instances grouped by service name
func (c *Client) BatchFindInstances(consumerID string, keys []*discovery.FindService, opts ...CallOption) (*discovery.BatchFindInstancesResponse, error) {
copts := &CallOptions{Revision: c.revision}
for _, opt := range opts {
Expand Down Expand Up @@ -779,7 +765,7 @@ func (c *Client) setupWSConnection(microServiceID, microServiceInstanceID string

u := url.URL{
Scheme: scheme,
Host: c.getAddress(),
Host: c.GetAddress(),
Path: fmt.Sprintf("%s%s/%s%s/%s%s", MSAPIPath, MicroservicePath, microServiceID,
InstancePath, microServiceInstanceID, "/heartbeat"),
}
Expand Down Expand Up @@ -951,7 +937,7 @@ func (c *Client) WatchMicroService(microServiceID string, callback func(*MicroSe
}
u := url.URL{
Scheme: scheme,
Host: c.getAddress(),
Host: c.GetAddress(),
Path: fmt.Sprintf("%s%s/%s%s", MSAPIPath,
MicroservicePath, microServiceID, WatchPath),
}
Expand Down Expand Up @@ -991,7 +977,7 @@ func (c *Client) WatchMicroService(microServiceID string, callback func(*MicroSe
return nil
}

func (c *Client) getAddress() string {
func (c *Client) GetAddress() string {
return c.pool.GetAvailableAddress()
}

Expand All @@ -1007,7 +993,7 @@ func (c *Client) startBackOff(microServiceID string, callback func(*MicroService
operation := func() error {
c.mutex.Lock()
c.watchers[microServiceID] = false
c.getAddress()
c.GetAddress()
c.mutex.Unlock()
err := c.WatchMicroService(microServiceID, callback)
if err != nil {
Expand Down
52 changes: 46 additions & 6 deletions client_test.go
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
package sc_test

import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"sync"
"testing"
"time"

"github.com/go-chassis/cari/discovery"
"github.com/go-chassis/cari/rbac"
"github.com/go-chassis/openlog"
"github.com/go-chassis/sc-client"
"github.com/stretchr/testify/assert"

"os"
"testing"
"time"
"github.com/go-chassis/sc-client"
)

func TestClient_RegisterService(t *testing.T) {
Expand All @@ -24,8 +27,7 @@ func TestClient_RegisterService(t *testing.T) {
hostname, err := os.Hostname()
assert.NoError(t, err)

MSList, err := c.GetAllMicroServices()
assert.NotEmpty(t, MSList)
_, err = c.GetAllMicroServices()
assert.NoError(t, err)

t.Run("given instance with no service id, should return err", func(t *testing.T) {
Expand Down Expand Up @@ -321,3 +323,41 @@ func TestClient_DataRace(t *testing.T) {
wg.Wait()
})
}

func TestClient_SyncEndpoints(t *testing.T) {
mockHealthApiServer := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
resp := &discovery.GetInstancesResponse{
Instances: []*discovery.MicroServiceInstance{
{
Endpoints: []string{"rest://127.0.0.1:3000"},
HostName: "test",
Status: sc.MSInstanceUP,
DataCenterInfo: &discovery.DataCenterInfo{
Name: "engine1",
Region: "cn",
AvailableZone: "az1",
},
},
},
}
instanceBytes, err := json.Marshal(resp)
if err != nil {
writer.Write([]byte(err.Error()))
writer.WriteHeader(http.StatusInternalServerError)
}
writer.Write(instanceBytes)
writer.WriteHeader(http.StatusOK)
return
}))

c, err := sc.NewClient(
sc.Options{
Endpoints: []string{mockHealthApiServer.Listener.Addr().String()},
})
assert.NoError(t, err)

// should use the synced address
err = c.SyncEndpoints()
assert.NoError(t, err)
assert.Equal(t, "127.0.0.1:3000", c.GetAddress())
}
10 changes: 5 additions & 5 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,12 @@ module github.com/go-chassis/sc-client

require (
github.com/cenkalti/backoff/v4 v4.1.1
github.com/go-chassis/cari v0.5.1-0.20210823023004-74041d1363c4
github.com/go-chassis/foundation v0.3.0
github.com/go-chassis/openlog v1.1.2
github.com/gorilla/websocket v1.4.2
github.com/go-chassis/cari v0.9.1-0.20231218072126-352554981fb4
github.com/go-chassis/foundation v0.4.0
github.com/go-chassis/openlog v1.1.3
github.com/gorilla/websocket v1.4.3-0.20210424162022-e8629af678b7
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/stretchr/testify v1.6.1
github.com/stretchr/testify v1.7.0
)

go 1.13
Loading

0 comments on commit e395a35

Please sign in to comment.