Skip to content

Commit

Permalink
refactor(apis): add rollingUpdate (#213)
Browse files Browse the repository at this point in the history
  • Loading branch information
zyy17 authored Nov 22, 2024
1 parent 15a3726 commit 9833a3b
Show file tree
Hide file tree
Showing 22 changed files with 452 additions and 5 deletions.
71 changes: 66 additions & 5 deletions apis/v1alpha1/defaulting.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
package v1alpha1

import (
"reflect"
"strings"

"dario.cat/mergo"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/intstr"
Expand All @@ -34,7 +36,7 @@ func (in *GreptimeDBCluster) SetDefaults() error {
in.Spec.Version = getVersionFromImage(in.GetBaseMainContainer().GetImage())

// Merge the default settings into the GreptimeDBClusterSpec.
if err := mergo.Merge(&in.Spec, in.defaultSpec()); err != nil {
if err := mergo.Merge(&in.Spec, in.defaultSpec(), mergo.WithTransformers(intOrStringTransformer{})); err != nil {
return err
}

Expand Down Expand Up @@ -171,6 +173,7 @@ func (in *GreptimeDBCluster) defaultFrontend() *FrontendSpec {
Service: &ServiceSpec{
Type: corev1.ServiceTypeClusterIP,
},
RollingUpdate: defaultRollingUpdateForDeployment(),
}
}

Expand All @@ -188,6 +191,7 @@ func (in *GreptimeDBCluster) defaultMeta() *MetaSpec {
RPCPort: DefaultMetaRPCPort,
HTTPPort: DefaultHTTPPort,
EnableRegionFailover: &enableRegionFailover,
RollingUpdate: defaultRollingUpdateForDeployment(),
}
}

Expand All @@ -198,9 +202,10 @@ func (in *GreptimeDBCluster) defaultDatanode() *DatanodeSpec {
Replicas: pointer.Int32(DefaultReplicas),
Logging: &LoggingSpec{},
},
RPCPort: DefaultRPCPort,
HTTPPort: DefaultHTTPPort,
Storage: defaultDatanodeStorage(),
RPCPort: DefaultRPCPort,
HTTPPort: DefaultHTTPPort,
Storage: defaultDatanodeStorage(),
RollingUpdate: defaultRollingUpdateForStatefulSet(),
}
}

Expand Down Expand Up @@ -312,7 +317,7 @@ func (in *GreptimeDBStandalone) SetDefaults() error {

in.Spec.Version = getVersionFromImage(in.GetBaseMainContainer().GetImage())

if err := mergo.Merge(&in.Spec, in.defaultSpec()); err != nil {
if err := mergo.Merge(&in.Spec, in.defaultSpec(), mergo.WithTransformers(intOrStringTransformer{})); err != nil {
return err
}

Expand Down Expand Up @@ -344,6 +349,7 @@ func (in *GreptimeDBStandalone) defaultSpec() *GreptimeDBStandaloneSpec {
OnlyLogToStdout: pointer.Bool(false),
},
DatanodeStorage: defaultDatanodeStorage(),
RollingUpdate: defaultRollingUpdateForStatefulSet(),
}

return defaultSpec
Expand Down Expand Up @@ -427,3 +433,58 @@ func defaultReadinessProbe() *corev1.Probe {
FailureThreshold: 10,
}
}

// Same as the default rolling update strategy of Deployment.
func defaultRollingUpdateForDeployment() *appsv1.RollingUpdateDeployment {
return &appsv1.RollingUpdateDeployment{
MaxUnavailable: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"},
MaxSurge: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"},
}
}

// Same as the default rolling update strategy of StatefulSet.
func defaultRollingUpdateForStatefulSet() *appsv1.RollingUpdateStatefulSetStrategy {
return &appsv1.RollingUpdateStatefulSetStrategy{
Partition: pointer.Int32(0),
MaxUnavailable: &intstr.IntOrString{
Type: intstr.Int,
IntVal: 1,
},
}
}

// This transformer handles merging of intstr.IntOrString values.
// The `Type` field in IntOrString is an int starting from 0, which means it would be considered "empty" during merging and get overwritten.
// We want to preserve the original Type of the destination value while only merging the actual int/string content.
type intOrStringTransformer struct{}

