Skip to content

Commit

Permalink
Merge pull request #1 from tossmilestone/fix/race-device-manager
Browse files Browse the repository at this point in the history
Fixes the races around devicemanager Allocate() and endpoint deletion
  • Loading branch information
tossmilestone authored Jun 26, 2018
2 parents 9f8ebd1 + cffd865 commit 687841f
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 13 deletions.
43 changes: 42 additions & 1 deletion pkg/kubelet/cm/deviceplugin/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ type endpoint interface {
allocate(devs []string) (*pluginapi.AllocateResponse, error)
getDevices() []pluginapi.Device
callback(resourceName string, added, updated, deleted []pluginapi.Device)
isStopped() bool
stopGracePeriodExpired() bool
}

type endpointImpl struct {
Expand All @@ -46,6 +48,7 @@ type endpointImpl struct {

socketPath string
resourceName string
stopTime time.Time

devices map[string]pluginapi.Device
mutex sync.Mutex
Expand All @@ -54,6 +57,7 @@ type endpointImpl struct {
}

// newEndpoint creates a new endpoint for the given resourceName.
// This is to be used during normal device plugin registration.
func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) {
client, c, err := dial(socketPath)
if err != nil {
Expand All @@ -73,6 +77,16 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina
}, nil
}

// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set.
// This is to be used during Kubelet restart, before the actual device plugin re-registers.
func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl {
return &endpointImpl{
resourceName: resourceName,
devices: devices,
stopTime: time.Now(),
}
}

func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) {
e.cb(resourceName, added, updated, deleted)
}
Expand Down Expand Up @@ -171,15 +185,42 @@ func (e *endpointImpl) run() {
}
}

func (e *endpointImpl) isStopped() bool {
e.mutex.Lock()
defer e.mutex.Unlock()
return !e.stopTime.IsZero()
}

func (e *endpointImpl) stopGracePeriodExpired() bool {
e.mutex.Lock()
defer e.mutex.Unlock()
return !e.stopTime.IsZero() && time.Since(e.stopTime) > endpointStopGracePeriod
}

// used for testing only
func (e *endpointImpl) setStopTime(t time.Time) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.stopTime = t
}

// allocate issues Allocate gRPC call to the device plugin.
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
if e.isStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
DevicesIDs: devs,
})
}

func (e *endpointImpl) stop() {
e.clientConn.Close()
e.mutex.Lock()
defer e.mutex.Unlock()
if e.clientConn != nil {
e.clientConn.Close()
}
e.stopTime = time.Now()
}

