From 932f50ccdd9e5d7f32419a0c6c552c3fe886b490 Mon Sep 17 00:00:00 2001 From: Geoff Genz Date: Thu, 8 Dec 2022 16:19:33 -0700 Subject: [PATCH] Small updates to allow TLS connections for AWS MSK, etc. --- cmd/clickhouse_sinker/main.go | 4 ++-- config/config.go | 12 +++++++++--- input/kafka_franz.go | 5 +++-- pool/conn.go | 13 ++++++++----- util/common.go | 20 +++++++++++--------- 5 files changed, 33 insertions(+), 21 deletions(-) diff --git a/cmd/clickhouse_sinker/main.go b/cmd/clickhouse_sinker/main.go index 4ed773d4..2090e804 100644 --- a/cmd/clickhouse_sinker/main.go +++ b/cmd/clickhouse_sinker/main.go @@ -420,7 +420,7 @@ func (s *Sinker) applyFirstConfig(newCfg *config.Config) (err error) { // 1. Initialize clickhouse connections chCfg := &newCfg.Clickhouse if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password, - chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns); err != nil { + chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns, chCfg.DialTimeout); err != nil { return } @@ -456,7 +456,7 @@ func (s *Sinker) applyAnotherConfig(newCfg *config.Config) (err error) { // 2. Initialize clickhouse connections. chCfg := &newCfg.Clickhouse if err = pool.InitClusterConn(chCfg.Hosts, chCfg.Port, chCfg.DB, chCfg.Username, chCfg.Password, - chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns); err != nil { + chCfg.DsnParams, chCfg.Secure, chCfg.InsecureSkipVerify, chCfg.MaxOpenConns, chCfg.DialTimeout); err != nil { return } diff --git a/config/config.go b/config/config.go index 064bfead..d0996bf4 100644 --- a/config/config.go +++ b/config/config.go @@ -44,7 +44,7 @@ type KafkaConfig struct { Security map[string]string TLS struct { Enable bool - CaCertFiles string // Required. It's the CA cert.pem with which Kafka brokers certs be signed. + CaCertFiles string // CA cert.pem with which Kafka brokers certs be signed. Leave empty for certificates trusted by the OS ClientCertFile string // Required for client authentication. It's client cert.pem. ClientKeyFile string // Required if and only if ClientCertFile is present. It's client key.pem. @@ -97,9 +97,10 @@ type ClickHouseConfig struct { RetryTimes int //<=0 means retry infinitely MaxOpenConns int + DialTimeout int // Connection dial timeout in seconds } -// Task configuration parameters +// TaskConfig parameters type TaskConfig struct { Name string @@ -170,6 +171,7 @@ const ( defaultLogLevel = "info" defaultKerberosConfigPath = "/etc/krb5.conf" defaultMaxOpenConns = 1 + defaultDialTimeout = 2 ) func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) { @@ -187,7 +189,7 @@ func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) { return } -// normallize and validate configuration +// Normalize and validate configuration func (cfg *Config) Normallize() (err error) { if len(cfg.Clickhouse.Hosts) == 0 || cfg.Kafka.Brokers == "" { err = errors.Newf("invalid configuration") @@ -230,6 +232,10 @@ func (cfg *Config) Normallize() (err error) { cfg.Clickhouse.MaxOpenConns = defaultMaxOpenConns } + if cfg.Clickhouse.DialTimeout <= 0 { + cfg.Clickhouse.DialTimeout = defaultDialTimeout + } + if cfg.Task != nil { cfg.Tasks = append(cfg.Tasks, cfg.Task) cfg.Task = nil diff --git a/input/kafka_franz.go b/input/kafka_franz.go index 4f3d5b4d..ea85463a 100644 --- a/input/kafka_franz.go +++ b/input/kafka_franz.go @@ -97,8 +97,9 @@ func (k *KafkaFranz) Init(cfg *config.Config, taskCfg *config.TaskConfig, putFn func GetFranzConfig(kfkCfg *config.KafkaConfig) (opts []kgo.Opt, err error) { opts = []kgo.Opt{ kgo.SeedBrokers(strings.Split(kfkCfg.Brokers, ",")...), - kgo.FetchMaxBytes(1 << 27), //134 MB - kgo.BrokerMaxReadBytes(1 << 27), //134 MB + kgo.DisableAutoCommit(), + kgo.FetchMaxBytes(20971520), // 20 MB -- Larger numbers are likely to cause OOM. Should be configurable + kgo.BrokerMaxReadBytes(20971520), //kgo.MetadataMaxAge(...) corresponds to sarama.Config.Metadata.RefreshFrequency kgo.WithLogger(kzap.New(util.Logger)), } diff --git a/pool/conn.go b/pool/conn.go index 13bd8e47..6c76d3ca 100644 --- a/pool/conn.go +++ b/pool/conn.go @@ -104,7 +104,8 @@ func (sc *ShardConn) NextGoodReplica(failedVer int) (db clickhouse.Conn, dbVer i // Each shard has a clickhouse.Conn which connects to one replica inside the shard. // We need more control than replica single-point-failure. -func InitClusterConn(hosts [][]string, port int, db, username, password, dsnParams string, secure, skipVerify bool, maxOpenConns int) (err error) { +func InitClusterConn(hosts [][]string, port int, db, username, password, dsnParams string, secure, skipVerify bool, + maxOpenConns int, dialTimeout int) (err error) { lock.Lock() defer lock.Unlock() freeClusterConn() @@ -113,8 +114,11 @@ func InitClusterConn(hosts [][]string, port int, db, username, password, dsnPara numReplicas := len(replicas) replicaAddrs := make([]string, numReplicas) for i, ip := range replicas { - if ips2, err := util.GetIP4Byname(ip); err == nil { - ip = ips2[0] + // Changing hostnames to IPs breaks TLS connections in many cases + if !secure { + if ips2, err := util.GetIP4Byname(ip); err == nil { + ip = ips2[0] + } } replicaAddrs[i] = fmt.Sprintf("%s:%d", ip, port) } @@ -126,8 +130,7 @@ func InitClusterConn(hosts [][]string, port int, db, username, password, dsnPara Username: username, Password: password, }, - //Debug: true, - DialTimeout: time.Second, + DialTimeout: time.Second * time.Duration(dialTimeout), MaxOpenConns: maxOpenConns, MaxIdleConns: 1, ConnMaxLifetime: time.Hour, diff --git a/util/common.go b/util/common.go index e69cd38e..ca38a5b8 100644 --- a/util/common.go +++ b/util/common.go @@ -165,17 +165,19 @@ func NewTLSConfig(caCertFiles, clientCertFile, clientKeyFile string, insecureSki tlsConfig.Certificates = []tls.Certificate{cert} } - // Load CA cert - caCertPool := x509.NewCertPool() - for _, caCertFile := range strings.Split(caCertFiles, ",") { - caCert, err := os.ReadFile(caCertFile) - if err != nil { - err = errors.Wrapf(err, "") - return &tlsConfig, err + // Load CA cert if it exists. Not needed for OS trusted certs + if caCertFiles != "" { + caCertPool := x509.NewCertPool() + for _, caCertFile := range strings.Split(caCertFiles, ",") { + caCert, err := os.ReadFile(caCertFile) + if err != nil { + err = errors.Wrapf(err, "") + return &tlsConfig, err + } + caCertPool.AppendCertsFromPEM(caCert) } - caCertPool.AppendCertsFromPEM(caCert) + tlsConfig.RootCAs = caCertPool } - tlsConfig.RootCAs = caCertPool tlsConfig.InsecureSkipVerify = insecureSkipVerify return &tlsConfig, nil }