Skip to content

Commit

Permalink
Reintroduce the random connection strategy (#1313)
Browse files Browse the repository at this point in the history
  • Loading branch information
larry-cdn77 authored May 31, 2024
1 parent 6f3c8e7 commit f373529
Show file tree
Hide file tree
Showing 9 changed files with 80 additions and 87 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,8 +151,9 @@ conn.SetConnMaxLifetime(time.Hour)
* username/password - auth credentials
* database - select the current default database
* dial_timeout - a duration string is a possibly signed sequence of decimal numbers, each with optional fraction and a unit suffix such as "300ms", "1s". Valid time units are "ms", "s", "m". (default 30s)
* connection_open_strategy - round_robin/in_order (default in_order).
* round_robin - choose a round-robin server from the set
* connection_open_strategy - random/round_robin/in_order (default in_order).
* random - choose random server from the set
* round_robin - choose a round-robin server from the set
* in_order - first live server is chosen in specified order
* debug - enable debug output (boolean value)
* compress - compress - specify the compression algorithm - “none” (default), `zstd`, `lz4`, `gzip`, `deflate`, `br`. If set to `true`, `lz4` will be used.
Expand Down
4 changes: 4 additions & 0 deletions clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"math/rand"
"sync/atomic"
"time"

Expand Down Expand Up @@ -233,13 +234,16 @@ func (ch *clickhouse) dial(ctx context.Context) (conn *connect, err error) {
}

func DefaultDialStrategy(ctx context.Context, connID int, opt *Options, dial Dial) (r DialResult, err error) {
random := rand.Int()
for i := range opt.Addr {
var num int
switch opt.ConnOpenStrategy {
case ConnOpenInOrder:
num = i
case ConnOpenRoundRobin:
num = (int(connID) + i) % len(opt.Addr)
case ConnOpenRandom:
num = (random + i) % len(opt.Addr)
}

if r, err = dial(ctx, opt.Addr[num], opt); err == nil {
Expand Down
3 changes: 3 additions & 0 deletions clickhouse_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ type ConnOpenStrategy uint8
const (
ConnOpenInOrder ConnOpenStrategy = iota
ConnOpenRoundRobin
ConnOpenRandom
)

type Protocol int
Expand Down Expand Up @@ -265,6 +266,8 @@ func (o *Options) fromDSN(in string) error {
o.ConnOpenStrategy = ConnOpenInOrder
case "round_robin":
o.ConnOpenStrategy = ConnOpenRoundRobin
case "random":
o.ConnOpenStrategy = ConnOpenRandom
}
case "max_open_conns":
maxOpenConns, err := strconv.Atoi(params.Get(v))
Expand Down
4 changes: 4 additions & 0 deletions clickhouse_std.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"fmt"
"io"
"log"
"math/rand"
"os"
"reflect"
"strings"
Expand Down Expand Up @@ -81,13 +82,16 @@ func (o *stdConnOpener) Connect(ctx context.Context) (_ driver.Conn, err error)
return nil, ErrAcquireConnNoAddress
}

random := rand.Int()
for i := range o.opt.Addr {
var num int
switch o.opt.ConnOpenStrategy {
case ConnOpenInOrder:
num = i
case ConnOpenRoundRobin:
num = (int(connID) + i) % len(o.opt.Addr)
case ConnOpenRandom:
num = (random + i) % len(o.opt.Addr)
}
if conn, err = dialFunc(ctx, o.opt.Addr[num], connID, o.opt); err == nil {
var debugf = func(format string, v ...any) {}
Expand Down
1 change: 1 addition & 0 deletions examples/clickhouse_api/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@ func TestIterableOrderedMapInsertRead(t *testing.T) {
func TestMultiHostConnect(t *testing.T) {
require.NoError(t, MultiHostVersion())
require.NoError(t, MultiHostRoundRobinVersion())
require.NoError(t, MultiHostRandomVersion())
}

func TestNested(t *testing.T) {
Expand Down
40 changes: 18 additions & 22 deletions examples/clickhouse_api/multi_host.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,40 +23,36 @@ import (
)

func MultiHostVersion() error {
return multiHostVersion(nil)
}

func MultiHostRoundRobinVersion() error {
connOpenStrategy := clickhouse.ConnOpenRoundRobin
return multiHostVersion(&connOpenStrategy)
}

func MultiHostRandomVersion() error {
connOpenStrategy := clickhouse.ConnOpenRandom
return multiHostVersion(&connOpenStrategy)
}

func multiHostVersion(connOpenStrategy *clickhouse.ConnOpenStrategy) error {
env, err := GetNativeTestEnvironment()
if err != nil {
return err
}
conn, err := clickhouse.Open(&clickhouse.Options{
options := clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
if err != nil {
return err
}
v, err := conn.ServerVersion()
if err != nil {
return err
if connOpenStrategy != nil {
options.ConnOpenStrategy = *connOpenStrategy
}
fmt.Println(v.String())
return nil
}

func MultiHostRoundRobinVersion() error {
env, err := GetNativeTestEnvironment()
conn, err := clickhouse.Open(&clickhouse.Options{
Addr: []string{"127.0.0.1:9001", "127.0.0.1:9002", fmt.Sprintf("%s:%d", env.Host, env.Port)},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
Auth: clickhouse.Auth{
Database: env.Database,
Username: env.Username,
Password: env.Password,
},
})
conn, err := clickhouse.Open(&options)
if err != nil {
return err
}
Expand Down
54 changes: 19 additions & 35 deletions tests/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,39 +67,20 @@ func TestBadConn(t *testing.T) {
}

func TestConnFailover(t *testing.T) {
env, err := GetNativeTestEnvironment()
require.NoError(t, err)
useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false"))
require.NoError(t, err)
port := env.Port
var tlsConfig *tls.Config
if useSSL {
port = env.SslPort
tlsConfig = &tls.Config{}
}
conn, err := GetConnectionWithOptions(&clickhouse.Options{
Addr: []string{
"127.0.0.1:9001",
"127.0.0.1:9002",
fmt.Sprintf("%s:%d", env.Host, port),
},
Auth: clickhouse.Auth{
Database: "default",
Username: env.Username,
Password: env.Password,
},
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
TLS: tlsConfig,
})
require.NoError(t, err)
require.NoError(t, conn.Ping(context.Background()))
t.Log(conn.ServerVersion())
t.Log(conn.Ping(context.Background()))
testConnFailover(t, nil)
}

func TestConnFailoverConnOpenRoundRobin(t *testing.T) {
func TestConnFailoverRoundRobin(t *testing.T) {
strategy := clickhouse.ConnOpenRoundRobin
testConnFailover(t, &strategy)
}

func TestConnFailoverRandom(t *testing.T) {
strategy := clickhouse.ConnOpenRandom
testConnFailover(t, &strategy)
}

func testConnFailover(t *testing.T, connOpenStrategy *clickhouse.ConnOpenStrategy) {
env, err := GetNativeTestEnvironment()
require.NoError(t, err)
useSSL, err := strconv.ParseBool(GetEnv("CLICKHOUSE_USE_SSL", "false"))
Expand All @@ -110,7 +91,7 @@ func TestConnFailoverConnOpenRoundRobin(t *testing.T) {
port = env.SslPort
tlsConfig = &tls.Config{}
}
conn, err := GetConnectionWithOptions(&clickhouse.Options{
options := clickhouse.Options{
Addr: []string{
"127.0.0.1:9001",
"127.0.0.1:9002",
Expand All @@ -124,9 +105,12 @@ func TestConnFailoverConnOpenRoundRobin(t *testing.T) {
Compression: &clickhouse.Compression{
Method: clickhouse.CompressionLZ4,
},
ConnOpenStrategy: clickhouse.ConnOpenRoundRobin,
TLS: tlsConfig,
})
TLS: tlsConfig,
}
if connOpenStrategy != nil {
options.ConnOpenStrategy = *connOpenStrategy
}
conn, err := GetConnectionWithOptions(&options)
require.NoError(t, err)
require.NoError(t, conn.Ping(context.Background()))
t.Log(conn.ServerVersion())
Expand Down
54 changes: 26 additions & 28 deletions tests/std/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,42 +51,40 @@ func TestStdConn(t *testing.T) {
}

func TestStdConnFailover(t *testing.T) {
env, err := GetStdTestEnvironment()
require.NoError(t, err)
useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false"))
require.NoError(t, err)
dsns := map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%[email protected]:9001,127.0.0.1:9002,%s:%d", env.Username, env.Password, env.Host, env.Port),
"Http": fmt.Sprintf("http://%s:%[email protected]:8124,127.0.0.1:8125,%s:%d", env.Username, env.Password, env.Host, env.HttpPort)}
if useSSL {
dsns = map[string]string{"Native": fmt.Sprintf("clickhouse://%s:%[email protected]:9001,127.0.0.1:9002,%s:%d?secure=true", env.Username, env.Password, env.Host, env.SslPort),
"Http": fmt.Sprintf("https://%s:%[email protected]:8124,127.0.0.1:8125,%s:%d?secure=true", env.Username, env.Password, env.Host, env.HttpsPort)}
}
for name, dsn := range dsns {
t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) {
testStdConnFailover(t, "")
}

if conn, err := sql.Open("clickhouse", dsn); assert.NoError(t, err) {
if err := conn.PingContext(context.Background()); assert.NoError(t, err) {
t.Log(conn.PingContext(context.Background()))
}
}
})
}
func TestStdConnFailoverRoundRobin(t *testing.T) {
testStdConnFailover(t, "round_robin")
}

func TestStdConnFailoverConnOpenRoundRobin(t *testing.T) {
func TestStdConnFailoverRandom(t *testing.T) {
testStdConnFailover(t, "random")
}

func testStdConnFailover(t *testing.T, openStrategy string) {
env, err := GetStdTestEnvironment()
require.NoError(t, err)
dsns := map[string]string{
"Native": fmt.Sprintf("clickhouse://%s:%[email protected]:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/?connection_open_strategy=round_robin", env.Username, env.Password, env.Host, env.Port),
"Http": fmt.Sprintf("http://%s:%[email protected]:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/?connection_open_strategy=round_robin", env.Username, env.Password, env.Host, env.HttpPort),
}
useSSL, err := strconv.ParseBool(clickhouse_tests.GetEnv("CLICKHOUSE_USE_SSL", "false"))
require.NoError(t, err)
nativePort := env.Port
httpPort := env.HttpPort
argsList := []string{}
if useSSL {
dsns = map[string]string{
"Native": fmt.Sprintf("clickhouse://%s:%[email protected]:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/?connection_open_strategy=round_robin&secure=true", env.Username, env.Password, env.Host, env.SslPort),
"Http": fmt.Sprintf("https://%s:%[email protected]:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/?connection_open_strategy=round_robin&secure=true", env.Username, env.Password, env.Host, env.HttpsPort),
}
nativePort = env.SslPort
httpPort = env.HttpsPort
argsList = append(argsList, "secure=true")
}
if len(openStrategy) > 0 {
argsList = append(argsList, fmt.Sprintf("connection_open_strategy=%s", openStrategy))
}
args := ""
if len(argsList) > 0 {
args = "?" + strings.Join(argsList, "&")
}
dsns := map[string]string{
"Native": fmt.Sprintf("clickhouse://%s:%[email protected]:9001,127.0.0.1:9002,127.0.0.1:9003,127.0.0.1:9004,127.0.0.1:9005,127.0.0.1:9006,%s:%d/%s", env.Username, env.Password, env.Host, nativePort, args),
"Http": fmt.Sprintf("http://%s:%[email protected]:8124,127.0.0.1:8125,127.0.0.1:8126,127.0.0.1:8127,127.0.0.1:8128,127.0.0.1:8129,%s:%d/%s", env.Username, env.Password, env.Host, httpPort, args),
}
for name, dsn := range dsns {
t.Run(fmt.Sprintf("%s Protocol", name), func(t *testing.T) {
Expand Down
2 changes: 2 additions & 0 deletions tests/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,8 @@ func optionsToDSN(o *clickhouse.Options) string {
strategy = "in_order"
case clickhouse.ConnOpenRoundRobin:
strategy = "round_robin"
case clickhouse.ConnOpenRandom:
strategy = "random"
}

params.Set("connection_open_strategy", strategy)
Expand Down

0 comments on commit f373529

Please sign in to comment.