Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove watch timeout to allow call staggering #1296

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ require (
gopkg.in/natefinch/lumberjack.v2 v2.0.0
k8s.io/api v0.29.0
k8s.io/apimachinery v0.29.0
k8s.io/client-go v1.5.2
k8s.io/client-go v0.29.0
michaely-cb marked this conversation as resolved.
Show resolved Hide resolved
k8s.io/klog v1.0.0
k8s.io/klog/v2 v2.110.1
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
Expand Down
2 changes: 2 additions & 0 deletions pkg/k8sclient/k8sclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ type NoK8sNetworkError struct {
// ClientInfo contains information given from k8s client
type ClientInfo struct {
Client kubernetes.Interface
WatchClient kubernetes.Interface
NetClient netclient.Interface
NetWatchClient netclient.Interface
EventBroadcaster record.EventBroadcaster
EventRecorder record.EventRecorder

Expand Down
17 changes: 15 additions & 2 deletions pkg/k8sclient/kubeconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,19 @@ func newClientInfo(config *rest.Config) (*ClientInfo, error) {
if err != nil {
return nil, err
}
netClient, err := netclient.NewForConfig(config)
if err != nil {
return nil, err
}

netclient, err := netclient.NewForConfig(config)
watchConfig := rest.CopyConfig(config)
// Do not set timeout for watches and delegate timeout to client-go
adrianchiris marked this conversation as resolved.
Show resolved Hide resolved
watchConfig.Timeout = 0
watchClient, err := kubernetes.NewForConfig(watchConfig)
if err != nil {
return nil, err
}
netWatchClient, err := netclient.NewForConfig(watchConfig)
if err != nil {
return nil, err
}
Expand All @@ -240,7 +251,9 @@ func newClientInfo(config *rest.Config) (*ClientInfo, error) {
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "multus"})
return &ClientInfo{
Client: client,
NetClient: netclient,
WatchClient: watchClient,
NetClient: netClient,
NetWatchClient: netWatchClient,
EventBroadcaster: broadcaster,
EventRecorder: recorder,
}, nil
Expand Down
16 changes: 8 additions & 8 deletions pkg/multus/multus_cni100_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ import (
netdefinformerv1 "github.com/k8snetworkplumbingwg/network-attachment-definition-client/pkg/client/informers/externalversions/k8s.cni.cncf.io/v1"
)

func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.SharedIndexInformer {
informerFactory := informerfactory.NewSharedInformerFactory(kclient, 0*time.Second)
func newPodInformer(ctx context.Context, watchClient kubernetes.Interface) cache.SharedIndexInformer {
informerFactory := informerfactory.NewSharedInformerFactory(watchClient, 0*time.Second)

podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return v1coreinformers.NewFilteredPodInformer(
Expand All @@ -71,8 +71,8 @@ func newPodInformer(ctx context.Context, kclient kubernetes.Interface) cache.Sha
return podInformer
}

func newNetDefInformer(ctx context.Context, client netdefclient.Interface) cache.SharedIndexInformer {
informerFactory := netdefinformer.NewSharedInformerFactory(client, 0*time.Second)
func newNetDefInformer(ctx context.Context, netWatchClient netdefclient.Interface) cache.SharedIndexInformer {
informerFactory := netdefinformer.NewSharedInformerFactory(netWatchClient, 0*time.Second)

netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return netdefinformerv1.NewNetworkAttachmentDefinitionInformer(
Expand Down Expand Up @@ -941,8 +941,8 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() {
_, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1))
Expect(err).NotTo(HaveOccurred())

podInformer := newPodInformer(ctx, fKubeClient.Client)
netdefInformer := newNetDefInformer(ctx, fKubeClient.NetClient)
podInformer := newPodInformer(ctx, fKubeClient.WatchClient)
netdefInformer := newNetDefInformer(ctx, fKubeClient.NetWatchClient)
fKubeClient.SetK8sClientInformers(podInformer, netdefInformer)

result, err := CmdAdd(args, fExec, fKubeClient)
Expand Down Expand Up @@ -991,8 +991,8 @@ var _ = Describe("multus operations cniVersion 1.0.0 config", func() {
_, err := fKubeClient.AddNetAttachDef(testhelpers.NewFakeNetAttachDef("kube-system", "net1", net1))
Expect(err).NotTo(HaveOccurred())

podInformer := newPodInformer(ctx, fKubeClient.Client)
netdefInformer := newNetDefInformer(ctx, fKubeClient.NetClient)
podInformer := newPodInformer(ctx, fKubeClient.WatchClient)
netdefInformer := newNetDefInformer(ctx, fKubeClient.NetWatchClient)
fKubeClient.SetK8sClientInformers(podInformer, netdefInformer)

wg := sync.WaitGroup{}
Expand Down
8 changes: 5 additions & 3 deletions pkg/multus/multus_suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,11 @@ func (f *fakeExec) FindInPath(plugin string, paths []string) (string, error) {
// NewFakeClientInfo returns fake client (just for testing)
func NewFakeClientInfo() *k8sclient.ClientInfo {
return &k8sclient.ClientInfo{
Client: fake.NewSimpleClientset(),
NetClient: netfake.NewSimpleClientset(),
EventRecorder: record.NewFakeRecorder(10),
Client: fake.NewSimpleClientset(),
WatchClient: fake.NewSimpleClientset(),
NetClient: netfake.NewSimpleClientset(),
NetWatchClient: netfake.NewSimpleClientset(),
EventRecorder: record.NewFakeRecorder(10),
}
}

Expand Down
17 changes: 8 additions & 9 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,10 +158,10 @@ func informerObjectTrim(obj interface{}) (interface{}, error) {
return obj, nil
}

func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) {
func newNetDefInformer(netWatchClient netdefclient.Interface) (netdefinformer.SharedInformerFactory, cache.SharedIndexInformer) {
const resyncInterval time.Duration = 1 * time.Second

informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netdefClient, resyncInterval)
informerFactory := netdefinformer.NewSharedInformerFactoryWithOptions(netWatchClient, resyncInterval)
netdefInformer := informerFactory.InformerFor(&netdefv1.NetworkAttachmentDefinition{}, func(client netdefclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return netdefinformerv1.NewNetworkAttachmentDefinitionInformer(
client,
Expand All @@ -173,7 +173,7 @@ func newNetDefInformer(netdefClient netdefclient.Interface) (netdefinformer.Shar
return informerFactory, netdefInformer
}

func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) {
func newPodInformer(watchClient kubernetes.Interface, nodeName string) (internalinterfaces.SharedInformerFactory, cache.SharedIndexInformer) {
var tweakFunc internalinterfaces.TweakListOptionsFunc
if nodeName != "" {
logging.Verbosef("Filtering pod watch for node %q", nodeName)
Expand All @@ -185,7 +185,7 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali

const resyncInterval time.Duration = 1 * time.Second

informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(kubeClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim))
informerFactory := informerfactory.NewSharedInformerFactoryWithOptions(watchClient, resyncInterval, informerfactory.WithTransform(informerObjectTrim))
podInformer := informerFactory.InformerFor(&kapi.Pod{}, func(c kubernetes.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
return v1coreinformers.NewFilteredPodInformer(
c,
Expand All @@ -194,7 +194,6 @@ func newPodInformer(kubeClient kubernetes.Interface, nodeName string) (internali
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
tweakFunc)
})

return informerFactory, podInformer
}

Expand Down Expand Up @@ -255,8 +254,8 @@ func NewCNIServer(daemonConfig *ControllerNetConf, serverConfig []byte, ignoreRe
}

func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, servConfig []byte, ignoreReadinessIndicator bool) (*Server, error) {
informerFactory, podInformer := newPodInformer(kubeClient.Client, os.Getenv("MULTUS_NODE_NAME"))
netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetClient)
podInformerFactory, podInformer := newPodInformer(kubeClient.WatchClient, os.Getenv("MULTUS_NODE_NAME"))
netdefInformerFactory, netdefInformer := newNetDefInformer(kubeClient.NetWatchClient)
kubeClient.SetK8sClientInformers(podInformer, netdefInformer)

router := http.NewServeMux()
Expand All @@ -277,7 +276,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s
[]string{"handler", "code", "method"},
),
},
informerFactory: informerFactory,
podInformerFactory: podInformerFactory,
podInformer: podInformer,
netdefInformerFactory: netdefInformerFactory,
netdefInformer: netdefInformer,
Expand Down Expand Up @@ -356,7 +355,7 @@ func newCNIServer(rundir string, kubeClient *k8s.ClientInfo, exec invoke.Exec, s

// Start starts the server and begins serving on the given listener
func (s *Server) Start(ctx context.Context, l net.Listener) {
s.informerFactory.Start(ctx.Done())
s.podInformerFactory.Start(ctx.Done())
s.netdefInformerFactory.Start(ctx.Done())

// Give the initial sync some time to complete in large clusters, but
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ type Server struct {
exec invoke.Exec
serverConfig []byte
metrics *Metrics
informerFactory internalinterfaces.SharedInformerFactory
podInformerFactory internalinterfaces.SharedInformerFactory
podInformer cache.SharedIndexInformer
netdefInformerFactory netdefinformer.SharedInformerFactory
netdefInformer cache.SharedIndexInformer
Expand Down
2 changes: 1 addition & 1 deletion vendor/modules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -478,7 +478,7 @@ k8s.io/apimachinery/pkg/version
k8s.io/apimachinery/pkg/watch
k8s.io/apimachinery/third_party/forked/golang/json
k8s.io/apimachinery/third_party/forked/golang/reflect
# k8s.io/client-go v1.5.2 => k8s.io/client-go v0.29.0
# k8s.io/client-go v0.29.0
## explicit; go 1.21
k8s.io/client-go/applyconfigurations/admissionregistration/v1
k8s.io/client-go/applyconfigurations/admissionregistration/v1alpha1
Expand Down
Loading