-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathzk_server.go
108 lines (91 loc) · 2.41 KB
/
zk_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package main
import (
"bufio"
"bytes"
"fmt"
"io"
"net"
"strings"
"time"
)
const (
monitorCMD = "mntr"
okCMD = "ruok"
enviCMD = "envi" // Might use this in the future?
)
// zkServer object
type zkServer struct {
ipPort string
}
// zkServer constructor
func newZKServer(ipPort string) *zkServer {
return &zkServer{ipPort: ipPort}
}
// zkServer.getStats() - runs mntr and ruok commands
func (zk *zkServer) getStats() (map[string]string, error) {
stats, err := zk.getMNTR()
if err != nil {
return stats, err
}
isOK, err := zk.getOKStatus()
if err != nil {
return stats, err
}
stats[zkOK] = isOK
return stats, nil
}
func (zk *zkServer) getMNTR() (map[string]string, error) {
stats := make(map[string]string)
byts, err := zk.sendCommand(monitorCMD)
if err != nil {
return stats, err
}
scanner := bufio.NewScanner(bytes.NewReader(byts))
for scanner.Scan() {
splits := strings.Split(scanner.Text(), "\t")
if splits[0] == "This ZooKeeper instance is not currently serving requests" {
log.Warnf("[%v] is up but not currently serving requests", zk.ipPort)
return stats, nil
}
if len(splits) != 2 {
log.Warningf("[%v] expected key:value, got this instead: %v", zk.ipPort, splits)
continue
}
stats[splits[0]] = splits[1]
}
return stats, nil
}
func (zk *zkServer) getOKStatus() (string, error) {
byts, err := zk.sendCommand(okCMD)
return string(byts), err
}
func (zk *zkServer) sendCommand(cmd string) ([]byte, error) {
dialer := net.Dialer{Timeout: time.Duration(*zkTimeout) * time.Second}
conn, err := dialer.Dial("tcp", zk.ipPort)
if err != nil {
return []byte{}, err
}
defer func() {
if err := conn.Close(); err != nil {
log.Errorf("[%v] failed to close connection: %v", zk.ipPort, err)
}
}()
// ensure these socket fail fast if ZK having problems
RWDeadLine := time.Duration(*zkRWDeadLine) * time.Second
if err := conn.SetReadDeadline(time.Now().Add(RWDeadLine)); err != nil {
log.Errorf("[%v] failed to set Read Deadline on conn: %v", zk.ipPort, err)
}
if err := conn.SetWriteDeadline(time.Now().Add(RWDeadLine)); err != nil {
log.Errorf("[%v] failed to set Write Deadline on conn: %v", zk.ipPort, err)
}
_, err = fmt.Fprintf(conn, fmt.Sprintf("%s\n", cmd))
if err != nil {
log.Errorf("[%v] failed to close connection: %v", zk.ipPort, err)
}
var buf bytes.Buffer
_, err = io.Copy(&buf, conn)
if err != nil {
return []byte{}, err
}
return buf.Bytes(), nil
}