// dial establishes the gRPC communication with the registered device plugin.
Expand Down
24 changes: 16 additions & 8 deletions pkg/kubelet/cm/deviceplugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,20 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) {
needsUpdateCheckpoint := false
var capacity = v1.ResourceList{}
var deletedResources []string
deletedResources := sets.NewString()
m.mutex.Lock()
for resourceName, devices := range m.allDevices {
if _, ok := m.endpoints[resourceName]; !ok {
e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
// The resources contained in endpoints and allDevices
// should always be consistent. Otherwise, we run with the risk
// of failing to garbage collect non-existing resources or devices.
if !ok {
glog.Errorf("unexpected: allDevices and endpoints are out of sync")
}
delete(m.endpoints, resourceName)
delete(m.allDevices, resourceName)
deletedResources = append(deletedResources, resourceName)
deletedResources.Insert(resourceName)
needsUpdateCheckpoint = true
} else {
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
Expand All @@ -391,7 +399,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) {
if needsUpdateCheckpoint {
m.writeCheckpoint()
}
return capacity, deletedResources
return capacity, deletedResources.UnsortedList()
}

// checkpointData struct is used to store pod to device allocation information
Expand Down Expand Up @@ -440,11 +448,11 @@ func (m *ManagerImpl) readCheckpoint() error {
defer m.mutex.Unlock()
m.podDevices.fromCheckpointData(data.PodDeviceEntries)
m.allocatedDevices = m.podDevices.devices()
for resource, devices := range data.RegisteredDevices {
for resource := range data.RegisteredDevices {
// During start up, creates empty allDevices list so that the resource capacity
// will stay zero till the corresponding device plugin re-registers.
m.allDevices[resource] = sets.NewString()
for _, dev := range devices {
m.allDevices[resource].Insert(dev)
}
m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device))
}
return nil
}
Expand Down
46 changes: 42 additions & 4 deletions pkg/kubelet/cm/deviceplugin/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func TestUpdateCapacity(t *testing.T) {
// Adds three devices for resource1, two healthy and one unhealthy.
// Expects capacity for resource1 to be 2.
resourceName1 := "domain1.com/resource1"
testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)}
testManager.endpoints[resourceName1] = e1
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
Expand Down Expand Up @@ -199,7 +200,8 @@ func TestUpdateCapacity(t *testing.T) {

// Tests adding another resource.
resourceName2 := "resource2"
testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)}
e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)}
testManager.endpoints[resourceName2] = e2
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity))
Expand All @@ -208,9 +210,9 @@ func TestUpdateCapacity(t *testing.T) {
as.Equal(int64(2), resource2Capacity.Value())
as.Equal(0, len(removedResources))

// Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
// Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
// is removed from capacity and it no longer exists in allDevices after the call.
delete(testManager.endpoints, resourceName1)
e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
capacity, removed := testManager.GetCapacity()
as.Equal([]string{resourceName1}, removed)
_, ok = capacity[v1.ResourceName(resourceName1)]
Expand All @@ -220,6 +222,39 @@ func TestUpdateCapacity(t *testing.T) {
as.Equal(int64(2), val.Value())
_, ok = testManager.allDevices[resourceName1]
as.False(ok)
_, ok = testManager.endpoints[resourceName1]
as.False(ok)
as.Equal(1, len(testManager.endpoints))

// Stops resourceName2 endpoint. Verifies its stopTime is set, allocate
// calls return errors.
e2.stop()
as.False(e2.stopTime.IsZero())
_, err = e2.allocate([]string{"Device1"})
reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
capacity, removed = testManager.GetCapacity()
val, ok = capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(2), val.Value())
as.Empty(removed)
// Writes and re-reads checkpoints. Verifies we create a stopped endpoint
// for resourceName2, its capacity is set to zero, and we still consider
// it as a DevicePlugin resource. This makes sure any pod that was scheduled
// during the time of propagating capacity change to the scheduler will be
// properly rejected instead of being incorrectly started.
err = testManager.writeCheckpoint()
as.Nil(err)
testManager.allDevices = make(map[string]sets.String)
err = testManager.readCheckpoint()
as.Nil(err)
as.Equal(1, len(testManager.endpoints))
_, ok = testManager.endpoints[resourceName2]
as.True(ok)
capacity, removed = testManager.GetCapacity()
val, ok = capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(0), val.Value())
as.Empty(removed)
}

type stringPairType struct {
Expand Down Expand Up @@ -268,6 +303,7 @@ func TestCheckpoint(t *testing.T) {
defer os.RemoveAll(tmpDir)
testManager := &ManagerImpl{
socketdir: tmpDir,
endpoints: make(map[string]endpoint),
allDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
Expand Down Expand Up @@ -363,6 +399,8 @@ func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, err
return nil, nil
}

func (m *MockEndpoint) isStopped() bool { return false }
func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
func TestPodContainerDeviceAllocation(t *testing.T) {
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
var logLevel string
Expand Down
10 changes: 10 additions & 0 deletions pkg/kubelet/cm/deviceplugin/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package deviceplugin

import (
"time"

"k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
"k8s.io/kubernetes/pkg/kubelet/config"
Expand Down Expand Up @@ -84,6 +86,8 @@ const (
errInvalidResourceName = "the ResourceName %q is invalid"
// errEmptyResourceName is the error raised when the resource name field is empty
errEmptyResourceName = "invalid Empty ResourceName"
// errEndpointStopped indicates that the endpoint has been stopped
errEndpointStopped = "endpoint %v has been stopped"

// errBadSocket is the error raised when the registry socket path is not absolute
errBadSocket = "bad socketPath, must be an absolute path:"
Expand All @@ -94,3 +98,9 @@ const (
// errListAndWatch is the error raised when ListAndWatch ended unsuccessfully
errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v"
)

// endpointStopGracePeriod indicates the grace period after an endpoint is stopped
// because its device plugin fails. DeviceManager keeps the stopped endpoint in its
// cache during this grace period to cover the time gap for the capacity change to
// take effect.
const endpointStopGracePeriod = time.Duration(5) * time.Minute
4 changes: 4 additions & 0 deletions test/e2e_node/gpu_device_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi

By("Restarting Kubelet and creating another pod")
restartKubelet(f)
framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout)
Eventually(func() bool {
return framework.NumberOfNVIDIAGPUs(getLocalNode(f)) > 0
}, 10*time.Second, framework.Poll).Should(BeTrue())
p2 := f.PodClient().CreateSync(makeCudaPauseImage())

By("Checking that pods got a different GPU")
Expand Down

0 comments on commit 687841f

Please sign in to comment.