From ee1e0e974cbb363d199ed096addae8712cd093ac Mon Sep 17 00:00:00 2001 From: Sebastian Sch Date: Mon, 2 Oct 2023 10:41:30 +0300 Subject: [PATCH] update the capacity to zero on shutdown/reset When the device plugin is restarted, kubelet marks the resource as unhealthy, but still reports the resource as existing for a grace period (5 mins). If a pod is scheduled before the device plugin comes up, the pod create fails without a retryloop with an error message Pod was rejected: Allocate failed due to no healthy devices present; cannot allocate unhealthy devices , which is unexpected. This commit allow the device plugin to send an empty list of devices before the reset or shutdown Signed-off-by: Sebastian Sch --- pkg/resources/server.go | 35 ++++++++++++++++++++++++++++++++--- pkg/resources/server_test.go | 6 ++++++ 2 files changed, 38 insertions(+), 3 deletions(-) diff --git a/pkg/resources/server.go b/pkg/resources/server.go index bb4fb0135..99a987525 100644 --- a/pkg/resources/server.go +++ b/pkg/resources/server.go @@ -41,6 +41,7 @@ type resourceServer struct { resourceNamePrefix string grpcServer *grpc.Server termSignal chan bool + terminatedSignal chan bool updateSignal chan bool stopWatcher chan bool checkIntervals int // health check intervals in seconds @@ -49,8 +50,9 @@ type resourceServer struct { } const ( - rsWatchInterval = 5 * time.Second - serverStartTimeout = 5 * time.Second + rsWatchInterval = 5 * time.Second + serverStartTimeout = 5 * time.Second + terminatedSignalTimeOut = 5 * time.Second ) // NewResourceServer returns an instance of ResourceServer @@ -69,6 +71,7 @@ func NewResourceServer(prefix, suffix string, pluginWatch, useCdi bool, rp types useCdi: useCdi, grpcServer: grpc.NewServer(), termSignal: make(chan bool, 1), + terminatedSignal: make(chan bool, 1), updateSignal: make(chan bool), stopWatcher: make(chan bool), checkIntervals: 20, // updates every 20 seconds @@ -178,8 +181,18 @@ func (rs *resourceServer) ListAndWatch(empty *pluginapi.Empty, stream pluginapi. for { select { case <-rs.termSignal: - // Terminate signal received; return from mehtod call + // Terminate signal received; send an empty list of devices and return from method call glog.Infof("%s: terminate signal received", methodID) + resp = new(pluginapi.ListAndWatchResponse) + resp.Devices = make([]*pluginapi.Device, 0) + + if err := stream.Send(resp); err != nil { + glog.Errorf("%s: error: cannot update device states: %v\n", methodID, err) + return err + } + // Releasing the terminate process to close the grpc server + rs.terminatedSignal <- true + return nil case <-rs.updateSignal: // Device health changed; so send new device list @@ -309,6 +322,14 @@ func (rs *resourceServer) restart() error { // Send terminate signal to ListAndWatch() rs.termSignal <- true + // wait for the terminated signal or 5 second + select { + case <-rs.terminatedSignal: + break + case <-time.After(terminatedSignalTimeOut): + glog.Warningf("Timed out waiting for ListAndWatch terminated signal") + } + rs.grpcServer = grpc.NewServer() // new instance of a grpc server return rs.Start() } @@ -325,6 +346,14 @@ func (rs *resourceServer) Stop() error { rs.stopWatcher <- true } + // wait for the terminated signal or 5 second + select { + case <-rs.terminatedSignal: + break + case <-time.After(terminatedSignalTimeOut): + glog.Warningf("Timed out waiting for ListAndWatch terminated signal") + } + rs.grpcServer.Stop() rs.grpcServer = nil diff --git a/pkg/resources/server_test.go b/pkg/resources/server_test.go index 5608b7413..753b4df8b 100644 --- a/pkg/resources/server_test.go +++ b/pkg/resources/server_test.go @@ -169,12 +169,16 @@ var _ = Describe("Server", func() { err := rs.Start() Expect(err).NotTo(HaveOccurred()) + // demo the listAndWatch + rs.terminatedSignal <- true err = rs.restart() Expect(err).NotTo(HaveOccurred()) Eventually(rs.termSignal, time.Second*10).Should(Receive()) go func() { rp.On("CleanDeviceInfoFile", "fake").Return(nil) + // demo the listAndWatch + rs.terminatedSignal <- true err := rs.Stop() Expect(err).NotTo(HaveOccurred()) rp.AssertCalled(t, "CleanDeviceInfoFile", "fake") @@ -210,6 +214,7 @@ var _ = Describe("Server", func() { go func() { rp.On("CleanDeviceInfoFile", "fake").Return(nil) + rs.terminatedSignal <- true err := rs.Stop() Expect(err).NotTo(HaveOccurred()) rp.AssertCalled(t, "CleanDeviceInfoFile", "fake") @@ -252,6 +257,7 @@ var _ = Describe("Server", func() { // sleep 1 second to let watcher perform at least a single socket-file check time.Sleep(time.Second) + rs.terminatedSignal <- true err = rs.Stop() Expect(err).NotTo(HaveOccurred())