Skip to content

Commit

Permalink
perf: only sync current zookpath when create table
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Aug 22, 2022
1 parent 5220fbb commit 979eda9
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 45 deletions.
30 changes: 18 additions & 12 deletions controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,19 @@ func (ck *ClickHouseController) CreateTable(c *gin.Context) {
}

// sync zookeeper path
err = clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
}
if conf.IsReplica {
path, err := clickhouse.GetZkPath(ckService.DB, params.DB, params.Name)
if err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
}
tableName := fmt.Sprintf("%s.%s", params.DB, params.Name)
conf.ZooPath[tableName] = path

if err = repository.Ps.UpdateCluster(conf); err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
if err = repository.Ps.UpdateCluster(conf); err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
}
}

if req.DryRun {
Expand Down Expand Up @@ -844,10 +848,12 @@ func (ck *ClickHouseController) StopCluster(c *gin.Context) {
we cant't get zookeeper path by querying ck,
so need to save the ZooKeeper path before stopping the cluster.
*/
err = clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, err)
return
if conf.IsReplica {
err = clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, err)
return
}
}

common.CloseConns(conf.Hosts)
Expand Down
4 changes: 2 additions & 2 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ func GetReplicaZkPath(conf *model.CKManClickHouseConfig) error {
for _, database := range databases {
if tables, ok := dbtables[database]; ok {
for _, table := range tables {
path, err := getReplicaZkPath(service.DB, database, table)
path, err := GetZkPath(service.DB, database, table)
if err != nil {
return err
}
Expand All @@ -910,7 +910,7 @@ func GetReplicaZkPath(conf *model.CKManClickHouseConfig) error {
return nil
}

func getReplicaZkPath(db *sql.DB, database, table string) (string, error) {
func GetZkPath(db *sql.DB, database, table string) (string, error) {
var err error
var path string
var rows *sql.Rows
Expand Down
64 changes: 33 additions & 31 deletions service/runner/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,36 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
index++
}

service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
_ = clickhouse.GetReplicaZkPath(conf)
var zooPaths []string
for _, path := range conf.ZooPath {
zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1)
zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum+1), -1)
zooPaths = append(zooPaths, zooPath)
}
if conf.IsReplica {
service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
_ = clickhouse.GetReplicaZkPath(conf)
var zooPaths []string
for _, path := range conf.ZooPath {
zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1)
zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum+1), -1)
zooPaths = append(zooPaths, zooPath)
}

for _, path := range zooPaths {
if ifDeleteShard {
//delete the shard
shardNode := fmt.Sprintf("%d", shardNum+1)
err = service.DeletePathUntilNode(path, shardNode)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
} else {
// delete replica path
replicaName := conf.Shards[shardNum].Replicas[replicaNum].Ip
replicaPath := fmt.Sprintf("%s/replicas/%s", path, replicaName)
log.Logger.Debugf("replicaPath: %s", replicaPath)
err = service.DeleteAll(replicaPath)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
for _, path := range zooPaths {
if ifDeleteShard {
//delete the shard
shardNode := fmt.Sprintf("%d", shardNum+1)
err = service.DeletePathUntilNode(path, shardNode)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
} else {
// delete replica path
replicaName := conf.Shards[shardNum].Replicas[replicaNum].Ip
replicaPath := fmt.Sprintf("%s/replicas/%s", path, replicaName)
log.Logger.Debugf("replicaPath: %s", replicaPath)
err = service.DeleteAll(replicaPath)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
}
}
}
Expand All @@ -143,12 +145,12 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
d := deploy.NewCkDeploy(*conf)
d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd)
d.Conf.Hosts = []string{ip}
if err = d.Stop(); err != nil {
if err := d.Stop(); err != nil {
log.Logger.Warnf("can't stop node %s, ignore it", ip)
}

deploy.SetNodeStatus(task, model.NodeStatusUninstall, model.ALL_NODES_DEFAULT)
if err = d.Uninstall(); err != nil {
if err := d.Uninstall(); err != nil {
log.Logger.Warnf("can't uninsatll node %s, ignore it", ip)
}

Expand Down Expand Up @@ -179,10 +181,10 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
d = deploy.NewCkDeploy(*conf)
d.Conf.Hosts = hosts
d.Conf.Shards = shards
if err = d.Init(); err != nil {
if err := d.Init(); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN)
}
if err = d.Config(); err != nil {
if err := d.Config(); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN)
}

Expand Down
3 changes: 3 additions & 0 deletions service/zookeeper/zookeeper_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func GetZkService(clusterName string) (*ZkService, error) {
}

func (z *ZkService) GetReplicatedTableStatus(conf *model.CKManClickHouseConfig) ([]model.ZkReplicatedTableStatus, error) {
if !conf.IsReplica {
return nil, nil
}
err := clickhouse.GetReplicaZkPath(conf)
if err != nil {
return nil, err
Expand Down

0 comments on commit 979eda9

Please sign in to comment.