func (t intOrStringTransformer) Transformer(typ reflect.Type) func(dst, src reflect.Value) error {
if typ != reflect.TypeOf(&intstr.IntOrString{}) {
return nil
}

return func(dst, src reflect.Value) error {
if dst.IsNil() || src.IsNil() {
return nil
}

dstVal, srcVal := dst.Interface().(*intstr.IntOrString), src.Interface().(*intstr.IntOrString)

// Don't override the type of dst.
if dstVal.Type == intstr.Int {
if dstVal.IntVal == 0 {
dstVal.IntVal = srcVal.IntVal
}
dstVal.StrVal = ""
}

if dstVal.Type == intstr.String {
if dstVal.StrVal == "" {
dstVal.StrVal = srcVal.StrVal
}
dstVal.IntVal = 0
}

return nil
}
}
46 changes: 46 additions & 0 deletions apis/v1alpha1/defaulting_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (
"reflect"
"testing"

"dario.cat/mergo"
"github.com/sergi/go-diff/diffmatchpatch"
"k8s.io/apimachinery/pkg/util/intstr"
"sigs.k8s.io/yaml"
)

Expand Down Expand Up @@ -211,3 +213,47 @@ func TestStandaloneSetDefaults(t *testing.T) {
}
}
}

func TestIntOrStringTransformer(t *testing.T) {
type foo struct {
Val *intstr.IntOrString
}
type testStruct struct {
Src foo
Dst foo
Expect foo
}

tests := []testStruct{
{
Src: foo{Val: &intstr.IntOrString{Type: intstr.String, StrVal: "1"}},
Dst: foo{Val: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}},
Expect: foo{Val: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}},
},
{
Src: foo{Val: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"}},
Dst: foo{Val: &intstr.IntOrString{Type: intstr.String, StrVal: "75%"}},
Expect: foo{Val: &intstr.IntOrString{Type: intstr.String, StrVal: "75%"}},
},
{
Src: foo{Val: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"}},
Dst: foo{},
Expect: foo{Val: &intstr.IntOrString{Type: intstr.String, StrVal: "25%"}},
},
{
Src: foo{Val: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}},
Dst: foo{},
Expect: foo{Val: &intstr.IntOrString{Type: intstr.Int, IntVal: 10}},
},
}

for i, tt := range tests {
if err := mergo.Merge(&tt.Dst, &tt.Src, mergo.WithTransformers(intOrStringTransformer{})); err != nil {
t.Errorf("test [%d] failed: %v", i, err)
}

if !reflect.DeepEqual(tt.Dst, tt.Expect) {
t.Errorf("test [%d] failed: expected '%v', got '%v'", i, tt.Expect, tt.Dst)
}
}
}
17 changes: 17 additions & 0 deletions apis/v1alpha1/greptimedbcluster_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
Expand Down Expand Up @@ -70,6 +71,10 @@ type MetaSpec struct {
// StoreKeyPrefix is the prefix of the key in the etcd. We can use it to isolate the data of different clusters.
// +optional
StoreKeyPrefix string `json:"storeKeyPrefix,omitempty"`

// RollingUpdate is the rolling update configuration. We always use `RollingUpdate` strategyt.
// +optional
RollingUpdate *appsv1.RollingUpdateDeployment `json:"rollingUpdate,omitempty"`
}

