diff --git a/go.mod b/go.mod index b103877cb..eae719d06 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,8 @@ require ( require ( github.com/agiledragon/gomonkey v2.0.2+incompatible github.com/agiledragon/gomonkey/v2 v2.9.0 + go.etcd.io/etcd/api/v3 v3.5.6 + go.etcd.io/etcd/client/v3 v3.5.6 ) require ( @@ -45,6 +47,8 @@ require ( github.com/bytedance/sonic v1.9.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/chenzhuoyu/base64x v0.0.0-20221115062448-fe3a3abad311 // indirect + github.com/coreos/go-semver v0.3.0 // indirect + github.com/coreos/go-systemd/v22 v22.3.2 // indirect github.com/creasty/defaults v1.5.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gabriel-vasile/mimetype v1.4.2 // indirect @@ -52,6 +56,7 @@ require ( github.com/go-ole/go-ole v1.2.6 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect + github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/protobuf v1.5.3 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/gorilla/websocket v1.4.2 // indirect @@ -82,6 +87,7 @@ require ( github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.11 // indirect github.com/yusufpapurcu/wmi v1.2.2 // indirect + go.etcd.io/etcd/client/pkg/v3 v3.5.6 // indirect go.uber.org/multierr v1.8.0 // indirect golang.org/x/arch v0.3.0 // indirect golang.org/x/text v0.14.0 // indirect diff --git a/go.sum b/go.sum index 1d60d0952..9a370bcc4 100644 --- a/go.sum +++ b/go.sum @@ -148,10 +148,12 @@ github.com/codahale/hdrhistogram v0.0.0-20161010025455-3a0bb77429bd/go.mod h1:sE github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= +github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmfM= github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk= github.com/coreos/go-systemd v0.0.0-20180511133405-39ca1b05acc7/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= +github.com/coreos/go-systemd/v22 v22.3.2 h1:D9/bQk5vlXQFZ6Kwuu6zaiXJ9oTPe68++AzAJc1DzSI= github.com/coreos/go-systemd/v22 v22.3.2/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA= @@ -270,6 +272,7 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= +github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/goji/httpauth v0.0.0-20160601135302-2da839ab0f4d/go.mod h1:nnjvkQ9ptGaCkuDUx6wNykzzlUixGxvkme+H/lnzb+A= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= @@ -766,12 +769,15 @@ go.etcd.io/bbolt v1.3.5/go.mod h1:G5EMThwa9y8QZGBClrRx5EY+Yw9kAhnjy3bSjsnlVTQ= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw= go.etcd.io/etcd/api/v3 v3.5.4/go.mod h1:5GB2vv4A4AOn3yk7MftYGHkUfGtDHnEraIjym4dYz5A= +go.etcd.io/etcd/api/v3 v3.5.6 h1:Cy2qx3npLcYqTKqGJzMypnMv2tiRyifZJ17BlWIWA7A= go.etcd.io/etcd/api/v3 v3.5.6/go.mod h1:KFtNaxGDw4Yx/BA4iPPwevUTAuqcsPxzyX8PHydchN8= go.etcd.io/etcd/client/pkg/v3 v3.5.4/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g= +go.etcd.io/etcd/client/pkg/v3 v3.5.6 h1:TXQWYceBKqLp4sa87rcPs11SXxUA/mHwH975v+BDvLU= go.etcd.io/etcd/client/pkg/v3 v3.5.6/go.mod h1:ggrwbk069qxpKPq8/FKkQ3Xq9y39kbFR4LnKszpRXeQ= go.etcd.io/etcd/client/v2 v2.305.0-alpha.0/go.mod h1:kdV+xzCJ3luEBSIeQyB/OEKkWKd8Zkux4sbDeANrosU= go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8= go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY= +go.etcd.io/etcd/client/v3 v3.5.6 h1:coLs69PWCXE9G4FKquzNaSHrRyMCAXwF+IX1tAPVO8E= go.etcd.io/etcd/client/v3 v3.5.6/go.mod h1:f6GRinRMCsFVv9Ht42EyY7nfsVGwrNO0WEoS2pRKzQk= go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY= go.etcd.io/etcd/raft/v3 v3.5.0-alpha.0/go.mod h1:FAwse6Zlm5v4tEWZaTjmNhe17Int4Oxbu7+2r0DiD3w= diff --git a/pkg/discovery/etcd3.go b/pkg/discovery/etcd3.go index 12fa18e72..667a86058 100644 --- a/pkg/discovery/etcd3.go +++ b/pkg/discovery/etcd3.go @@ -17,14 +17,247 @@ package discovery -type EtcdRegistryService struct{} +import ( + "context" + "fmt" + "github.com/seata/seata-go/pkg/util/log" + etcd3 "go.etcd.io/etcd/client/v3" + "strconv" + "strings" + "sync" +) + +const ( + clusterNameSplitChar = "-" + addressSplitChar = ":" + etcdClusterPrefix = "registry-seata" +) + +type EtcdRegistryService struct { + client *etcd3.Client + cfg etcd3.Config + vgroupMapping map[string]string + grouplist map[string][]*ServiceInstance + rwLock sync.RWMutex + + stopCh chan struct{} +} + +func newEtcdRegistryService(config *ServiceConfig, etcd3Config *Etcd3Config) RegistryService { + + if etcd3Config == nil { + log.Fatalf("etcd config is nil") + panic("etcd config is nil") + } + + cfg := etcd3.Config{ + Endpoints: []string{etcd3Config.ServerAddr}, + } + cli, err := etcd3.New(cfg) + if err != nil { + log.Fatalf("failed to create etcd3 client") + panic("failed to create etcd3 client") + } + + vgroupMapping := config.VgroupMapping + grouplist := make(map[string][]*ServiceInstance, 0) + + etcdRegistryService := &EtcdRegistryService{ + client: cli, + cfg: cfg, + vgroupMapping: vgroupMapping, + grouplist: grouplist, + stopCh: make(chan struct{}), + } + go etcdRegistryService.watch(etcdClusterPrefix) + + return etcdRegistryService +} + +func (s *EtcdRegistryService) watch(key string) { + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + resp, err := s.client.Get(ctx, key, etcd3.WithPrefix()) + if err != nil { + log.Infof("cant get server instances from etcd") + } + + if resp != nil { + for _, kv := range resp.Kvs { + k := kv.Key + v := kv.Value + clusterName, err := getClusterName(k) + if err != nil { + log.Errorf("etcd key has an incorrect format: ", err) + return + } + serverInstance, err := getServerInstance(v) + if err != nil { + log.Errorf("etcd value has an incorrect format: ", err) + return + } + s.rwLock.Lock() + if s.grouplist[clusterName] == nil { + s.grouplist[clusterName] = []*ServiceInstance{serverInstance} + } else { + s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance) + } + s.rwLock.Unlock() + } + + } + // watch the changes of endpoints + watchCh := s.client.Watch(ctx, key, etcd3.WithPrefix()) + + for { + select { + case watchResp, ok := <-watchCh: + if !ok { + log.Warnf("Watch channel closed") + return + } + for _, event := range watchResp.Events { + switch event.Type { + case etcd3.EventTypePut: + log.Infof("Key %s updated. New value: %s\n", event.Kv.Key, event.Kv.Value) + + k := event.Kv.Key + v := event.Kv.Value + clusterName, err := getClusterName(k) + if err != nil { + log.Errorf("etcd key err: ", err) + return + } + serverInstance, err := getServerInstance(v) + if err != nil { + log.Errorf("etcd value err: ", err) + return + } + + s.rwLock.Lock() + if s.grouplist[clusterName] == nil { + s.grouplist[clusterName] = []*ServiceInstance{serverInstance} + s.rwLock.Unlock() + continue + } + if ifHaveSameServiceInstances(s.grouplist[clusterName], serverInstance) { + s.rwLock.Unlock() + continue + } + s.grouplist[clusterName] = append(s.grouplist[clusterName], serverInstance) + s.rwLock.Unlock() + + case etcd3.EventTypeDelete: + log.Infof("Key %s deleted.\n", event.Kv.Key) + + cluster, ip, port, err := getClusterAndAddress(event.Kv.Key) + if err != nil { + log.Errorf("etcd key err: ", err) + return + } + + s.rwLock.Lock() + serviceInstances := s.grouplist[cluster] + if serviceInstances == nil { + log.Warnf("etcd doesnt exit cluster: ", cluster) + s.rwLock.Unlock() + continue + } + s.grouplist[cluster] = removeValueFromList(serviceInstances, ip, port) + s.rwLock.Unlock() + } + } + case <-s.stopCh: + log.Warn("stop etcd watch") + return + } + } +} + +func getClusterName(key []byte) (string, error) { + stringKey := string(key) + keySplit := strings.Split(stringKey, clusterNameSplitChar) + if len(keySplit) != 4 { + return "", fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey) + } + + cluster := keySplit[2] + return cluster, nil +} + +func getServerInstance(value []byte) (*ServiceInstance, error) { + stringValue := string(value) + valueSplit := strings.Split(stringValue, addressSplitChar) + if len(valueSplit) != 2 { + return nil, fmt.Errorf("etcd value has an incorrect format. value: %s", stringValue) + } + ip := valueSplit[0] + port, err := strconv.Atoi(valueSplit[1]) + if err != nil { + return nil, fmt.Errorf("etcd port has an incorrect format. err: %w", err) + } + serverInstance := &ServiceInstance{ + Addr: ip, + Port: port, + } + + return serverInstance, nil +} + +func getClusterAndAddress(key []byte) (string, string, int, error) { + stringKey := string(key) + keySplit := strings.Split(stringKey, clusterNameSplitChar) + if len(keySplit) != 4 { + return "", "", 0, fmt.Errorf("etcd key has an incorrect format. key: %s", stringKey) + } + cluster := keySplit[2] + address := strings.Split(keySplit[3], addressSplitChar) + ip := address[0] + port, err := strconv.Atoi(address[1]) + if err != nil { + return "", "", 0, fmt.Errorf("etcd port has an incorrect format. err: %w", err) + } + + return cluster, ip, port, nil +} + +func ifHaveSameServiceInstances(list []*ServiceInstance, value *ServiceInstance) bool { + for _, v := range list { + if v.Addr == value.Addr && v.Port == value.Port { + return true + } + } + return false +} + +func removeValueFromList(list []*ServiceInstance, ip string, port int) []*ServiceInstance { + for k, v := range list { + if v.Addr == ip && v.Port == port { + result := list[:k] + if k < len(list)-1 { + result = append(result, list[k+1:]...) + } + return result + } + } + + return list +} func (s *EtcdRegistryService) Lookup(key string) ([]*ServiceInstance, error) { - //TODO implement me - panic("implement me") + s.rwLock.RLock() + defer s.rwLock.RUnlock() + cluster := s.vgroupMapping[key] + if cluster == "" { + return nil, fmt.Errorf("cluster doesnt exit") + } + + list := s.grouplist[cluster] + return list, nil } func (s *EtcdRegistryService) Close() { - //TODO implement me - panic("implement me") + s.stopCh <- struct{}{} } diff --git a/pkg/discovery/etcd3_test.go b/pkg/discovery/etcd3_test.go new file mode 100644 index 000000000..fbde6fb43 --- /dev/null +++ b/pkg/discovery/etcd3_test.go @@ -0,0 +1,141 @@ +package discovery + +import ( + "github.com/golang/mock/gomock" + "github.com/seata/seata-go/pkg/discovery/mock" + "github.com/stretchr/testify/assert" + "go.etcd.io/etcd/api/v3/mvccpb" + "go.etcd.io/etcd/client/v3" + "reflect" + "testing" + "time" +) + +func TestEtcd3RegistryService_Lookup(t *testing.T) { + + tests := []struct { + name string + getResp *clientv3.GetResponse + watchResp *clientv3.WatchResponse + want []*ServiceInstance + }{ + { + name: "normal", + getResp: &clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + { + Key: []byte("registry-seata-default-172.0.0.1:8091"), + Value: []byte("172.0.0.1:8091"), + }, + }, + }, + watchResp: nil, + want: []*ServiceInstance{ + { + Addr: "172.0.0.1", + Port: 8091, + }, + }, + }, + { + name: "use watch update ServiceInstances", + getResp: nil, + watchResp: &clientv3.WatchResponse{ + Events: []*clientv3.Event{ + { + Type: clientv3.EventTypePut, + Kv: &mvccpb.KeyValue{ + Key: []byte("registry-seata-default-172.0.0.1:8091"), + Value: []byte("172.0.0.1:8091"), + }, + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "172.0.0.1", + Port: 8091, + }, + }, + }, + { + name: "use watch del ServiceInstances", + getResp: &clientv3.GetResponse{ + Kvs: []*mvccpb.KeyValue{ + { + Key: []byte("registry-seata-default-172.0.0.1:8091"), + Value: []byte("172.0.0.1:8091"), + }, + { + Key: []byte("registry-seata-default-172.0.0.1:8092"), + Value: []byte("172.0.0.1:8092"), + }, + }, + }, + watchResp: &clientv3.WatchResponse{ + Events: []*clientv3.Event{ + { + Type: clientv3.EventTypeDelete, + Kv: &mvccpb.KeyValue{ + Key: []byte("registry-seata-default-172.0.0.1:8091"), + Value: []byte("172.0.0.1:8091"), + }, + }, + }, + }, + want: []*ServiceInstance{ + { + Addr: "172.0.0.1", + Port: 8092, + }, + }, + }, + } + + for _, tt := range tests { + ctrl := gomock.NewController(t) + mockEtcdClient := mock.NewMockEtcdClient(ctrl) + etcdRegistryService := &EtcdRegistryService{ + client: &clientv3.Client{ + KV: mockEtcdClient, + Watcher: mockEtcdClient, + }, + vgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + grouplist: make(map[string][]*ServiceInstance, 0), + stopCh: make(chan struct{}), + } + + mockEtcdClient.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any()).Return(tt.getResp, nil) + ch := make(chan clientv3.WatchResponse) + mockEtcdClient.EXPECT().Watch(gomock.Any(), gomock.Any(), gomock.Any()).Return(ch) + + go func() { + etcdRegistryService.watch("registry-seata") + }() + // wait a second for watch + time.Sleep(1 * time.Second) + + if tt.watchResp != nil { + go func() { + ch <- *tt.watchResp + }() + } + + // wait one more second for update + time.Sleep(1 * time.Second) + serviceInstances, err := etcdRegistryService.Lookup("default_tx_group") + if err != nil { + t.Errorf("error happen when look up . err = %e", err) + } + t.Logf(tt.name) + for i := range serviceInstances { + t.Log(serviceInstances[i].Addr) + t.Log(serviceInstances[i].Port) + } + assert.True(t, reflect.DeepEqual(serviceInstances, tt.want)) + + etcdRegistryService.Close() + } +} diff --git a/pkg/discovery/init.go b/pkg/discovery/init.go index c3afe27cc..8e7f5e38f 100644 --- a/pkg/discovery/init.go +++ b/pkg/discovery/init.go @@ -33,7 +33,8 @@ func InitRegistry(serviceConfig *ServiceConfig, registryConfig *RegistryConfig) //init file registry registryService = newFileRegistryService(serviceConfig) case ETCD: - //TODO: init etcd registry + //init etcd registry + registryService = newEtcdRegistryService(serviceConfig, ®istryConfig.Etcd3) case NACOS: //TODO: init nacos registry case EUREKA: diff --git a/pkg/discovery/init_test.go b/pkg/discovery/init_test.go index 0cfce0e4d..e78350674 100644 --- a/pkg/discovery/init_test.go +++ b/pkg/discovery/init_test.go @@ -34,7 +34,7 @@ func TestInitRegistry(t *testing.T) { expectedType string }{ { - name: "normal", + name: "file", args: args{ registryConfig: &RegistryConfig{ Type: FILE, @@ -43,6 +43,25 @@ func TestInitRegistry(t *testing.T) { }, expectedType: "FileRegistryService", }, + { + name: "etcd", + args: args{ + serviceConfig: &ServiceConfig{ + VgroupMapping: map[string]string{ + "default_tx_group": "default", + }, + }, + registryConfig: &RegistryConfig{ + Type: ETCD, + Etcd3: Etcd3Config{ + ServerAddr: "127.0.0.1:2379", + Cluster: "default", + }, + }, + }, + hasPanic: false, + expectedType: "EtcdRegistryService", + }, { name: "unknown type", args: args{ diff --git a/pkg/discovery/mock/mock_etcd_client.go b/pkg/discovery/mock/mock_etcd_client.go new file mode 100644 index 000000000..5c3f63c1d --- /dev/null +++ b/pkg/discovery/mock/mock_etcd_client.go @@ -0,0 +1,192 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: test_etcd_client.go + +// Package mock is a generated GoMock package. +package mock + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + clientv3 "go.etcd.io/etcd/client/v3" +) + +// MockEtcdClient is a mock of EtcdClient interface. +type MockEtcdClient struct { + ctrl *gomock.Controller + recorder *MockEtcdClientMockRecorder +} + +// MockEtcdClientMockRecorder is the mock recorder for MockEtcdClient. +type MockEtcdClientMockRecorder struct { + mock *MockEtcdClient +} + +// NewMockEtcdClient creates a new mock instance. +func NewMockEtcdClient(ctrl *gomock.Controller) *MockEtcdClient { + mock := &MockEtcdClient{ctrl: ctrl} + mock.recorder = &MockEtcdClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockEtcdClient) EXPECT() *MockEtcdClientMockRecorder { + return m.recorder +} + +// Close mocks base method. +func (m *MockEtcdClient) Close() error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Close") + ret0, _ := ret[0].(error) + return ret0 +} + +// Close indicates an expected call of Close. +func (mr *MockEtcdClientMockRecorder) Close() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Close", reflect.TypeOf((*MockEtcdClient)(nil).Close)) +} + +// Compact mocks base method. +func (m *MockEtcdClient) Compact(ctx context.Context, rev int64, opts ...clientv3.CompactOption) (*clientv3.CompactResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, rev} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Compact", varargs...) + ret0, _ := ret[0].(*clientv3.CompactResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Compact indicates an expected call of Compact. +func (mr *MockEtcdClientMockRecorder) Compact(ctx, rev interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, rev}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Compact", reflect.TypeOf((*MockEtcdClient)(nil).Compact), varargs...) +} + +// Delete mocks base method. +func (m *MockEtcdClient) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, key} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Delete", varargs...) + ret0, _ := ret[0].(*clientv3.DeleteResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Delete indicates an expected call of Delete. +func (mr *MockEtcdClientMockRecorder) Delete(ctx, key interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, key}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Delete", reflect.TypeOf((*MockEtcdClient)(nil).Delete), varargs...) +} + +// Do mocks base method. +func (m *MockEtcdClient) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Do", ctx, op) + ret0, _ := ret[0].(clientv3.OpResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Do indicates an expected call of Do. +func (mr *MockEtcdClientMockRecorder) Do(ctx, op interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Do", reflect.TypeOf((*MockEtcdClient)(nil).Do), ctx, op) +} + +// Get mocks base method. +func (m *MockEtcdClient) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, key} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Get", varargs...) + ret0, _ := ret[0].(*clientv3.GetResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Get indicates an expected call of Get. +func (mr *MockEtcdClientMockRecorder) Get(ctx, key interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, key}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Get", reflect.TypeOf((*MockEtcdClient)(nil).Get), varargs...) +} + +// Put mocks base method. +func (m *MockEtcdClient) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, key, val} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Put", varargs...) + ret0, _ := ret[0].(*clientv3.PutResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// Put indicates an expected call of Put. +func (mr *MockEtcdClientMockRecorder) Put(ctx, key, val interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, key, val}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Put", reflect.TypeOf((*MockEtcdClient)(nil).Put), varargs...) +} + +// RequestProgress mocks base method. +func (m *MockEtcdClient) RequestProgress(ctx context.Context) error { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "RequestProgress", ctx) + ret0, _ := ret[0].(error) + return ret0 +} + +// RequestProgress indicates an expected call of RequestProgress. +func (mr *MockEtcdClientMockRecorder) RequestProgress(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestProgress", reflect.TypeOf((*MockEtcdClient)(nil).RequestProgress), ctx) +} + +// Txn mocks base method. +func (m *MockEtcdClient) Txn(ctx context.Context) clientv3.Txn { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Txn", ctx) + ret0, _ := ret[0].(clientv3.Txn) + return ret0 +} + +// Txn indicates an expected call of Txn. +func (mr *MockEtcdClientMockRecorder) Txn(ctx interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Txn", reflect.TypeOf((*MockEtcdClient)(nil).Txn), ctx) +} + +// Watch mocks base method. +func (m *MockEtcdClient) Watch(ctx context.Context, key string, opts ...clientv3.OpOption) clientv3.WatchChan { + m.ctrl.T.Helper() + varargs := []interface{}{ctx, key} + for _, a := range opts { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "Watch", varargs...) + ret0, _ := ret[0].(clientv3.WatchChan) + return ret0 +} + +// Watch indicates an expected call of Watch. +func (mr *MockEtcdClientMockRecorder) Watch(ctx, key interface{}, opts ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{ctx, key}, opts...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Watch", reflect.TypeOf((*MockEtcdClient)(nil).Watch), varargs...) +} diff --git a/pkg/discovery/mock/test_etcd_client.go b/pkg/discovery/mock/test_etcd_client.go new file mode 100644 index 000000000..d780e0ea8 --- /dev/null +++ b/pkg/discovery/mock/test_etcd_client.go @@ -0,0 +1,8 @@ +package mock + +import "go.etcd.io/etcd/client/v3" + +type EtcdClient interface { + clientv3.KV + clientv3.Watcher +}