Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

master #190

Merged
merged 13 commits into from
Mar 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile_goreleaser
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
FROM alpine:3.19
RUN apk --no-cache add ca-certificates tzdata && \
RUN sed -i 's/dl-cdn.alpinelinux.org/mirrors.aliyun.com/g' /etc/apk/repositories && \
apk --no-cache add ca-certificates tzdata && \
echo "UTC" > /etc/timezone
ADD dist/clickhouse_sinker_linux_amd64_v1/clickhouse_sinker /usr/local/bin/clickhouse_sinker
ADD dist/nacos_publish_config_linux_amd64_v1/nacos_publish_config /usr/local/bin/nacos_publish_config
Expand Down
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ SINKER_LDFLAGS += -X "main.date=$(shell date --iso-8601=s)"
SINKER_LDFLAGS += -X "main.commit=$(shell git rev-parse HEAD)"
SINKER_LDFLAGS += -X "main.builtBy=$(shell echo `whoami`@`hostname`)"
DEFAULT_CFG_PATH = /etc/clickhouse_sinker.hjson
IMG_TAGGED = hub.eoitek.net/storage/clickhouse_sinker:${VERSION}
IMG_LATEST = hub.eoitek.net/storage/clickhouse_sinker:latest
IMG_TAGGED = hub.eoitek.net/aimeter/clickhouse_sinker:${VERSION}
IMG_LATEST = hub.eoitek.net/aimeter/clickhouse_sinker:latest
export GOPROXY=https://goproxy.cn,direct

GO := CGO_ENABLED=0 go
Expand Down
20 changes: 20 additions & 0 deletions cmd/clickhouse_sinker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,26 @@ func main() {
}
}
})
health.Health.AddLivenessCheck("task", func() error {
var err error
if runner != nil && runner.GetCurrentConfig() != nil {
var stateLags map[string]cm.StateLag
var count int
if stateLags, err = cm.GetTaskStateAndLags(runner.GetCurrentConfig()); err == nil {
for _, value := range stateLags {
if value.State == "Dead" {
count++
}
}
if count == len(stateLags) {
return fmt.Errorf("All task is Dead.")
}
} else {
return err
}
}
return nil
})
mux.Handle("/metrics", httpMetrics)
mux.HandleFunc("/ready", health.Health.ReadyEndpoint) // GET /ready?full=1
mux.HandleFunc("/live", health.Health.LiveEndpoint) // GET /live?full=1
Expand Down
28 changes: 14 additions & 14 deletions cmd/kafka_gen_prom/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,57 +18,57 @@ CREATE TABLE default.dist_prom_extend ON CLUSTER abc AS prom_extend ENGINE = Dis

-- Prometheus metric solution 2 - seperated table for datapoints and series labels can join on series id
CREATE TABLE default.prom_metric ON CLUSTER abc (
__series_id Int64,
__series_id__ Int64,
timestamp DateTime CODEC(DoubleDelta, LZ4),
value Float32 CODEC(ZSTD(15))
) ENGINE=ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (__series_id, timestamp);
ORDER BY (__series_id__, timestamp);

CREATE TABLE default.dist_prom_metric ON CLUSTER abc AS prom_metric ENGINE = Distributed(abc, default, prom_metric);

CREATE TABLE default.prom_metric_series ON CLUSTER abc (
__series_id Int64,
__mgmt_id Int64,
__series_id__ Int64,
__mgmt_id__ Int64,
labels String,
__name__ String
) ENGINE=ReplicatedReplacingMergeTree()
ORDER BY (__name__, __series_id);
ORDER BY (__name__, __series_id__);

CREATE TABLE default.dist_prom_metric_series ON CLUSTER abc AS prom_metric_series ENGINE = Distributed(abc, default, prom_metric_series);

CREATE TABLE default.prom_metric_agg ON CLUSTER abc (
__series_id Int64,
__series_id__ Int64,
timestamp DateTime CODEC(DoubleDelta, LZ4),
max_value AggregateFunction(max, Float32),
min_value AggregateFunction(min, Float32),
avg_value AggregateFunction(avg, Float32)
) ENGINE=ReplicatedReplacingMergeTree()
PARTITION BY toYYYYMMDD(timestamp)
ORDER BY (__series_id, timestamp);
ORDER BY (__series_id__, timestamp);

CREATE TABLE default.dist_prom_metric_agg ON CLUSTER abc AS prom_metric_agg ENGINE = Distributed(abc, default, prom_metric_agg);

SELECT __series_id,
SELECT __series_id__,
toStartOfDay(timestamp) AS timestamp,
maxMerge(max_value) AS max_value,
minMerge(min_value) AS min_value,
avgMerge(avg_value) AS avg_value
FROM default.dist_prom_metric_agg
WHERE __series_id IN (-9223014754132113609, -9223015002162651005)
GROUP BY __series_id, timestamp
ORDER BY __series_id, timestamp;
WHERE __series_id__ IN (-9223014754132113609, -9223015002162651005)
GROUP BY __series_id__, timestamp
ORDER BY __series_id__, timestamp;

-- Activate aggregation for future datapoints by creating a materialized view
CREATE MATERIALIZED VIEW default.prom_metric_mv ON CLUSTER abc
TO prom_metric_agg
AS SELECT __series_id,
AS SELECT __series_id__,
toStartOfHour(timestamp) AS timestamp,
maxState(value) AS max_value,
minState(value) AS min_value,
avgState(value) AS avg_value
FROM prom_metric
GROUP BY __series_id, timestamp;
GROUP BY __series_id__, timestamp;

