From 2286fbafb454efe30f4ab8ea51cc047023ec6f83 Mon Sep 17 00:00:00 2001 From: Michael Yue Date: Tue, 11 Jun 2024 09:55:50 -0700 Subject: [PATCH 1/2] Remove watch timeout to allow call staggering --- go.mod | 2 +- pkg/k8sclient/k8sclient.go | 2 ++ pkg/k8sclient/kubeconfig.go | 17 +++++++++++++++-- pkg/multus/multus_cni100_test.go | 16 ++++++++-------- pkg/multus/multus_suite_test.go | 8 +++++--- pkg/server/server.go | 17 ++++++++--------- pkg/server/types.go | 2 +- vendor/modules.txt | 2 +- 8 files changed, 41 insertions(+), 25 deletions(-) diff --git a/go.mod b/go.mod index 531d4c635..e64872b35 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,7 @@ require ( gopkg.in/natefinch/lumberjack.v2 v2.0.0 k8s.io/api v0.29.0 k8s.io/apimachinery v0.29.0 - k8s.io/client-go v1.5.2 + k8s.io/client-go v0.29.0 k8s.io/klog v1.0.0 k8s.io/klog/v2 v2.110.1 k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect diff --git a/pkg/k8sclient/k8sclient.go b/pkg/k8sclient/k8sclient.go index 83f306f80..10fa71a8c 100644 --- a/pkg/k8sclient/k8sclient.go +++ b/pkg/k8sclient/k8sclient.go @@ -59,7 +59,9 @@ type NoK8sNetworkError struct { // ClientInfo contains information given from k8s client type ClientInfo struct { Client kubernetes.Interface + WatchClient kubernetes.Interface NetClient netclient.Interface + NetWatchClient netclient.Interface EventBroadcaster record.EventBroadcaster EventRecorder record.EventRecorder diff --git a/pkg/k8sclient/kubeconfig.go b/pkg/k8sclient/kubeconfig.go index b9ca1b402..14fe80d91 100644 --- a/pkg/k8sclient/kubeconfig.go +++ b/pkg/k8sclient/kubeconfig.go @@ -228,8 +228,19 @@ func newClientInfo(config *rest.Config) (*ClientInfo, error) { if err != nil { return nil, err } + netClient, err := netclient.NewForConfig(config) + if err != nil { + return nil, err + } - netclient, err := netclient.NewForConfig(config) + watchConfig := rest.CopyConfig(config) + // Do not set timeout for watches and delegate timeout to client-go + watchConfig.Timeout = 0 + watchClient, err := kubernetes.NewForConfig(watchConfig) + if err != nil { + return nil, err + } + netWatchClient, err := netclient.NewForConfig(watchConfig) if err != nil { return nil, err } @@ -240,7 +251,9 @@ func newClientInfo(config *rest.Config) (*ClientInfo, error) { recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"}) return &ClientInfo{ Client: client, - NetClient: netclient, + WatchClient: watchClient, + NetClient: netClient, + NetWatchClient: netWatchClient, EventBroadcaster: broadcaster, EventRecorder: recorder, }, nil diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index 7c4f7013c..41e7b73c2 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -48,8 +48,8 @@ import ( netdefinformerv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions/k8s.cni.cncf.io/v1" ) -func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer { - informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0*time.Second) +func newPodInformer(ctx context.Context, watchClient kubernetes.Interface) cache.SharedIndexInformer { + informerFactory := informerfactory.NewSharedInformerFactory(watchClient, 0*time.Second) podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return v1coreinformers.NewFilteredPodInformer( @@ -71,8 +71,8 @@ func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.Sha return podInformer } -func newNetDefInformer(ctx context.Context, client netdefclient.Interface) cache.SharedIndexInformer { - informerFactory := netdefinformer.NewSharedInformerFactory(client, 0*time.Second) +func newNetDefInformer(ctx context.Context, netWatchClient netdefclient.Interface) cache.SharedIndexInformer { + informerFactory := netdefinformer.NewSharedInformerFactory(netWatchClient, 0*time.Second) netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return netdefinformerv1.NewNetworkAttachmentDefinitionInformer( @@ -941,8 +941,8 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - podInformer := newPodInformer(ctx, fKubeClient.Client) - netdefInformer := newNetDefInformer(ctx, fKubeClient.NetClient) + podInformer := newPodInformer(ctx, fKubeClient.WatchClient) + netdefInformer := newNetDefInformer(ctx, fKubeClient.NetWatchClient) fKubeClient.SetK8sClientInformers(podInformer, netdefInformer) result, err := CmdAdd(args, fExec, fKubeClient) @@ -991,8 +991,8 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - podInformer := newPodInformer(ctx, fKubeClient.Client) - netdefInformer := newNetDefInformer(ctx, fKubeClient.NetClient) + podInformer := newPodInformer(ctx, fKubeClient.WatchClient) + netdefInformer := newNetDefInformer(ctx, fKubeClient.NetWatchClient) fKubeClient.SetK8sClientInformers(podInformer, netdefInformer) wg := sync.WaitGroup{} diff --git a/pkg/multus/multus_suite_test.go b/pkg/multus/multus_suite_test.go index a9ad866f2..6b1a6deff 100644 --- a/pkg/multus/multus_suite_test.go +++ b/pkg/multus/multus_suite_test.go @@ -225,9 +225,11 @@ func (f *fakeExec) FindInPath(plugin string, paths []string) (string, error) { // NewFakeClientInfo returns fake client (just for testing) func NewFakeClientInfo() *k8sclient.ClientInfo { return &k8sclient.ClientInfo{ - Client: fake.NewSimpleClientset(), - NetClient: netfake.NewSimpleClientset(), - EventRecorder: record.NewFakeRecorder(10), + Client: fake.NewSimpleClientset(), + WatchClient: fake.NewSimpleClientset(), + NetClient: netfake.NewSimpleClientset(), + NetWatchClient: netfake.NewSimpleClientset(), + EventRecorder: record.NewFakeRecorder(10), } } diff --git a/pkg/server/server.go b/pkg/server/server.go index d469af23a..bf62927c9 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -158,10 +158,10 @@ func informerObjectTrim(obj interface{}) (interface{}, error) { return obj, nil } -func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) { +func newNetDefInformer(netWatchClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) { const resyncInterval time.Duration = 1 * time.Second - informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netdefClient, resyncInterval) + informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netWatchClient, resyncInterval) netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return netdefinformerv1.NewNetworkAttachmentDefinitionInformer( client, @@ -173,7 +173,7 @@ func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.Shar return informerFactory, netdefInformer } -func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) { +func newPodInformer(watchClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) { var tweakFunc internalinterfaces.TweakListOptionsFunc if nodeName != "" { logging.Verbosef("Filtering pod watch for node %q", nodeName) @@ -185,7 +185,7 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali const resyncInterval time.Duration = 1 * time.Second - informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(kubeClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim)) + informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(watchClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim)) podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return v1coreinformers.NewFilteredPodInformer( c, @@ -194,7 +194,6 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, tweakFunc) }) - return informerFactory, podInformer } @@ -255,8 +254,8 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe } func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) { - informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME")) - netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient) + podInformerFactory, podInformer := newPodInformer(kubeClient.WatchClient, os.Getenv("MULTUS_NODE_NAME")) + netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetWatchClient) kubeClient.SetK8sClientInformers(podInformer, netdefInformer) router := http.NewServeMux() @@ -277,7 +276,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s []string{"handler", "code", "method"}, ), }, - informerFactory: informerFactory, + podInformerFactory: podInformerFactory, podInformer: podInformer, netdefInformerFactory: netdefInformerFactory, netdefInformer: netdefInformer, @@ -356,7 +355,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s // Start starts the server and begins serving on the given listener func (s *Server) Start(ctx context.Context, l net.Listener) { - s.informerFactory.Start(ctx.Done()) + s.podInformerFactory.Start(ctx.Done()) s.netdefInformerFactory.Start(ctx.Done()) // Give the initial sync some time to complete in large clusters, but diff --git a/pkg/server/types.go b/pkg/server/types.go index 81d4d6819..225877d12 100644 --- a/pkg/server/types.go +++ b/pkg/server/types.go @@ -54,7 +54,7 @@ type Server struct { exec invoke.Exec serverConfig []byte metrics *Metrics - informerFactory internalinterfaces.SharedInformerFactory + podInformerFactory internalinterfaces.SharedInformerFactory podInformer cache.SharedIndexInformer netdefInformerFactory netdefinformer.SharedInformerFactory netdefInformer cache.SharedIndexInformer diff --git a/vendor/modules.txt b/vendor/modules.txt index 26cb7a525..6a2a152dc 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -478,7 +478,7 @@ k8s.io/apimachinery/pkg/version k8s.io/apimachinery/pkg/watch k8s.io/apimachinery/third_party/forked/golang/json k8s.io/apimachinery/third_party/forked/golang/reflect -# k8s.io/client-go v1.5.2 => k8s.io/client-go v0.29.0 +# k8s.io/client-go v0.29.0 ## explicit; go 1.21 k8s.io/client-go/applyconfigurations/admissionregistration/v1 k8s.io/client-go/applyconfigurations/admissionregistration/v1alpha1 From c4db076d90337e93def8e3e3a2229e67f780ac8d Mon Sep 17 00:00:00 2001 From: Michael Yue Date: Fri, 21 Jun 2024 08:36:58 -0400 Subject: [PATCH 2/2] fix tests --- pkg/multus/multus_cni100_test.go | 17 +++++++++-------- pkg/multus/multus_suite_test.go | 13 +++++++------ 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/pkg/multus/multus_cni100_test.go b/pkg/multus/multus_cni100_test.go index 41e7b73c2..fa04755dc 100644 --- a/pkg/multus/multus_cni100_test.go +++ b/pkg/multus/multus_cni100_test.go @@ -28,6 +28,7 @@ import ( cni100 "github.com/containernetworking/cni/pkg/types/100" "github.com/containernetworking/plugins/pkg/ns" "github.com/containernetworking/plugins/pkg/testutils" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/logging" testhelpers "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/testing" @@ -48,8 +49,8 @@ import ( netdefinformerv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions/k8s.cni.cncf.io/v1" ) -func newPodInformer(ctx context.Context, watchClient kubernetes.Interface) cache.SharedIndexInformer { - informerFactory := informerfactory.NewSharedInformerFactory(watchClient, 0*time.Second) +func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer { + informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0*time.Second) podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return v1coreinformers.NewFilteredPodInformer( @@ -71,8 +72,8 @@ func newPodInformer(ctx context.Context, watchClient kubernetes.Interface) cache return podInformer } -func newNetDefInformer(ctx context.Context, netWatchClient netdefclient.Interface) cache.SharedIndexInformer { - informerFactory := netdefinformer.NewSharedInformerFactory(netWatchClient, 0*time.Second) +func newNetDefInformer(ctx context.Context, client netdefclient.Interface) cache.SharedIndexInformer { + informerFactory := netdefinformer.NewSharedInformerFactory(client, 0*time.Second) netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer { return netdefinformerv1.NewNetworkAttachmentDefinitionInformer( @@ -941,8 +942,8 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - podInformer := newPodInformer(ctx, fKubeClient.WatchClient) - netdefInformer := newNetDefInformer(ctx, fKubeClient.NetWatchClient) + podInformer := newPodInformer(ctx, fKubeClient.Client) + netdefInformer := newNetDefInformer(ctx, fKubeClient.NetClient) fKubeClient.SetK8sClientInformers(podInformer, netdefInformer) result, err := CmdAdd(args, fExec, fKubeClient) @@ -991,8 +992,8 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() { _, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1)) Expect(err).NotTo(HaveOccurred()) - podInformer := newPodInformer(ctx, fKubeClient.WatchClient) - netdefInformer := newNetDefInformer(ctx, fKubeClient.NetWatchClient) + podInformer := newPodInformer(ctx, fKubeClient.Client) + netdefInformer := newNetDefInformer(ctx, fKubeClient.NetClient) fKubeClient.SetK8sClientInformers(podInformer, netdefInformer) wg := sync.WaitGroup{} diff --git a/pkg/multus/multus_suite_test.go b/pkg/multus/multus_suite_test.go index 6b1a6deff..330c848f0 100644 --- a/pkg/multus/multus_suite_test.go +++ b/pkg/multus/multus_suite_test.go @@ -31,10 +31,11 @@ import ( cni100 "github.com/containernetworking/cni/pkg/types/100" cniversion "github.com/containernetworking/cni/pkg/version" netfake "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/clientset/versioned/fake" - "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/tools/record" + "gopkg.in/k8snetworkplumbingwg/multus-cni.v4/pkg/k8sclient" + . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -225,11 +226,11 @@ func (f *fakeExec) FindInPath(plugin string, paths []string) (string, error) { // NewFakeClientInfo returns fake client (just for testing) func NewFakeClientInfo() *k8sclient.ClientInfo { return &k8sclient.ClientInfo{ - Client: fake.NewSimpleClientset(), - WatchClient: fake.NewSimpleClientset(), - NetClient: netfake.NewSimpleClientset(), - NetWatchClient: netfake.NewSimpleClientset(), - EventRecorder: record.NewFakeRecorder(10), + // We use watch clients to avoid reconnections in fixed intervals in production, though we do not + // distinguish between non-watch clients and watch clients in tests for simplicity + Client: fake.NewSimpleClientset(), + NetClient: netfake.NewSimpleClientset(), + EventRecorder: record.NewFakeRecorder(10), } }