Skip to content

Commit

Permalink
systest pass
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Dec 9, 2022
1 parent 8fab1c3 commit e12a7b4
Show file tree
Hide file tree
Showing 6 changed files with 24 additions and 34 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,4 @@ systest: build
lint:
golangci-lint run --timeout=3m
run: pre
go run cmd/clickhouse_sinker/main.go --local-cfg-file docker/test_dynamic_schema.json
go run cmd/clickhouse_sinker/main.go --local-cfg-file docker/test_dynamic_schema.hjson
24 changes: 8 additions & 16 deletions docker/test_auto_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,26 @@
clickhouse: {
hosts: [
[
192.168.110.6
# 192.168.110.8
]
[
192.168.110.10
192.168.110.12
]
[
192.168.110.14
127.0.0.1
]
]
port: 19000
port: 9000
db: default
username: ""
password: 123456
password: ""
retryTimes: 0
}
kafka: {
brokers: 192.168.110.12:9092, 192.168.110.8:9092, 192.168.110.16:9092
brokers: 127.0.0.1:9092
}
task: {
name: test_ck_query
topic: kafka_gen_ckdata22
consumerGroup: test_ck_query
name: test_auto_schema
topic: topic1
consumerGroup: test_auto_schema
earliest: true
parser: json
autoSchema: true
tableName: test_ck_query_lowcardinality_r1
tableName: test_auto_schema
excludeColumns: []
bufferSize: 50000
}
Expand Down
4 changes: 2 additions & 2 deletions docs/guide/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ Note: Ensure `clickhouse-server` and `kafka` work before running clickhouse_sink
- For local file:

`clickhouse_sinker --local-cfg-file docker/test_auto_schema.json`
`clickhouse_sinker --local-cfg-file docker/test_auto_schema.hjson`

- For Nacos:

Expand Down Expand Up @@ -71,7 +71,7 @@ Let's follow up a piece of the systest script.
* Run clickhouse_sinker

```bash
$ ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.json
$ ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.hjson
```


Expand Down
12 changes: 6 additions & 6 deletions go.test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ sudo docker cp send.sh kafka:/tmp/
sudo docker exec kafka sh /tmp/send.sh

echo "start clickhouse_sinker to consume"
timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_fixed_schema.json
timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.json
timeout 60 ./clickhouse_sinker --local-cfg-file docker/test_dynamic_schema.json
timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_fixed_schema.hjson
timeout 30 ./clickhouse_sinker --local-cfg-file docker/test_auto_schema.hjson
timeout 60 ./clickhouse_sinker --local-cfg-file docker/test_dynamic_schema.hjson

echo "check result 1"
count=`curl "localhost:8123" -d 'select count() from test_fixed_schema'`
Expand All @@ -74,9 +74,9 @@ curl "localhost:8123" -d 'TRUNCATE TABLE test_auto_schema'
curl "localhost:8123" -d 'TRUNCATE TABLE test_dynamic_schema'

echo "publish clickhouse_sinker config"
./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_fixed_schema --local-cfg-file docker/test_fixed_schema.json
./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_auto_schema --local-cfg-file docker/test_auto_schema.json
./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_dynamic_schema --local-cfg-file docker/test_dynamic_schema.json
./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_fixed_schema --local-cfg-file docker/test_fixed_schema.hjson
./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_auto_schema --local-cfg-file docker/test_auto_schema.hjson
./nacos_publish_config --nacos-addr 127.0.0.1:8848 --nacos-username nacos --nacos-password nacos --nacos-dataid test_dynamic_schema --local-cfg-file docker/test_dynamic_schema.hjson

echo "start clickhouse_sinker to consume"
sudo docker exec kafka kafka-consumer-groups --bootstrap-server localhost:9093 --execute --reset-offsets --group test_fixed_schema --all-topics --to-earliest
Expand Down
12 changes: 5 additions & 7 deletions output/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,14 @@ func NewClickHouse(cfg *config.Config, taskCfg *config.TaskConfig) *ClickHouse {

// Init the clickhouse intance
func (c *ClickHouse) Init() (err error) {
// FIXME: goroutine leak?
go func() {
ticker := time.NewTicker(10 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mux.Lock()
c.wrSeries = 0
c.mux.Unlock()
}
for range ticker.C {
c.mux.Lock()
c.wrSeries = 0
c.mux.Unlock()
}
}()
return c.initSchema()
Expand Down
4 changes: 2 additions & 2 deletions output/clickhouse_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,14 +65,14 @@ func writeRows(prepareSQL string, rows model.Rows, idxBegin, idxEnd int, conn cl
}
if err = batch.Send(); err != nil {
err = errors.Wrapf(err, "driver.Batch.Send")
batch.Abort()
_ = batch.Abort()
return
}
return
}
if err = batch.Send(); err != nil {
err = errors.Wrapf(err, "driver.Batch.Send")
batch.Abort()
_ = batch.Abort()
return
}
return
Expand Down

0 comments on commit e12a7b4

Please sign in to comment.