From eb45e8e145b725d2e83b7e029446fd7464683a41 Mon Sep 17 00:00:00 2001 From: YenchangChan Date: Sat, 22 Jun 2024 16:30:05 +0800 Subject: [PATCH] clickhouse-keeper is ready --- Dockerfile.test | 4 +- Makefile | 10 + ckconfig/keeper.go | 93 +++++ ckconfig/keeper_fake.xml | 37 ++ ckconfig/keeper_test.go | 27 ++ ckconfig/metrika.go | 25 +- cmd/znodefix/znodefix.go | 3 +- common/math.go | 9 - common/math_test.go | 15 + common/package.go | 66 +--- common/util.go | 24 +- controller/clickhouse.go | 15 +- controller/deploy.go | 51 ++- controller/package.go | 73 +--- controller/schema_ui.go | 172 +++++---- controller/zookeeper.go | 8 +- deploy/base.go | 1 + deploy/ck.go | 37 +- deploy/cmd.go | 6 +- deploy/cmd_deb.go | 27 +- deploy/cmd_pkg_test.go | 18 +- deploy/cmd_rpm.go | 28 +- deploy/cmd_tgz.go | 30 +- deploy/keeper.go | 503 +++++++++++++++++++++++++ docker-compose.yml | 12 +- docs/docs.go | 102 ++++- docs/swagger.json | 102 ++++- docs/swagger.yaml | 71 +++- frontend | 2 +- model/deploy_ck.go | 41 +- model/task.go | 2 + service/runner/ck.go | 37 +- service/runner/handle.go | 32 ++ service/runner/keeper.go | 171 +++++++++ service/zookeeper/zk_test.go | 2 +- service/zookeeper/zookeeper_service.go | 19 +- 36 files changed, 1537 insertions(+), 338 deletions(-) create mode 100644 ckconfig/keeper.go create mode 100644 ckconfig/keeper_fake.xml create mode 100644 ckconfig/keeper_test.go create mode 100644 deploy/keeper.go create mode 100644 service/runner/keeper.go diff --git a/Dockerfile.test b/Dockerfile.test index 12db2d28..73cac1ab 100644 --- a/Dockerfile.test +++ b/Dockerfile.test @@ -2,8 +2,8 @@ # You can run command like: "docker build -f Dockerfile.test -t ckman-clickhouse:centos-7 ." # the offical image is eoitek/ckman-clickhouse:centos-7, You can pull it from dockerhub. -#FROM centos:7 -FROM ccr.ccs.tencentyun.com/library/centos:7 +FROM centos:7 +#FROM ccr.ccs.tencentyun.com/library/centos:7 WORKDIR /var/ RUN yum -y update && yum install -y openssh* \ diff --git a/Makefile b/Makefile index d8d3d5b9..8735d3e9 100644 --- a/Makefile +++ b/Makefile @@ -14,6 +14,7 @@ GOARCH?=$(shell go env GOARCH) TARNAME=${PKGDIR}-${VERSION}-${DATE}.${OS}.$(GOARCH).tar.gz TAG?=$(shell date +%y%m%d) LDFLAGS=-ldflags "-X main.BuildTimeStamp=${TIME} -X main.GitCommitHash=${REVISION} -X main.Version=${VERSION}" +GCFLAGS=-gcflags "all=-N -l" PUB_KEY=$(shell cat resources/eoi_public_key.pub 2>/dev/null) export GOPROXY=https://goproxy.cn,direct @@ -33,6 +34,15 @@ backend: go build ${LDFLAGS} -o znodefix cmd/znodefix/znodefix.go go build ${LDFLAGS} -o znode_count cmd/znodecnt/znodecount.go +.PHONY: debug +debug: + @rm -rf ${PKGFULLDIR} + go build ${GCFLAGS} ${LDFLAGS} + go build ${GCFLAGS} ${LDFLAGS} -o ckmanpasswd cmd/password/password.go + go build ${GCFLAGS} ${LDFLAGS} -o migrate cmd/migrate/migrate.go + go build ${GCFLAGS} ${LDFLAGS} -o znodefix cmd/znodefix/znodefix.go + go build ${GCFLAGS} ${LDFLAGS} -o znode_count cmd/znodecnt/znodecount.go + .PHONY: pre pre: go mod tidy diff --git a/ckconfig/keeper.go b/ckconfig/keeper.go new file mode 100644 index 00000000..112dcab3 --- /dev/null +++ b/ckconfig/keeper.go @@ -0,0 +1,93 @@ +package ckconfig + +import ( + "path" + + "github.com/housepower/ckman/common" + "github.com/housepower/ckman/model" + "github.com/imdario/mergo" +) + +func keeper_root(ipv6Enable bool) map[string]interface{} { + output := make(map[string]interface{}) + if ipv6Enable { + output["listen_host"] = "::" + } else { + output["listen_host"] = "0.0.0.0" + } + output["max_connections"] = 4096 + return output +} + +func keeper_server(conf *model.CKManClickHouseConfig, ipv6Enable bool, idx int) map[string]interface{} { + output := keeper_root(ipv6Enable) + output["logger"] = keeper_logger(conf) + keeper_server := make(map[string]interface{}) + mergo.Merge(&keeper_server, conf.KeeperConf.Expert) + keeper_server["tcp_port"] = conf.KeeperConf.TcpPort + keeper_server["server_id"] = idx + keeper_server["log_storage_path"] = conf.KeeperConf.LogPath + "clickhouse/coordination/logs" + keeper_server["snapshot_storage_path"] = conf.KeeperConf.SnapshotPath + "clickhouse/coordination/snapshots" + keeper_server["coordination_settings"] = coordination_settings(conf.KeeperConf.Coordination) + keeper_server["raft_configuration"] = raft_configuration(conf.KeeperConf) + output["keeper_server"] = keeper_server + return output +} + +// https://github.com/ClickHouse/ClickHouse/blob/master/src/Coordination/CoordinationSettings.h +func coordination_settings(coordination model.Coordination) map[string]interface{} { + output := make(map[string]interface{}) + mergo.Merge(&output, coordination.Expert) + output["operation_timeout_ms"] = coordination.OperationTimeoutMs + output["session_timeout_ms"] = coordination.SessionTimeoutMs + if coordination.ForceSync { + output["force_sync"] = "true" + } else { + output["force_sync"] = "false" + } + if coordination.AutoForwarding { + output["auto_forwarding"] = "true" + } else { + output["auto_forwarding"] = "false" + } + return output +} + +func keeper_logger(conf *model.CKManClickHouseConfig) map[string]interface{} { + output := make(map[string]interface{}) + output["level"] = "debug" + output["size"] = "1000M" + output["count"] = 10 + if !conf.NeedSudo { + output["log"] = path.Join(conf.Cwd, "log", "clickhouse-keeper", "clickhouse-keeper.log") + output["errorlog"] = path.Join(conf.Cwd, "log", "clickhouse-keeper", "clickhouse-keeper.err.log") + } + return output + +} + +func raft_configuration(conf *model.KeeperConf) []map[string]interface{} { + var outputs []map[string]interface{} + for idx, node := range conf.KeeperNodes { + output := make(map[string]interface{}) + output["server"] = map[string]interface{}{ + "id": idx + 1, + "hostname": node, + "port": conf.RaftPort, + } + outputs = append(outputs, output) + } + return outputs +} + +func GenerateKeeperXML(filename string, conf *model.CKManClickHouseConfig, ipv6Enable bool, idx int) (string, error) { + xml := common.NewXmlFile(filename) + rootTag := "clickhouse" + xml.Begin(rootTag) + xml.Merge(keeper_server(conf, ipv6Enable, idx)) + xml.End(rootTag) + if err := xml.Dump(); err != nil { + return filename, err + } + return filename, nil +} diff --git a/ckconfig/keeper_fake.xml b/ckconfig/keeper_fake.xml new file mode 100644 index 00000000..63d88b4d --- /dev/null +++ b/ckconfig/keeper_fake.xml @@ -0,0 +1,37 @@ + + + + false + true + 10000 + Information + 30000 + + /var/lib/clickhouse/coordination/log + + + 192.168.101.102 + 1 + 9181 + + + 192.168.101.105 + 2 + 9181 + + + 192.168.101.107 + 3 + 9181 + + + 2 + /var/lib/clickhouse/coordination/snapshots + 9181 + + :: + + debug + + 4096 + diff --git a/ckconfig/keeper_test.go b/ckconfig/keeper_test.go new file mode 100644 index 00000000..05fb0ced --- /dev/null +++ b/ckconfig/keeper_test.go @@ -0,0 +1,27 @@ +package ckconfig + +import ( + "testing" + + "github.com/housepower/ckman/model" + "github.com/stretchr/testify/assert" +) + +func TestGenerateKeeperXML(t *testing.T) { + conf := model.CKManClickHouseConfig{ + KeeperConf: &model.KeeperConf{ + KeeperNodes: []string{"192.168.101.102", "192.168.101.105", "192.168.101.107"}, + TcpPort: 9181, + RaftPort: 9234, + LogPath: "/var/lib/clickhouse/coordination/log", + SnapshotPath: "/var/lib/clickhouse/coordination/snapshots", + Coordination: model.Coordination{ + OperationTimeoutMs: 10000, + SessionTimeoutMs: 30000, + ForceSync: true, + }, + }, + } + _, err := GenerateKeeperXML("keeper_fake.xml", &conf, true, 2) + assert.Nil(t, err) +} diff --git a/ckconfig/metrika.go b/ckconfig/metrika.go index 4e9973f9..3c3591a2 100644 --- a/ckconfig/metrika.go +++ b/ckconfig/metrika.go @@ -3,6 +3,7 @@ package ckconfig import ( "github.com/housepower/ckman/common" "github.com/housepower/ckman/model" + "github.com/housepower/ckman/service/zookeeper" ) func GenerateMetrikaXML(filename string, conf *model.CKManClickHouseConfig) (string, error) { @@ -40,10 +41,11 @@ func GenZookeeperMetrika(indent int, conf *model.CKManClickHouseConfig) string { xml := common.NewXmlFile("") xml.SetIndent(indent) xml.Begin("zookeeper") - for index, zk := range conf.ZkNodes { + nodes, port := zookeeper.GetZkInfo(conf) + for index, zk := range nodes { xml.BeginwithAttr("node", []common.XMLAttr{{Key: "index", Value: index + 1}}) xml.Write("host", zk) - xml.Write("port", conf.ZkPort) + xml.Write("port", port) xml.End("node") } xml.End("zookeeper") @@ -59,25 +61,6 @@ func GenLocalMetrika(indent int, conf *model.CKManClickHouseConfig) string { secret = false } if secret { - xml.Comment(`Inter-server per-cluster secret for Distributed queries - default: no secret (no authentication will be performed) - - If set, then Distributed queries will be validated on shards, so at least: - - such cluster should exist on the shard, - - such cluster should have the same secret. - - And also (and which is more important), the initial_user will - be used as current user for the query. - - Right now the protocol is pretty simple and it only takes into account: - - cluster name - - query - - Also it will be nice if the following will be implemented: - - source hostname (see interserver_http_host), but then it will depends from DNS, - it can use IP address instead, but then the you need to get correct on the initiator node. - - target hostname / ip address (same notes as for source hostname) - - time-based security tokens`) xml.Write("secret", "foo") } for _, shard := range conf.Shards { diff --git a/cmd/znodefix/znodefix.go b/cmd/znodefix/znodefix.go index e610217a..63221b1a 100644 --- a/cmd/znodefix/znodefix.go +++ b/cmd/znodefix/znodefix.go @@ -82,7 +82,8 @@ func main() { if err != nil { log.Logger.Fatalf("get cluster %s failed:%v", cmdOps.ClusterName, err) } - service, err := zookeeper.NewZkService(cluster.ZkNodes, cluster.ZkPort) + nodes, port := zookeeper.GetZkInfo(&cluster) + service, err := zookeeper.NewZkService(nodes, port) if err != nil { log.Logger.Fatalf("can't create zookeeper instance:%v", err) } diff --git a/common/math.go b/common/math.go index 828f2e6a..bd445efb 100644 --- a/common/math.go +++ b/common/math.go @@ -19,15 +19,6 @@ func Decimal(value float64) float64 { return value } -func ArraySearch(target string, str_array []string) bool { - for _, str := range str_array { - if target == str { - return true - } - } - return false -} - func Md5CheckSum(s string) string { sum := md5.Sum([]byte(s)) return hex.EncodeToString(sum[:16]) diff --git a/common/math_test.go b/common/math_test.go index 921672ea..ccd042e2 100644 --- a/common/math_test.go +++ b/common/math_test.go @@ -131,3 +131,18 @@ func TestArraySearch(t *testing.T) { "aaa", "bbb", "ccc", "kkk", })) } + +func TestArrayRemove(t *testing.T) { + assert.Equal(t, []string{"aaa", "ccc", "kkk"}, ArrayRemove([]string{ + "aaa", "bbb", "ccc", "kkk", + }, "bbb")) + assert.Equal(t, []int{1, 2, 3, 4}, ArrayRemove([]int{ + 1, 2, 3, 4, 5, + }, 5)) + + arr := []string{ + "aaa", "bbb", "ccc", "kkk", + } + ArrayRemove(arr, "bbb") + assert.Equal(t, []string{"aaa", "bbb", "ccc", "kkk"}, arr) +} diff --git a/common/package.go b/common/package.go index 48060c04..18cb7527 100644 --- a/common/package.go +++ b/common/package.go @@ -15,12 +15,11 @@ import ( const ( DefaultPackageDirectory string = "package/clickhouse" - DefaultKeeperDirectory string = "package/keeper" PkgModuleCommon string = "common" PkgModuleClient string = "client" PkgModuleServer string = "server" - PkgKeeper string = "keeper" + PkgModuleKeeper string = "keeper" PkgSuffixRpm string = "rpm" PkgSuffixTgz string = "tgz" @@ -43,10 +42,7 @@ func (v CkPackageFiles) Less(i, j int) bool { return CompareClickHouseVersion(v[i].Version, v[j].Version) < 0 } -var ( - CkPackages sync.Map - KeeperPackages sync.Map -) +var CkPackages sync.Map func parsePkgName(fname string) CkPackageFile { var module, version, arch, suffix string @@ -238,61 +234,3 @@ func GetAllPackages() map[string]CkPackageFiles { }) return pkgs } - -func LoadKeeperPackages() error { - var files CkPackageFiles - CkPackages.Range(func(k, v interface{}) bool { - CkPackages.Delete(k) - return true - }) - dir, err := os.ReadDir(path.Join(config.GetWorkDirectory(), DefaultKeeperDirectory)) - if err != nil { - return errors.Wrap(err, "") - } - - for _, fi := range dir { - if !fi.IsDir() { - fname := fi.Name() - file := parsePkgName(fname) - files = append(files, file) - } - } - - sort.Sort(sort.Reverse(files)) - for _, file := range files { - key := fmt.Sprintf("%s.%s", file.Arch, file.Suffix) - value, ok := CkPackages.Load(key) - if !ok { - pkgs := CkPackageFiles{file} - CkPackages.Store(key, pkgs) - } else { - pkgs := value.(CkPackageFiles) - found := false - for _, pkg := range pkgs { - if reflect.DeepEqual(pkg, file) { - found = true - break - } - } - if !found { - pkgs = append(pkgs, file) - } - CkPackages.Store(key, pkgs) - } - } - - return nil -} - -func GetAllKeeperPackages() map[string]CkPackageFiles { - pkgs := make(map[string]CkPackageFiles, 0) - _ = LoadKeeperPackages() - CkPackages.Range(func(k, v interface{}) bool { - key := k.(string) - files := v.(CkPackageFiles) - sort.Sort(sort.Reverse(files)) - pkgs[key] = files - return true - }) - return pkgs -} diff --git a/common/util.go b/common/util.go index b4da2407..8ed35795 100644 --- a/common/util.go +++ b/common/util.go @@ -259,8 +259,8 @@ func Shuffle(value []string) []string { return arr } -func ArrayDistinct(arr []string) []string { - set := make(map[string]struct{}, len(arr)) +func ArrayDistinct[T string | int | int64 | int32 | uint | uint64 | uint32 | float32 | float64](arr []T) []T { + set := make(map[T]struct{}, len(arr)) j := 0 for _, v := range arr { _, ok := set[v] @@ -274,6 +274,26 @@ func ArrayDistinct(arr []string) []string { return arr[:j] } +func ArrayRemove[T string | int | int64 | int32 | uint | uint64 | uint32 | float32 | float64](arr []T, elem T) []T { + var res []T + for _, v := range arr { + if v == elem { + continue + } + res = append(res, v) + } + return res +} + +func ArraySearch[T string | int | int64 | int32 | uint | uint64 | uint32 | float32 | float64](target T, str_array []T) bool { + for _, str := range str_array { + if target == str { + return true + } + } + return false +} + func TernaryExpression(condition bool, texpr, fexpr interface{}) interface{} { if condition { return texpr diff --git a/controller/clickhouse.go b/controller/clickhouse.go index b95064e1..eb28257a 100644 --- a/controller/clickhouse.go +++ b/controller/clickhouse.go @@ -1590,6 +1590,11 @@ func (controller *ClickHouseController) AddNode(c *gin.Context) { // install clickhouse and start service on the new node d := deploy.NewCkDeploy(conf) d.Conf.Hosts = req.Ips + if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal { + d.Ext.Restart = true + d.Conf.KeeperConf.KeeperNodes = append(d.Conf.KeeperConf.KeeperNodes, req.Ips...) + } + d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd) if reflect.DeepEqual(d.Packages, deploy.Packages{}) { err := errors.Errorf("package %s %s not found in localpath", conf.Version, conf.PkgType) @@ -1698,6 +1703,10 @@ SETTINGS skip_unavailable_shards = 1` d := deploy.NewCkDeploy(conf) d.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd) d.Conf.Hosts = []string{ip} + if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal { + d.Ext.Restart = true + d.Conf.KeeperConf.KeeperNodes = common.ArrayRemove(conf.Hosts, ip) + } taskId, err := deploy.CreateNewTask(clusterName, model.TaskTypeCKDeleteNode, d) if err != nil { @@ -2821,6 +2830,7 @@ func mergeClickhouseConfig(conf *model.CKManClickHouseConfig, force bool) (bool, userconfChanged := !reflect.DeepEqual(cluster.UsersConf, conf.UsersConf) logicChaned := !reflect.DeepEqual(cluster.LogicCluster, conf.LogicCluster) zkChanged := !reflect.DeepEqual(cluster.ZkNodes, conf.ZkNodes) + keeperChanged := !reflect.DeepEqual(cluster.KeeperConf, conf.KeeperConf) noChangedFn := func() bool { return cluster.Port == conf.Port && @@ -2831,7 +2841,7 @@ func mergeClickhouseConfig(conf *model.CKManClickHouseConfig, force bool) (bool, cluster.Password == conf.Password && !storageChanged && !expertChanged && cluster.PromHost == conf.PromHost && cluster.PromPort == conf.PromPort && cluster.ZkPort == conf.ZkPort && - !userconfChanged && !logicChaned && !zkChanged + !userconfChanged && !logicChaned && !zkChanged && !keeperChanged } if !force { @@ -2954,7 +2964,7 @@ func mergeClickhouseConfig(conf *model.CKManClickHouseConfig, force bool) (bool, } // need restart - if cluster.Port != conf.Port || storageChanged || expertChanged { + if cluster.Port != conf.Port || storageChanged || expertChanged || keeperChanged { restart = true } @@ -2973,6 +2983,7 @@ func mergeClickhouseConfig(conf *model.CKManClickHouseConfig, force bool) (bool, cluster.LogicCluster = conf.LogicCluster cluster.ZkPort = conf.ZkPort cluster.ZkNodes = conf.ZkNodes + cluster.KeeperConf = conf.KeeperConf if err = common.DeepCopyByGob(conf, cluster); err != nil { return false, err } diff --git a/controller/deploy.go b/controller/deploy.go index 266044a4..c1642412 100644 --- a/controller/deploy.go +++ b/controller/deploy.go @@ -55,7 +55,12 @@ func (controller *DeployController) DeployCk(c *gin.Context) { tmp := deploy.NewCkDeploy(conf) tmp.Packages = deploy.BuildPackages(conf.Version, conf.PkgType, conf.Cwd) - + if conf.KeeperWithStanalone() { + if tmp.Packages.Keeper == "" { + controller.wrapfunc(c, model.E_DATA_CHECK_FAILED, errors.Errorf("keeper package not found")) + return + } + } taskId, err := deploy.CreateNewTask(conf.Cluster, model.TaskTypeCKDeploy, tmp) if err != nil { controller.wrapfunc(c, model.E_DATA_INSERT_FAILED, err) @@ -112,10 +117,6 @@ func checkDeployParams(conf *model.CKManClickHouseConfig, force bool) error { } } - // if conf.Hosts, err = common.ParseHosts(conf.Hosts); err != nil { - // return err - // } - if !force { if err := common.CheckCkInstance(conf); err != nil { return err @@ -125,16 +126,38 @@ func checkDeployParams(conf *model.CKManClickHouseConfig, force bool) error { if err = MatchingPlatfrom(conf); err != nil { return err } - //if conf.IsReplica && len(conf.Hosts)%2 == 1 { - // return errors.Errorf("When supporting replica, the number of nodes must be even") - //} - //conf.Shards = GetShardsbyHosts(conf.Hosts, conf.IsReplica) + conf.IsReplica = true - if len(conf.ZkNodes) == 0 { - return errors.Errorf("zookeeper nodes must not be empty") - } - if conf.ZkNodes, err = common.ParseHosts(conf.ZkNodes); err != nil { - return err + if conf.Keeper == model.ClickhouseKeeper { + if conf.KeeperConf == nil { + return errors.Errorf("keeper conf must not be empty") + } + if conf.KeeperConf.Runtime == model.KeeperRuntimeStandalone { + if conf.KeeperConf.KeeperNodes, err = common.ParseHosts(conf.KeeperConf.KeeperNodes); err != nil { + return err + } + if len(conf.KeeperConf.KeeperNodes) == 0 { + return errors.Errorf("keeper nodes must not be empty") + } + } else if conf.KeeperConf.Runtime == model.KeeperRuntimeInternal { + conf.KeeperConf.KeeperNodes = make([]string, len(conf.Hosts)) + copy(conf.KeeperConf.KeeperNodes, conf.Hosts) + } else { + return errors.Errorf("keeper runtime %s is not supported", conf.KeeperConf.Runtime) + } + if err := checkAccess(conf.KeeperConf.LogPath, conf); err != nil { + return errors.Wrapf(err, "check access error") + } + if err := checkAccess(conf.KeeperConf.SnapshotPath, conf); err != nil { + return errors.Wrapf(err, "check access error") + } + } else { + if len(conf.ZkNodes) == 0 { + return errors.Errorf("zookeeper nodes must not be empty") + } + if conf.ZkNodes, err = common.ParseHosts(conf.ZkNodes); err != nil { + return err + } } if conf.LogicCluster != nil { if conf.Cluster == *conf.LogicCluster { diff --git a/controller/package.go b/controller/package.go index f54f617f..d6e927bc 100644 --- a/controller/package.go +++ b/controller/package.go @@ -48,10 +48,6 @@ func NewPackageController(config *config.CKManConfig, wrapfunc Wrapfunc) *Packag // @Success 200 {string} json "{"code":"0000","msg":"success","data":null}" // @Router /api/v2/package [post] func (controller *PackageController) Upload(c *gin.Context) { - kind := c.Query("pkgKind") - if kind == "" { - kind = "clickhouse" - } localFile, err := ParserFormData(c.Request) if err != nil { controller.wrapfunc(c, model.E_UPLOAD_FAILED, err) @@ -67,14 +63,14 @@ func (controller *PackageController) Upload(c *gin.Context) { for _, peer := range config.GetClusterPeers() { peerUrl := "" if controller.config.Server.Https { - peerUrl = fmt.Sprintf("https://%s:%d/api/v1/package?pkgKind=%s", peer.Ip, peer.Port, kind) + peerUrl = fmt.Sprintf("https://%s:%d/api/v1/package", peer.Ip, peer.Port) err = UploadFileByURL(peerUrl, localFile) if err != nil { controller.wrapfunc(c, model.E_UPLOAD_FAILED, err) return } } else { - peerUrl = fmt.Sprintf("http://%s:%d/api/v1/package?pkgKind=%s", peer.Ip, peer.Port, kind) + peerUrl = fmt.Sprintf("http://%s:%d/api/v1/package", peer.Ip, peer.Port) err = UploadFileByURL(peerUrl, localFile) if err != nil { controller.wrapfunc(c, model.E_UPLOAD_FAILED, err) @@ -83,13 +79,8 @@ func (controller *PackageController) Upload(c *gin.Context) { } } } - if kind == "clickhouse" { - err = common.LoadPackages() - } else if kind == "keeper" { - err = common.LoadKeeperPackages() - } else { - err = fmt.Errorf("invalid package kind") - } + + err = common.LoadPackages() if err != nil { controller.wrapfunc(c, model.E_UPLOAD_FAILED, err) return @@ -186,21 +177,12 @@ func UploadFileByURL(url string, localFile string) error { // @Success 200 {string} json "{"code":"0000","msg":"ok","data":[{"version":"22.3.9.19","pkgType":"x86_64.rpm","pkgName":"clickhouse-common-static-22.3.9.19.x86_64.rpm"}]}" // @Router /api/v2/package [get] func (controller *PackageController) List(c *gin.Context) { - pkgKind := c.Query("pkgKind") pkgType := c.Query("pkgType") if pkgType == "" { pkgType = model.PkgTypeDefault } - if pkgKind == "" { - pkgKind = "clickhouse" - } - var pkgs map[string]common.CkPackageFiles - if pkgKind == "clickhouse" { - pkgs = common.GetAllPackages() - } else if pkgKind == "keeper" { - pkgs = common.GetAllKeeperPackages() - } + pkgs := common.GetAllPackages() var resp []model.PkgInfo if pkgType == "all" { for k, v := range pkgs { @@ -243,32 +225,11 @@ func (controller *PackageController) List(c *gin.Context) { func (controller *PackageController) Delete(c *gin.Context) { packageVersion := c.Query("packageVersion") packageType := c.Query("pkgType") - kind := c.Query("pkgKind") - if kind == "" { - kind = "clickhouse" - } - if kind == "clickhouse" { - packages := deploy.BuildPackages(packageVersion, packageType, "") - for _, packageName := range packages.PkgLists { - if err := os.Remove(path.Join(config.GetWorkDirectory(), common.DefaultPackageDirectory, packageName)); err != nil { - controller.wrapfunc(c, model.E_FILE_NOT_EXIST, err) - return - } - } - } else if kind == "keeper" { - pkgs := common.GetAllKeeperPackages() - LOOP: - for _, pkg := range pkgs { - for _, p := range pkg { - if p.Version == packageVersion && p.Module == packageType { - packageName := p.PkgName - if err := os.Remove(path.Join(config.GetWorkDirectory(), common.DefaultKeeperDirectory, packageName)); err != nil { - controller.wrapfunc(c, model.E_FILE_NOT_EXIST, err) - return - } - break LOOP - } - } + packages := deploy.BuildPackages(packageVersion, packageType, "") + for _, packageName := range packages.PkgLists { + if err := os.Remove(path.Join(config.GetWorkDirectory(), common.DefaultPackageDirectory, packageName)); err != nil { + controller.wrapfunc(c, model.E_FILE_NOT_EXIST, err) + return } } @@ -282,14 +243,14 @@ func (controller *PackageController) Delete(c *gin.Context) { for _, peer := range config.GetClusterPeers() { peerUrl := "" if controller.config.Server.Https { - peerUrl = fmt.Sprintf("https://%s:%d/api/v1/package?packageVersion=%s&packageType=%s&packageKind=%s", peer.Ip, peer.Port, packageVersion, packageType, kind) + peerUrl = fmt.Sprintf("https://%s:%d/api/v1/package?packageVersion=%s&packageType=%s", peer.Ip, peer.Port, packageVersion, packageType) err := DeleteFileByURL(peerUrl) if err != nil { controller.wrapfunc(c, model.E_DATA_DELETE_FAILED, err) return } } else { - peerUrl = fmt.Sprintf("http://%s:%d/api/v1/package?packageVersion=%s&packageType=%s&packageKind=%s", peer.Ip, peer.Port, packageVersion, packageType, kind) + peerUrl = fmt.Sprintf("http://%s:%d/api/v1/package?packageVersion=%s&packageType=%s", peer.Ip, peer.Port, packageVersion, packageType) err := DeleteFileByURL(peerUrl) if err != nil { controller.wrapfunc(c, model.E_DATA_DELETE_FAILED, err) @@ -298,14 +259,8 @@ func (controller *PackageController) Delete(c *gin.Context) { } } } - var err error - if kind == "clickhouse" { - err = common.LoadPackages() - } else if kind == "keeper" { - err = common.LoadKeeperPackages() - } else { - err = fmt.Errorf("invalid package kind") - } + + err := common.LoadPackages() if err != nil { controller.wrapfunc(c, model.E_DATA_SELECT_FAILED, err) return diff --git a/controller/schema_ui.go b/controller/schema_ui.go index ddfd6508..c94fe1ca 100644 --- a/controller/schema_ui.go +++ b/controller/schema_ui.go @@ -30,13 +30,8 @@ var schemaHandleFunc = map[string]func() common.ConfigParams{ GET_SCHEMA_UI_REBALANCE: RegistRebalanceClusterSchema, } -func getPkgType(kind string) []common.Candidate { - var pkgs map[string]common.CkPackageFiles - if kind == "clickhouse" { - pkgs = common.GetAllPackages() - } else if kind == "keeper" { - pkgs = common.GetAllKeeperPackages() - } +func getPkgType() []common.Candidate { + pkgs := common.GetAllPackages() var lists []common.Candidate for pkgType := range pkgs { can := common.Candidate{ @@ -47,13 +42,8 @@ func getPkgType(kind string) []common.Candidate { return lists } -func getPkgLists(kind string) []common.Candidate { - var packages map[string]common.CkPackageFiles - if kind == "clickhouse" { - packages = common.GetAllPackages() - } else if kind == "keeper" { - packages = common.GetAllKeeperPackages() - } +func getPkgLists() []common.Candidate { + packages := common.GetAllPackages() var pkgLists []common.Candidate for _, pkgs := range packages { for _, pkg := range pkgs { @@ -184,8 +174,8 @@ func RegistCreateClusterSchema() common.ConfigParams { Default: "zookeeper", DescriptionZH: "如果使用clickhouse-keeper, 则默认由ckman托管;如果使用已有zookeeper或已经创建好的keeper集群,都视同zookeeper", Candidates: []common.Candidate{ - {Value: "zookeeper", LabelEN: "Zookeeper", LabelZH: "Zookeeper"}, - {Value: "clickhouse-keeper", LabelEN: "ClickHouse-Keeper", LabelZH: "ClickHouse-Keeper"}, + {Value: model.Zookeeper, LabelEN: "Zookeeper", LabelZH: "Zookeeper"}, + {Value: model.ClickhouseKeeper, LabelEN: "ClickHouse-Keeper", LabelZH: "ClickHouse-Keeper"}, }, }) @@ -200,56 +190,80 @@ func RegistCreateClusterSchema() common.ConfigParams { params.MustRegister(keeper, "Runtime", &common.Parameter{ LabelZH: "运行方式", LabelEN: "Runtime", - Default: "standalone", + Default: model.KeeperRuntimeStandalone, DescriptionZH: "如果单独部署,则和clickhouse-server 分开进程;如果内置,则和clickhouse-server放在一块", Candidates: []common.Candidate{ - {Value: "standalone", LabelEN: "Standalone", LabelZH: "单独部署"}, - {Value: "internal", LabelEN: "Internal", LabelZH: "内置"}, + {Value: model.KeeperRuntimeStandalone, LabelEN: "Standalone", LabelZH: "单独部署"}, + {Value: model.KeeperRuntimeInternal, LabelEN: "Internal", LabelZH: "内置"}, }, }) - params.MustRegister(keeper, "KeeperPkg", &common.Parameter{ - LabelZH: "Keeper版本", - LabelEN: "Keeper Package Name", - DescriptionZH: "需要部署的ClickHouse-Keeper集群的安装包", - DescriptionEN: "which package of clickhouse will deployed, need upload rpm package before", - Candidates: getPkgLists("keeper"), - Visiable: "Runtime == 'standalone'", - Filter: "\"KeeperPkg\".indexOf(KeeperPkgType) !== -1", - }) - params.MustRegister(keeper, "KeeperPkgType", &common.Parameter{ - LabelZH: "安装包类型", - LabelEN: "Package Type", - DescriptionZH: "安装包的类型,表示当前安装包是什么系统架构,什么压缩格式", - DescriptionEN: "The type of the installation package, indicating what system architecture and compression format", - Candidates: getPkgType("keeper"), - Visiable: "Runtime == 'standalone'", - }) params.MustRegister(keeper, "KeeperNodes", &common.Parameter{ LabelZH: "Keeper节点", LabelEN: "KeeperNodes", Visiable: "Runtime == 'standalone'", }) - params.MustRegister(keeper, "KeeperPort", &common.Parameter{ + params.MustRegister(keeper, "TcpPort", &common.Parameter{ LabelZH: "Keeper端口", - LabelEN: "KeeperPort", + LabelEN: "TcpPort", Default: "9181", }) + params.MustRegister(keeper, "RaftPort", &common.Parameter{ + LabelZH: "Raft通信端口", + LabelEN: "RaftPort", + Default: "9234", + }) params.MustRegister(keeper, "LogPath", &common.Parameter{ LabelZH: "Log路径", LabelEN: "LogPath", - Default: "/var/lib/clickhouse/coordination/logs", + Default: "/var/lib/", + Regexp: "^/.+/$", }) params.MustRegister(keeper, "SnapshotPath", &common.Parameter{ LabelZH: "Snapshot路径", LabelEN: "SnapshotPath", - Default: "/var/lib/clickhouse/coordination/snapshots", + Default: "/var/lib/", + Regexp: "^/.+/$", }) params.MustRegister(keeper, "Expert", &common.Parameter{ LabelZH: "专家配置", LabelEN: "Expert", Required: "false", }) + params.MustRegister(keeper, "Coordination", &common.Parameter{ + LabelZH: "协作配置", + LabelEN: "Coordination", + Required: "false", + }) + + var coordination model.Coordination + params.MustRegister(coordination, "OperationTimeoutMs", &common.Parameter{ + LabelZH: "OperationTimeoutMs", + LabelEN: "OperationTimeoutMs", + Default: "10000", + Required: "false", + }) + params.MustRegister(coordination, "SessionTimeoutMs", &common.Parameter{ + LabelZH: "SessionTimeoutMs", + LabelEN: "SessionTimeoutMs", + Default: "30000", + Required: "false", + }) + params.MustRegister(coordination, "ForceSync", &common.Parameter{ + LabelZH: "ForceSync", + LabelEN: "ForceSync", + Required: "false", + }) + params.MustRegister(coordination, "AutoForwarding", &common.Parameter{ + LabelZH: "AutoForwarding", + LabelEN: "AutoForwarding", + Required: "false", + }) + params.MustRegister(coordination, "Expert", &common.Parameter{ + LabelZH: "专家配置", + LabelEN: "Expert", + Required: "false", + }) params.MustRegister(conf, "ZkNodes", &common.Parameter{ LabelZH: "ZooKeeper集群结点列表", @@ -857,8 +871,8 @@ func RegistUpdateConfigSchema() common.ConfigParams { params.MustRegister(conf, "Keeper", &common.Parameter{ DescriptionZH: "如果使用clickhouse-keeper, 则默认由ckman托管;如果使用已有zookeeper或已经创建好的keeper集群,都视同zookeeper", Candidates: []common.Candidate{ - {Value: "zookeeper", LabelEN: "Zookeeper", LabelZH: "Zookeeper"}, - {Value: "clickhouse-keeper", LabelEN: "ClickHouse-Keeper", LabelZH: "ClickHouse-Keeper"}, + {Value: model.Zookeeper, LabelEN: "Zookeeper", LabelZH: "Zookeeper"}, + {Value: model.ClickhouseKeeper, LabelEN: "ClickHouse-Keeper", LabelZH: "ClickHouse-Keeper"}, }, Editable: "false", }) @@ -876,44 +890,74 @@ func RegistUpdateConfigSchema() common.ConfigParams { LabelEN: "Runtime", DescriptionZH: "如果单独部署,则和clickhouse-server 分开进程;如果内置,则和clickhouse-server放在一块", Candidates: []common.Candidate{ - {Value: "standalone", LabelEN: "Standalone", LabelZH: "单独部署"}, - {Value: "internal", LabelEN: "Internal", LabelZH: "内置"}, + {Value: model.KeeperRuntimeStandalone, LabelEN: "Standalone", LabelZH: "单独部署"}, + {Value: model.KeeperRuntimeInternal, LabelEN: "Internal", LabelZH: "内置"}, }, + Editable: "false", }) params.MustRegister(keeper, "KeeperNodes", &common.Parameter{ LabelZH: "Keeper节点", LabelEN: "KeeperNodes", - Visiable: "Runtime == 'standalone'", - }) - - params.MustRegister(keeper, "KeeperVersion", &common.Parameter{ - LabelZH: "Keeper版本", - LabelEN: "KeeperVersion", - Visiable: "Runtime == 'standalone'", - }) - params.MustRegister(keeper, "KeeperNodes", &common.Parameter{ - LabelZH: "Keeper节点", - LabelEN: "KeeperNodes", - Visiable: "Runtime == 'standalone'", + Editable: "false", }) - params.MustRegister(keeper, "KeeperPort", &common.Parameter{ + params.MustRegister(keeper, "TcpPort", &common.Parameter{ LabelZH: "Keeper端口", - LabelEN: "KeeperPort", + LabelEN: "TcpPort", + }) + params.MustRegister(keeper, "RaftPort", &common.Parameter{ + LabelZH: "Raft通信端口", + LabelEN: "RaftPort", }) params.MustRegister(keeper, "LogPath", &common.Parameter{ - LabelZH: "Log路径", - LabelEN: "LogPath", + LabelZH: "Log路径", + LabelEN: "LogPath", + Editable: "false", }) params.MustRegister(keeper, "SnapshotPath", &common.Parameter{ - LabelZH: "Snapshot路径", - LabelEN: "SnapshotPath", + LabelZH: "Snapshot路径", + LabelEN: "SnapshotPath", + Editable: "false", }) params.MustRegister(keeper, "Expert", &common.Parameter{ LabelZH: "专家配置", LabelEN: "Expert", Required: "false", }) + params.MustRegister(keeper, "Coordination", &common.Parameter{ + LabelZH: "协作配置", + LabelEN: "Coordination", + Required: "false", + }) + + var coordination model.Coordination + params.MustRegister(coordination, "OperationTimeoutMs", &common.Parameter{ + LabelZH: "OperationTimeoutMs", + LabelEN: "OperationTimeoutMs", + Default: "10000", + Required: "false", + }) + params.MustRegister(coordination, "SessionTimeoutMs", &common.Parameter{ + LabelZH: "SessionTimeoutMs", + LabelEN: "SessionTimeoutMs", + Default: "30000", + Required: "false", + }) + params.MustRegister(coordination, "ForceSync", &common.Parameter{ + LabelZH: "ForceSync", + LabelEN: "ForceSync", + Required: "false", + }) + params.MustRegister(coordination, "AutoForwarding", &common.Parameter{ + LabelZH: "AutoForwarding", + LabelEN: "AutoForwarding", + Required: "false", + }) + params.MustRegister(coordination, "Expert", &common.Parameter{ + LabelZH: "专家配置", + LabelEN: "Expert", + Required: "false", + }) params.MustRegister(conf, "ZkNodes", &common.Parameter{ LabelZH: "ZooKeeper集群结点列表", @@ -1545,7 +1589,7 @@ func GetSchemaParams(typo string, conf model.CKManClickHouseConfig) common.Confi LabelEN: "Package Name", DescriptionZH: "需要部署的ClickHouse集群的安装包版本,只显示common安装包,但需提前上传common、server、client安装包", DescriptionEN: "which package of clickhouse will deployed, need upload rpm package before", - Candidates: getPkgLists("clickhouse"), + Candidates: getPkgLists(), Filter: "\"PkgName\".indexOf(PkgType) !== -1", }) @@ -1554,7 +1598,7 @@ func GetSchemaParams(typo string, conf model.CKManClickHouseConfig) common.Confi LabelEN: "Package Type", DescriptionZH: "安装包的类型,表示当前安装包是什么系统架构,什么压缩格式", DescriptionEN: "The type of the installation package, indicating what system architecture and compression format", - Candidates: getPkgType("clickhouse"), + Candidates: getPkgType(), }) } return params diff --git a/controller/zookeeper.go b/controller/zookeeper.go index 009c91df..d1bd0722 100644 --- a/controller/zookeeper.go +++ b/controller/zookeeper.go @@ -47,12 +47,14 @@ func (controller *ZookeeperController) GetStatus(c *gin.Context) { return } - zkList := make([]model.ZkStatusRsp, len(conf.ZkNodes)) - for index, node := range conf.ZkNodes { + nodes, port := zookeeper.GetZkInfo(&conf) + + zkList := make([]model.ZkStatusRsp, len(nodes)) + for index, node := range nodes { tmp := model.ZkStatusRsp{ Host: node, } - body, err := zookeeper.ZkMetric(node, conf.ZkPort, "mntr") + body, err := zookeeper.ZkMetric(node, port, "mntr") if err != nil { controller.wrapfunc(c, model.E_ZOOKEEPER_ERROR, fmt.Sprintf("get zookeeper node %s satus fail: %v", node, err)) return diff --git a/deploy/base.go b/deploy/base.go index 043e92a8..61577881 100644 --- a/deploy/base.go +++ b/deploy/base.go @@ -3,6 +3,7 @@ package deploy type Packages struct { PkgLists []string Cwd string + Keeper string } type DeployBase struct { diff --git a/deploy/ck.go b/deploy/ck.go index 98070832..c5ff445a 100644 --- a/deploy/ck.go +++ b/deploy/ck.go @@ -178,7 +178,7 @@ func (d *CKDeploy) Install() error { d.Conf.Normalize() cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) cmds := make([]string, 0) - cmds = append(cmds, cmdIns.InstallCmd(d.Packages)) + cmds = append(cmds, cmdIns.InstallCmd(CkSvrName, d.Packages)) cmds = append(cmds, fmt.Sprintf("rm -rf %s", path.Join(d.Conf.Path, "clickhouse"))) cmds = append(cmds, fmt.Sprintf("mkdir -p %s", path.Join(d.Conf.Path, "clickhouse"))) if d.Conf.NeedSudo { @@ -243,7 +243,7 @@ func (d *CKDeploy) Uninstall() error { d.Conf.Normalize() cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) cmds := make([]string, 0) - cmds = append(cmds, cmdIns.Uninstall(d.Packages, d.Conf.Version)) + cmds = append(cmds, cmdIns.Uninstall(CkSvrName, d.Packages, d.Conf.Version)) cmds = append(cmds, fmt.Sprintf("rm -rf %s", path.Join(d.Conf.Path, "clickhouse"))) if d.Conf.NeedSudo { cmds = append(cmds, "rm -rf /etc/clickhouse-server") @@ -284,7 +284,7 @@ func (d *CKDeploy) Uninstall() error { func (d *CKDeploy) Upgrade() error { d.Conf.Normalize() cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) - cmd := cmdIns.UpgradeCmd(d.Packages) + cmd := cmdIns.UpgradeCmd(CkSvrName, d.Packages) var lastError error var wg sync.WaitGroup for _, host := range d.Conf.Hosts { @@ -391,6 +391,30 @@ func (d *CKDeploy) Config() error { } confFiles = append(confFiles, hostXml) + var keeperFile common.TempFile + if d.Conf.Keeper == model.ClickhouseKeeper && !d.Conf.KeeperWithStanalone() { + var err error + keeperFile, err = common.NewTempFile(path.Join(config.GetWorkDirectory(), "package"), "keeper_config") + if err != nil { + lastError = err + return + } + defer os.Remove(keeperFile.FullName) + idx := 0 + for i, kn := range d.Conf.KeeperConf.KeeperNodes { + if kn == innerHost { + idx = i + break + } + } + keeperXml, err := ckconfig.GenerateKeeperXML(keeperFile.FullName, d.Conf, d.Ext.Ipv6Enable, idx+1) + if err != nil { + lastError = err + return + } + confFiles = append(confFiles, keeperXml) + } + if err := common.ScpUploadFiles(confFiles, path.Join(remotePath, "config.d"), sshOpts); err != nil { lastError = err return @@ -403,6 +427,9 @@ func (d *CKDeploy) Config() error { cmds := make([]string, 0) cmds = append(cmds, fmt.Sprintf("mv %s %s", path.Join(remotePath, "config.d", hostFile.BaseName), path.Join(remotePath, "config.d", "host.xml"))) cmds = append(cmds, fmt.Sprintf("mv %s %s", path.Join(remotePath, "users.d", usersFile.BaseName), path.Join(remotePath, "users.d", "users.xml"))) + if d.Conf.Keeper == model.ClickhouseKeeper && !d.Conf.KeeperWithStanalone() { + cmds = append(cmds, fmt.Sprintf("mv %s %s", path.Join(remotePath, "config.d", keeperFile.BaseName), path.Join(remotePath, "config.d", "keeper_config.xml"))) + } if d.Conf.NeedSudo { cmds = append(cmds, "chown -R clickhouse:clickhouse /etc/clickhouse-server") } @@ -806,6 +833,7 @@ func BuildPackages(version, pkgType, cwd string) Packages { if !ok { return Packages{} } + var keeper string for _, pkg := range pkgs.(common.CkPackageFiles) { if pkg.Version == version { if pkg.Module == common.PkgModuleCommon { @@ -814,6 +842,8 @@ func BuildPackages(version, pkgType, cwd string) Packages { pkgLists[1] = pkg.PkgName } else if pkg.Module == common.PkgModuleClient { pkgLists[2] = pkg.PkgName + } else if pkg.Module == common.PkgModuleKeeper { + keeper = pkg.PkgName } } } @@ -821,5 +851,6 @@ func BuildPackages(version, pkgType, cwd string) Packages { return Packages{ PkgLists: pkgLists, Cwd: cwd, + Keeper: keeper, } } diff --git a/deploy/cmd.go b/deploy/cmd.go index 0795f44c..85df0140 100644 --- a/deploy/cmd.go +++ b/deploy/cmd.go @@ -10,9 +10,9 @@ type CmdAdpt interface { StartCmd(svr, cwd string) string StopCmd(svr, cwd string) string RestartCmd(svr, cwd string) string - InstallCmd(pkgs Packages) string - UpgradeCmd(pkgs Packages) string - Uninstall(pkgs Packages, version string) string + InstallCmd(svr string, pkgs Packages) string + UpgradeCmd(svr string, pkgs Packages) string + Uninstall(svr string, pkgs Packages, version string) string } type CmdFactory interface { diff --git a/deploy/cmd_deb.go b/deploy/cmd_deb.go index 4e42a1da..7ccd41f8 100644 --- a/deploy/cmd_deb.go +++ b/deploy/cmd_deb.go @@ -26,20 +26,25 @@ func (p *DebPkg) RestartCmd(svr, cwd string) string { return "service " + svr + " restart" } -func (p *DebPkg) InstallCmd(pkgs Packages) string { - for idx, pkg := range pkgs.PkgLists { - pkgs.PkgLists[idx] = path.Join(common.TmpWorkDirectory, pkg) +func (p *DebPkg) InstallCmd(svr string, pkgs Packages) string { + if svr == CkSvrName { + for idx, pkg := range pkgs.PkgLists { + pkgs.PkgLists[idx] = path.Join(common.TmpWorkDirectory, pkg) + } + return "DEBIAN_FRONTEND=noninteractive dpkg -i " + strings.Join(pkgs.PkgLists, " ") + } else { + return "dpkg -i " + pkgs.Keeper } - return "DEBIAN_FRONTEND=noninteractive dpkg -i " + strings.Join(pkgs.PkgLists, " ") } -func (p *DebPkg) UpgradeCmd(pkgs Packages) string { - for idx, pkg := range pkgs.PkgLists { - pkgs.PkgLists[idx] = path.Join(common.TmpWorkDirectory, pkg) - } - return "DEBIAN_FRONTEND=noninteractive dpkg -i " + strings.Join(pkgs.PkgLists, " ") +func (p *DebPkg) UpgradeCmd(svr string, pkgs Packages) string { + return p.InstallCmd(svr, pkgs) } -func (p *DebPkg) Uninstall(pkgs Packages, version string) string { - return "dpkg -P clickhouse-client clickhouse-common-static clickhouse-server" +func (p *DebPkg) Uninstall(svr string, pkgs Packages, version string) string { + if svr == CkSvrName { + return "dpkg -P clickhouse-client clickhouse-common-static clickhouse-server" + } else { + return "dpkg -P clickhouse-keeper" + } } diff --git a/deploy/cmd_pkg_test.go b/deploy/cmd_pkg_test.go index 34a248a6..341997d9 100644 --- a/deploy/cmd_pkg_test.go +++ b/deploy/cmd_pkg_test.go @@ -37,7 +37,7 @@ func TestTgzPkg_InstallCmd(t *testing.T) { Cwd: "/home/eoi/clickhouse", } p := TgzFacotry{}.Create() - out := p.InstallCmd(pkgs) + out := p.InstallCmd(CkSvrName, pkgs) expect := `mkdir -p /home/eoi/clickhouse/bin /home/eoi/clickhouse/etc/clickhouse-server/config.d /home/eoi/clickhouse/etc/clickhouse-server/users.d /home/eoi/clickhouse/log/clickhouse-server /home/eoi/clickhouse/run /home/eoi/clickhouse/data/clickhouse;tar -xvf /tmp/clickhouse-common-static-22.3.6.5-amd64.tgz -C /tmp;cp -rf /tmp/clickhouse-common-static-22.3.6.5/usr/bin/* /home/eoi/clickhouse/bin;tar -xvf /tmp/clickhouse-server-22.3.6.5-amd64.tgz -C /tmp;cp -rf /tmp/clickhouse-server-22.3.6.5/usr/bin/* /home/eoi/clickhouse/bin;cp -rf /tmp/clickhouse-server-22.3.6.5/etc/clickhouse-* /home/eoi/clickhouse/etc/;tar -xvf /tmp/clickhouse-client-22.3.6.5-amd64.tgz -C /tmp;cp -rf /tmp/clickhouse-client-22.3.6.5/usr/bin/* /home/eoi/clickhouse/bin;cp -rf /tmp/clickhouse-client-22.3.6.5/etc/clickhouse-* /home/eoi/clickhouse/etc/` assert.Equal(t, expect, out) } @@ -53,7 +53,7 @@ func TestTgzPkg_UninstallCmd(t *testing.T) { } p := TgzFacotry{}.Create() expect := "rm -rf /home/eoi/clickhouse/*" - out := p.Uninstall(pkgs, "22.3.6.5") + out := p.Uninstall(CkSvrName, pkgs, "22.3.6.5") assert.Equal(t, expect, out) } @@ -67,7 +67,7 @@ func TestTgzPkg_UpgradeCmd(t *testing.T) { Cwd: "/home/eoi/clickhouse", } p := TgzFacotry{}.Create() - out := p.UpgradeCmd(pkgs) + out := p.UpgradeCmd(CkSvrName, pkgs) expect := `mkdir -p /home/eoi/clickhouse/bin /home/eoi/clickhouse/etc/clickhouse-server/config.d /home/eoi/clickhouse/etc/clickhouse-server/users.d /home/eoi/clickhouse/log/clickhouse-server /home/eoi/clickhouse/run /home/eoi/clickhouse/data/clickhouse;tar -xvf /tmp/clickhouse-common-static-22.3.6.5-amd64.tgz -C /tmp;cp -rf /tmp/clickhouse-common-static-22.3.6.5/usr/bin/* /home/eoi/clickhouse/bin;tar -xvf /tmp/clickhouse-server-22.3.6.5-amd64.tgz -C /tmp;cp -rf /tmp/clickhouse-server-22.3.6.5/usr/bin/* /home/eoi/clickhouse/bin;cp -rf /tmp/clickhouse-server-22.3.6.5/etc/clickhouse-* /home/eoi/clickhouse/etc/;tar -xvf /tmp/clickhouse-client-22.3.6.5-amd64.tgz -C /tmp;cp -rf /tmp/clickhouse-client-22.3.6.5/usr/bin/* /home/eoi/clickhouse/bin;cp -rf /tmp/clickhouse-client-22.3.6.5/etc/clickhouse-* /home/eoi/clickhouse/etc/` assert.Equal(t, expect, out) } @@ -102,7 +102,7 @@ func TestRpmPkg_InstallCmd(t *testing.T) { }, } p := RpmFacotry{}.Create() - out := p.InstallCmd(pkgs) + out := p.InstallCmd(CkSvrName, pkgs) expect := `DEBIAN_FRONTEND=noninteractive rpm --force --nosignature --nodeps -ivh /tmp/clickhouse-common-static-22.3.6.5-amd64.tgz;DEBIAN_FRONTEND=noninteractive rpm --force --nosignature --nodeps -ivh /tmp/clickhouse-server-22.3.6.5-amd64.tgz;DEBIAN_FRONTEND=noninteractive rpm --force --nosignature --nodeps -ivh /tmp/clickhouse-client-22.3.6.5-amd64.tgz` assert.Equal(t, expect, out) } @@ -116,7 +116,7 @@ func TestRpmPkg_UninstallCmd(t *testing.T) { }, } p := RpmFacotry{}.Create() - out := p.Uninstall(pkgs, "22.3.6.5") + out := p.Uninstall(CkSvrName, pkgs, "22.3.6.5") expect := `rpm -e $(rpm -qa |grep clickhouse |grep 22.3.6.5)` assert.Equal(t, expect, out) } @@ -130,7 +130,7 @@ func TestRpmPkg_UpgradeCmd(t *testing.T) { }, } p := RpmFacotry{}.Create() - out := p.UpgradeCmd(pkgs) + out := p.UpgradeCmd(CkSvrName, pkgs) expect := `DEBIAN_FRONTEND=noninteractive rpm --force --nosignature --nodeps -Uvh /tmp/clickhouse-common-static-22.3.6.5-amd64.tgz;DEBIAN_FRONTEND=noninteractive rpm --force --nosignature --nodeps -Uvh /tmp/clickhouse-server-22.3.6.5-amd64.tgz;DEBIAN_FRONTEND=noninteractive rpm --force --nosignature --nodeps -Uvh /tmp/clickhouse-client-22.3.6.5-amd64.tgz` assert.Equal(t, expect, out) } @@ -165,7 +165,7 @@ func TestDebPkg_InstallCmd(t *testing.T) { }, } p := DebFacotry{}.Create() - out := p.InstallCmd(pkgs) + out := p.InstallCmd(CkSvrName, pkgs) expect := `DEBIAN_FRONTEND=noninteractive dpkg -i /tmp/clickhouse-common-static-22.3.6.5-amd64.tgz /tmp/clickhouse-server-22.3.6.5-amd64.tgz /tmp/clickhouse-client-22.3.6.5-amd64.tgz` assert.Equal(t, expect, out) } @@ -179,7 +179,7 @@ func TestDebPkg_UninstallCmd(t *testing.T) { }, } p := DebFacotry{}.Create() - out := p.Uninstall(pkgs, "22.3.6.5") + out := p.Uninstall(CkSvrName, pkgs, "22.3.6.5") expect := `dpkg -P clickhouse-client clickhouse-common-static clickhouse-server` assert.Equal(t, expect, out) } @@ -194,7 +194,7 @@ func TestDebPkg_UpgradeCmd(t *testing.T) { Cwd: "/home/eoi/clickhouse", } p := DebFacotry{}.Create() - out := p.UpgradeCmd(pkgs) + out := p.UpgradeCmd(CkSvrName, pkgs) expect := `DEBIAN_FRONTEND=noninteractive dpkg -i /tmp/clickhouse-common-static-22.3.6.5-amd64.tgz /tmp/clickhouse-server-22.3.6.5-amd64.tgz /tmp/clickhouse-client-22.3.6.5-amd64.tgz` assert.Equal(t, expect, out) } diff --git a/deploy/cmd_rpm.go b/deploy/cmd_rpm.go index 89a72d53..70b2e0b3 100644 --- a/deploy/cmd_rpm.go +++ b/deploy/cmd_rpm.go @@ -31,22 +31,34 @@ func (p *RpmPkg) RestartCmd(svr, cwd string) string { return "systemctl restart " + svr } -func (p *RpmPkg) InstallCmd(pkgs Packages) string { +func (p *RpmPkg) InstallCmd(svr string, pkgs Packages) string { var cmd string - for _, pkg := range pkgs.PkgLists { - cmd += fmt.Sprintf("%s -ivh %s;", rpmPrefix, path.Join(common.TmpWorkDirectory, pkg)) + if svr == CkSvrName { + for _, pkg := range pkgs.PkgLists { + cmd += fmt.Sprintf("%s -ivh %s;", rpmPrefix, path.Join(common.TmpWorkDirectory, pkg)) + } + } else if svr == KeeperSvrName { + cmd += fmt.Sprintf("%s -ivh %s;", rpmPrefix, path.Join(common.TmpWorkDirectory, pkgs.Keeper)) } return strings.TrimSuffix(cmd, ";") } -func (p *RpmPkg) UpgradeCmd(pkgs Packages) string { +func (p *RpmPkg) UpgradeCmd(svr string, pkgs Packages) string { var cmd string - for _, pkg := range pkgs.PkgLists { - cmd += fmt.Sprintf("%s -Uvh %s;", rpmPrefix, path.Join(common.TmpWorkDirectory, pkg)) + if svr == CkSvrName { + for _, pkg := range pkgs.PkgLists { + cmd += fmt.Sprintf("%s -Uvh %s;", rpmPrefix, path.Join(common.TmpWorkDirectory, pkg)) + } + } else if svr == KeeperSvrName { + cmd += fmt.Sprintf("%s -Uvh %s;", rpmPrefix, path.Join(common.TmpWorkDirectory, pkgs.Keeper)) } return strings.TrimSuffix(cmd, ";") } -func (p *RpmPkg) Uninstall(pkgs Packages, version string) string { - return fmt.Sprintf("rpm -e $(rpm -qa |grep clickhouse |grep %s)", version) +func (p *RpmPkg) Uninstall(svr string, pkgs Packages, version string) string { + if svr == KeeperSvrName { + return fmt.Sprintf("rpm -e $(rpm -qa |grep clickhouse-keeper |grep %s)", version) + } else { + return fmt.Sprintf("rpm -e $(rpm -qa |grep clickhouse |grep %s)", version) + } } diff --git a/deploy/cmd_tgz.go b/deploy/cmd_tgz.go index bd365ee7..0bbdb7e0 100644 --- a/deploy/cmd_tgz.go +++ b/deploy/cmd_tgz.go @@ -26,30 +26,32 @@ func (p *TgzPkg) RestartCmd(svr, cwd string) string { return p.StopCmd(svr, cwd) + ";" + p.StartCmd(svr, cwd) } -func (p *TgzPkg) InstallCmd(pkgs Packages) string { +func (p *TgzPkg) InstallCmd(svr string, pkgs Packages) string { content := fmt.Sprintf("mkdir -p %s/bin %s/etc/clickhouse-server/config.d %s/etc/clickhouse-server/users.d %s/log/clickhouse-server %s/run %s/data/clickhouse;", pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd, pkgs.Cwd) - for _, pkg := range pkgs.PkgLists { + if svr == CkSvrName { + for _, pkg := range pkgs.PkgLists { + lastIndex := strings.LastIndex(pkg, "-") + extractDir := pkg[:lastIndex] + content += fmt.Sprintf("tar -xvf /tmp/%s -C /tmp;", pkg) + content += fmt.Sprintf("cp -rf /tmp/%s/usr/bin/* %s/bin;", extractDir, pkgs.Cwd) + if !strings.Contains(extractDir, common.PkgModuleCommon) { + content += fmt.Sprintf("cp -rf /tmp/%s/etc/clickhouse-* %s/etc/;", extractDir, pkgs.Cwd) + } + } + } else if svr == KeeperSvrName { + pkg := pkgs.Keeper lastIndex := strings.LastIndex(pkg, "-") extractDir := pkg[:lastIndex] content += fmt.Sprintf("tar -xvf /tmp/%s -C /tmp;", pkg) content += fmt.Sprintf("cp -rf /tmp/%s/usr/bin/* %s/bin;", extractDir, pkgs.Cwd) - if !strings.Contains(extractDir, common.PkgModuleCommon) { - content += fmt.Sprintf("cp -rf /tmp/%s/etc/clickhouse-* %s/etc/;", extractDir, pkgs.Cwd) - } } - //content += fmt.Sprintf(`echo "PATH=$PATH:%s/bin" > %s/.profile;`, pkgs.Cwd, pkgs.Cwd) - //content += fmt.Sprintf(`echo source %s/.profile >> ${HOME}/.bash_profile;`, pkgs.Cwd) - //content += fmt.Sprintf("source ${HOME}/.bash_profile;") - //content += "useradd clickhouse;" - //content += "groupadd clickhouse;" - //content += fmt.Sprintf("chown -R clickhouse:clickhouse %s", pkgs.Cwd) return strings.TrimSuffix(content, ";") } -func (p *TgzPkg) UpgradeCmd(pkgs Packages) string { - return p.InstallCmd(pkgs) +func (p *TgzPkg) UpgradeCmd(svr string, pkgs Packages) string { + return p.InstallCmd(svr, pkgs) } -func (p *TgzPkg) Uninstall(pkgs Packages, version string) string { +func (p *TgzPkg) Uninstall(svr string, pkgs Packages, version string) string { return fmt.Sprintf("rm -rf %s/*", pkgs.Cwd) } diff --git a/deploy/keeper.go b/deploy/keeper.go new file mode 100644 index 00000000..0e4585b0 --- /dev/null +++ b/deploy/keeper.go @@ -0,0 +1,503 @@ +package deploy + +import ( + "encoding/gob" + "fmt" + "os" + "path" + "strings" + "sync" + "time" + + "github.com/housepower/ckman/ckconfig" + "github.com/housepower/ckman/service/zookeeper" + + "github.com/housepower/ckman/config" + "github.com/pkg/errors" + + "github.com/housepower/ckman/common" + "github.com/housepower/ckman/log" + "github.com/housepower/ckman/model" +) + +func init() { + gob.Register(KeeperDeploy{}) +} + +const ( + KeeperSvrName string = "clickhouse-keeper" +) + +type KeeperDeploy struct { + DeployBase + Conf *model.CKManClickHouseConfig + HostInfos []ckconfig.HostInfo + Ext model.CkDeployExt +} + +func NewKeeperDeploy(conf model.CKManClickHouseConfig, packages Packages) *KeeperDeploy { + return &KeeperDeploy{ + Conf: &conf, + DeployBase: DeployBase{ + Packages: packages, + }, + } +} + +func (d *KeeperDeploy) Init() error { + d.Conf.Normalize() + d.HostInfos = make([]ckconfig.HostInfo, len(d.Conf.KeeperConf.KeeperNodes)) + var lastError error + var wg sync.WaitGroup + d.Ext.Ipv6Enable = true + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + kpath := "" + if !d.Conf.NeedSudo { + kpath = path.Join(d.Conf.Cwd, d.Conf.Path, "clickhouse-keeper") + } else { + kpath = path.Join(d.Conf.Path, "clickhouse-keeper") + } + cmd1 := fmt.Sprintf("mkdir -p %s ; chown -R clickhouse:clickhouse %s", kpath, kpath) + _, err := common.RemoteExecute(sshOpts, cmd1) + if err != nil { + lastError = err + return + } + if d.Ext.Ipv6Enable { + cmd2 := "grep lo /proc/net/if_inet6 >/dev/null 2>&1; echo $?" + output, err := common.RemoteExecute(sshOpts, cmd2) + if err != nil { + lastError = err + return + } + + ipv6Enable := strings.Trim(output, "\n") + if ipv6Enable != "0" { + //file not exists, return 2, file exists but empty, return 1 + d.Ext.Ipv6Enable = false + } + } + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("init done") + return nil +} + +func (d *KeeperDeploy) Prepare() error { + d.Conf.Normalize() + file := path.Join(config.GetWorkDirectory(), common.DefaultPackageDirectory, d.Packages.Keeper) + + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + if err := common.ScpUploadFiles([]string{file}, common.TmpWorkDirectory, sshOpts); err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s prepare done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("prepare done") + return nil +} + +func (d *KeeperDeploy) Install() error { + d.Conf.Normalize() + cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) + cmds := make([]string, 0) + cmds = append(cmds, cmdIns.InstallCmd(KeeperSvrName, d.Packages)) + cmds = append(cmds, fmt.Sprintf("rm -rf %s/* %s/*", d.Conf.KeeperConf.LogPath, d.Conf.KeeperConf.SnapshotPath)) + if d.Conf.NeedSudo { + cmds = append(cmds, fmt.Sprintf("chown clickhouse.clickhouse %s %s -R", d.Conf.KeeperConf.LogPath, d.Conf.KeeperConf.SnapshotPath)) + } + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + cmd1 := cmdIns.StopCmd(KeeperSvrName, d.Conf.Cwd) + _, _ = common.RemoteExecute(sshOpts, cmd1) + + cmd2 := strings.Join(cmds, ";") + _, err := common.RemoteExecute(sshOpts, cmd2) + if err != nil { + lastError = err + return + } + if d.Conf.Cwd != "" { + //tgz deployment, try to add auto start + pkg := d.Packages.Keeper + lastIndex := strings.LastIndex(pkg, "-") + extractDir := pkg[:lastIndex] + + cmd3 := fmt.Sprintf("cp /tmp/%s/etc/init.d/clickhouse-keeper /etc/init.d/;", extractDir) + cmd3 += fmt.Sprintf("cp /tmp/%s/lib/systemd/system/clickhouse-keeper.service /etc/systemd/system/", extractDir) + sshOpts.NeedSudo = true + _, err = common.RemoteExecute(sshOpts, cmd3) + if err != nil { + log.Logger.Warnf("try to config autorestart failed:%v", err) + } + } + + log.Logger.Debugf("host %s install done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("install done") + return nil +} + +func (d *KeeperDeploy) Uninstall() error { + d.Conf.Normalize() + cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) + cmds := make([]string, 0) + cmds = append(cmds, cmdIns.Uninstall(KeeperSvrName, d.Packages, d.Conf.Version)) + cmds = append(cmds, fmt.Sprintf("rm -rf %s %s", d.Conf.KeeperConf.LogPath, d.Conf.KeeperConf.SnapshotPath)) + if d.Conf.NeedSudo { + cmds = append(cmds, "rm -rf /etc/clickhouse-keeper") + } + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + cmd := strings.Join(cmds, ";") + _, err := common.RemoteExecute(sshOpts, cmd) + if err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s uninstall done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("uninstall done") + return nil +} + +func (d *KeeperDeploy) Upgrade() error { + d.Conf.Normalize() + cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) + cmd := cmdIns.UpgradeCmd(KeeperSvrName, d.Packages) + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + _, err := common.RemoteExecute(sshOpts, cmd) + if err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s upgrade done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("upgrade done") + return nil +} + +func (d *KeeperDeploy) Config() error { + d.Conf.Normalize() + confFiles := make([]string, 0) + + var remotePath string + if d.Conf.NeedSudo { + remotePath = "/etc/clickhouse-keeper" + } else { + remotePath = path.Join(d.Conf.Cwd, "etc", "clickhouse-keeper") + } + var lastError error + var wg sync.WaitGroup + for index, host := range d.Conf.KeeperConf.KeeperNodes { + innerIndex := index + innerHost := host + confFiles := confFiles + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + if d.Conf.NeedSudo { + //clear config first + cmd := "cp /etc/clickhouse-keeper/keeper_config.xml /etc/clickhouse-keeper/keeper_config.xml.last" + if _, err := common.RemoteExecute(sshOpts, cmd); err != nil { + lastError = err + return + } + } + + keeperFile, err := common.NewTempFile(path.Join(config.GetWorkDirectory(), "package"), "keeper_config") + if err != nil { + lastError = err + return + } + defer os.Remove(keeperFile.FullName) + keeperXml, err := ckconfig.GenerateKeeperXML(keeperFile.FullName, d.Conf, d.Ext.Ipv6Enable, innerIndex+1) + if err != nil { + lastError = err + return + } + confFiles = append(confFiles, keeperXml) + + if err := common.ScpUploadFiles(confFiles, remotePath, sshOpts); err != nil { + lastError = err + return + } + + cmds := make([]string, 0) + cmds = append(cmds, fmt.Sprintf("mv %s %s", path.Join(remotePath, keeperFile.BaseName), path.Join(remotePath, "keeper_config.xml"))) + if d.Conf.NeedSudo { + cmds = append(cmds, "chown -R clickhouse:clickhouse /etc/clickhouse-keeper") + } + cmd := strings.Join(cmds, ";") + if _, err = common.RemoteExecute(sshOpts, cmd); err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s config done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("config done") + return nil +} + +func (d *KeeperDeploy) Start() error { + d.Conf.Normalize() + cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + // if strings.HasSuffix(d.Conf.PkgType, common.PkgSuffixTgz) { + // // try to modify ulimit nofiles + // sshOpts.NeedSudo = true + // cmds := []string{ + // fmt.Sprintf("sed -i '/%s soft nofile/d' /etc/security/limits.conf", d.Conf.SshUser), + // fmt.Sprintf("sed -i '/%s hard nofile/d' /etc/security/limits.conf", d.Conf.SshUser), + // fmt.Sprintf("echo \"%s soft nofile 500000\" >> /etc/security/limits.conf", d.Conf.SshUser), + // fmt.Sprintf("echo \"%s hard nofile 500000\" >> /etc/security/limits.conf", d.Conf.SshUser), + // } + // _, err := common.RemoteExecute(sshOpts, strings.Join(cmds, ";")) + // if err != nil { + // log.Logger.Warnf("[%s] set ulimit -n failed: %v", host, err) + // } + // sshOpts.NeedSudo = d.Conf.NeedSudo + // } + + cmd := cmdIns.StartCmd(KeeperSvrName, d.Conf.Cwd) + _, err := common.RemoteExecute(sshOpts, cmd) + if err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s start done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("start done") + return nil +} + +func (d *KeeperDeploy) Stop() error { + d.Conf.Normalize() + cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + cmd := cmdIns.StopCmd(KeeperSvrName, d.Conf.Cwd) + _, err := common.RemoteExecute(sshOpts, cmd) + if err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s stop done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("stop done") + return nil +} + +func (d *KeeperDeploy) Restart() error { + d.Conf.Normalize() + cmdIns := GetSuitableCmdAdpt(d.Conf.PkgType) + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + sshOpts := common.SshOptions{ + User: d.Conf.SshUser, + Password: d.Conf.SshPassword, + Port: d.Conf.SshPort, + Host: innerHost, + NeedSudo: d.Conf.NeedSudo, + AuthenticateType: d.Conf.AuthenticateType, + } + cmd := cmdIns.RestartCmd(KeeperSvrName, d.Conf.Cwd) + _, err := common.RemoteExecute(sshOpts, cmd) + if err != nil { + lastError = err + return + } + log.Logger.Debugf("host %s restart done", innerHost) + }) + } + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("restart done") + return nil +} + +func (d *KeeperDeploy) Check(timeout int) error { + d.Conf.Normalize() + var lastError error + var wg sync.WaitGroup + for _, host := range d.Conf.KeeperConf.KeeperNodes { + innerHost := host + wg.Add(1) + _ = common.Pool.Submit(func() { + defer wg.Done() + // Golang <-time.After() is not garbage collected before expiry. + ticker := time.NewTicker(5 * time.Second) + ticker2 := time.NewTicker(time.Duration(timeout) * time.Second) + defer ticker.Stop() + defer ticker2.Stop() + for { + select { + case <-ticker.C: + res, err := zookeeper.ZkMetric(innerHost, d.Conf.KeeperConf.TcpPort, "ruok") + if err == nil && string(res) == "imok" { + log.Logger.Debugf("host %s check done", innerHost) + return + } + case <-ticker2.C: + lastError = errors.Wrapf(model.CheckTimeOutErr, "clickhouse-keeper may start failed, please check the clickhouse-keeper log") + return + } + } + }) + } + + wg.Wait() + if lastError != nil { + return lastError + } + log.Logger.Infof("check done") + return nil +} diff --git a/docker-compose.yml b/docker-compose.yml index 72a9b85f..04d73def 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ version: "3" services: node1: - image: eoitek/ckman-clickhouse:centos-7 + image: ckman-clickhouse:centos-7 hostname: node1 privileged: true extra_hosts: @@ -16,7 +16,7 @@ services: ipv4_address: 192.168.122.101 node2: - image: eoitek/ckman-clickhouse:centos-7 + image: ckman-clickhouse:centos-7 hostname: node2 privileged: true extra_hosts: @@ -31,7 +31,7 @@ services: ipv4_address: 192.168.122.102 node3: - image: eoitek/ckman-clickhouse:centos-7 + image: ckman-clickhouse:centos-7 hostname: node3 privileged: true extra_hosts: @@ -46,7 +46,7 @@ services: ipv4_address: 192.168.122.103 node4: - image: eoitek/ckman-clickhouse:centos-7 + image: ckman-clickhouse:centos-7 hostname: node4 privileged: true extra_hosts: @@ -61,7 +61,7 @@ services: ipv4_address: 192.168.122.104 node5: - image: eoitek/ckman-clickhouse:centos-7 + image: ckman-clickhouse:centos-7 hostname: node5 privileged: true extra_hosts: @@ -76,7 +76,7 @@ services: ipv4_address: 192.168.122.105 node6: - image: eoitek/ckman-clickhouse:centos-7 + image: ckman-clickhouse:centos-7 hostname: node6 privileged: true extra_hosts: diff --git a/docs/docs.go b/docs/docs.go index 4f5f6324..d649c8a7 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -214,7 +214,7 @@ var doc = `{ ], "responses": { "200": { - "description": "{\"code\":\"0000\",\"msg\":\"ok\", \"data\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"zkStatusPort\":8080,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}", + "description": "{\"code\":\"0000\",\"msg\":\"ok\", \"data\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}", "schema": { "type": "string" } @@ -530,7 +530,7 @@ var doc = `{ ], "responses": { "200": { - "description": "{\"code\":\"0000\",\"msg\":\"success\",\"data\":{\"test\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"zkStatusPort\":8080,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}}}", + "description": "{\"code\":\"0000\",\"msg\":\"success\",\"data\":{\"test\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}}}", "schema": { "type": "string" } @@ -2777,6 +2777,13 @@ var doc = `{ "type": "boolean", "example": true }, + "keeper": { + "type": "string", + "example": "zookeeper" + }, + "keeperConf": { + "$ref": "#/definitions/model.KeeperConf" + }, "logic_cluster": { "type": "string", "example": "logic_test" @@ -2805,6 +2812,9 @@ var doc = `{ "type": "string", "example": "127.0.0.1" }, + "promMetricPort": { + "$ref": "#/definitions/model.PromMetricPort" + }, "promPort": { "type": "integer", "example": 9090 @@ -2853,10 +2863,6 @@ var doc = `{ "zkPort": { "type": "integer", "example": 2181 - }, - "zkStatusPort": { - "type": "integer", - "example": 8080 } } }, @@ -2927,10 +2933,6 @@ var doc = `{ "zkPort": { "type": "integer", "example": 2181 - }, - "zkStatusPort": { - "type": "integer", - "example": 8080 } } }, @@ -3052,6 +3054,29 @@ var doc = `{ } } }, + "model.Coordination": { + "type": "object", + "properties": { + "autoForwarding": { + "type": "boolean" + }, + "expert": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "forceSync": { + "type": "boolean" + }, + "operationTimeoutMs": { + "type": "integer" + }, + "sessionTimeoutMs": { + "type": "integer" + } + } + }, "model.CreateCkTableReq": { "type": "object", "properties": { @@ -3245,6 +3270,49 @@ var doc = `{ } } }, + "model.KeeperConf": { + "type": "object", + "properties": { + "coordination": { + "$ref": "#/definitions/model.Coordination" + }, + "expert": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "keeperNodes": { + "type": "array", + "items": { + "type": "string" + }, + "example": [ + "192.168.101.102", + "192.168.101.105", + "192.168.101.107" + ] + }, + "logPath": { + "type": "string" + }, + "raftPort": { + "type": "integer", + "example": 9234 + }, + "runtime": { + "type": "string", + "example": "standalone" + }, + "snapshotPath": { + "type": "string" + }, + "tcpPort": { + "type": "integer", + "example": 9181 + } + } + }, "model.LoginReq": { "type": "object", "properties": { @@ -3362,6 +3430,20 @@ var doc = `{ } } }, + "model.PromMetricPort": { + "type": "object", + "properties": { + "clickHouse": { + "type": "integer" + }, + "nodeExport": { + "type": "integer" + }, + "zooKeeper": { + "type": "integer" + } + } + }, "model.PurgerTableReq": { "type": "object", "properties": { diff --git a/docs/swagger.json b/docs/swagger.json index 8e6227b4..47258536 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -198,7 +198,7 @@ ], "responses": { "200": { - "description": "{\"code\":\"0000\",\"msg\":\"ok\", \"data\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"zkStatusPort\":8080,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}", + "description": "{\"code\":\"0000\",\"msg\":\"ok\", \"data\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}", "schema": { "type": "string" } @@ -514,7 +514,7 @@ ], "responses": { "200": { - "description": "{\"code\":\"0000\",\"msg\":\"success\",\"data\":{\"test\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"zkStatusPort\":8080,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}}}", + "description": "{\"code\":\"0000\",\"msg\":\"success\",\"data\":{\"test\":{\"mode\":\"import\",\"hosts\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\",\"192.168.0.4\"],\"names\":[\"node1\",\"node2\",\"node3\",\"node4\"],\"port\":9000,\"httpPort\":8123,\"user\":\"ck\",\"password\":\"123456\",\"database\":\"default\",\"cluster\":\"test\",\"zkNodes\":[\"192.168.0.1\",\"192.168.0.2\",\"192.168.0.3\"],\"zkPort\":2181,\"isReplica\":true,\"version\":\"20.8.5.45\",\"sshUser\":\"\",\"sshPassword\":\"\",\"shards\":[{\"replicas\":[{\"ip\":\"192.168.0.1\",\"hostname\":\"node1\"},{\"ip\":\"192.168.0.2\",\"hostname\":\"node2\"}]},{\"replicas\":[{\"ip\":\"192.168.0.3\",\"hostname\":\"node3\"},{\"ip\":\"192.168.0.4\",\"hostname\":\"node4\"}]}],\"path\":\"\"}}}}", "schema": { "type": "string" } @@ -2761,6 +2761,13 @@ "type": "boolean", "example": true }, + "keeper": { + "type": "string", + "example": "zookeeper" + }, + "keeperConf": { + "$ref": "#/definitions/model.KeeperConf" + }, "logic_cluster": { "type": "string", "example": "logic_test" @@ -2789,6 +2796,9 @@ "type": "string", "example": "127.0.0.1" }, + "promMetricPort": { + "$ref": "#/definitions/model.PromMetricPort" + }, "promPort": { "type": "integer", "example": 9090 @@ -2837,10 +2847,6 @@ "zkPort": { "type": "integer", "example": 2181 - }, - "zkStatusPort": { - "type": "integer", - "example": 8080 } } }, @@ -2911,10 +2917,6 @@ "zkPort": { "type": "integer", "example": 2181 - }, - "zkStatusPort": { - "type": "integer", - "example": 8080 } } }, @@ -3036,6 +3038,29 @@ } } }, + "model.Coordination": { + "type": "object", + "properties": { + "autoForwarding": { + "type": "boolean" + }, + "expert": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "forceSync": { + "type": "boolean" + }, + "operationTimeoutMs": { + "type": "integer" + }, + "sessionTimeoutMs": { + "type": "integer" + } + } + }, "model.CreateCkTableReq": { "type": "object", "properties": { @@ -3229,6 +3254,49 @@ } } }, + "model.KeeperConf": { + "type": "object", + "properties": { + "coordination": { + "$ref": "#/definitions/model.Coordination" + }, + "expert": { + "type": "object", + "additionalProperties": { + "type": "string" + } + }, + "keeperNodes": { + "type": "array", + "items": { + "type": "string" + }, + "example": [ + "192.168.101.102", + "192.168.101.105", + "192.168.101.107" + ] + }, + "logPath": { + "type": "string" + }, + "raftPort": { + "type": "integer", + "example": 9234 + }, + "runtime": { + "type": "string", + "example": "standalone" + }, + "snapshotPath": { + "type": "string" + }, + "tcpPort": { + "type": "integer", + "example": 9181 + } + } + }, "model.LoginReq": { "type": "object", "properties": { @@ -3346,6 +3414,20 @@ } } }, + "model.PromMetricPort": { + "type": "object", + "properties": { + "clickHouse": { + "type": "integer" + }, + "nodeExport": { + "type": "integer" + }, + "zooKeeper": { + "type": "integer" + } + } + }, "model.PurgerTableReq": { "type": "object", "properties": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 991f48c9..dbabaedb 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -162,6 +162,11 @@ definitions: isReplica: example: true type: boolean + keeper: + example: zookeeper + type: string + keeperConf: + $ref: '#/definitions/model.KeeperConf' logic_cluster: example: logic_test type: string @@ -183,6 +188,8 @@ definitions: promHost: example: 127.0.0.1 type: string + promMetricPort: + $ref: '#/definitions/model.PromMetricPort' promPort: example: 9090 type: integer @@ -219,9 +226,6 @@ definitions: zkPort: example: 2181 type: integer - zkStatusPort: - example: 8080 - type: integer type: object model.CkImportConfig: properties: @@ -273,9 +277,6 @@ definitions: zkPort: example: 2181 type: integer - zkStatusPort: - example: 8080 - type: integer type: object model.CkTableNameType: properties: @@ -364,6 +365,21 @@ definitions: example: true type: boolean type: object + model.Coordination: + properties: + autoForwarding: + type: boolean + expert: + additionalProperties: + type: string + type: object + forceSync: + type: boolean + operationTimeoutMs: + type: integer + sessionTimeoutMs: + type: integer + type: object model.CreateCkTableReq: properties: database: @@ -498,6 +514,36 @@ definitions: description: minmax, set, bloom_filter, ngrambf_v1, tokenbf_v1 type: string type: object + model.KeeperConf: + properties: + coordination: + $ref: '#/definitions/model.Coordination' + expert: + additionalProperties: + type: string + type: object + keeperNodes: + example: + - 192.168.101.102 + - 192.168.101.105 + - 192.168.101.107 + items: + type: string + type: array + logPath: + type: string + raftPort: + example: 9234 + type: integer + runtime: + example: standalone + type: string + snapshotPath: + type: string + tcpPort: + example: 9181 + type: integer + type: object model.LoginReq: properties: password: @@ -576,6 +622,15 @@ definitions: sql: type: string type: object + model.PromMetricPort: + properties: + clickHouse: + type: integer + nodeExport: + type: integer + zooKeeper: + type: integer + type: object model.PurgerTableReq: properties: begin: @@ -790,7 +845,7 @@ paths: type: string responses: "200": - description: '{"code":"0000","msg":"ok", "data":{"mode":"import","hosts":["192.168.0.1","192.168.0.2","192.168.0.3","192.168.0.4"],"names":["node1","node2","node3","node4"],"port":9000,"httpPort":8123,"user":"ck","password":"123456","database":"default","cluster":"test","zkNodes":["192.168.0.1","192.168.0.2","192.168.0.3"],"zkPort":2181,"zkStatusPort":8080,"isReplica":true,"version":"20.8.5.45","sshUser":"","sshPassword":"","shards":[{"replicas":[{"ip":"192.168.0.1","hostname":"node1"},{"ip":"192.168.0.2","hostname":"node2"}]},{"replicas":[{"ip":"192.168.0.3","hostname":"node3"},{"ip":"192.168.0.4","hostname":"node4"}]}],"path":""}}' + description: '{"code":"0000","msg":"ok", "data":{"mode":"import","hosts":["192.168.0.1","192.168.0.2","192.168.0.3","192.168.0.4"],"names":["node1","node2","node3","node4"],"port":9000,"httpPort":8123,"user":"ck","password":"123456","database":"default","cluster":"test","zkNodes":["192.168.0.1","192.168.0.2","192.168.0.3"],"zkPort":2181,"isReplica":true,"version":"20.8.5.45","sshUser":"","sshPassword":"","shards":[{"replicas":[{"ip":"192.168.0.1","hostname":"node1"},{"ip":"192.168.0.2","hostname":"node2"}]},{"replicas":[{"ip":"192.168.0.3","hostname":"node3"},{"ip":"192.168.0.4","hostname":"node4"}]}],"path":""}}' schema: type: string security: @@ -971,7 +1026,7 @@ paths: type: string responses: "200": - description: '{"code":"0000","msg":"success","data":{"test":{"mode":"import","hosts":["192.168.0.1","192.168.0.2","192.168.0.3","192.168.0.4"],"names":["node1","node2","node3","node4"],"port":9000,"httpPort":8123,"user":"ck","password":"123456","database":"default","cluster":"test","zkNodes":["192.168.0.1","192.168.0.2","192.168.0.3"],"zkPort":2181,"zkStatusPort":8080,"isReplica":true,"version":"20.8.5.45","sshUser":"","sshPassword":"","shards":[{"replicas":[{"ip":"192.168.0.1","hostname":"node1"},{"ip":"192.168.0.2","hostname":"node2"}]},{"replicas":[{"ip":"192.168.0.3","hostname":"node3"},{"ip":"192.168.0.4","hostname":"node4"}]}],"path":""}}}}' + description: '{"code":"0000","msg":"success","data":{"test":{"mode":"import","hosts":["192.168.0.1","192.168.0.2","192.168.0.3","192.168.0.4"],"names":["node1","node2","node3","node4"],"port":9000,"httpPort":8123,"user":"ck","password":"123456","database":"default","cluster":"test","zkNodes":["192.168.0.1","192.168.0.2","192.168.0.3"],"zkPort":2181,"isReplica":true,"version":"20.8.5.45","sshUser":"","sshPassword":"","shards":[{"replicas":[{"ip":"192.168.0.1","hostname":"node1"},{"ip":"192.168.0.2","hostname":"node2"}]},{"replicas":[{"ip":"192.168.0.3","hostname":"node3"},{"ip":"192.168.0.4","hostname":"node4"}]}],"path":""}}}}' schema: type: string security: diff --git a/frontend b/frontend index d62b5213..b5e87a74 160000 --- a/frontend +++ b/frontend @@ -1 +1 @@ -Subproject commit d62b52138a05e24320a520f9ed9f702450e4290a +Subproject commit b5e87a748a10307e5fd03c81d7ec128f460b5aac diff --git a/model/deploy_ck.go b/model/deploy_ck.go index 66f73aa4..b15c4ae8 100644 --- a/model/deploy_ck.go +++ b/model/deploy_ck.go @@ -33,6 +33,13 @@ const ( SshPasswordUsePubkey int = 2 MaxTimeOut int = 3600 + + ClickhouseKeeper string = "clickhouse-keeper" + Zookeeper string = "zookeeper" + ClickHouse string = "clickhouse" + + KeeperRuntimeStandalone = "standalone" + KeeperRuntimeInternal = "internal" ) type CkDeployExt struct { @@ -111,16 +118,23 @@ type CKManClickHouseConfig struct { NeedSudo bool `json:"needSudo" swaggerignore:"true"` } +type Coordination struct { + OperationTimeoutMs int + SessionTimeoutMs int + ForceSync bool + AutoForwarding bool + Expert map[string]string +} + type KeeperConf struct { - Runtime string `json:"runtime" example:"standalone"` - KeeperPkgType string `json:"keeperPkgType" example:"x86_64.rpm"` - KeeperPkg string `json:"keeperPkg" example:"clickhouse-keeper-22.3.3.44.noarch.rpm"` - KeeperVersion string `json:"keeperVersion" example:"22.3.3.44"` - KeeperNodes []string `json:"keeperNodes" example:"192.168.101.102,192.168.101.105,192.168.101.107"` - KeeperPort int `json:"keeperPort" example:"9181"` - LogPath string - SnapshotPath string - Expert map[string]string + Runtime string `json:"runtime" example:"standalone"` + KeeperNodes []string `json:"keeperNodes" example:"192.168.101.102,192.168.101.105,192.168.101.107"` + TcpPort int `json:"tcpPort" example:"9181"` + RaftPort int `json:"raftPort" example:"9234"` + LogPath string + SnapshotPath string + Coordination Coordination + Expert map[string]string } // Refers to https://clickhouse.tech/docs/en/engines/table-engines/mergetree-family/mergetree/#table_engine-mergetree-multiple-volumes @@ -270,7 +284,7 @@ func (config *CKManClickHouseConfig) Normalize() { } if config.Keeper == "" { - config.Keeper = "zookeeper" + config.Keeper = Zookeeper } if !strings.HasSuffix(config.PkgType, "tgz") { @@ -368,3 +382,10 @@ func (config *CKManClickHouseConfig) GetConnOption() ConnetOption { opt.Password = config.Password return opt } + +func (config *CKManClickHouseConfig) KeeperWithStanalone() bool { + if config.Keeper == ClickhouseKeeper { + return config.KeeperConf != nil && config.KeeperConf.Runtime == KeeperRuntimeStandalone + } + return false +} diff --git a/model/task.go b/model/task.go index b2aad096..4ee3eac5 100644 --- a/model/task.go +++ b/model/task.go @@ -31,6 +31,8 @@ const ( TaskTypeKeeperDeploy string = "keeper.deploy" TaskTypeKeeperUpgrade string = "keeper.upgrade" + TaskTypeKeeperDestory string = "keeper.destory" + TaskTypeKeeperSetting string = "keeper.setting" ALL_NODES_DEFAULT string = "all_hosts" ) diff --git a/service/runner/ck.go b/service/runner/ck.go index 741bd69f..f0a21ec3 100644 --- a/service/runner/ck.go +++ b/service/runner/ck.go @@ -56,6 +56,9 @@ func DestroyCkCluster(task *model.Task, d deploy.CKDeploy, conf *model.CKManClic return errors.Wrapf(err, "[%s]", model.NodeStatusInstall.EN) } + if d.Conf.Keeper == model.ClickhouseKeeper { + return nil + } //clear zkNode deploy.SetNodeStatus(task, model.NodeStatusClearData, model.ALL_NODES_DEFAULT) service, err := zookeeper.GetZkService(conf.Cluster) @@ -194,9 +197,21 @@ func DeleteCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, ip if err := d.Config(); err != nil { return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN) } + if d.Ext.Restart { + if err := d.Restart(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusRestart.EN) + } + if err := d.Check(300); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN) + } + } conf.Hosts = hosts conf.Shards = shards + if d.Conf.Keeper == model.ClickhouseKeeper && d.Conf.KeeperConf.Runtime == model.KeeperRuntimeInternal { + conf.KeeperConf.KeeperNodes = make([]string, len(d.Conf.Hosts)) + copy(conf.KeeperConf.KeeperNodes, d.Conf.Hosts) + } return nil } @@ -235,6 +250,10 @@ func AddCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, d *de deploy.SetNodeStatus(task, model.NodeStatusConfigExt, model.ALL_NODES_DEFAULT) d2 := deploy.NewCkDeploy(*conf) d2.Conf.Shards = d.Conf.Shards + if conf.Keeper == model.ClickhouseKeeper && conf.KeeperConf.Runtime == model.KeeperRuntimeInternal { + d2.Conf.KeeperConf.KeeperNodes = make([]string, len(conf.Hosts)+len(d.Conf.Hosts)) + copy(d2.Conf.KeeperConf.KeeperNodes, append(conf.Hosts, d.Conf.Hosts...)) + } if err := d2.Init(); err != nil { return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN) } @@ -242,8 +261,22 @@ func AddCkClusterNode(task *model.Task, conf *model.CKManClickHouseConfig, d *de return errors.Wrapf(err, "[%s]", model.NodeStatusConfig.EN) } + if d.Ext.Restart { + if err := d2.Restart(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusConfigExt.EN) + } + + if err := d2.Check(300); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN) + } + } + conf.Shards = d.Conf.Shards conf.Hosts = append(conf.Hosts, d.Conf.Hosts...) + if conf.Keeper == model.ClickhouseKeeper && conf.KeeperConf.Runtime == model.KeeperRuntimeInternal { + conf.KeeperConf.KeeperNodes = make([]string, len(conf.Hosts)) + copy(conf.KeeperConf.KeeperNodes, conf.Hosts) + } return nil } @@ -260,7 +293,7 @@ func UpgradeCkCluster(task *model.Task, d deploy.CKDeploy) error { } case model.PolicyFull: err := upgradePackage(task, d, 10) - if err != model.CheckTimeOutErr { + if err != nil && err != model.CheckTimeOutErr { return err } default: @@ -345,7 +378,7 @@ func ConfigCkCluster(task *model.Task, d deploy.CKDeploy) error { case model.PolicyFull: deploy.SetNodeStatus(task, model.NodeStatusRestart, model.ALL_NODES_DEFAULT) err := d.Restart() - if err != model.CheckTimeOutErr { + if err != nil && err != model.CheckTimeOutErr { return err } _ = d.Check(30) diff --git a/service/runner/handle.go b/service/runner/handle.go index fe485a8f..4c06aecb 100644 --- a/service/runner/handle.go +++ b/service/runner/handle.go @@ -47,6 +47,14 @@ func CKDeployHandle(task *model.Task) error { return err } + if d.Conf.KeeperWithStanalone() { + task.TaskType = model.TaskTypeKeeperDeploy + if err := DeployKeeperCluster(task, d); err != nil { + return err + } + task.TaskType = model.TaskTypeCKDeploy + } + if err := DeployCkCluster(task, d); err != nil { return err } @@ -116,6 +124,14 @@ func CKDestoryHandle(task *model.Task) error { return err } + if d.Conf.KeeperWithStanalone() { + task.TaskType = model.TaskTypeKeeperDestory + if err = DestroyKeeperCluster(task, d, &conf); err != nil { + return err + } + task.TaskType = model.TaskTypeCKDestory + } + deploy.SetNodeStatus(task, model.NodeStatusStore, model.ALL_NODES_DEFAULT) if err = repository.Ps.Begin(); err != nil { return err @@ -258,6 +274,14 @@ func CKUpgradeHandle(task *model.Task) error { return nil } + if d.Conf.KeeperWithStanalone() { + task.TaskType = model.TaskTypeKeeperUpgrade + if err = UpgradeKeeperCluster(task, d); err != nil { + return err + } + task.TaskType = model.TaskTypeCKUpgrade + } + err = UpgradeCkCluster(task, d) if err != nil { return err @@ -278,6 +302,14 @@ func CKSettingHandle(task *model.Task) error { return err } + if d.Conf.KeeperWithStanalone() { + task.TaskType = model.TaskTypeKeeperSetting + if err := ConfigKeeperCluster(task, d); err != nil { + return err + } + task.TaskType = model.TaskTypeCKSetting + } + if err := ConfigCkCluster(task, d); err != nil { return err } diff --git a/service/runner/keeper.go b/service/runner/keeper.go new file mode 100644 index 00000000..dc945bf6 --- /dev/null +++ b/service/runner/keeper.go @@ -0,0 +1,171 @@ +package runner + +import ( + "fmt" + + "github.com/housepower/ckman/common" + "github.com/housepower/ckman/deploy" + "github.com/housepower/ckman/log" + "github.com/housepower/ckman/model" + "github.com/pkg/errors" +) + +func DeployKeeperCluster(task *model.Task, d deploy.CKDeploy) error { + kd := deploy.NewKeeperDeploy(*d.Conf, d.Packages) + deploy.SetNodeStatus(task, model.NodeStatusInit, model.ALL_NODES_DEFAULT) + if err := kd.Init(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusInit.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusPrepare, model.ALL_NODES_DEFAULT) + if err := kd.Prepare(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusPrepare.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusInstall, model.ALL_NODES_DEFAULT) + if err := kd.Install(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusInstall.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusConfig, model.ALL_NODES_DEFAULT) + if err := kd.Config(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusConfig.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusStart, model.ALL_NODES_DEFAULT) + if err := kd.Start(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusStart.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusCheck, model.ALL_NODES_DEFAULT) + if err := kd.Check(30); err != nil { + //return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN) + deploy.SetTaskStatus(task, model.TaskStatusFailed, err.Error()) + } + return nil +} + +func DestroyKeeperCluster(task *model.Task, d deploy.CKDeploy, conf *model.CKManClickHouseConfig) error { + kd := deploy.NewKeeperDeploy(*d.Conf, d.Packages) + deploy.SetNodeStatus(task, model.NodeStatusStop, model.ALL_NODES_DEFAULT) + _ = kd.Stop() + + deploy.SetNodeStatus(task, model.NodeStatusUninstall, model.ALL_NODES_DEFAULT) + if err := kd.Uninstall(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusInstall.EN) + } + + return nil +} + +func UpgradeKeeperCluster(task *model.Task, d deploy.CKDeploy) error { + kd := deploy.NewKeeperDeploy(*d.Conf, d.Packages) + switch d.Ext.Policy { + case model.PolicyRolling: + var rd deploy.KeeperDeploy + common.DeepCopyByGob(&rd, kd) + for _, host := range d.Conf.KeeperConf.KeeperNodes { + rd.Conf.KeeperConf.KeeperNodes = []string{host} + if err := upgradeKeeperPackage(task, rd, model.MaxTimeOut); err != nil { + return err + } + } + case model.PolicyFull: + err := upgradeKeeperPackage(task, *kd, 30) + if err != nil { + return err + } + default: + return fmt.Errorf("not support policy %s yet", d.Ext.Policy) + } + + return nil +} + +func upgradeKeeperPackage(task *model.Task, d deploy.KeeperDeploy, timeout int) error { + var node string + if d.Ext.Policy == model.PolicyRolling { + node = d.Conf.Hosts[0] + } else { + node = model.ALL_NODES_DEFAULT + } + + deploy.SetNodeStatus(task, model.NodeStatusInit, node) + if err := d.Init(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusInit.EN) + } + deploy.SetNodeStatus(task, model.NodeStatusStop, node) + if err := d.Stop(); err != nil { + log.Logger.Warnf("stop cluster %s failed: %v", d.Conf.Cluster, err) + } + + deploy.SetNodeStatus(task, model.NodeStatusPrepare, node) + if err := d.Prepare(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusPrepare.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusUpgrade, node) + if err := d.Upgrade(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusUpgrade.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusConfig, node) + if err := d.Config(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusConfig.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusStart, node) + if err := d.Start(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusStart.EN) + } + + deploy.SetNodeStatus(task, model.NodeStatusCheck, node) + if err := d.Check(timeout); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusCheck.EN) + } + deploy.SetNodeStatus(task, model.NodeStatusDone, node) + + return nil +} + +func ConfigKeeperCluster(task *model.Task, d deploy.CKDeploy) error { + kd := deploy.NewKeeperDeploy(*d.Conf, d.Packages) + deploy.SetNodeStatus(task, model.NodeStatusInit, model.ALL_NODES_DEFAULT) + if err := kd.Init(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusInit.EN) + } + deploy.SetNodeStatus(task, model.NodeStatusConfig, model.ALL_NODES_DEFAULT) + if err := kd.Config(); err != nil { + return errors.Wrapf(err, "[%s]", model.NodeStatusConfig.EN) + } + + if kd.Ext.Restart { + switch d.Ext.Policy { + case model.PolicyRolling: + var rd deploy.CKDeploy + common.DeepCopyByGob(&rd, kd) + for _, host := range rd.Conf.KeeperConf.KeeperNodes { + deploy.SetNodeStatus(task, model.NodeStatusRestart, host) + rd.Conf.KeeperConf.KeeperNodes = []string{host} + if err := rd.Restart(); err != nil { + return err + } + if err := rd.Check(model.MaxTimeOut); err != nil { + return err + } + deploy.SetNodeStatus(task, model.NodeStatusDone, host) + } + case model.PolicyFull: + deploy.SetNodeStatus(task, model.NodeStatusRestart, model.ALL_NODES_DEFAULT) + err := kd.Restart() + if err != nil && err != model.CheckTimeOutErr { + return err + } + _ = d.Check(30) + default: + return fmt.Errorf("not support policy %s yet", d.Ext.Policy) + } + + } + return nil +} diff --git a/service/zookeeper/zk_test.go b/service/zookeeper/zk_test.go index 4b627084..de3aad73 100644 --- a/service/zookeeper/zk_test.go +++ b/service/zookeeper/zk_test.go @@ -8,7 +8,7 @@ import ( ) func TestMetric(t *testing.T) { - resp, err := ZkMetric("192.168.101.94", 2181, "mntr") + resp, err := ZkMetric("192.168.122.101", 9181, "mntr") assert.Nil(t, err) fmt.Println(string(resp)) } diff --git a/service/zookeeper/zookeeper_service.go b/service/zookeeper/zookeeper_service.go index c71e5bee..fa109c2a 100644 --- a/service/zookeeper/zookeeper_service.go +++ b/service/zookeeper/zookeeper_service.go @@ -57,7 +57,8 @@ func GetZkService(clusterName string) (*ZkService, error) { } else { conf, err := repository.Ps.GetClusterbyName(clusterName) if err == nil { - service, err := NewZkService(conf.ZkNodes, conf.ZkPort) + nodes, port := GetZkInfo(&conf) + service, err := NewZkService(nodes, port) if err != nil { return nil, err } @@ -182,6 +183,9 @@ func ZkMetric(host string, port int, metric string) ([]byte, error) { resp[matches[1]] = matches[2] } } + if len(resp) == 0 { + return b, nil + } return json.Marshal(resp) } @@ -203,3 +207,16 @@ func GetZkClusterNodes(host string, port int) ([]string, error) { } return nodes, nil } + +func GetZkInfo(conf *model.CKManClickHouseConfig) ([]string, int) { + var nodes []string + var port int + if conf.Keeper == model.ClickhouseKeeper { + nodes = conf.KeeperConf.KeeperNodes + port = conf.KeeperConf.TcpPort + } else { + nodes = conf.ZkNodes + port = conf.ZkPort + } + return nodes, port +}