diff --git a/.github/scripts/run-integration-tests.sh b/.github/scripts/run-integration-tests.sh index 3e4a84f147..5e3972c54e 100755 --- a/.github/scripts/run-integration-tests.sh +++ b/.github/scripts/run-integration-tests.sh @@ -2,8 +2,8 @@ set -euxo pipefail -SETUP_ENVTEST_VER=${SETUP_ENVTEST_VER-v0.0.0-20221214170741-69f093833822} -ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION-1.25} +SETUP_ENVTEST_VER=${SETUP_ENVTEST_VER-v0.0.0-20240115093953-9e6e3b144a69} +ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION-1.28} # install and prepare setup-envtest if ! command -v setup-envtest &> /dev/null @@ -14,4 +14,4 @@ KUBEBUILDER_ASSETS=$(setup-envtest use --use-env -p path $ENVTEST_K8S_VERSION) export KUBEBUILDER_ASSETS # run integration tests -go test ./integrationtests/... +go test ./integrationtests/agent/... diff --git a/integrationtests/agent/suite_test.go b/integrationtests/agent/suite_test.go index 0b38620eb2..d08be2ed57 100644 --- a/integrationtests/agent/suite_test.go +++ b/integrationtests/agent/suite_test.go @@ -83,7 +83,13 @@ var _ = BeforeSuite(func() { Expect(k8sClient).NotTo(BeNil()) specEnvs = make(map[string]*specEnv, 2) - for id, f := range map[string]specResources{"capabilitybundle": capabilityBundleResources, "orphanbundle": orphanBundeResources} { + for id, f := range map[string]specResources{ + "capabilitybundle": capabilityBundleResources, + "orphanbundle": orphanBundeResources, + "watchertrigger": func() map[string][]v1alpha1.BundleResource { + return nil + }, + } { namespace, err := utils.NewNamespaceName() Expect(err).ToNot(HaveOccurred()) fmt.Printf("Creating namespace %s\n", namespace) diff --git a/integrationtests/agent/watcher_trigger_test.go b/integrationtests/agent/watcher_trigger_test.go new file mode 100644 index 0000000000..49fbf5202e --- /dev/null +++ b/integrationtests/agent/watcher_trigger_test.go @@ -0,0 +1,192 @@ +package agent + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + appsv1 "k8s.io/api/apps/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/dynamic" + "k8s.io/utils/pointer" + + "github.com/rancher/fleet/internal/cmd/agent/trigger" + + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/client" +) + +var _ = Describe("Watches for deployed resources", Ordered, func() { + + var ( + env *specEnv + triggr *trigger.Trigger + ) + + BeforeAll(func() { + env = specEnvs["watchertrigger"] + triggr = trigger.New(ctx, env.k8sClient.RESTMapper(), dynamic.NewForConfigOrDie(cfg)) + DeferCleanup(func() { + Expect(k8sClient.Delete(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: env.namespace}})).ToNot(HaveOccurred()) + }) + }) + + registerResource := func(key string, objs ...runtime.Object) *int { + var count int + Expect(triggr.OnChange(key, env.namespace, func() { + count++ + }, objs...)).ToNot(HaveOccurred()) + return &count + } + + When("watching a deployed configmap", Ordered, func() { + createConfigMap := func() *corev1.ConfigMap { + cm := corev1.ConfigMap{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-cm-", + Namespace: env.namespace, + }, + } + + err := env.k8sClient.Create(ctx, &cm) + Expect(err).ToNot(HaveOccurred()) + Expect(cm.UID).ToNot(BeEmpty()) + return &cm + } + + var triggerCount *int + var cm *corev1.ConfigMap + BeforeEach(func() { + cm = createConfigMap() + triggerCount = registerResource(env.namespace+"/test-configmap", cm) + DeferCleanup(func() { + Expect( + client.IgnoreNotFound(env.k8sClient.Delete(ctx, cm))). + NotTo(HaveOccurred()) + }) + }) + It("is not initially triggered", func() { + Consistently(func() int { + return *triggerCount + }).Should(Equal(0)) + }) + It("is triggered on deletion", func() { + Expect(env.k8sClient.Delete(ctx, cm)).ToNot(HaveOccurred()) + Eventually(func() int { + return *triggerCount + }).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3). + Should(Equal(1)) + }) + It("is always triggered when modified", func() { + cm.Data = map[string]string{"foo": "bar"} + Expect(env.k8sClient.Update(ctx, cm)).ToNot(HaveOccurred()) + Eventually(func() int { + return *triggerCount + }).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3). + Should(Equal(1)) + + cm.Data = map[string]string{"bar": "baz"} + Expect(env.k8sClient.Update(ctx, cm)).ToNot(HaveOccurred()) + Eventually(func() int { + return *triggerCount + }).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3). + Should(Equal(2)) + }) + }) + When("watching a deployed deployment", Ordered, func() { + createDeployment := func() *appsv1.Deployment { + deploy := appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + GenerateName: "test-deploy-", + Namespace: env.namespace, + Generation: 1, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: pointer.Int32(0), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{ + "app": "test", + }, + }, + Paused: true, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{ + "app": "test", + }, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test", + Image: "test-image", + }, + }, + }, + }, + }, + } + + err := env.k8sClient.Create(ctx, &deploy) + Expect(err).ToNot(HaveOccurred()) + Expect(deploy.UID).ToNot(BeEmpty()) + // envtest does not return a complete object, which is needed for the trigger to work + deploy.TypeMeta = metav1.TypeMeta{ + Kind: "Deployment", + APIVersion: "apps/v1", + } + return &deploy + } + var triggerCount *int + var deploy *appsv1.Deployment + + BeforeEach(func() { + deploy = createDeployment() + triggerCount = registerResource(env.namespace+"/test-deploy", deploy) + DeferCleanup(func() { + Expect( + client.IgnoreNotFound(env.k8sClient.Delete(ctx, deploy))). + NotTo(HaveOccurred()) + }) + }) + It("is not initially triggered", func() { + Consistently(func() int { + return *triggerCount + }).Should(Equal(0)) + }) + It("is triggered on deletion", func() { + Expect(env.k8sClient.Delete(ctx, deploy)).ToNot(HaveOccurred()) + Eventually(func() int { + return *triggerCount + }).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3). + Should(Equal(1)) + }) + It("is not triggered on status updates", func() { + deploy.Status.Conditions = []appsv1.DeploymentCondition{ + { + Type: appsv1.DeploymentAvailable, + Status: corev1.ConditionFalse, + LastUpdateTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Message: "tests", + Reason: "tests", + }, + } + Expect(env.k8sClient.Status().Update(ctx, deploy)).ToNot(HaveOccurred()) + Consistently(func() int { + return *triggerCount + }).Should(Equal(0)) + }) + It("is triggered on Spec changes", func() { + for i := 1; i <= 5; i++ { + deploy.Spec.Replicas = pointer.Int32(int32(i)) + Expect(env.k8sClient.Update(ctx, deploy)).ToNot(HaveOccurred()) + Eventually(func() int { + return *triggerCount + }).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3). + Should(Equal(i)) + } + }) + }) +}) diff --git a/internal/cmd/agent/trigger/watcher.go b/internal/cmd/agent/trigger/watcher.go index 6951d6e315..80088de8b4 100644 --- a/internal/cmd/agent/trigger/watcher.go +++ b/internal/cmd/agent/trigger/watcher.go @@ -4,17 +4,19 @@ package trigger import ( "context" "sync" + "sync/atomic" "time" - "github.com/rancher/fleet/pkg/durations" "github.com/rancher/wrangler/v2/pkg/objectset" - "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" + + "github.com/rancher/fleet/pkg/durations" ) type Trigger struct { @@ -26,6 +28,11 @@ type Trigger struct { triggers map[schema.GroupVersionKind]map[objectset.ObjectKey]map[string]func() restMapper meta.RESTMapper client dynamic.Interface + + // seenGenerations keeps a registry of the object UIDs and the latest observed generation, if any + // Uses sync.Map for a safe concurrent usage. + // Uses atomic.Int64 as values in order to stick to the first use case described at https://pkg.go.dev/sync#Map + seenGenerations sync.Map } func New(ctx context.Context, restMapper meta.RESTMapper, client dynamic.Interface) *Trigger { @@ -125,10 +132,39 @@ func (t *Trigger) OnChange(key string, defaultNamespace string, trigger func(), return nil } -func (t *Trigger) call(gvk schema.GroupVersionKind, key objectset.ObjectKey) { +func (t *Trigger) storeObjectGeneration(uid types.UID, generation int64) { + value := new(atomic.Int64) + value.Store(generation) + t.seenGenerations.Store(uid, value) +} + +func (t *Trigger) call(gvk schema.GroupVersionKind, obj metav1.Object, deleted bool) { + // If this type populates Generation metadata, use it to filter events that didn't modify that field + if currentGeneration := obj.GetGeneration(); currentGeneration != 0 { + uid := obj.GetUID() + // if the object is being deleted, just forget about it and execute the callback + if deleted { + t.seenGenerations.Delete(uid) + } else { + // keep a map of UID -> generation, using sync.Map and atomic.Int64 for safe concurrent usage + // - sync.Map entries are never modified after created, a pointer is used as value + // - using atomic.Int64 as values allows safely comparing and updating the current Generation value + if value, ok := t.seenGenerations.Load(uid); !ok { + t.storeObjectGeneration(uid, currentGeneration) + } else { + previous := value.(*atomic.Int64) + // Set current generation and retrieve the previous value. if unchanged, do nothing and return early + if previousGeneration := previous.Swap(currentGeneration); previousGeneration == currentGeneration { + return + } + } + } + } + t.RLock() defer t.RUnlock() + key := objectset.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()} for _, f := range t.triggers[gvk][key] { f() } @@ -176,22 +212,27 @@ type watcher struct { } func (w *watcher) Start(ctx context.Context) { + // resourceVersion is used as a checkpoint if the Watch operation is interrupted. + // the for loop will resume watching with a non-empty resource version to avoid missing or repeating events resourceVersion := "" for { w.Lock() if w.stopped { + // The Watch operation was intentionally stopped, exit the loop w.Unlock() - break + return } w.Unlock() - time.Sleep(durations.TriggerSleep) + // Watch is non-blocking, the response allows consuming the events or stopping + // An error may mean the connection could not be established for some reason resp, err := w.client.Resource(w.gvr).Watch(ctx, metav1.ListOptions{ AllowWatchBookmarks: true, ResourceVersion: resourceVersion, }) if err != nil { resourceVersion = "" + time.Sleep(durations.WatchErrorRetrySleep) continue } @@ -200,21 +241,25 @@ func (w *watcher) Start(ctx context.Context) { w.Unlock() for event := range resp.ResultChan() { - meta, err := meta.Accessor(event.Object) - var key objectset.ObjectKey - if err == nil { - resourceVersion = meta.GetResourceVersion() - key.Name = meta.GetName() - key.Namespace = meta.GetNamespace() + // Not all events include a Kubernetes object payload (see the event.Event godoc), filter those out. + obj, err := meta.Accessor(event.Object) + if err != nil { + continue } + // Store resource version for later resuming if watching is interrupted + resourceVersion = obj.GetResourceVersion() + switch event.Type { + // Just initialize the seen generations. case watch.Added: - fallthrough - case watch.Modified: - fallthrough - case watch.Deleted: - w.t.call(w.gvk, key) + if generation := obj.GetGeneration(); generation != 0 { + w.t.storeObjectGeneration(obj.GetUID(), generation) + } + // Only trigger for Modified or Deleted objects. + case watch.Modified, watch.Deleted: + deleted := event.Type == watch.Deleted + w.t.call(w.gvk, obj, deleted) } } } diff --git a/pkg/durations/durations.go b/pkg/durations/durations.go index 9eccf74760..3c9009faa5 100644 --- a/pkg/durations/durations.go +++ b/pkg/durations/durations.go @@ -24,7 +24,7 @@ const ( RestConfigTimeout = time.Second * 15 ServiceTokenSleep = time.Second * 2 TokenClusterEnqueueDelay = time.Second * 2 - TriggerSleep = time.Second * 2 + WatchErrorRetrySleep = time.Second * 2 DefaultCpuPprofPeriod = time.Minute ReleaseCacheTTL = time.Minute * 5 )