Skip to content

Commit

Permalink
clickhouse-keeper is ready
Browse files Browse the repository at this point in the history
  • Loading branch information
YenchangChan committed Jun 22, 2024
1 parent 0d104da commit eb45e8e
Show file tree
Hide file tree
Showing 36 changed files with 1,537 additions and 338 deletions.
4 changes: 2 additions & 2 deletions Dockerfile.test
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
# You can run command like: "docker build -f Dockerfile.test -t ckman-clickhouse:centos-7 ."
# the offical image is eoitek/ckman-clickhouse:centos-7, You can pull it from dockerhub.

#FROM centos:7
FROM ccr.ccs.tencentyun.com/library/centos:7
FROM centos:7
#FROM ccr.ccs.tencentyun.com/library/centos:7
WORKDIR /var/

RUN yum -y update && yum install -y openssh* \
Expand Down
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ GOARCH?=$(shell go env GOARCH)
TARNAME=${PKGDIR}-${VERSION}-${DATE}.${OS}.$(GOARCH).tar.gz
TAG?=$(shell date +%y%m%d)
LDFLAGS=-ldflags "-X main.BuildTimeStamp=${TIME} -X main.GitCommitHash=${REVISION} -X main.Version=${VERSION}"
GCFLAGS=-gcflags "all=-N -l"
PUB_KEY=$(shell cat resources/eoi_public_key.pub 2>/dev/null)
export GOPROXY=https://goproxy.cn,direct

Expand All @@ -33,6 +34,15 @@ backend:
go build ${LDFLAGS} -o znodefix cmd/znodefix/znodefix.go
go build ${LDFLAGS} -o znode_count cmd/znodecnt/znodecount.go

.PHONY: debug
debug:
@rm -rf ${PKGFULLDIR}
go build ${GCFLAGS} ${LDFLAGS}
go build ${GCFLAGS} ${LDFLAGS} -o ckmanpasswd cmd/password/password.go
go build ${GCFLAGS} ${LDFLAGS} -o migrate cmd/migrate/migrate.go
go build ${GCFLAGS} ${LDFLAGS} -o znodefix cmd/znodefix/znodefix.go
go build ${GCFLAGS} ${LDFLAGS} -o znode_count cmd/znodecnt/znodecount.go

.PHONY: pre
pre:
go mod tidy
Expand Down
93 changes: 93 additions & 0 deletions ckconfig/keeper.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package ckconfig

import (
"path"

"github.com/housepower/ckman/common"
"github.com/housepower/ckman/model"
"github.com/imdario/mergo"
)

func keeper_root(ipv6Enable bool) map[string]interface{} {
output := make(map[string]interface{})
if ipv6Enable {
output["listen_host"] = "::"
} else {
output["listen_host"] = "0.0.0.0"
}
output["max_connections"] = 4096
return output
}

func keeper_server(conf *model.CKManClickHouseConfig, ipv6Enable bool, idx int) map[string]interface{} {
output := keeper_root(ipv6Enable)
output["logger"] = keeper_logger(conf)
keeper_server := make(map[string]interface{})
mergo.Merge(&keeper_server, conf.KeeperConf.Expert)
keeper_server["tcp_port"] = conf.KeeperConf.TcpPort
keeper_server["server_id"] = idx
keeper_server["log_storage_path"] = conf.KeeperConf.LogPath + "clickhouse/coordination/logs"
keeper_server["snapshot_storage_path"] = conf.KeeperConf.SnapshotPath + "clickhouse/coordination/snapshots"
keeper_server["coordination_settings"] = coordination_settings(conf.KeeperConf.Coordination)
keeper_server["raft_configuration"] = raft_configuration(conf.KeeperConf)
output["keeper_server"] = keeper_server
return output
}

// https://github.com/ClickHouse/ClickHouse/blob/master/src/Coordination/CoordinationSettings.h
func coordination_settings(coordination model.Coordination) map[string]interface{} {
output := make(map[string]interface{})
mergo.Merge(&output, coordination.Expert)
output["operation_timeout_ms"] = coordination.OperationTimeoutMs
output["session_timeout_ms"] = coordination.SessionTimeoutMs
if coordination.ForceSync {
output["force_sync"] = "true"
} else {
output["force_sync"] = "false"
}
if coordination.AutoForwarding {
output["auto_forwarding"] = "true"
} else {
output["auto_forwarding"] = "false"
}
return output
}

func keeper_logger(conf *model.CKManClickHouseConfig) map[string]interface{} {
output := make(map[string]interface{})
output["level"] = "debug"
output["size"] = "1000M"
output["count"] = 10
if !conf.NeedSudo {
output["log"] = path.Join(conf.Cwd, "log", "clickhouse-keeper", "clickhouse-keeper.log")
output["errorlog"] = path.Join(conf.Cwd, "log", "clickhouse-keeper", "clickhouse-keeper.err.log")
}
return output

}

