Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add jitter and resync to polling #3151

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions charts/fleet/templates/deployment_gitjob.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ spec:
- name: NO_PROXY
value: {{ $.Values.noProxy }}
{{- end }}
{{- if $.Values.gitops.syncPeriod }}
- name: GITREPO_SYNC_PERIOD
value: {{ quote $.Values.gitops.syncPeriod }}
{{- end }}
{{- if $.Values.controller.reconciler.workers.gitrepo }}
- name: GITREPO_RECONCILER_WORKERS
value: {{ quote $.Values.controller.reconciler.workers.gitrepo }}
Expand Down
3 changes: 3 additions & 0 deletions charts/fleet/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ priorityClassName: ""

gitops:
enabled: true
# syncPeriod is used to pick up polling for lost gitrepo events.
# It should be larger than the largest gitrepo pollinginterval.
syncPeriod: 2h

metrics:
enabled: true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ func (r *BundleDeploymentReconciler) SetupWithManager(mgr ctrl.Manager) error {
WithEventFilter(
// we do not trigger for status changes
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down
19 changes: 16 additions & 3 deletions internal/cmd/controller/gitops/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ import (
)

var (
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
zopts *zap.Options
scheme = runtime.NewScheme()
setupLog = ctrl.Log.WithName("setup")
zopts *zap.Options
defaultSyncPeriod = 10 * time.Hour
)

func init() {
Expand Down Expand Up @@ -94,6 +95,14 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error {
shardIDSuffix = fmt.Sprintf("-%s", g.ShardID)
}

syncPeriod := defaultSyncPeriod
if d := os.Getenv("GITREPO_SYNC_PERIOD"); d != "" {
syncPeriod, err = time.ParseDuration(d)
if err != nil {
return err
}
}

mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{
Scheme: scheme,
Metrics: g.setupMetrics(),
Expand All @@ -103,6 +112,10 @@ func (g *GitOperator) Run(cmd *cobra.Command, args []string) error {
LeaseDuration: leaderOpts.LeaseDuration,
RenewDeadline: leaderOpts.RenewDeadline,
RetryPeriod: leaderOpts.RetryPeriod,
// resync to pick up lost gitrepos
Cache: cache.Options{
SyncPeriod: &syncPeriod,
},
})

if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion internal/cmd/controller/gitops/reconciler/gitjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand/v2"
"os"
"reflect"
"sort"
Expand Down Expand Up @@ -102,8 +103,9 @@ func (r *GitJobReconciler) SetupWithManager(mgr ctrl.Manager) error {
return ctrl.NewControllerManagedBy(mgr).
For(&v1alpha1.GitRepo{},
builder.WithPredicates(
// do not trigger for GitRepo status changes (except for commit changes)
// do not trigger for GitRepo status changes (except for commit changes and cache sync)
predicate.Or(
TypedResourceVersionUnchangedPredicate[client.Object]{},
aruiz14 marked this conversation as resolved.
Show resolved Hide resolved
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down Expand Up @@ -189,6 +191,7 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
result := reconcile.Result{}
if repoPolled {
result = reconcile.Result{RequeueAfter: getPollingIntervalDuration(gitrepo)}
result.RequeueAfter = addJitter(result.RequeueAfter)
}

res, err := r.manageGitJob(ctx, logger, gitrepo, oldCommit, repoPolled, result)
Expand All @@ -208,6 +211,16 @@ func (r *GitJobReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr
return result, nil
}

// addJitter to the requeue time to avoid thundering herd
// generate a random number between -10% and +10% of the duration
func addJitter(d time.Duration) time.Duration {
if d <= 0 {
return d
}

return d + time.Duration(rand.Int64N(int64(d)/10)) // nolint:gosec // gosec G404 false positive, not used for crypto
}

// manageGitJob is responsible for creating, updating and deleting the GitJob and setting the GitRepo's status accordingly
func (r *GitJobReconciler) manageGitJob(ctx context.Context, logger logr.Logger, gitrepo *v1alpha1.GitRepo, oldCommit string, repoPolled bool, oldResult reconcile.Result) (reconcile.Result, error) {
name := types.NamespacedName{Namespace: gitrepo.Namespace, Name: gitrepo.Name}
Expand Down
51 changes: 51 additions & 0 deletions internal/cmd/controller/gitops/reconciler/predicate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package reconciler

import (
"reflect"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
)

// TypedResourceVersionUnchangedPredicate implements a update predicate to
// allow syncPeriod to trigger the reconciler
type TypedResourceVersionUnchangedPredicate[T metav1.Object] struct {
predicate.TypedFuncs[T]
}

func isNil(arg any) bool {
if v := reflect.ValueOf(arg); !v.IsValid() || ((v.Kind() == reflect.Ptr ||
v.Kind() == reflect.Interface ||
v.Kind() == reflect.Slice ||
v.Kind() == reflect.Map ||
v.Kind() == reflect.Chan ||
v.Kind() == reflect.Func) && v.IsNil()) {
return true
}
return false
}

func (TypedResourceVersionUnchangedPredicate[T]) Create(e event.CreateEvent) bool {
return false
}

func (TypedResourceVersionUnchangedPredicate[T]) Delete(e event.DeleteEvent) bool {
return false
}

// Update implements default UpdateEvent filter for validating resource version change.
func (TypedResourceVersionUnchangedPredicate[T]) Update(e event.TypedUpdateEvent[T]) bool {
if isNil(e.ObjectOld) {
return false
}
if isNil(e.ObjectNew) {
return false
}

return e.ObjectNew.GetResourceVersion() == e.ObjectOld.GetResourceVersion()
}

func (TypedResourceVersionUnchangedPredicate[T]) Generic(e event.GenericEvent) bool {
return false
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ func (r *HelmAppReconciler) SetupWithManager(mgr ctrl.Manager) error {
For(&fleet.HelmApp{},
builder.WithPredicates(
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/controller/reconciler/config_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ func (r *ConfigReconciler) SetupWithManager(mgr ctrl.Manager) error {
object.GetName() == config.ManagerConfigName
}),
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.ResourceVersionChangedPredicate{},
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
Expand Down
3 changes: 3 additions & 0 deletions internal/cmd/controller/reconciler/imagescan_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,9 @@ func (r *ImageScanReconciler) SetupWithManager(mgr ctrl.Manager) error {
predicate.And(
sharding.FilterByShardID(r.ShardID),
predicate.Or(
// Note: These predicates prevent cache
// syncPeriod from triggering reconcile, since
// cache sync is an Update event.
predicate.GenerationChangedPredicate{},
predicate.AnnotationChangedPredicate{},
predicate.LabelChangedPredicate{},
Expand Down
Loading