Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[v0.9] Reduce BundleDeployment triggering on deployed resources updates #2031

Merged
merged 13 commits into from
Jan 17, 2024
6 changes: 3 additions & 3 deletions .github/scripts/run-integration-tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

set -euxo pipefail

SETUP_ENVTEST_VER=${SETUP_ENVTEST_VER-v0.0.0-20221214170741-69f093833822}
ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION-1.25}
SETUP_ENVTEST_VER=${SETUP_ENVTEST_VER-v0.0.0-20240115093953-9e6e3b144a69}
ENVTEST_K8S_VERSION=${ENVTEST_K8S_VERSION-1.28}

# install and prepare setup-envtest
if ! command -v setup-envtest &> /dev/null
Expand All @@ -14,4 +14,4 @@ KUBEBUILDER_ASSETS=$(setup-envtest use --use-env -p path $ENVTEST_K8S_VERSION)
export KUBEBUILDER_ASSETS

# run integration tests
go test ./integrationtests/...
go test ./integrationtests/agent/...
8 changes: 7 additions & 1 deletion integrationtests/agent/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,13 @@ var _ = BeforeSuite(func() {
Expect(k8sClient).NotTo(BeNil())

specEnvs = make(map[string]*specEnv, 2)
for id, f := range map[string]specResources{"capabilitybundle": capabilityBundleResources, "orphanbundle": orphanBundeResources} {
for id, f := range map[string]specResources{
"capabilitybundle": capabilityBundleResources,
"orphanbundle": orphanBundeResources,
"watchertrigger": func() map[string][]v1alpha1.BundleResource {
return nil
},
} {
namespace, err := utils.NewNamespaceName()
Expect(err).ToNot(HaveOccurred())
fmt.Printf("Creating namespace %s\n", namespace)
Expand Down
192 changes: 192 additions & 0 deletions integrationtests/agent/watcher_trigger_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
package agent

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
appsv1 "k8s.io/api/apps/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/dynamic"
"k8s.io/utils/pointer"

"github.com/rancher/fleet/internal/cmd/agent/trigger"

corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/client"
)

var _ = Describe("Watches for deployed resources", Ordered, func() {

var (
env *specEnv
triggr *trigger.Trigger
)

BeforeAll(func() {
env = specEnvs["watchertrigger"]
triggr = trigger.New(ctx, env.k8sClient.RESTMapper(), dynamic.NewForConfigOrDie(cfg))
DeferCleanup(func() {
Expect(k8sClient.Delete(ctx, &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: env.namespace}})).ToNot(HaveOccurred())
})
})

registerResource := func(key string, objs ...runtime.Object) *int {
var count int
Expect(triggr.OnChange(key, env.namespace, func() {
count++
}, objs...)).ToNot(HaveOccurred())
return &count
}

When("watching a deployed configmap", Ordered, func() {
createConfigMap := func() *corev1.ConfigMap {
cm := corev1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-cm-",
Namespace: env.namespace,
},
}

err := env.k8sClient.Create(ctx, &cm)
Expect(err).ToNot(HaveOccurred())
Expect(cm.UID).ToNot(BeEmpty())
return &cm
}

var triggerCount *int
var cm *corev1.ConfigMap
BeforeEach(func() {
cm = createConfigMap()
triggerCount = registerResource(env.namespace+"/test-configmap", cm)
DeferCleanup(func() {
Expect(
client.IgnoreNotFound(env.k8sClient.Delete(ctx, cm))).
NotTo(HaveOccurred())
})
})
It("is not initially triggered", func() {
Consistently(func() int {
return *triggerCount
}).Should(Equal(0))
})
It("is triggered on deletion", func() {
Expect(env.k8sClient.Delete(ctx, cm)).ToNot(HaveOccurred())
Eventually(func() int {
return *triggerCount
}).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3).
weyfonk marked this conversation as resolved.
Show resolved Hide resolved
Should(Equal(1))
})
It("is always triggered when modified", func() {
cm.Data = map[string]string{"foo": "bar"}
Expect(env.k8sClient.Update(ctx, cm)).ToNot(HaveOccurred())
Eventually(func() int {
return *triggerCount
}).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3).
Should(Equal(1))

cm.Data = map[string]string{"bar": "baz"}
Expect(env.k8sClient.Update(ctx, cm)).ToNot(HaveOccurred())
Eventually(func() int {
return *triggerCount
}).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3).
Should(Equal(2))
})
})
When("watching a deployed deployment", Ordered, func() {
createDeployment := func() *appsv1.Deployment {
deploy := appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
GenerateName: "test-deploy-",
Namespace: env.namespace,
Generation: 1,
},
Spec: appsv1.DeploymentSpec{
Replicas: pointer.Int32(0),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "test",
},
},
Paused: true,
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "test",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "test",
Image: "test-image",
},
},
},
},
},
}