func raft_configuration(conf *model.KeeperConf) []map[string]interface{} {
var outputs []map[string]interface{}
for idx, node := range conf.KeeperNodes {
output := make(map[string]interface{})
output["server"] = map[string]interface{}{
"id": idx + 1,
"hostname": node,
"port": conf.RaftPort,
}
outputs = append(outputs, output)
}
return outputs
}

func GenerateKeeperXML(filename string, conf *model.CKManClickHouseConfig, ipv6Enable bool, idx int) (string, error) {
xml := common.NewXmlFile(filename)
rootTag := "clickhouse"
xml.Begin(rootTag)
xml.Merge(keeper_server(conf, ipv6Enable, idx))
xml.End(rootTag)
if err := xml.Dump(); err != nil {
return filename, err
}
return filename, nil
}
37 changes: 37 additions & 0 deletions ckconfig/keeper_fake.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
<clickhouse>
<keeper_server>
<coordination_settings>
<auto_forwarding>false</auto_forwarding>
<force_sync>true</force_sync>
<operation_timeout_ms>10000</operation_timeout_ms>
<raft_log_level>Information</raft_log_level>
<session_timeout_ms>30000</session_timeout_ms>
</coordination_settings>
<log_storage_path>/var/lib/clickhouse/coordination/log</log_storage_path>
<raft_configuration>
<server>
<hostname>192.168.101.102</hostname>
<id>1</id>
<port>9181</port>
</server>
<server>
<hostname>192.168.101.105</hostname>
<id>2</id>
<port>9181</port>
</server>
<server>
<hostname>192.168.101.107</hostname>
<id>3</id>
<port>9181</port>
</server>
</raft_configuration>
<server_id>2</server_id>
<snapshot_storage_path>/var/lib/clickhouse/coordination/snapshots</snapshot_storage_path>
<tcp_port>9181</tcp_port>
</keeper_server>
<listen_host>::</listen_host>
<logger>
<level>debug</level>
</logger>
<max_connections>4096</max_connections>
</clickhouse>
27 changes: 27 additions & 0 deletions ckconfig/keeper_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ckconfig

import (
"testing"

"github.com/housepower/ckman/model"
"github.com/stretchr/testify/assert"
)

func TestGenerateKeeperXML(t *testing.T) {
conf := model.CKManClickHouseConfig{
KeeperConf: &model.KeeperConf{
KeeperNodes: []string{"192.168.101.102", "192.168.101.105", "192.168.101.107"},
TcpPort: 9181,
RaftPort: 9234,
LogPath: "/var/lib/clickhouse/coordination/log",
SnapshotPath: "/var/lib/clickhouse/coordination/snapshots",
Coordination: model.Coordination{
OperationTimeoutMs: 10000,
SessionTimeoutMs: 30000,
ForceSync: true,
},
},
}
_, err := GenerateKeeperXML("keeper_fake.xml", &conf, true, 2)
assert.Nil(t, err)
}
25 changes: 4 additions & 21 deletions ckconfig/metrika.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ckconfig
import (
"github.com/housepower/ckman/common"
"github.com/housepower/ckman/model"
"github.com/housepower/ckman/service/zookeeper"
)

