diff --git a/Makefile b/Makefile index fd785f50..acdcdacb 100644 --- a/Makefile +++ b/Makefile @@ -21,4 +21,4 @@ systest: build lint: golangci-lint run --timeout=3m run: pre - go run cmd/clickhouse_sinker/main.go --local-cfg-file docker/test_dynamic_schema.json + go run cmd/clickhouse_sinker/main.go --local-cfg-file docker/test_dynamic_schema.hjson diff --git a/docker/test_auto_schema.hjson b/docker/test_auto_schema.hjson index b7d95346..c4557e94 100644 --- a/docker/test_auto_schema.hjson +++ b/docker/test_auto_schema.hjson @@ -2,34 +2,26 @@ clickhouse: { hosts: [ [ - 192.168.110.6 - # 192.168.110.8 - ] - [ - 192.168.110.10 - 192.168.110.12 - ] - [ - 192.168.110.14 + 127.0.0.1 ] ] - port: 19000 + port: 9000 db: default username: "" - password: 123456 + password: "" retryTimes: 0 } kafka: { - brokers: 192.168.110.12:9092, 192.168.110.8:9092, 192.168.110.16:9092 + brokers: 127.0.0.1:9092 } task: { - name: test_ck_query - topic: kafka_gen_ckdata22 - consumerGroup: test_ck_query + name: test_auto_schema + topic: topic1 + consumerGroup: test_auto_schema earliest: true parser: json autoSchema: true - tableName: test_ck_query_lowcardinality_r1 + tableName: test_auto_schema excludeColumns: [] bufferSize: 50000 } diff --git a/docs/guide/run.md b/docs/guide/run.md index 38e7ed21..bef02631 100644 --- a/docs/guide/run.md +++ b/docs/guide/run.md @@ -10,7 +10,7 @@ Note: Ensure `clickhouse-server` and `kafka` work before running clickhouse_sink - For local file: - `clickhouse_sinker --local-cfg-file docker/test_auto_schema.json` + `clickhouse_sinker --local-cfg-file docker/test_auto_schema.hjson` - For Nacos: @@ -71,7 +71,7 @@ Let's follow up a piece of the systest script. * Run clickhouse_sinker ```bash - $ ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.json + $ ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.hjson ``` diff --git a/go.test.sh b/go.test.sh index 9fcdc43a..6fbef090 100755 --- a/go.test.sh +++ b/go.test.sh @@ -48,9 +48,9 @@ sudo docker cp send.sh kafka:/tmp/ sudo docker exec kafka sh /tmp/send.sh echo "start clickhouse_sinker to consume" -timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_fixed_schema.json -timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.json -timeout 60 ./clickhouse_sinker --local-cfg-file docker/test_dynamic_schema.json +timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_fixed_schema.hjson +timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.hjson +timeout 60 ./clickhouse_sinker --local-cfg-file docker/test_dynamic_schema.hjson echo "check result 1" count=`curl "localhost:8123" -d 'select count() from test_fixed_schema'` @@ -74,9 +74,9 @@ curl "localhost:8123" -d 'TRUNCATE TABLE test_auto_schema' curl "localhost:8123" -d 'TRUNCATE TABLE test_dynamic_schema' echo "publish clickhouse_sinker config" -./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_fixed_schema --local-cfg-file docker/test_fixed_schema.json -./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_auto_schema --local-cfg-file docker/test_auto_schema.json -./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_dynamic_schema --local-cfg-file docker/test_dynamic_schema.json +./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_fixed_schema --local-cfg-file docker/test_fixed_schema.hjson +./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_auto_schema --local-cfg-file docker/test_auto_schema.hjson +./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_dynamic_schema --local-cfg-file docker/test_dynamic_schema.hjson echo "start clickhouse_sinker to consume" sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_fixed_schema --all-topics --to-earliest diff --git a/output/clickhouse.go b/output/clickhouse.go index 2461ad90..91008e07 100644 --- a/output/clickhouse.go +++ b/output/clickhouse.go @@ -83,16 +83,14 @@ func NewClickHouse(cfg *config.Config, taskCfg *config.TaskConfig) *ClickHouse { // Init the clickhouse intance func (c *ClickHouse) Init() (err error) { + // FIXME: goroutine leak? go func() { ticker := time.NewTicker(10 * time.Second) defer ticker.Stop() - for { - select { - case <-ticker.C: - c.mux.Lock() - c.wrSeries = 0 - c.mux.Unlock() - } + for range ticker.C { + c.mux.Lock() + c.wrSeries = 0 + c.mux.Unlock() } }() return c.initSchema() diff --git a/output/clickhouse_util.go b/output/clickhouse_util.go index 16f691d4..d213609a 100644 --- a/output/clickhouse_util.go +++ b/output/clickhouse_util.go @@ -65,14 +65,14 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn cl } if err = batch.Send(); err != nil { err = errors.Wrapf(err, "driver.Batch.Send") - batch.Abort() + _ = batch.Abort() return } return } if err = batch.Send(); err != nil { err = errors.Wrapf(err, "driver.Batch.Send") - batch.Abort() + _ = batch.Abort() return } return