diff --git a/pkg/kubelet/cm/deviceplugin/endpoint.go b/pkg/kubelet/cm/deviceplugin/endpoint.go index 29feaf52852e0..baebeb172a236 100644 --- a/pkg/kubelet/cm/deviceplugin/endpoint.go +++ b/pkg/kubelet/cm/deviceplugin/endpoint.go @@ -38,6 +38,8 @@ type endpoint interface { allocate(devs []string) (*pluginapi.AllocateResponse, error) getDevices() []pluginapi.Device callback(resourceName string, added, updated, deleted []pluginapi.Device) + isStopped() bool + stopGracePeriodExpired() bool } type endpointImpl struct { @@ -46,6 +48,7 @@ type endpointImpl struct { socketPath string resourceName string + stopTime time.Time devices map[string]pluginapi.Device mutex sync.Mutex @@ -54,6 +57,7 @@ type endpointImpl struct { } // newEndpoint creates a new endpoint for the given resourceName. +// This is to be used during normal device plugin registration. func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { client, c, err := dial(socketPath) if err != nil { @@ -73,6 +77,16 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina }, nil } +// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set. +// This is to be used during Kubelet restart, before the actual device plugin re-registers. +func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl { + return &endpointImpl{ + resourceName: resourceName, + devices: devices, + stopTime: time.Now(), + } +} + func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { e.cb(resourceName, added, updated, deleted) } @@ -171,15 +185,42 @@ func (e *endpointImpl) run() { } } +func (e *endpointImpl) isStopped() bool { + e.mutex.Lock() + defer e.mutex.Unlock() + return !e.stopTime.IsZero() +} + +func (e *endpointImpl) stopGracePeriodExpired() bool { + e.mutex.Lock() + defer e.mutex.Unlock() + return !e.stopTime.IsZero() && time.Since(e.stopTime) > endpointStopGracePeriod +} + +// used for testing only +func (e *endpointImpl) setStopTime(t time.Time) { + e.mutex.Lock() + defer e.mutex.Unlock() + e.stopTime = t +} + // allocate issues Allocate gRPC call to the device plugin. func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) { + if e.isStopped() { + return nil, fmt.Errorf(errEndpointStopped, e) + } return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ DevicesIDs: devs, }) } func (e *endpointImpl) stop() { - e.clientConn.Close() + e.mutex.Lock() + defer e.mutex.Unlock() + if e.clientConn != nil { + e.clientConn.Close() + } + e.stopTime = time.Now() } // dial establishes the gRPC communication with the registered device plugin. diff --git a/pkg/kubelet/cm/deviceplugin/manager.go b/pkg/kubelet/cm/deviceplugin/manager.go index db28e36d5d602..3de8a9ba6351d 100644 --- a/pkg/kubelet/cm/deviceplugin/manager.go +++ b/pkg/kubelet/cm/deviceplugin/manager.go @@ -376,12 +376,20 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) { needsUpdateCheckpoint := false var capacity = v1.ResourceList{} - var deletedResources []string + deletedResources := sets.NewString() m.mutex.Lock() for resourceName, devices := range m.allDevices { - if _, ok := m.endpoints[resourceName]; !ok { + e, ok := m.endpoints[resourceName] + if (ok && e.stopGracePeriodExpired()) || !ok { + // The resources contained in endpoints and allDevices + // should always be consistent. Otherwise, we run with the risk + // of failing to garbage collect non-existing resources or devices. + if !ok { + glog.Errorf("unexpected: allDevices and endpoints are out of sync") + } + delete(m.endpoints, resourceName) delete(m.allDevices, resourceName) - deletedResources = append(deletedResources, resourceName) + deletedResources.Insert(resourceName) needsUpdateCheckpoint = true } else { capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) @@ -391,7 +399,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, []string) { if needsUpdateCheckpoint { m.writeCheckpoint() } - return capacity, deletedResources + return capacity, deletedResources.UnsortedList() } // checkpointData struct is used to store pod to device allocation information @@ -440,11 +448,11 @@ func (m *ManagerImpl) readCheckpoint() error { defer m.mutex.Unlock() m.podDevices.fromCheckpointData(data.PodDeviceEntries) m.allocatedDevices = m.podDevices.devices() - for resource, devices := range data.RegisteredDevices { + for resource := range data.RegisteredDevices { + // During start up, creates empty allDevices list so that the resource capacity + // will stay zero till the corresponding device plugin re-registers. m.allDevices[resource] = sets.NewString() - for _, dev := range devices { - m.allDevices[resource].Insert(dev) - } + m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device)) } return nil } diff --git a/pkg/kubelet/cm/deviceplugin/manager_test.go b/pkg/kubelet/cm/deviceplugin/manager_test.go index a11b791f25225..523ec43a5aa51 100644 --- a/pkg/kubelet/cm/deviceplugin/manager_test.go +++ b/pkg/kubelet/cm/deviceplugin/manager_test.go @@ -163,7 +163,8 @@ func TestUpdateCapacity(t *testing.T) { // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. resourceName1 := "domain1.com/resource1" - testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)} + e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + testManager.endpoints[resourceName1] = e1 callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) capacity, removedResources := testManager.GetCapacity() resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] @@ -199,7 +200,8 @@ func TestUpdateCapacity(t *testing.T) { // Tests adding another resource. resourceName2 := "resource2" - testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)} + e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + testManager.endpoints[resourceName2] = e2 callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) capacity, removedResources = testManager.GetCapacity() as.Equal(2, len(capacity)) @@ -208,9 +210,9 @@ func TestUpdateCapacity(t *testing.T) { as.Equal(int64(2), resource2Capacity.Value()) as.Equal(0, len(removedResources)) - // Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 + // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 // is removed from capacity and it no longer exists in allDevices after the call. - delete(testManager.endpoints, resourceName1) + e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second)) capacity, removed := testManager.GetCapacity() as.Equal([]string{resourceName1}, removed) _, ok = capacity[v1.ResourceName(resourceName1)] @@ -220,6 +222,39 @@ func TestUpdateCapacity(t *testing.T) { as.Equal(int64(2), val.Value()) _, ok = testManager.allDevices[resourceName1] as.False(ok) + _, ok = testManager.endpoints[resourceName1] + as.False(ok) + as.Equal(1, len(testManager.endpoints)) + + // Stops resourceName2 endpoint. Verifies its stopTime is set, allocate + // calls return errors. + e2.stop() + as.False(e2.stopTime.IsZero()) + _, err = e2.allocate([]string{"Device1"}) + reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2)) + capacity, removed = testManager.GetCapacity() + val, ok = capacity[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(2), val.Value()) + as.Empty(removed) + // Writes and re-reads checkpoints. Verifies we create a stopped endpoint + // for resourceName2, its capacity is set to zero, and we still consider + // it as a DevicePlugin resource. This makes sure any pod that was scheduled + // during the time of propagating capacity change to the scheduler will be + // properly rejected instead of being incorrectly started. + err = testManager.writeCheckpoint() + as.Nil(err) + testManager.allDevices = make(map[string]sets.String) + err = testManager.readCheckpoint() + as.Nil(err) + as.Equal(1, len(testManager.endpoints)) + _, ok = testManager.endpoints[resourceName2] + as.True(ok) + capacity, removed = testManager.GetCapacity() + val, ok = capacity[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(0), val.Value()) + as.Empty(removed) } type stringPairType struct { @@ -268,6 +303,7 @@ func TestCheckpoint(t *testing.T) { defer os.RemoveAll(tmpDir) testManager := &ManagerImpl{ socketdir: tmpDir, + endpoints: make(map[string]endpoint), allDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), @@ -363,6 +399,8 @@ func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, err return nil, nil } +func (m *MockEndpoint) isStopped() bool { return false } +func (m *MockEndpoint) stopGracePeriodExpired() bool { return false } func TestPodContainerDeviceAllocation(t *testing.T) { flag.Set("alsologtostderr", fmt.Sprintf("%t", true)) var logLevel string diff --git a/pkg/kubelet/cm/deviceplugin/types.go b/pkg/kubelet/cm/deviceplugin/types.go index c4465a8be4cb1..0ece8ab267f12 100644 --- a/pkg/kubelet/cm/deviceplugin/types.go +++ b/pkg/kubelet/cm/deviceplugin/types.go @@ -17,6 +17,8 @@ limitations under the License. package deviceplugin import ( + "time" + "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha" "k8s.io/kubernetes/pkg/kubelet/config" @@ -84,6 +86,8 @@ const ( errInvalidResourceName = "the ResourceName %q is invalid" // errEmptyResourceName is the error raised when the resource name field is empty errEmptyResourceName = "invalid Empty ResourceName" + // errEndpointStopped indicates that the endpoint has been stopped + errEndpointStopped = "endpoint %v has been stopped" // errBadSocket is the error raised when the registry socket path is not absolute errBadSocket = "bad socketPath, must be an absolute path:" @@ -94,3 +98,9 @@ const ( // errListAndWatch is the error raised when ListAndWatch ended unsuccessfully errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v" ) + +// endpointStopGracePeriod indicates the grace period after an endpoint is stopped +// because its device plugin fails. DeviceManager keeps the stopped endpoint in its +// cache during this grace period to cover the time gap for the capacity change to +// take effect. +const endpointStopGracePeriod = time.Duration(5) * time.Minute \ No newline at end of file diff --git a/test/e2e_node/gpu_device_plugin.go b/test/e2e_node/gpu_device_plugin.go index d2a52c3749ebe..b7263ac0b9e4a 100644 --- a/test/e2e_node/gpu_device_plugin.go +++ b/test/e2e_node/gpu_device_plugin.go @@ -103,6 +103,10 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi By("Restarting Kubelet and creating another pod") restartKubelet(f) + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + Eventually(func() bool { + return framework.NumberOfNVIDIAGPUs(getLocalNode(f)) > 0 + }, 10*time.Second, framework.Poll).Should(BeTrue()) p2 := f.PodClient().CreateSync(makeCudaPauseImage()) By("Checking that pods got a different GPU")