Skip to content

Commit

Permalink
fix: Attempt to repair disconnected/failed master nodes before failin…
Browse files Browse the repository at this point in the history
…g over (#1105)

* add support for repairing leaders

Signed-off-by: mluffman <[email protected]>

* add make lint directive

Signed-off-by: mluffman <[email protected]>

* add test

Signed-off-by: drivebyer <[email protected]>

* fix lint

Signed-off-by: drivebyer <[email protected]>

* fix lint

Signed-off-by: drivebyer <[email protected]>

* chainsaw

Signed-off-by: drivebyer <[email protected]>

* update

Signed-off-by: drivebyer <[email protected]>

* update

Signed-off-by: drivebyer <[email protected]>

* no parallel

Signed-off-by: drivebyer <[email protected]>

---------

Signed-off-by: mluffman <[email protected]>
Signed-off-by: drivebyer <[email protected]>
Co-authored-by: mluffman <[email protected]>
Co-authored-by: drivebyer <[email protected]>
  • Loading branch information
3 people authored Oct 18, 2024
1 parent bee2bf7 commit de6b066
Show file tree
Hide file tree
Showing 13 changed files with 297 additions and 31 deletions.
2 changes: 1 addition & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# Build the manager binary
FROM golang:1.21 as builder
FROM golang:1.21 AS builder
ARG BUILDOS
ARG BUILDPLATFORM
ARG BUILDARCH
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,9 @@ manifests: controller-gen
fmt:
go fmt ./...

lint: golangci-lint
$(GOLANGCI_LINT) run

# Run go vet against code
vet:
go vet ./...
Expand Down
4 changes: 2 additions & 2 deletions api/status/redis-cluster_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,6 @@ const (
RedisClusterInitializing RedisClusterState = "Initializing"
RedisClusterBootstrap RedisClusterState = "Bootstrap"
// RedisClusterReady means the RedisCluster is ready for use, we use redis-cli --cluster check 127.0.0.1:6379 to check the cluster status
RedisClusterReady RedisClusterState = "Ready"
// RedisClusterFailed RedisClusterState = "Failed"
RedisClusterReady RedisClusterState = "Ready"
RedisClusterFailed RedisClusterState = "Failed"
)
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ require (

require (
emperror.dev/errors v0.8.0 // indirect
github.com/avast/retry-go v3.0.0+incompatible // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ github.com/PuerkitoBio/purell v1.0.0/go.mod h1:c11w/QuzBsJSee3cPx9rAFu61PvFxuPbt
github.com/PuerkitoBio/urlesc v0.0.0-20160726150825-5bd2802263f2/go.mod h1:uGdkoq3SwY9Y+13GIhn11/XLaGBb4BfwItxLd5jeuXE=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5 h1:0CwZNZbxp69SHPdPJAN/hZIm0C4OItdklCFmMRWYpio=
github.com/armon/go-socks5 v0.0.0-20160902184237-e75332964ef5/go.mod h1:wHh0iHkYZB8zMSxRWpUBQtwG5a7fFgvEO+odwuTv2gs=
github.com/avast/retry-go v3.0.0+incompatible h1:4SOWQ7Qs+oroOTQOYnAHqelpCO0biHSxpiH9JdtuBj0=
github.com/avast/retry-go v3.0.0+incompatible/go.mod h1:XtSnn+n/sHqQIpZ10K1qAevBhOOCWBLXXy3hyiqqBrY=
github.com/banzaicloud/k8s-objectmatcher v1.8.0 h1:Nugn25elKtPMTA2br+JgHNeSQ04sc05MDPmpJnd1N2A=
github.com/banzaicloud/k8s-objectmatcher v1.8.0/go.mod h1:p2LSNAjlECf07fbhDyebTkPUIYnU05G+WfGgkTmgeMg=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down
40 changes: 35 additions & 5 deletions pkg/controllers/rediscluster/rediscluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@ package rediscluster

import (
"context"
"fmt"
"time"

"github.com/OT-CONTAINER-KIT/redis-operator/api/status"
redisv1beta2 "github.com/OT-CONTAINER-KIT/redis-operator/api/v1beta2"
intctrlutil "github.com/OT-CONTAINER-KIT/redis-operator/pkg/controllerutil"
"github.com/OT-CONTAINER-KIT/redis-operator/pkg/k8sutils"
retry "github.com/avast/retry-go"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
Expand All @@ -45,7 +47,7 @@ type RedisClusterReconciler struct {

func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
reqLogger := r.Log.WithValues("Request.Namespace", req.Namespace, "Request.Name", req.Name)
reqLogger.Info("Reconciling opstree redis Cluster controller")
reqLogger.V(1).Info("Reconciling opstree redis Cluster controller")
instance := &redisv1beta2.RedisCluster{}

err := r.Client.Get(context.TODO(), req.NamespacedName, instance)
Expand Down Expand Up @@ -186,13 +188,41 @@ func (r *RedisClusterReconciler) Reconcile(ctx context.Context, req ctrl.Request
return intctrlutil.RequeueAfter(reqLogger, time.Second*60, "Redis cluster count is not desired", "Current.Count", nc, "Desired.Count", totalReplicas)
}

reqLogger.Info("Redis cluster count is desired")
if int(totalReplicas) > 1 && k8sutils.CheckRedisClusterState(ctx, r.K8sClient, r.Log, instance) >= int(totalReplicas)-1 {
reqLogger.Info("Redis leader is not desired, executing failover operation")
err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance)
reqLogger.V(1).Info("Number of Redis nodes match desired")
unhealthyNodeCount, err := k8sutils.UnhealthyNodesInCluster(ctx, r.K8sClient, r.Log, instance)
if err != nil {
reqLogger.Error(err, "failed to determine unhealthy node count in cluster")
}
if int(totalReplicas) > 1 && unhealthyNodeCount >= int(totalReplicas)-1 {
err = k8sutils.UpdateRedisClusterStatus(instance, status.RedisClusterFailed, "RedisCluster has too many unhealthy nodes", leaderReplicas, followerReplicas, r.Dk8sClient)
if err != nil {
return intctrlutil.RequeueWithError(err, reqLogger, "")
}

reqLogger.Info("healthy leader count does not match desired; attempting to repair disconnected masters")
if err = k8sutils.RepairDisconnectedMasters(ctx, r.K8sClient, r.Log, instance); err != nil {
reqLogger.Error(err, "failed to repair disconnected masters")
}

err = retry.Do(func() error {
nc, nErr := k8sutils.UnhealthyNodesInCluster(ctx, r.K8sClient, r.Log, instance)
if nErr != nil {
return nErr
}
if nc == 0 {
return nil
}
return fmt.Errorf("%d unhealthy nodes", nc)
}, retry.Attempts(3), retry.Delay(time.Second*5))

if err == nil {
reqLogger.Info("repairing unhealthy masters successful, no unhealthy masters left")
return intctrlutil.RequeueAfter(reqLogger, time.Second*30, "no unhealthy nodes found after repairing disconnected masters")
}
reqLogger.Info("unhealthy nodes exist after attempting to repair disconnected masters; starting failover")
if err = k8sutils.ExecuteFailoverOperation(ctx, r.K8sClient, r.Log, instance); err != nil {
return intctrlutil.RequeueWithError(err, reqLogger, "")
}
}

// Check If there is No Empty Master Node
Expand Down
99 changes: 84 additions & 15 deletions pkg/k8sutils/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,53 @@ func CreateSingleLeaderRedisCommand(logger logr.Logger, cr *redisv1beta2.RedisCl
return cmd
}

// RepairDisconnectedMasters attempts to repair disconnected/failed masters by issuing
// a CLUSTER MEET with the updated address of the host
func RepairDisconnectedMasters(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) error {
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
return repairDisconnectedMasters(ctx, client, logger, cr, redisClient)
}

func repairDisconnectedMasters(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster, redisClient *redis.Client) error {
nodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
return err
}
masterNodeType := "master"
for _, node := range nodes {
if !nodeIsOfType(node, masterNodeType) {
continue
}
if !nodeFailedOrDisconnected(node) {
continue
}
log.V(1).Info("found disconnected master node", "node", node)
podName, err := getMasterHostFromClusterNode(node)
if err != nil {
return err
}
ip := getRedisServerIP(client, logger, RedisDetails{
PodName: podName,
Namespace: cr.Namespace,
})
err = redisClient.ClusterMeet(ctx, ip, strconv.Itoa(*cr.Spec.Port)).Err()
if err != nil {
return fmt.Errorf("failed to issue cluster meet: %w", err)
}
}
return nil
}

func getMasterHostFromClusterNode(node clusterNodesResponse) (string, error) {
addressAndHost := node[1]
s := strings.Split(addressAndHost, ",")
if len(s) != 2 {
return "", fmt.Errorf("failed to extract host from host and address string, unexpected number of elements: %d", len(s))
}
return strings.Split(addressAndHost, ",")[1], nil
}

// CreateMultipleLeaderRedisCommand will create command for single leader cluster creation
func CreateMultipleLeaderRedisCommand(client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) []string {
cmd := []string{"redis-cli", "--cluster", "create"}
Expand Down Expand Up @@ -189,7 +236,10 @@ func ExecuteRedisReplicationCommand(ctx context.Context, client kubernetes.Inter
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()

nodes := checkRedisCluster(ctx, redisClient, logger)
nodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
logger.Error(err, "failed to get cluster nodes")
}
for followerIdx := 0; followerIdx <= int(followerCounts)-1; {
for i := 0; i < int(followerPerLeader) && followerIdx <= int(followerCounts)-1; i++ {
followerPod := RedisDetails{
Expand Down Expand Up @@ -225,22 +275,27 @@ func ExecuteRedisReplicationCommand(ctx context.Context, client kubernetes.Inter
}
}

// checkRedisCluster will check the redis cluster have sufficient nodes or not
func checkRedisCluster(ctx context.Context, redisClient *redis.Client, logger logr.Logger) [][]string {
type clusterNodesResponse []string

// clusterNodes will returns the response of CLUSTER NODES
func clusterNodes(ctx context.Context, redisClient *redis.Client, logger logr.Logger) ([]clusterNodesResponse, error) {
output, err := redisClient.ClusterNodes(ctx).Result()
if err != nil {
logger.Error(err, "Error in getting Redis cluster nodes")
return nil, err
}
logger.V(1).Info("Redis cluster nodes are listed", "Output", output)

csvOutput := csv.NewReader(strings.NewReader(output))
csvOutput.Comma = ' '
csvOutput.FieldsPerRecord = -1
csvOutputRecords, err := csvOutput.ReadAll()
if err != nil {
logger.Error(err, "Error parsing Node Counts", "output", output)
return nil, err
}
response := make([]clusterNodesResponse, 0, len(csvOutputRecords))
for _, record := range csvOutputRecords {
response = append(response, record)
}
return csvOutputRecords
return response, nil
}

// ExecuteFailoverOperation will execute redis failover operations
Expand Down Expand Up @@ -297,7 +352,10 @@ func CheckRedisNodeCount(ctx context.Context, client kubernetes.Interface, logge
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
var redisNodeType string
clusterNodes := checkRedisCluster(ctx, redisClient, logger)
clusterNodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
logger.Error(err, "failed to get cluster nodes")
}
count := len(clusterNodes)

switch nodeType {
Expand All @@ -311,7 +369,7 @@ func CheckRedisNodeCount(ctx context.Context, client kubernetes.Interface, logge
if nodeType != "" {
count = 0
for _, node := range clusterNodes {
if strings.Contains(node[2], redisNodeType) {
if nodeIsOfType(node, redisNodeType) {
count++
}
}
Expand Down Expand Up @@ -350,19 +408,30 @@ func RedisClusterStatusHealth(ctx context.Context, client kubernetes.Interface,
return true
}

// CheckRedisClusterState will check the redis cluster state
func CheckRedisClusterState(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) int {
// UnhealthyNodesInCluster returns the number of unhealthy nodes in the cluster cr
func UnhealthyNodesInCluster(ctx context.Context, client kubernetes.Interface, logger logr.Logger, cr *redisv1beta2.RedisCluster) (int, error) {
redisClient := configureRedisClient(client, logger, cr, cr.ObjectMeta.Name+"-leader-0")
defer redisClient.Close()
clusterNodes := checkRedisCluster(ctx, redisClient, logger)
clusterNodes, err := clusterNodes(ctx, redisClient, logger)
if err != nil {
return 0, err
}
count := 0
for _, node := range clusterNodes {
if strings.Contains(node[2], "fail") || strings.Contains(node[7], "disconnected") {
if nodeFailedOrDisconnected(node) {
count++
}
}
logger.V(1).Info("Number of failed nodes in cluster", "Failed Node Count", count)
return count
return count, nil
}

func nodeIsOfType(node clusterNodesResponse, nodeType string) bool {
return strings.Contains(node[2], nodeType)
}

func nodeFailedOrDisconnected(node clusterNodesResponse) bool {
return strings.Contains(node[2], "fail") || strings.Contains(node[7], "disconnected")
}

// configureRedisClient will configure the Redis Client
Expand Down Expand Up @@ -469,7 +538,7 @@ func getContainerID(client kubernetes.Interface, logger logr.Logger, cr *redisv1
}

// checkRedisNodePresence will check if the redis node exist in cluster or not
func checkRedisNodePresence(cr *redisv1beta2.RedisCluster, nodeList [][]string, nodeName string) bool {
func checkRedisNodePresence(cr *redisv1beta2.RedisCluster, nodeList []clusterNodesResponse, nodeName string) bool {
logger := generateRedisManagerLogger(cr.Namespace, cr.ObjectMeta.Name)
logger.V(1).Info("Checking if Node is in cluster", "Node", nodeName)
for _, node := range nodeList {
Expand Down
53 changes: 47 additions & 6 deletions pkg/k8sutils/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,15 @@ func TestCheckRedisNodePresence(t *testing.T) {
csvOutput := csv.NewReader(strings.NewReader(output))
csvOutput.Comma = ' '
csvOutput.FieldsPerRecord = -1
nodes, _ := csvOutput.ReadAll()
rawNodes, _ := csvOutput.ReadAll()

nodes := make([]clusterNodesResponse, 0, len(rawNodes))
for _, node := range rawNodes {
nodes = append(nodes, node)
}

tests := []struct {
nodes [][]string
nodes []clusterNodesResponse
ip string
want bool
}{
Expand All @@ -52,6 +57,40 @@ func TestCheckRedisNodePresence(t *testing.T) {
}
}

func TestRepairDisconnectedMasters(t *testing.T) {
ctx := context.Background()
redisClient, mock := redismock.NewClientMock()
mock.ExpectClusterNodes().SetVal(`
07c37dfeb235213a872192d90877d0cd55635b91 127.0.0.1:30004@31004,redis-cluster-follower-0 slave e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 0 1426238317239 4 connected
67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 127.0.0.1:30002@31002,redis-cluster-leader-0 master - 0 1426238316232 2 disconnected 5461-10922
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,redis-cluster-follower-1 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 disconnected
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,redis-cluster-leader-1 myself,master - 0 0 1 connected 0-5460
`)

namespace := "default"
newPodIP := "0.0.0.0"
k8sClient := k8sClientFake.NewSimpleClientset(&corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "redis-cluster-leader-0",
Namespace: namespace,
},
Status: corev1.PodStatus{
PodIP: newPodIP,
},
})
mock.ExpectClusterMeet(newPodIP, "6379").SetVal("OK")
port := 6379
err := repairDisconnectedMasters(ctx, k8sClient, logr.Discard(), &redisv1beta2.RedisCluster{
ObjectMeta: metav1.ObjectMeta{
Namespace: namespace,
},
Spec: redisv1beta2.RedisClusterSpec{
Port: &port,
},
}, redisClient)
assert.NoError(t, err)
}

func TestGetRedisServerIP(t *testing.T) {
tests := []struct {
name string
Expand Down Expand Up @@ -772,14 +811,14 @@ func Test_checkRedisServerRole(t *testing.T) {
}
}

func TestCheckRedisCluster(t *testing.T) {
func TestClusterNodes(t *testing.T) {
logger := logr.Discard() // Discard logs

tests := []struct {
name string
expectError error
clusterNodesOutput string
expectedResult [][]string
expectedResult []clusterNodesResponse
}{
{
name: "Detailed cluster nodes output",
Expand All @@ -789,7 +828,7 @@ func TestCheckRedisCluster(t *testing.T) {
6ec23923021cf3ffec47632106199cb7f496ce01 127.0.0.1:30005@31005,hostname5 slave 67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1 0 1426238316232 5 connected
824fe116063bc5fcf9f4ffd895bc17aee7731ac3 127.0.0.1:30006@31006,hostname6 slave 292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f 0 1426238317741 6 connected
e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,master - 0 0 1 connected 0-5460`,
expectedResult: [][]string{
expectedResult: []clusterNodesResponse{
{"07c37dfeb235213a872192d90877d0cd55635b91", "127.0.0.1:30004@31004,hostname4", "slave", "e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca", "0", "1426238317239", "4", "connected"},
{"67ed2db8d677e59ec4a4cefb06858cf2a1a89fa1", "127.0.0.1:30002@31002,hostname2", "master", "-", "0", "1426238316232", "2", "connected", "5461-10922"},
{"292f8b365bb7edb5e285caf0b7e6ddc7265d2f4f", "127.0.0.1:30003@31003,hostname3", "master", "-", "0", "1426238318243", "3", "connected", "10923-16383"},
Expand All @@ -814,11 +853,13 @@ e7d1eecce10fd6bb5eb35b9f99a514335d9ba9ca 127.0.0.1:30001@31001,hostname1 myself,
} else {
mock.ExpectClusterNodes().SetVal(tc.clusterNodesOutput)
}
result := checkRedisCluster(context.TODO(), db, logger)
result, err := clusterNodes(context.TODO(), db, logger)

if tc.expectError != nil {
assert.Nil(t, result)
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.ElementsMatch(t, tc.expectedResult, result)
}

Expand Down
7 changes: 5 additions & 2 deletions tests/_config/chainsaw-configuration.yaml
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
---
# yaml-language-server: $schema=https://raw.githubusercontent.com/kyverno/chainsaw/main/.schemas/json/configuration-chainsaw-v1alpha1.json
apiVersion: chainsaw.kyverno.io/v1alpha1
apiVersion: chainsaw.kyverno.io/v1alpha2
kind: Configuration
metadata:
name: chainsaw-configuration
spec:
delayBeforeCleanup: 10s
execution:
failFast: true
cleanup:
delayBeforeCleanup: 10s
timeouts:
apply: 5m
delete: 5m
Expand Down
Loading

0 comments on commit de6b066

Please sign in to comment.