Skip to content

Commit

Permalink
update the capacity to zero on shutdown/reset
Browse files Browse the repository at this point in the history
When the device plugin is restarted, kubelet marks the resource as unhealthy, but still reports the resource as existing for a grace period (5 mins). If a pod is scheduled before the device plugin comes up, the pod create fails without a retryloop with an error message Pod was rejected: Allocate failed due to no healthy devices present; cannot allocate unhealthy devices <DEVICE_NAME>, which is unexpected.

This commit allow the device plugin to send an empty list of devices before the reset or shutdown

Signed-off-by: Sebastian Sch <[email protected]>
  • Loading branch information
SchSeba committed Oct 2, 2023
1 parent 2cc723d commit ee1e0e9
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 3 deletions.
35 changes: 32 additions & 3 deletions pkg/resources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type resourceServer struct {
resourceNamePrefix string
grpcServer *grpc.Server
termSignal chan bool
terminatedSignal chan bool
updateSignal chan bool
stopWatcher chan bool
checkIntervals int // health check intervals in seconds
Expand All @@ -49,8 +50,9 @@ type resourceServer struct {
}

const (
rsWatchInterval = 5 * time.Second
serverStartTimeout = 5 * time.Second
rsWatchInterval = 5 * time.Second
serverStartTimeout = 5 * time.Second
terminatedSignalTimeOut = 5 * time.Second
)

// NewResourceServer returns an instance of ResourceServer
Expand All @@ -69,6 +71,7 @@ func NewResourceServer(prefix, suffix string, pluginWatch, useCdi bool, rp types
useCdi: useCdi,
grpcServer: grpc.NewServer(),
termSignal: make(chan bool, 1),
terminatedSignal: make(chan bool, 1),
updateSignal: make(chan bool),
stopWatcher: make(chan bool),
checkIntervals: 20, // updates every 20 seconds
Expand Down Expand Up @@ -178,8 +181,18 @@ func (rs *resourceServer) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.
for {
select {
case <-rs.termSignal:
// Terminate signal received; return from mehtod call
// Terminate signal received; send an empty list of devices and return from method call
glog.Infof("%s: terminate signal received", methodID)
resp = new(pluginapi.ListAndWatchResponse)
resp.Devices = make([]*pluginapi.Device, 0)

if err := stream.Send(resp); err != nil {
glog.Errorf("%s: error: cannot update device states: %v\n", methodID, err)
return err
}
// Releasing the terminate process to close the grpc server
rs.terminatedSignal <- true

return nil
case <-rs.updateSignal:
// Device health changed; so send new device list
Expand Down Expand Up @@ -309,6 +322,14 @@ func (rs *resourceServer) restart() error {
// Send terminate signal to ListAndWatch()
rs.termSignal <- true

// wait for the terminated signal or 5 second
select {
case <-rs.terminatedSignal:
break
case <-time.After(terminatedSignalTimeOut):
glog.Warningf("Timed out waiting for ListAndWatch terminated signal")
}

rs.grpcServer = grpc.NewServer() // new instance of a grpc server
return rs.Start()
}
Expand All @@ -325,6 +346,14 @@ func (rs *resourceServer) Stop() error {
rs.stopWatcher <- true
}

// wait for the terminated signal or 5 second
select {
case <-rs.terminatedSignal:
break
case <-time.After(terminatedSignalTimeOut):
glog.Warningf("Timed out waiting for ListAndWatch terminated signal")
}

rs.grpcServer.Stop()
rs.grpcServer = nil

Expand Down
6 changes: 6 additions & 0 deletions pkg/resources/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,16 @@ var _ = Describe("Server", func() {
err := rs.Start()
Expect(err).NotTo(HaveOccurred())

// demo the listAndWatch
rs.terminatedSignal <- true
err = rs.restart()
Expect(err).NotTo(HaveOccurred())
Eventually(rs.termSignal, time.Second*10).Should(Receive())

go func() {
rp.On("CleanDeviceInfoFile", "fake").Return(nil)
// demo the listAndWatch
rs.terminatedSignal <- true
err := rs.Stop()
Expect(err).NotTo(HaveOccurred())
rp.AssertCalled(t, "CleanDeviceInfoFile", "fake")
Expand Down Expand Up @@ -210,6 +214,7 @@ var _ = Describe("Server", func() {

go func() {
rp.On("CleanDeviceInfoFile", "fake").Return(nil)
rs.terminatedSignal <- true
err := rs.Stop()
Expect(err).NotTo(HaveOccurred())
rp.AssertCalled(t, "CleanDeviceInfoFile", "fake")
Expand Down Expand Up @@ -252,6 +257,7 @@ var _ = Describe("Server", func() {

// sleep 1 second to let watcher perform at least a single socket-file check
time.Sleep(time.Second)
rs.terminatedSignal <- true
err = rs.Stop()
Expect(err).NotTo(HaveOccurred())

Expand Down

0 comments on commit ee1e0e9

Please sign in to comment.