err := env.k8sClient.Create(ctx, &deploy)
Expect(err).ToNot(HaveOccurred())
Expect(deploy.UID).ToNot(BeEmpty())
// envtest does not return a complete object, which is needed for the trigger to work
deploy.TypeMeta = metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
}
return &deploy
}
var triggerCount *int
var deploy *appsv1.Deployment

BeforeEach(func() {
deploy = createDeployment()
triggerCount = registerResource(env.namespace+"/test-deploy", deploy)
DeferCleanup(func() {
Expect(
client.IgnoreNotFound(env.k8sClient.Delete(ctx, deploy))).
NotTo(HaveOccurred())
})
})
It("is not initially triggered", func() {
Consistently(func() int {
return *triggerCount
}).Should(Equal(0))
})
It("is triggered on deletion", func() {
Expect(env.k8sClient.Delete(ctx, deploy)).ToNot(HaveOccurred())
Eventually(func() int {
return *triggerCount
}).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3).
Should(Equal(1))
})
It("is not triggered on status updates", func() {
deploy.Status.Conditions = []appsv1.DeploymentCondition{
{
Type: appsv1.DeploymentAvailable,
Status: corev1.ConditionFalse,
LastUpdateTime: metav1.Now(),
LastTransitionTime: metav1.Now(),
Message: "tests",
Reason: "tests",
},
}
Expect(env.k8sClient.Status().Update(ctx, deploy)).ToNot(HaveOccurred())
Consistently(func() int {
return *triggerCount
}).Should(Equal(0))
})
It("is triggered on Spec changes", func() {
for i := 1; i <= 5; i++ {
deploy.Spec.Replicas = pointer.Int32(int32(i))
Expect(env.k8sClient.Update(ctx, deploy)).ToNot(HaveOccurred())
Eventually(func() int {
return *triggerCount
}).WithPolling(100 * time.Millisecond).MustPassRepeatedly(3).
Should(Equal(i))
}
})
})
})
77 changes: 61 additions & 16 deletions internal/cmd/agent/trigger/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,19 @@ package trigger
import (
"context"
"sync"
"sync/atomic"
"time"

"github.com/rancher/fleet/pkg/durations"
"github.com/rancher/wrangler/v2/pkg/objectset"

"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"

"github.com/rancher/fleet/pkg/durations"
)

