Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove quotes in case of tuple partition name #265

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion business/ck_rebalance.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,14 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e
fmt.Sprintf("ALTER TABLE %s FETCH PARTITION '%s' FROM '%s'", tbl.Table, patt, tbl.ZooPath),
fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION '%s'", tbl.Table, patt),
}
if strings.Contains(patt, "(") && strings.Contains(patt, ")") {
// This implies that the partition name contains tuple. Removing quotes
dstQuires = []string{
fmt.Sprintf("ALTER TABLE %s DROP DETACHED PARTITION %s ", tbl.Table, patt),
fmt.Sprintf("ALTER TABLE %s FETCH PARTITION %s FROM '%s'", tbl.Table, patt, tbl.ZooPath),
fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION %s", tbl.Table, patt),
}
}
for _, query := range dstQuires {
log.Logger.Infof("host %s: query: %s", dstHost, query)
if _, err = dstChConn.Exec(query); err != nil {
Expand All @@ -246,6 +254,10 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e
return fmt.Errorf("can't get connection: %s", tbl.Host)
}
query := fmt.Sprintf("ALTER TABLE %s DROP PARTITION '%s'", tbl.Table, patt)
if strings.Contains(patt, "(") && strings.Contains(patt, ")") {
// This implies that the partition name contains tuple. Removing quotes
query = fmt.Sprintf("ALTER TABLE %s DROP PARTITION %s", tbl.Table, patt)
}
if _, err = srcChConn.Exec(query); err != nil {
log.Logger.Infof("host %s: query: %s", tbl.Host, query)
err = errors.Wrapf(err, "")
Expand All @@ -271,6 +283,10 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e
srcDir := dstDir + "/"

query := fmt.Sprintf("ALTER TABLE %s DETACH PARTITION '%s'", tbl.Table, patt)
if strings.Contains(patt, "(") && strings.Contains(patt, ")") {
// This implies that the partition name contains tuple. Removing quotes
query = fmt.Sprintf("ALTER TABLE %s DETACH PARTITION %s", tbl.Table, patt)
}
log.Logger.Infof("host: %s, query: %s", tbl.Host, query)
if _, err = srcCkConn.Exec(query); err != nil {
err = errors.Wrapf(err, "")
Expand All @@ -297,8 +313,10 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e
return
}
log.Logger.Debugf("host: %s, output: %s", tbl.Host, out)

query = fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION '%s'", tbl.Table, patt)
if strings.Contains(patt, "(") && strings.Contains(patt, ")") {
query = fmt.Sprintf("ALTER TABLE %s ATTACH PARTITION %s", tbl.Table, patt)
}
log.Logger.Infof("host: %s, query: %s", dstHost, query)
if _, err = dstCkConn.Exec(query); err != nil {
err = errors.Wrapf(err, "")
Expand All @@ -308,6 +326,9 @@ func (this *CKRebalance) ExecutePlan(database string, tbl *TblPartitions) (err e
lock.Unlock()

query = fmt.Sprintf("ALTER TABLE %s DROP DETACHED PARTITION '%s'", tbl.Table, patt)
if strings.Contains(patt, "(") && strings.Contains(patt, ")") {
query = fmt.Sprintf("ALTER TABLE %s DROP DETACHED PARTITION %s", tbl.Table, patt)
}
log.Logger.Infof("host: %s, query: %s", tbl.Host, query)
if _, err = srcCkConn.Exec(query); err != nil {
err = errors.Wrapf(err, "")
Expand Down
30 changes: 18 additions & 12 deletions controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,15 +306,19 @@ func (ck *ClickHouseController) CreateTable(c *gin.Context) {
}

// sync zookeeper path
err = clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
}
if conf.IsReplica {
path, err := clickhouse.GetZkPath(ckService.DB, params.DB, params.Name)
if err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
}
tableName := fmt.Sprintf("%s.%s", params.DB, params.Name)
conf.ZooPath[tableName] = path

if err = repository.Ps.UpdateCluster(conf); err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
if err = repository.Ps.UpdateCluster(conf); err != nil {
model.WrapMsg(c, model.CREAT_CK_TABLE_FAIL, err)
return
}
}

if req.DryRun {
Expand Down Expand Up @@ -844,10 +848,12 @@ func (ck *ClickHouseController) StopCluster(c *gin.Context) {
we cant't get zookeeper path by querying ck,
so need to save the ZooKeeper path before stopping the cluster.
*/
err = clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, err)
return
if conf.IsReplica {
err = clickhouse.GetReplicaZkPath(&conf)
if err != nil {
model.WrapMsg(c, model.STOP_CK_CLUSTER_FAIL, err)
return
}
}

common.CloseConns(conf.Hosts)
Expand Down
11 changes: 5 additions & 6 deletions deploy/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,13 @@ func (d *CKDeploy) Init() error {
NeedSudo: d.Conf.NeedSudo,
AuthenticateType: d.Conf.AuthenticateType,
}
cmd := "hostname -f"
output, err := common.RemoteExecute(sshOpts, cmd)
if err != nil {
lastError = err
return
}
cmd := "hostname"
output, _ := common.RemoteExecute(sshOpts, cmd)

hostname := strings.Trim(output, "\n")
if hostname == "" {
hostname = innerReplica.Ip
}
d.Conf.Shards[innerShardIndex].Replicas[innerReplicaIndex].HostName = hostname
lock.Lock()
HostNameMap[hostname] = true
Expand Down
4 changes: 2 additions & 2 deletions service/clickhouse/clickhouse_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -898,7 +898,7 @@ func GetReplicaZkPath(conf *model.CKManClickHouseConfig) error {
for _, database := range databases {
if tables, ok := dbtables[database]; ok {
for _, table := range tables {
path, err := getReplicaZkPath(service.DB, database, table)
path, err := GetZkPath(service.DB, database, table)
if err != nil {
return err
}
Expand All @@ -910,7 +910,7 @@ func GetReplicaZkPath(conf *model.CKManClickHouseConfig) error {
return nil
}

func getReplicaZkPath(db *sql.DB, database, table string) (string, error) {
func GetZkPath(db *sql.DB, database, table string) (string, error) {
var err error
var path string
var rows *sql.Rows
Expand Down
64 changes: 33 additions & 31 deletions service/runner/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,34 +106,36 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
index++
}

service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
_ = clickhouse.GetReplicaZkPath(conf)
var zooPaths []string
for _, path := range conf.ZooPath {
zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1)
zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum+1), -1)
zooPaths = append(zooPaths, zooPath)
}
if conf.IsReplica {
service, err := zookeeper.NewZkService(conf.ZkNodes, conf.ZkPort)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
_ = clickhouse.GetReplicaZkPath(conf)
var zooPaths []string
for _, path := range conf.ZooPath {
zooPath := strings.Replace(path, "{cluster}", conf.Cluster, -1)
zooPath = strings.Replace(zooPath, "{shard}", fmt.Sprintf("%d", shardNum+1), -1)
zooPaths = append(zooPaths, zooPath)
}

for _, path := range zooPaths {
if ifDeleteShard {
//delete the shard
shardNode := fmt.Sprintf("%d", shardNum+1)
err = service.DeletePathUntilNode(path, shardNode)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
} else {
// delete replica path
replicaName := conf.Shards[shardNum].Replicas[replicaNum].Ip
replicaPath := fmt.Sprintf("%s/replicas/%s", path, replicaName)
log.Logger.Debugf("replicaPath: %s", replicaPath)
err = service.DeleteAll(replicaPath)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
for _, path := range zooPaths {
if ifDeleteShard {
//delete the shard
shardNode := fmt.Sprintf("%d", shardNum+1)
err = service.DeletePathUntilNode(path, shardNode)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
} else {
// delete replica path
replicaName := conf.Shards[shardNum].Replicas[replicaNum].Ip
replicaPath := fmt.Sprintf("%s/replicas/%s", path, replicaName)
log.Logger.Debugf("replicaPath: %s", replicaPath)
err = service.DeleteAll(replicaPath)
if err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusClearData.EN)
}
}
}
}
Expand All @@ -143,12 +145,12 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
d := deploy.NewCkDeploy(*conf)
d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd)
d.Conf.Hosts = []string{ip}
if err = d.Stop(); err != nil {
if err := d.Stop(); err != nil {
log.Logger.Warnf("can't stop node %s, ignore it", ip)
}

deploy.SetNodeStatus(task, model.NodeStatusUninstall, model.ALL_NODES_DEFAULT)
if err = d.Uninstall(); err != nil {
if err := d.Uninstall(); err != nil {
log.Logger.Warnf("can't uninsatll node %s, ignore it", ip)
}

Expand Down Expand Up @@ -179,10 +181,10 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
d = deploy.NewCkDeploy(*conf)
d.Conf.Hosts = hosts
d.Conf.Shards = shards
if err = d.Init(); err != nil {
if err := d.Init(); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN)
}
if err = d.Config(); err != nil {
if err := d.Config(); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN)
}

Expand Down
3 changes: 3 additions & 0 deletions service/zookeeper/zookeeper_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ func GetZkService(clusterName string) (*ZkService, error) {
}

func (z *ZkService) GetReplicatedTableStatus(conf *model.CKManClickHouseConfig) ([]model.ZkReplicatedTableStatus, error) {
if !conf.IsReplica {
return nil, nil
}
err := clickhouse.GetReplicaZkPath(conf)
if err != nil {
return nil, err
Expand Down