func (in *MetaSpec) GetConfig() string {
Expand Down Expand Up @@ -143,6 +148,10 @@ type FrontendSpec struct {
// TLS is the TLS configuration of the frontend.
// +optional
TLS *TLSSpec `json:"tls,omitempty"`

// RollingUpdate is the rolling update configuration. We always use `RollingUpdate` strategyt.
// +optional
RollingUpdate *appsv1.RollingUpdateDeployment `json:"rollingUpdate,omitempty"`
}

func (in *FrontendSpec) GetTLS() *TLSSpec {
Expand Down Expand Up @@ -192,6 +201,10 @@ type DatanodeSpec struct {
// Storage is the default file storage of the datanode. For example, WAL, cache, index etc.
// +optional
Storage *DatanodeStorageSpec `json:"storage,omitempty"`

// RollingUpdate is the rolling update configuration. We always use `RollingUpdate` strategy.
// +optional
RollingUpdate *appsv1.RollingUpdateStatefulSetStrategy `json:"rollingUpdate,omitempty"`
}

func (in *DatanodeSpec) GetConfig() string {
Expand Down Expand Up @@ -231,6 +244,10 @@ type FlownodeSpec struct {
// +kubebuilder:validation:Maximum=65535
// +optional
RPCPort int32 `json:"rpcPort,omitempty"`

// RollingUpdate is the rolling update configuration. We always use `RollingUpdate` strategy.
// +optional
RollingUpdate *appsv1.RollingUpdateStatefulSetStrategy `json:"rollingUpdate,omitempty"`
}

func (in *FlownodeSpec) GetConfig() string {
Expand Down
5 changes: 5 additions & 0 deletions apis/v1alpha1/greptimedbstandalone_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package v1alpha1

import (
appsv1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -87,6 +88,10 @@ type GreptimeDBStandaloneSpec struct {
// Logging defines the logging configuration for the component.
// +optional
Logging *LoggingSpec `json:"logging,omitempty"`

// RollingUpdate is the rolling update configuration. We always use `RollingUpdate` strategy.
// +optional
RollingUpdate *appsv1.RollingUpdateStatefulSetStrategy `json:"rollingUpdate,omitempty"`
}

// GreptimeDBStandaloneStatus defines the observed state of GreptimeDBStandalone
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ spec:
logsDir: /data/greptimedb/logs
onlyLogToStdout: false
persistentWithData: false
rollingUpdate:
maxUnavailable: 1
maxSurge: 2
template:
main:
image: greptime/greptimedb:test
Expand Down Expand Up @@ -100,6 +103,9 @@ spec:
logsDir: /data/greptimedb/logs
onlyLogToStdout: false
persistentWithData: false
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
template:
main:
image: greptime/greptimedb:latest
Expand Down Expand Up @@ -145,6 +151,9 @@ spec:
logsDir: /data/greptimedb/logs
onlyLogToStdout: false
persistentWithData: false
rollingUpdate:
maxUnavailable: 25%
partition: 1
template:
main:
image: greptime/greptimedb:latest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ spec:
template:
main:
image: greptime/greptimedb:test
rollingUpdate:
maxUnavailable: 1
maxSurge: 2
meta:
etcdEndpoints:
- etcd.etcd-cluster.svc.cluster.local:2379
Expand All @@ -37,4 +40,7 @@ spec:
replicas: 3
logging:
level: debug
rollingUpdate:
partition: 1
maxUnavailable: 25%
httpPort: 5000
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ spec:
type: ClusterIP
logging: {}
template: {}
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
meta:
etcdEndpoints:
- etcd.etcd-cluster.svc.cluster.local:2379
Expand All @@ -57,6 +60,9 @@ spec:
replicas: 1
logging: {}
template: {}
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
datanode:
httpPort: 4000
rpcPort: 4001
Expand All @@ -70,3 +76,6 @@ spec:
storageSize: 10Gi
logging: {}
template: {}
rollingUpdate:
maxUnavailable: 1
partition: 0
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ spec:
args:
- --metasrv-addrs
- meta.default:3002
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
meta:
enableRegionFailover: false
etcdEndpoints:
Expand All @@ -74,6 +77,9 @@ spec:
args:
- --store-addr
- etcd.default:2379
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
datanode:
httpPort: 4000
rpcPort: 4001
Expand All @@ -89,3 +95,6 @@ spec:
template:
main:
image: greptime/greptimedb:latest
rollingUpdate:
maxUnavailable: 1
partition: 0
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ spec:
type: ClusterIP
logging: {}
template: {}
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
meta:
etcdEndpoints:
- etcd.etcd-cluster.svc.cluster.local:2379
Expand All @@ -69,6 +72,9 @@ spec:
rpcPort: 3002
replicas: 1
template: {}
rollingUpdate:
maxUnavailable: 25%
maxSurge: 25%
datanode:
httpPort: 4000
rpcPort: 4001
Expand All @@ -82,3 +88,6 @@ spec:
storageRetainPolicy: Retain
storageSize: 10Gi
template: {}
rollingUpdate:
maxUnavailable: 1
partition: 0
Loading

0 comments on commit 9833a3b

Please sign in to comment.