type Trigger struct {
Expand All @@ -26,6 +28,11 @@ type Trigger struct {
triggers map[schema.GroupVersionKind]map[objectset.ObjectKey]map[string]func()
restMapper meta.RESTMapper
client dynamic.Interface

// seenGenerations keeps a registry of the object UIDs and the latest observed generation, if any
// Uses sync.Map for a safe concurrent usage.
// Uses atomic.Int64 as values in order to stick to the first use case described at https://pkg.go.dev/sync#Map
seenGenerations sync.Map
}

func New(ctx context.Context, restMapper meta.RESTMapper, client dynamic.Interface) *Trigger {
Expand Down Expand Up @@ -125,10 +132,39 @@ func (t *Trigger) OnChange(key string, defaultNamespace string, trigger func(),
return nil
}

func (t *Trigger) call(gvk schema.GroupVersionKind, key objectset.ObjectKey) {
func (t *Trigger) storeObjectGeneration(uid types.UID, generation int64) {
value := new(atomic.Int64)
value.Store(generation)
t.seenGenerations.Store(uid, value)
}

func (t *Trigger) call(gvk schema.GroupVersionKind, obj metav1.Object, deleted bool) {
aruiz14 marked this conversation as resolved.
Show resolved Hide resolved
// If this type populates Generation metadata, use it to filter events that didn't modify that field
if currentGeneration := obj.GetGeneration(); currentGeneration != 0 {
uid := obj.GetUID()
// if the object is being deleted, just forget about it and execute the callback
if deleted {
t.seenGenerations.Delete(uid)
} else {
// keep a map of UID -> generation, using sync.Map and atomic.Int64 for safe concurrent usage
// - sync.Map entries are never modified after created, a pointer is used as value
// - using atomic.Int64 as values allows safely comparing and updating the current Generation value
if value, ok := t.seenGenerations.Load(uid); !ok {
t.storeObjectGeneration(uid, currentGeneration)
} else {
previous := value.(*atomic.Int64)
// Set current generation and retrieve the previous value. if unchanged, do nothing and return early
if previousGeneration := previous.Swap(currentGeneration); previousGeneration == currentGeneration {
return
}
}
}
}

t.RLock()
defer t.RUnlock()

key := objectset.ObjectKey{Namespace: obj.GetNamespace(), Name: obj.GetName()}
for _, f := range t.triggers[gvk][key] {
f()
}
Expand Down Expand Up @@ -176,22 +212,27 @@ type watcher struct {
}

func (w *watcher) Start(ctx context.Context) {
// resourceVersion is used as a checkpoint if the Watch operation is interrupted.
// the for loop will resume watching with a non-empty resource version to avoid missing or repeating events
resourceVersion := ""
for {
w.Lock()
if w.stopped {
// The Watch operation was intentionally stopped, exit the loop
w.Unlock()
break
return
}
w.Unlock()

time.Sleep(durations.TriggerSleep)
// Watch is non-blocking, the response allows consuming the events or stopping
// An error may mean the connection could not be established for some reason
resp, err := w.client.Resource(w.gvr).Watch(ctx, metav1.ListOptions{
AllowWatchBookmarks: true,
ResourceVersion: resourceVersion,
})
if err != nil {
resourceVersion = ""
time.Sleep(durations.WatchErrorRetrySleep)
continue
}

Expand All @@ -200,21 +241,25 @@ func (w *watcher) Start(ctx context.Context) {
w.Unlock()

for event := range resp.ResultChan() {
meta, err := meta.Accessor(event.Object)
var key objectset.ObjectKey
if err == nil {
resourceVersion = meta.GetResourceVersion()
key.Name = meta.GetName()
key.Namespace = meta.GetNamespace()
// Not all events include a Kubernetes object payload (see the event.Event godoc), filter those out.
obj, err := meta.Accessor(event.Object)
if err != nil {
continue
}

// Store resource version for later resuming if watching is interrupted
resourceVersion = obj.GetResourceVersion()

switch event.Type {
// Just initialize the seen generations.
case watch.Added:
fallthrough
case watch.Modified:
fallthrough
case watch.Deleted:
w.t.call(w.gvk, key)
if generation := obj.GetGeneration(); generation != 0 {
w.t.storeObjectGeneration(obj.GetUID(), generation)
}
// Only trigger for Modified or Deleted objects.
case watch.Modified, watch.Deleted:
deleted := event.Type == watch.Deleted
w.t.call(w.gvk, obj, deleted)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/durations/durations.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const (
RestConfigTimeout = time.Second * 15
ServiceTokenSleep = time.Second * 2
TokenClusterEnqueueDelay = time.Second * 2
TriggerSleep = time.Second * 2
WatchErrorRetrySleep = time.Second * 2
DefaultCpuPprofPeriod = time.Minute
ReleaseCacheTTL = time.Minute * 5
)
Loading