-- Deactivate aggregation by dropping the materialized view. You can revise and create it later as you will.
DROP TABLE default.prom_metric_mv ON CLUSTER abc SYNC;
Expand Down Expand Up @@ -144,7 +144,7 @@ func (dp Datapoint) MarshalJSON() ([]byte, error) {
return nil, err
}
labels2 := labels[1 : len(labels)-1]
msg := fmt.Sprintf(`{"timestamp":"%s", "value":%f, "value1":%g, "value2":%d, "value3":%t, "__name__":"%s", %s, "__series_id":%d, "__mgmt_id":%d}`,
msg := fmt.Sprintf(`{"timestamp":"%s", "value":%f, "value1":%g, "value2":%d, "value3":%t, "__name__":"%s", %s, "__series_id__":%d, "__mgmt_id__":%d}`,
dp.Timestamp.Format(time.RFC3339), dp.Value, dp.Value1, dp.Value2, dp.Value3, dp.Name, labels2, seriesID, mgmtID)
return []byte(msg), nil
}
Expand Down
34 changes: 31 additions & 3 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type Config struct {
Tasks []*TaskConfig
Assignment Assignment
LogLevel string
LogTrace bool
RecordPoolSize int64
ReloadSeriesMapInterval int
ActiveSeriesRange int

Expand All @@ -46,7 +48,13 @@ type Config struct {

// KafkaConfig configuration parameters
type KafkaConfig struct {
Brokers string
Brokers string
Properties struct {
HeartbeatInterval int `json:"heartbeat.interval.ms"`
SessionTimeout int `json:"session.timeout.ms"`
RebalanceTimeout int `json:"rebalance.timeout.ms"`
RequestTimeoutOverhead int `json:"request.timeout.ms"`
}
ResetSaslRealm bool
Security map[string]string
TLS struct {
Expand Down Expand Up @@ -187,8 +195,12 @@ const (
defaultLogLevel = "info"
defaultKerberosConfigPath = "/etc/krb5.conf"
defaultMaxOpenConns = 1
defaultReloadSeriesMapInterval = 3600 // 1 hour
defaultActiveSeriesRange = 86400 // 1 day
defaultReloadSeriesMapInterval = 3600 // 1 hour
defaultActiveSeriesRange = 86400 // 1 day
defaultHeartbeatInterval = 3000 // 3 s
defaultSessionTimeout = 120000 // 2 min
defaultRebalanceTimeout = 120000 // 2 min
defaultRequestTimeoutOverhead = 60000 // 1 min
)

func ParseLocalCfgFile(cfgPath string) (cfg *Config, err error) {
Expand Down Expand Up @@ -259,6 +271,19 @@ func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Cr
os.Setenv("DOMAIN_REALM", net.JoinHostPort("hadoop."+strings.ToLower(cfg.Kafka.Sasl.GSSAPI.Realm), port))
}
}
if cfg.Kafka.Properties.HeartbeatInterval == 0 {
cfg.Kafka.Properties.HeartbeatInterval = defaultHeartbeatInterval
}
if cfg.Kafka.Properties.RebalanceTimeout == 0 {
cfg.Kafka.Properties.RebalanceTimeout = defaultRebalanceTimeout
}
if cfg.Kafka.Properties.RequestTimeoutOverhead == 0 {
cfg.Kafka.Properties.RequestTimeoutOverhead = defaultRequestTimeoutOverhead
}
if cfg.Kafka.Properties.SessionTimeout == 0 {
cfg.Kafka.Properties.SessionTimeout = defaultSessionTimeout
}

if cfg.Clickhouse.RetryTimes < 0 {
cfg.Clickhouse.RetryTimes = 0
}
Expand Down Expand Up @@ -316,6 +341,9 @@ func (cfg *Config) Normallize(constructGroup bool, httpAddr string, cred util.Cr
}
}
}
if cfg.RecordPoolSize == 0 {
cfg.RecordPoolSize = MaxBufferSize
}
switch strings.ToLower(cfg.LogLevel) {
case "debug", "info", "warn", "error", "dpanic", "panic", "fatal":
default:
Expand Down
8 changes: 4 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ services:
restart: always
hostname: zookeeper
ports:
- "2181:2181"
- "52181:2181"
environment:
ALLOW_ANONYMOUS_LOGIN: 1
ZOO_4LW_COMMANDS_WHITELIST: "*"
Expand Down Expand Up @@ -37,8 +37,8 @@ services:
image: clickhouse/clickhouse-server:23.8
restart: always
ports:
- "8123:8123"
- "9000:9000"
- "58123:8123"
- "59000:9000"
ulimits:
nofile:
soft: 262144
Expand All @@ -55,4 +55,4 @@ services:
- PREFER_HOST_MODE=hostname
- MODE=standalone
ports:
- "8848:8848"
- "58848:8848"
2 changes: 1 addition & 1 deletion docker/test_auto_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
127.0.0.1
]
]
port: 9000
port: 59000
db: default
username: ""
password: ""
Expand Down
2 changes: 1 addition & 1 deletion docker/test_dynamic_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
127.0.0.1
]
]
port: 9000
port: 59000
db: default
username: ""
password: ""
Expand Down
2 changes: 1 addition & 1 deletion docker/test_fixed_schema.hjson
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
127.0.0.1
]
]
port: 9000
port: 59000
db: default
username: ""
password: ""
Expand Down
Loading
Loading