func GenerateMetrikaXML(filename string, conf *model.CKManClickHouseConfig) (string, error) {
Expand Down Expand Up @@ -40,10 +41,11 @@ func GenZookeeperMetrika(indent int, conf *model.CKManClickHouseConfig) string {
xml := common.NewXmlFile("")
xml.SetIndent(indent)
xml.Begin("zookeeper")
for index, zk := range conf.ZkNodes {
nodes, port := zookeeper.GetZkInfo(conf)
for index, zk := range nodes {
xml.BeginwithAttr("node", []common.XMLAttr{{Key: "index", Value: index + 1}})
xml.Write("host", zk)
xml.Write("port", conf.ZkPort)
xml.Write("port", port)
xml.End("node")
}
xml.End("zookeeper")
Expand All @@ -59,25 +61,6 @@ func GenLocalMetrika(indent int, conf *model.CKManClickHouseConfig) string {
secret = false
}
if secret {
xml.Comment(`Inter-server per-cluster secret for Distributed queries
default: no secret (no authentication will be performed)
If set, then Distributed queries will be validated on shards, so at least:
- such cluster should exist on the shard,
- such cluster should have the same secret.
And also (and which is more important), the initial_user will
be used as current user for the query.
Right now the protocol is pretty simple and it only takes into account:
- cluster name
- query
Also it will be nice if the following will be implemented:
- source hostname (see interserver_http_host), but then it will depends from DNS,
it can use IP address instead, but then the you need to get correct on the initiator node.
- target hostname / ip address (same notes as for source hostname)
- time-based security tokens`)
xml.Write("secret", "foo")
}
for _, shard := range conf.Shards {
Expand Down
3 changes: 2 additions & 1 deletion cmd/znodefix/znodefix.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ func main() {
if err != nil {
log.Logger.Fatalf("get cluster %s failed:%v", cmdOps.ClusterName, err)
}
service, err := zookeeper.NewZkService(cluster.ZkNodes, cluster.ZkPort)
nodes, port := zookeeper.GetZkInfo(&cluster)
service, err := zookeeper.NewZkService(nodes, port)
if err != nil {
log.Logger.Fatalf("can't create zookeeper instance:%v", err)
}
Expand Down
9 changes: 0 additions & 9 deletions common/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ func Decimal(value float64) float64 {
return value
}

func ArraySearch(target string, str_array []string) bool {
for _, str := range str_array {
if target == str {
return true
}
}
return false
}

func Md5CheckSum(s string) string {
sum := md5.Sum([]byte(s))
return hex.EncodeToString(sum[:16])
Expand Down
15 changes: 15 additions & 0 deletions common/math_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,3 +131,18 @@ func TestArraySearch(t *testing.T) {
"aaa", "bbb", "ccc", "kkk",
}))
}

func TestArrayRemove(t *testing.T) {
assert.Equal(t, []string{"aaa", "ccc", "kkk"}, ArrayRemove([]string{
"aaa", "bbb", "ccc", "kkk",
}, "bbb"))
assert.Equal(t, []int{1, 2, 3, 4}, ArrayRemove([]int{
1, 2, 3, 4, 5,
}, 5))

arr := []string{
"aaa", "bbb", "ccc", "kkk",
}
ArrayRemove(arr, "bbb")
assert.Equal(t, []string{"aaa", "bbb", "ccc", "kkk"}, arr)
}
66 changes: 2 additions & 64 deletions common/package.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,11 @@ import (

const (
DefaultPackageDirectory string = "package/clickhouse"
DefaultKeeperDirectory string = "package/keeper"

PkgModuleCommon string = "common"
PkgModuleClient string = "client"
PkgModuleServer string = "server"
PkgKeeper string = "keeper"
PkgModuleKeeper string = "keeper"

PkgSuffixRpm string = "rpm"
PkgSuffixTgz string = "tgz"
Expand All @@ -43,10 +42,7 @@ func (v CkPackageFiles) Less(i, j int) bool {
return CompareClickHouseVersion(v[i].Version, v[j].Version) < 0
}

var (
CkPackages sync.Map
KeeperPackages sync.Map
)
var CkPackages sync.Map

func parsePkgName(fname string) CkPackageFile {
var module, version, arch, suffix string
Expand Down Expand Up @@ -238,61 +234,3 @@ func GetAllPackages() map[string]CkPackageFiles {
})
return pkgs
}

func LoadKeeperPackages() error {
var files CkPackageFiles
CkPackages.Range(func(k, v interface{}) bool {
CkPackages.Delete(k)
return true
})
dir, err := os.ReadDir(path.Join(config.GetWorkDirectory(), DefaultKeeperDirectory))
if err != nil {
return errors.Wrap(err, "")
}

for _, fi := range dir {
if !fi.IsDir() {
fname := fi.Name()
file := parsePkgName(fname)
files = append(files, file)
}
}

sort.Sort(sort.Reverse(files))
for _, file := range files {
key := fmt.Sprintf("%s.%s", file.Arch, file.Suffix)
value, ok := CkPackages.Load(key)
if !ok {
pkgs := CkPackageFiles{file}
CkPackages.Store(key, pkgs)
} else {
pkgs := value.(CkPackageFiles)
found := false
for _, pkg := range pkgs {
if reflect.DeepEqual(pkg, file) {
found = true
break
}
}
if !found {
pkgs = append(pkgs, file)
}
CkPackages.Store(key, pkgs)
}
}

return nil
}

func GetAllKeeperPackages() map[string]CkPackageFiles {
pkgs := make(map[string]CkPackageFiles, 0)
_ = LoadKeeperPackages()
CkPackages.Range(func(k, v interface{}) bool {
key := k.(string)
files := v.(CkPackageFiles)
sort.Sort(sort.Reverse(files))
pkgs[key] = files
return true
})
return pkgs
}
Loading

0 comments on commit eb45e8e

Please sign in to comment.