Skip to content

Commit

Permalink
fix: sync dist table issue
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed May 11, 2024
1 parent f6f7626 commit 381cac1
Showing 1 changed file with 72 additions and 27 deletions.
99 changes: 72 additions & 27 deletions service/cron/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ func syncLogicbyTable(clusters []string, database, localTable string) error {
}

// logic table
if err = alterTable(ckService.Conn, database, common.ClickHouseLocalTablePrefix+localTable, onCluster, columnsExpr); err != nil {
if err = alterTable(ckService.Conn, database, common.ClickHouseDistTableOnLogicPrefix+localTable, onCluster, columnsExpr); err != nil {
return err
}

Expand Down Expand Up @@ -270,20 +270,10 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick
if conn == nil {
continue
}
query := fmt.Sprintf("SELECT name, type FROM system.columns WHERE database = '%s' AND table = '%s'", database, localTable)
rows, err := conn.Query(query)
tblMap, err := getColumns(conn, database, localTable)
if err != nil {
return errors.Wrap(err, host)
}
defer rows.Close()
tblMap := make(common.Map)
for rows.Next() {
var name, typ string
if err = rows.Scan(&name, &typ); err != nil {
return errors.Wrap(err, host)
}
tblMap[name] = typ
}
tableLists[host] = tblMap
dbLists[host] = conn
}
Expand All @@ -303,26 +293,45 @@ func syncDistTable(distTable, localTable, database string, conf model.CKManClick
if needAlter {
log.Logger.Debugf("need alter table, table %s.%s have different columns on cluster %s", database, localTable, conf.Cluster)
for host, cols := range tableLists {
needAdds := allCols.Difference(cols).(common.Map)
var columns []string
for k, v := range needAdds {
columns = append(columns, fmt.Sprintf("ADD COLUMN IF NOT EXISTS `%s` %s ", k, v))
}

// 当前节点是全量的列, 无需更新
if len(columns) == 0 {
continue
}
// local table
if err := alterTable(dbLists[host], database, localTable, "", strings.Join(columns, ",")); err != nil {
if err := syncSchema(dbLists[host], allCols, cols, database, localTable, ""); err != nil {
return err
}
}
}

// distributed table
if err := alterTable(dbLists[host], database, distTable, "", strings.Join(columns, ",")); err != nil {
return err
// 如果集群间本地表都是一致的,但是本地表与分布式表/逻辑表不同步?
for _, host := range conf.Hosts {
conn := common.GetConnection(host)
if conn == nil {
continue
}
distCols, err := getColumns(conn, database, distTable)
if err != nil {
return errors.Wrap(err, host)
}
onCluster := fmt.Sprintf("ON CLUSTER %s", conf.Cluster)
if err = syncSchema(conn, allCols, distCols, database, distTable, onCluster); err != nil {
return errors.Wrap(err, host)
}

logicTable := common.ClickHouseDistTableOnLogicPrefix + localTable
logicCols, err := getColumns(conn, database, logicTable)
if err != nil {
err = common.ClikHouseExceptionDecode(err)
var exception *client.Exception
if errors.As(err, &exception) {
// 逻辑表不存在没关系,不报错
if exception.Code == 60 {
continue
}
}
return errors.Wrap(err, host)
}

if err = syncSchema(conn, allCols, logicCols, database, logicTable, onCluster); err != nil {
return errors.Wrap(err, host)
}

}
return nil
}
Expand All @@ -347,3 +356,39 @@ func alterTable(conn *common.Conn, database, table, onCluster, col string) error
log.Logger.Debug(query)
return conn.Exec(query)
}

func getColumns(conn *common.Conn, database, table string) (common.Map, error) {
query := fmt.Sprintf("SELECT name, type FROM system.columns WHERE database = '%s' AND table = '%s'", database, table)
rows, err := conn.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
tblMap := make(common.Map)
for rows.Next() {
var name, typ string
if err = rows.Scan(&name, &typ); err != nil {
return nil, err
}
tblMap[name] = typ
}
return tblMap, nil
}

func syncSchema(conn *common.Conn, allCols, cols common.Map, database, table, oncluster string) error {
needAdds := allCols.Difference(cols).(common.Map)
var columns []string
for k, v := range needAdds {
columns = append(columns, fmt.Sprintf("ADD COLUMN IF NOT EXISTS `%s` %s ", k, v))
}

// 当前节点是全量的列, 无需更新
if len(columns) == 0 {
return nil
}
// local table
if err := alterTable(conn, database, table, oncluster, strings.Join(columns, ",")); err != nil {
return err
}
return nil
}

0 comments on commit 381cac1

Please sign in to comment.