Skip to content

Commit

Permalink
Small updates to allow TLS connections for AWS MSK, etc.
Browse files Browse the repository at this point in the history
  • Loading branch information
genzgd authored and yuzhichang committed Dec 9, 2022
1 parent 827b20b commit 932f50c
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 21 deletions.
4 changes: 2 additions & 2 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,7 @@ func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) {
// 1. Initialize clickhouse connections
chCfg := &newCfg.Clickhouse
if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password,
chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns); err != nil {
chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns, chCfg.DialTimeout); err != nil {
return
}

Expand Down Expand Up @@ -456,7 +456,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) {
// 2. Initialize clickhouse connections.
chCfg := &newCfg.Clickhouse
if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password,
chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns); err != nil {
chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns, chCfg.DialTimeout); err != nil {
return
}

Expand Down
12 changes: 9 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type KafkaConfig struct {
Security map[string]string
TLS struct {
Enable bool
CaCertFiles string // Required. It's the CA cert.pem with which Kafka brokers certs be signed.
CaCertFiles string // CA cert.pem with which Kafka brokers certs be signed. Leave empty for certificates trusted by the OS
ClientCertFile string // Required for client authentication. It's client cert.pem.
ClientKeyFile string // Required if and only if ClientCertFile is present. It's client key.pem.

Expand Down Expand Up @@ -97,9 +97,10 @@ type ClickHouseConfig struct {

RetryTimes int //<=0 means retry infinitely
MaxOpenConns int
DialTimeout int // Connection dial timeout in seconds
}

// Task configuration parameters
// TaskConfig parameters
type TaskConfig struct {
Name string

Expand Down Expand Up @@ -170,6 +171,7 @@ const (
defaultLogLevel = "info"
defaultKerberosConfigPath = "/etc/krb5.conf"
defaultMaxOpenConns = 1
defaultDialTimeout = 2
)

func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) {
Expand All @@ -187,7 +189,7 @@ func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) {
return
}

// normallize and validate configuration
// Normalize and validate configuration
func (cfg *Config) Normallize() (err error) {
if len(cfg.Clickhouse.Hosts) == 0 || cfg.Kafka.Brokers == "" {
err = errors.Newf("invalid configuration")
Expand Down Expand Up @@ -230,6 +232,10 @@ func (cfg *Config) Normallize() (err error) {
cfg.Clickhouse.MaxOpenConns = defaultMaxOpenConns
}

if cfg.Clickhouse.DialTimeout <= 0 {
cfg.Clickhouse.DialTimeout = defaultDialTimeout
}

if cfg.Task != nil {
cfg.Tasks = append(cfg.Tasks, cfg.Task)
cfg.Task = nil
Expand Down
5 changes: 3 additions & 2 deletions input/kafka_franz.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,9 @@ func (k *KafkaFranz) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn
func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error) {
opts = []kgo.Opt{
kgo.SeedBrokers(strings.Split(kfkCfg.Brokers, ",")...),
kgo.FetchMaxBytes(1 << 27), //134 MB
kgo.BrokerMaxReadBytes(1 << 27), //134 MB
kgo.DisableAutoCommit(),
kgo.FetchMaxBytes(20971520), // 20 MB -- Larger numbers are likely to cause OOM. Should be configurable
kgo.BrokerMaxReadBytes(20971520),
//kgo.MetadataMaxAge(...) corresponds to sarama.Config.Metadata.RefreshFrequency
kgo.WithLogger(kzap.New(util.Logger)),
}
Expand Down
13 changes: 8 additions & 5 deletions pool/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ func (sc *ShardConn) NextGoodReplica(failedVer int) (db clickhouse.Conn, dbVer i

// Each shard has a clickhouse.Conn which connects to one replica inside the shard.
// We need more control than replica single-point-failure.
func InitClusterConn(hosts [][]string, port int, db, username, password, dsnParams string, secure, skipVerify bool, maxOpenConns int) (err error) {
func InitClusterConn(hosts [][]string, port int, db, username, password, dsnParams string, secure, skipVerify bool,
maxOpenConns int, dialTimeout int) (err error) {
lock.Lock()
defer lock.Unlock()
freeClusterConn()
Expand All @@ -113,8 +114,11 @@ func InitClusterConn(hosts [][]string, port int, db, username, password, dsnPara
numReplicas := len(replicas)
replicaAddrs := make([]string, numReplicas)
for i, ip := range replicas {
if ips2, err := util.GetIP4Byname(ip); err == nil {
ip = ips2[0]
// Changing hostnames to IPs breaks TLS connections in many cases
if !secure {
if ips2, err := util.GetIP4Byname(ip); err == nil {
ip = ips2[0]
}
}
replicaAddrs[i] = fmt.Sprintf("%s:%d", ip, port)
}
Expand All @@ -126,8 +130,7 @@ func InitClusterConn(hosts [][]string, port int, db, username, password, dsnPara
Username: username,
Password: password,
},
//Debug: true,
DialTimeout: time.Second,
DialTimeout: time.Second * time.Duration(dialTimeout),
MaxOpenConns: maxOpenConns,
MaxIdleConns: 1,
ConnMaxLifetime: time.Hour,
Expand Down
20 changes: 11 additions & 9 deletions util/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,19 @@ func NewTLSConfig(caCertFiles, clientCertFile, clientKeyFile string, insecureSki
tlsConfig.Certificates = []tls.Certificate{cert}
}

// Load CA cert
caCertPool := x509.NewCertPool()
for _, caCertFile := range strings.Split(caCertFiles, ",") {
caCert, err := os.ReadFile(caCertFile)
if err != nil {
err = errors.Wrapf(err, "")
return &tlsConfig, err
// Load CA cert if it exists. Not needed for OS trusted certs
if caCertFiles != "" {
caCertPool := x509.NewCertPool()
for _, caCertFile := range strings.Split(caCertFiles, ",") {
caCert, err := os.ReadFile(caCertFile)
if err != nil {
err = errors.Wrapf(err, "")
return &tlsConfig, err
}
caCertPool.AppendCertsFromPEM(caCert)
}
caCertPool.AppendCertsFromPEM(caCert)
tlsConfig.RootCAs = caCertPool
}
tlsConfig.RootCAs = caCertPool
tlsConfig.InsecureSkipVerify = insecureSkipVerify
return &tlsConfig, nil
}
Expand Down

0 comments on commit 932f50c

Please sign in to comment.