Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

update the capacity to zero on shutdown/reset #502

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
93 changes: 61 additions & 32 deletions pkg/resources/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,23 +34,25 @@ import (
)

type resourceServer struct {
resourcePool types.ResourcePool
pluginWatch bool
endPoint string // Socket file
sockPath string // Socket file path
resourceNamePrefix string
grpcServer *grpc.Server
termSignal chan bool
updateSignal chan bool
stopWatcher chan bool
checkIntervals int // health check intervals in seconds
useCdi bool
cdi cdiPkg.CDI
resourcePool types.ResourcePool
pluginWatch bool
endPoint string // Socket file
sockPath string // Socket file path
resourceNamePrefix string
grpcServer *grpc.Server
listAndWatchStopSignal chan bool
listAndWatchFinishedSignal chan bool
updateSignal chan bool
stopWatcher chan bool
checkIntervals int // health check intervals in seconds
useCdi bool
cdi cdiPkg.CDI
}

const (
rsWatchInterval = 5 * time.Second
serverStartTimeout = 5 * time.Second
rsWatchInterval = 5 * time.Second
serverStartTimeout = 5 * time.Second
terminatedSignalTimeOut = 5 * time.Second
)

// NewResourceServer returns an instance of ResourceServer
Expand All @@ -61,18 +63,19 @@ func NewResourceServer(prefix, suffix string, pluginWatch, useCdi bool, rp types
sockPath = filepath.Join(types.DeprecatedSockDir, sockName)
}
return &resourceServer{
resourcePool: rp,
pluginWatch: pluginWatch,
endPoint: sockName,
sockPath: sockPath,
resourceNamePrefix: prefix,
useCdi: useCdi,
grpcServer: grpc.NewServer(),
termSignal: make(chan bool, 1),
updateSignal: make(chan bool),
stopWatcher: make(chan bool),
checkIntervals: 20, // updates every 20 seconds
cdi: cdiPkg.New(),
resourcePool: rp,
pluginWatch: pluginWatch,
endPoint: sockName,
sockPath: sockPath,
resourceNamePrefix: prefix,
useCdi: useCdi,
grpcServer: grpc.NewServer(),
listAndWatchStopSignal: make(chan bool, 1),
listAndWatchFinishedSignal: make(chan bool, 1),
updateSignal: make(chan bool),
stopWatcher: make(chan bool),
checkIntervals: 20, // updates every 20 seconds
cdi: cdiPkg.New(),
}
}

Expand Down Expand Up @@ -177,9 +180,19 @@ func (rs *resourceServer) ListAndWatch(empty *pluginapi.Empty, stream pluginapi.
// listen for events: if updateSignal send new list of devices
for {
select {
case <-rs.termSignal:
// Terminate signal received; return from mehtod call
case <-rs.listAndWatchStopSignal:
// Terminate signal received; send an empty list of devices and return from method call
glog.Infof("%s: terminate signal received", methodID)
resp = new(pluginapi.ListAndWatchResponse)
resp.Devices = make([]*pluginapi.Device, 0)

if err := stream.Send(resp); err != nil {
glog.Errorf("%s: error: cannot update device states: %v\n", methodID, err)
return err
}
// Releasing the terminate process to close the grpc server
rs.listAndWatchFinishedSignal <- true

return nil
case <-rs.updateSignal:
// Device health changed; so send new device list
Expand Down Expand Up @@ -304,11 +317,19 @@ func (rs *resourceServer) restart() error {
if rs.grpcServer == nil {
return fmt.Errorf("grpc server instance not found for %s", resourceName)
}
rs.grpcServer.Stop()
rs.grpcServer = nil
// Send terminate signal to ListAndWatch()
rs.termSignal <- true
rs.listAndWatchStopSignal <- true

// wait for the terminated signal or 5 second
Copy link
Member

@zeeke zeeke Oct 2, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In resoruceServer.Stop() we do the termSignal-terminatedSignal handshake before stopping the grpcServer. n restart, we do it after rs.grpcServer.Stop(). Is it intentional?

It's not strictly related to this PR, but with these new changes I think it can raise an error on L189
stream.Send(resp)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are right! sorry

select {
case <-rs.listAndWatchFinishedSignal:
break
case <-time.After(terminatedSignalTimeOut):
glog.Warningf("Timed out waiting for ListAndWatch terminated signal")
}

rs.grpcServer.Stop()
rs.grpcServer = nil
rs.grpcServer = grpc.NewServer() // new instance of a grpc server
return rs.Start()
}
Expand All @@ -320,11 +341,19 @@ func (rs *resourceServer) Stop() error {
return nil
}
// Send terminate signal to ListAndWatch()
rs.termSignal <- true
rs.listAndWatchStopSignal <- true
if !rs.pluginWatch {
rs.stopWatcher <- true
}

// wait for the terminated signal or 5 second
select {
case <-rs.listAndWatchFinishedSignal:
break
case <-time.After(terminatedSignalTimeOut):
glog.Warningf("Timed out waiting for ListAndWatch terminated signal")
}

rs.grpcServer.Stop()
rs.grpcServer = nil

Expand Down
18 changes: 12 additions & 6 deletions pkg/resources/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,17 +169,21 @@ var _ = Describe("Server", func() {
err := rs.Start()
Expect(err).NotTo(HaveOccurred())

// demo the listAndWatch
rs.listAndWatchFinishedSignal <- true
err = rs.restart()
Expect(err).NotTo(HaveOccurred())
Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())

go func() {
rp.On("CleanDeviceInfoFile", "fake").Return(nil)
// demo the listAndWatch
rs.listAndWatchFinishedSignal <- true
err := rs.Stop()
Expect(err).NotTo(HaveOccurred())
rp.AssertCalled(t, "CleanDeviceInfoFile", "fake")
}()
Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())
Eventually(rs.stopWatcher, time.Second*10).Should(Receive())

close(done)
Expand Down Expand Up @@ -210,11 +214,12 @@ var _ = Describe("Server", func() {

go func() {
rp.On("CleanDeviceInfoFile", "fake").Return(nil)
rs.listAndWatchFinishedSignal <- true
err := rs.Stop()
Expect(err).NotTo(HaveOccurred())
rp.AssertCalled(t, "CleanDeviceInfoFile", "fake")
}()
Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())

close(done)
}, 12.0)
Expand Down Expand Up @@ -252,10 +257,11 @@ var _ = Describe("Server", func() {

// sleep 1 second to let watcher perform at least a single socket-file check
time.Sleep(time.Second)
rs.listAndWatchFinishedSignal <- true
err = rs.Stop()
Expect(err).NotTo(HaveOccurred())

Eventually(rs.termSignal, time.Second*10).Should(Receive())
Eventually(rs.listAndWatchStopSignal, time.Second*10).Should(Receive())

close(done)
}, 12.0)
Expand Down Expand Up @@ -459,7 +465,7 @@ var _ = Describe("Server", func() {
Eventually(lwSrv.updates, time.Second*10).Should(Receive())

// finally send term signal
rs.termSignal <- true
rs.listAndWatchStopSignal <- true

close(done)
}, 30.0)
Expand Down Expand Up @@ -501,7 +507,7 @@ var _ = Describe("Server", func() {
Eventually(lwSrv.updates, time.Second*10).Should(Receive())

// finally send term signal
rs.termSignal <- true
rs.listAndWatchStopSignal <- true

close(done)
}, 30)
Expand Down
Loading