Skip to content

Commit

Permalink
clickhouse-keeper with tgz
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Jun 23, 2024
1 parent eb45e8e commit 5011d72
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 24 deletions.
8 changes: 0 additions & 8 deletions controller/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -1590,10 +1590,6 @@ func (controller *ClickHouseController) AddNode(c *gin.Context) {
// install clickhouse and start service on the new node
d := deploy.NewCkDeploy(conf)
d.Conf.Hosts = req.Ips
if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal {
d.Ext.Restart = true
d.Conf.KeeperConf.KeeperNodes = append(d.Conf.KeeperConf.KeeperNodes, req.Ips...)
}

d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd)
if reflect.DeepEqual(d.Packages, deploy.Packages{}) {
Expand Down Expand Up @@ -1703,10 +1699,6 @@ SETTINGS skip_unavailable_shards = 1`
d := deploy.NewCkDeploy(conf)
d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd)
d.Conf.Hosts = []string{ip}
if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal {
d.Ext.Restart = true
d.Conf.KeeperConf.KeeperNodes = common.ArrayRemove(conf.Hosts, ip)
}

taskId, err := deploy.CreateNewTask(clusterName, model.TaskTypeCKDeleteNode, d)
if err != nil {
Expand Down
13 changes: 13 additions & 0 deletions controller/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ func checkDeployParams(conf *model.CKManClickHouseConfig, force bool) error {
if conf.Cwd == "" {
return errors.Errorf("cwd can't be empty for tgz deployment")
}
if !strings.HasSuffix(conf.Cwd, "/") {
return errors.Errorf(fmt.Sprintf("path %s must end with '/'", conf.Cwd))
}
conf.NeedSudo = false
if err = checkAccess(conf.Cwd, conf); err != nil {
return errors.Wrapf(err, "check access error")
Expand Down Expand Up @@ -140,11 +143,21 @@ func checkDeployParams(conf *model.CKManClickHouseConfig, force bool) error {
return errors.Errorf("keeper nodes must not be empty")
}
} else if conf.KeeperConf.Runtime == model.KeeperRuntimeInternal {
if strings.HasSuffix(conf.PkgType, common.PkgSuffixTgz) {
return errors.Errorf("keeper internal runtime doesn't support tgz deployment")
}
conf.KeeperConf.KeeperNodes = make([]string, len(conf.Hosts))
copy(conf.KeeperConf.KeeperNodes, conf.Hosts)

} else {
return errors.Errorf("keeper runtime %s is not supported", conf.KeeperConf.Runtime)
}
if !strings.HasSuffix(conf.KeeperConf.LogPath, "/") {
return errors.Errorf(fmt.Sprintf("path %s must end with '/'", conf.KeeperConf.LogPath))
}
if !strings.HasSuffix(conf.KeeperConf.SnapshotPath, "/") {
return errors.Errorf(fmt.Sprintf("path %s must end with '/'", conf.KeeperConf.SnapshotPath))
}
if err := checkAccess(conf.KeeperConf.LogPath, conf); err != nil {
return errors.Wrapf(err, "check access error")
}
Expand Down
1 change: 1 addition & 0 deletions controller/schema_ui.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func RegistCreateClusterSchema() common.ConfigParams {
DescriptionZH: "工作路径,仅tgz部署时需要",
DescriptionEN: "Working directory, only required for tgz deployment",
Visiable: "PkgType.indexOf('tgz') !== -1",
Regexp: "^/.+/$",
})
params.MustRegister(conf, "SshUser", &common.Parameter{
LabelZH: "系统账户名",
Expand Down
1 change: 1 addition & 0 deletions deploy/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,7 @@ func (d *CKDeploy) Config() error {
if d.Conf.Keeper == model.ClickhouseKeeper && !d.Conf.KeeperWithStanalone() {
cmds = append(cmds, fmt.Sprintf("mv %s %s", path.Join(remotePath, "config.d", keeperFile.BaseName), path.Join(remotePath, "config.d", "keeper_config.xml")))
}
cmds = append(cmds, "rm -rf /tmp/host* /tmp/users*")
if d.Conf.NeedSudo {
cmds = append(cmds, "chown -R clickhouse:clickhouse /etc/clickhouse-server")
}
Expand Down
31 changes: 21 additions & 10 deletions deploy/cmd_tgz.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,34 +16,45 @@ func (TgzFacotry) Create() CmdAdpt {
type TgzPkg struct{}

func (p *TgzPkg) StartCmd(svr, cwd string) string {
return fmt.Sprintf("%s/bin/%s --config-file=%s/etc/%s/config.xml --pid-file=%s/run/%s.pid --daemon", cwd, svr, cwd, svr, cwd, svr)
if svr == KeeperSvrName {
return fmt.Sprintf("%sbin/%s --config-file=%setc/%s/keeper_config.xml --pid-file=%srun/%s.pid --daemon", cwd, svr, cwd, svr, cwd, svr)
} else {
return fmt.Sprintf("%sbin/%s --config-file=%setc/%s/config.xml --pid-file=%srun/%s.pid --daemon", cwd, svr, cwd, svr, cwd, svr)
}
}
func (p *TgzPkg) StopCmd(svr, cwd string) string {
return fmt.Sprintf("ps -ef |grep %s/bin/%s |grep -v grep |awk '{print $2}' |xargs kill", cwd, svr)
return fmt.Sprintf("ps -ef |grep %sbin/%s |grep -v grep |awk '{print $2}' |xargs kill", cwd, svr)
}

func (p *TgzPkg) RestartCmd(svr, cwd string) string {
return p.StopCmd(svr, cwd) + ";" + p.StartCmd(svr, cwd)
}

func (p *TgzPkg) InstallCmd(svr string, pkgs Packages) string {
content := fmt.Sprintf("mkdir -p %s/bin %s/etc/clickhouse-server/config.d %s/etc/clickhouse-server/users.d %s/log/clickhouse-server %s/run %s/data/clickhouse;", pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd)
content := ""
if svr == CkSvrName {
content = fmt.Sprintf("mkdir -p %sbin %setc/clickhouse-server/config.d %setc/clickhouse-server/users.d %slog/clickhouse-server %srun;",
pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd)
for _, pkg := range pkgs.PkgLists {
lastIndex := strings.LastIndex(pkg, "-")
extractDir := pkg[:lastIndex]
content += fmt.Sprintf("tar -xvf /tmp/%s -C /tmp;", pkg)
content += fmt.Sprintf("cp -rf /tmp/%s/usr/bin/* %s/bin;", extractDir, pkgs.Cwd)
if !strings.Contains(extractDir, common.PkgModuleCommon) {
content += fmt.Sprintf("cp -rf /tmp/%s/etc/clickhouse-* %s/etc/;", extractDir, pkgs.Cwd)
content += fmt.Sprintf("tar -xf /tmp/%s -C /tmp;", pkg)
content += fmt.Sprintf("cp -rf /tmp/%s/usr/bin/* %sbin;", extractDir, pkgs.Cwd)
if strings.Contains(extractDir, common.PkgModuleClient) {
content += fmt.Sprintf("cp -rf /tmp/%s/etc/clickhouse-client %setc/;", extractDir, pkgs.Cwd)
} else if strings.Contains(extractDir, common.PkgModuleServer) {
content += fmt.Sprintf("cp -rf /tmp/%s/etc/clickhouse-server %setc/;", extractDir, pkgs.Cwd)
}
}
} else if svr == KeeperSvrName {
content = fmt.Sprintf("mkdir -p %sbin %s/etc/clickhouse-keeper %slog/clickhouse-keeper %srun;",
pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd)
pkg := pkgs.Keeper
lastIndex := strings.LastIndex(pkg, "-")
extractDir := pkg[:lastIndex]
content += fmt.Sprintf("tar -xvf /tmp/%s -C /tmp;", pkg)
content += fmt.Sprintf("cp -rf /tmp/%s/usr/bin/* %s/bin;", extractDir, pkgs.Cwd)
content += fmt.Sprintf("tar -xf /tmp/%s -C /tmp;", pkg)
content += fmt.Sprintf("cp -rf /tmp/%s/usr/bin/* %sbin;", extractDir, pkgs.Cwd)
content += fmt.Sprintf("cp -rf /tmp/%s/etc/clickhouse-keeper/* %setc/clickhouse-keeper/;", extractDir, pkgs.Cwd)
}
return strings.TrimSuffix(content, ";")
}
Expand All @@ -53,5 +64,5 @@ func (p *TgzPkg) UpgradeCmd(svr string, pkgs Packages) string {
}

func (p *TgzPkg) Uninstall(svr string, pkgs Packages, version string) string {
return fmt.Sprintf("rm -rf %s/*", pkgs.Cwd)
return fmt.Sprintf("rm -rf %s*", pkgs.Cwd)
}
6 changes: 3 additions & 3 deletions deploy/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,7 @@ func (d *KeeperDeploy) Install() error {
lastIndex := strings.LastIndex(pkg, "-")
extractDir := pkg[:lastIndex]

cmd3 := fmt.Sprintf("cp /tmp/%s/etc/init.d/clickhouse-keeper /etc/init.d/;", extractDir)
cmd3 += fmt.Sprintf("cp /tmp/%s/lib/systemd/system/clickhouse-keeper.service /etc/systemd/system/", extractDir)
cmd3 := fmt.Sprintf("cp /tmp/%s/lib/systemd/system/clickhouse-keeper.service /etc/systemd/system/", extractDir)
sshOpts.NeedSudo = true
_, err = common.RemoteExecute(sshOpts, cmd3)
if err != nil {
Expand All @@ -197,7 +196,7 @@ func (d *KeeperDeploy) Uninstall() error {
cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType)
cmds := make([]string, 0)
cmds = append(cmds, cmdIns.Uninstall(KeeperSvrName, d.Packages, d.Conf.Version))
cmds = append(cmds, fmt.Sprintf("rm -rf %s %s", d.Conf.KeeperConf.LogPath, d.Conf.KeeperConf.SnapshotPath))
cmds = append(cmds, fmt.Sprintf("rm -rf %s/* %s/*", d.Conf.KeeperConf.LogPath, d.Conf.KeeperConf.SnapshotPath))
if d.Conf.NeedSudo {
cmds = append(cmds, "rm -rf /etc/clickhouse-keeper")
}
Expand Down Expand Up @@ -327,6 +326,7 @@ func (d *KeeperDeploy) Config() error {
if d.Conf.NeedSudo {
cmds = append(cmds, "chown -R clickhouse:clickhouse /etc/clickhouse-keeper")
}
cmds = append(cmds, "rm -rf /tmp/keeper_config*")
cmd := strings.Join(cmds, ";")
if _, err = common.RemoteExecute(sshOpts, cmd); err != nil {
lastError = err
Expand Down
13 changes: 10 additions & 3 deletions service/runner/ck.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,10 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
d = deploy.NewCkDeploy(*conf)
d.Conf.Hosts = hosts
d.Conf.Shards = shards
if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal {
d.Ext.Restart = true
d.Conf.KeeperConf.KeeperNodes = common.ArrayRemove(conf.Hosts, ip)
}
if err := d.Init(); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN)
}
Expand All @@ -202,7 +206,8 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip
return errors.Wrapf(err, "[%s]", model.NodeStatusRestart.EN)
}
if err := d.Check(300); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN)
log.Logger.Warnf("[%s]delnode check failed: %v", d.Conf.Cluster, err)
//return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN)
}
}

Expand Down Expand Up @@ -243,7 +248,8 @@ func AddCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, d *de

deploy.SetNodeStatus(task, model.NodeStatusCheck, model.ALL_NODES_DEFAULT)
if err := d.Check(30); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN)
log.Logger.Warnf("[%s]addnode check failed: %v", d.Conf.Cluster, err)
//return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN)
}

// update other nodes config
Expand All @@ -267,7 +273,8 @@ func AddCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, d *de
}

if err := d2.Check(300); err != nil {
return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN)
log.Logger.Warnf("[%s]addnode check failed: %v", d.Conf.Cluster, err)
//return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN)
}
}

Expand Down
5 changes: 5 additions & 0 deletions service/runner/handle.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,11 @@ func CKAddNodeHandle(task *model.Task) error {
return err
}

if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal {
d.Ext.Restart = true
d.Conf.KeeperConf.KeeperNodes = append(d.Conf.KeeperConf.KeeperNodes, d.Conf.Hosts...)
}

conf, err := repository.Ps.GetClusterbyName(d.Conf.Cluster)
if err != nil {
return nil
Expand Down

0 comments on commit 5011d72

Please sign in to comment.