Skip to content

Commit

Permalink
Merge pull request #246 from kubescape/fix
Browse files Browse the repository at this point in the history
use same cooldownqueue as node-agent and synchronizer
  • Loading branch information
matthyx authored Sep 23, 2024
2 parents 20df76f + 73bd4e8 commit 0b44cb2
Show file tree
Hide file tree
Showing 8 changed files with 137 additions and 184 deletions.
81 changes: 0 additions & 81 deletions continuousscanning/cooldownqueue.go

This file was deleted.

14 changes: 7 additions & 7 deletions continuousscanning/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package continuousscanning

import (
"context"
"time"

"github.com/kubescape/operator/watcher"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
Expand All @@ -19,7 +19,7 @@ type ContinuousScanningService struct {
workDone chan struct{}
k8sdynamic dynamic.Interface
eventHandlers []EventHandler
eventQueue *cooldownQueue
eventQueue *watcher.CooldownQueue
}

func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.Command {
Expand All @@ -34,8 +34,8 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C
wp.Run(ctx, resourceEventsCh)
logger.L().Info("ran watch pool")

go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *cooldownQueue) {
defer out.Stop(ctx)
go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *watcher.CooldownQueue) {
defer out.Stop()

for {
select {
Expand All @@ -44,7 +44,7 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C
"got event from channel",
helpers.Interface("event", e),
)
out.Enqueue(ctx, e)
out.Enqueue(e)
case <-shutdownCh:
return
}
Expand Down Expand Up @@ -100,9 +100,9 @@ func (s *ContinuousScanningService) Stop() {
<-s.workDone
}

func NewContinuousScanningService(client dynamic.Interface, tl TargetLoader, queueSize int, sameEventCooldown time.Duration, h ...EventHandler) *ContinuousScanningService {
func NewContinuousScanningService(client dynamic.Interface, tl TargetLoader, h ...EventHandler) *ContinuousScanningService {
doneCh := make(chan struct{})
eventQueue := NewCooldownQueue(queueSize, sameEventCooldown)
eventQueue := watcher.NewCooldownQueue()
workDone := make(chan struct{})

return &ContinuousScanningService{
Expand Down
4 changes: 2 additions & 2 deletions continuousscanning/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestAddEventHandler(t *testing.T) {
tl := NewTargetLoader(f)
// We use the spy handler later to verify if it's been called
spyH := &spyHandler{called: false, wg: resourcesCreatedWg, mx: &sync.RWMutex{}}
css := NewContinuousScanningService(dynClient, tl, DefaultQueueSize, DefaultTTL, spyH)
css := NewContinuousScanningService(dynClient, tl, spyH)
css.Launch(ctx)

// Create Pods to be listened
Expand Down Expand Up @@ -265,7 +265,7 @@ func TestContinuousScanningService(t *testing.T) {
triggeringHandler := NewTriggeringHandler(wp, operatorConfig)
stubFetcher := &stubFetcher{podMatchRules}
loader := NewTargetLoader(stubFetcher)
css := NewContinuousScanningService(dynClient, loader, DefaultQueueSize, DefaultTTL, triggeringHandler)
css := NewContinuousScanningService(dynClient, loader, triggeringHandler)
css.Launch(ctx)

// Create Pods to be listened
Expand Down
3 changes: 1 addition & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
rulebindingcachev1 "github.com/kubescape/operator/admission/rulebinding/cache"
"github.com/kubescape/operator/admission/webhook"
"github.com/kubescape/operator/config"
cs "github.com/kubescape/operator/continuousscanning"
"github.com/kubescape/operator/mainhandler"
"github.com/kubescape/operator/notificationhandler"
"github.com/kubescape/operator/objectcache"
Expand Down Expand Up @@ -150,7 +149,7 @@ func main() {

if operatorConfig.ContinuousScanEnabled() {
go func(mh *mainhandler.MainHandler) {
err := mh.SetupContinuousScanning(ctx, cs.DefaultQueueSize, cfg.EventDeduplicationInterval)
err := mh.SetupContinuousScanning(ctx)
logger.L().Info("set up cont scanning service")
if err != nil {
logger.L().Ctx(ctx).Fatal(err.Error(), helpers.Error(err))
Expand Down
12 changes: 6 additions & 6 deletions mainhandler/handlerequests.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func NewActionHandler(config config.IConfig, k8sAPI *k8sinterface.KubernetesApi,
}

// SetupContinuousScanning sets up the continuous cluster scanning function
func (mainHandler *MainHandler) SetupContinuousScanning(ctx context.Context, queueSize int, eventCooldown time.Duration) error {
func (mainHandler *MainHandler) SetupContinuousScanning(ctx context.Context) error {
ksStorageClient, err := kssc.NewForConfig(k8sinterface.GetK8sConfig())
if err != nil {
logger.L().Ctx(ctx).Fatal(fmt.Sprintf("Unable to initialize the storage client: %v", err))
Expand All @@ -133,7 +133,7 @@ func (mainHandler *MainHandler) SetupContinuousScanning(ctx context.Context, que
loader := cs.NewTargetLoader(fetcher)

dynClient := mainHandler.k8sAPI.DynamicClient
svc := cs.NewContinuousScanningService(dynClient, loader, queueSize, eventCooldown, triggeringHandler, deletingHandler)
svc := cs.NewContinuousScanningService(dynClient, loader, triggeringHandler, deletingHandler)
svc.Launch(ctx)

return nil
Expand All @@ -150,7 +150,7 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) {
if err != nil {
logger.L().Ctx(ctx).Fatal(fmt.Sprintf("Unable to initialize the storage client: %v", err))
}
eventQueue := watcher.NewCooldownQueue(watcher.DefaultQueueSize, watcher.DefaultTTL)
eventQueue := watcher.NewCooldownQueue()
watchHandler := watcher.NewWatchHandler(ctx, mainHandler.config, mainHandler.k8sAPI, ksStorageClient, eventQueue)

// wait for the kubevuln component to be ready
Expand Down Expand Up @@ -433,16 +433,16 @@ func (mainHandler *MainHandler) HandleImageScanningScopedRequest(ctx context.Con
CommandName: apis.TypeScanImages,
Args: map[string]interface{}{
utils.ArgsContainerData: containerData,
utils.ArgsPod: &pod,
utils.ArgsPod: pod,
},
}

// send specific command to the channel
newSessionObj := utils.NewSessionObj(ctx, mainHandler.config, cmd, "Websocket", sessionObj.Reporter.GetJobID(), "", 1)

logger.L().Info("triggering", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID))
logger.L().Info("triggering scan image", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID))
if err := mainHandler.HandleSingleRequest(ctx, newSessionObj); err != nil {
logger.L().Info("failed to complete action", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID))
logger.L().Info("failed to complete action", helpers.Error(err), helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID))
newSessionObj.Reporter.SendError(err, mainHandler.sendReport, true)
continue
}
Expand Down
65 changes: 35 additions & 30 deletions watcher/cooldownqueue.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,81 @@
package watcher

import (
"strings"
"sync"
"time"

lru "github.com/hashicorp/golang-lru/v2/expirable"
v1 "k8s.io/api/core/v1"
"istio.io/pkg/cache"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/watch"
)

const (
// Default size for the cooldown queue
DefaultQueueSize = 512
// Default TTL for events put in the queue
DefaultTTL = 1 * time.Second
defaultExpiration = 5 * time.Second
evictionInterval = 1 * time.Second
)

// CooldownQueue is a queue that lets clients put events into it with a cooldown
//
// When a client puts an event into a queue, it forwards the event to its
// output channel and starts a cooldown for this event. If a client attempts to
// put the same event into the queue while the cooldown is running, the queue
// will silently drop the event. When the cooldown resets and a client puts the
// same event into the queue, it will be forwarded to the output channel
// When a client puts an event into a queue, it waits for a cooldown period before
// the event is forwarded to the consumer. If an event for the same key is put into the queue
// again before the cooldown period is over, the event is overridden and the cooldown period is reset.
type CooldownQueue struct {
seenEvents *lru.LRU[string, bool]
closed bool
mu sync.Mutex // mutex for closed
chanMu *sync.Mutex // mutex for innerChan
seenEvents cache.ExpiringCache
innerChan chan watch.Event
ResultChan <-chan watch.Event
closed bool
}

// NewCooldownQueue returns a new Cooldown Queue
func NewCooldownQueue(size int, cooldown time.Duration) *CooldownQueue {
cache := lru.NewLRU[string, bool](size, nil, cooldown)
func NewCooldownQueue() *CooldownQueue {
events := make(chan watch.Event)
chanMu := sync.Mutex{}
callback := func(key, value any) {
chanMu.Lock()
defer chanMu.Unlock()
events <- value.(watch.Event)
}
c := cache.NewTTLWithCallback(defaultExpiration, evictionInterval, callback)
return &CooldownQueue{
seenEvents: cache,
chanMu: &chanMu,
seenEvents: c,
innerChan: events,
ResultChan: events,
}
}

// makeEventKey creates a unique key for an event from a watcher
func makeEventKey(e watch.Event) string {
object, ok := e.Object.(*v1.Pod)
if !ok {
return ""
}
eventKey := string(e.Type) + "-" + string(object.GetUID())
return eventKey
gvk := e.Object.GetObjectKind().GroupVersionKind()
meta := e.Object.(metav1.Object)
return strings.Join([]string{gvk.Group, gvk.Version, gvk.Kind, meta.GetNamespace(), meta.GetName()}, "/")
}

func (q *CooldownQueue) Closed() bool {
q.mu.Lock()
defer q.mu.Unlock()
return q.closed
}

// Enqueue enqueues an event in the Cooldown Queue
func (q *CooldownQueue) Enqueue(e watch.Event) {
q.mu.Lock()
defer q.mu.Unlock()
if q.closed {
return
}
eventKey := makeEventKey(e)
_, exists := q.seenEvents.Get(eventKey)
if exists {
return
}
go func() {
q.innerChan <- e
}()
q.seenEvents.Add(eventKey, true)
q.seenEvents.Set(eventKey, e)
}

func (q *CooldownQueue) Stop() {
q.chanMu.Lock()
defer q.chanMu.Unlock()
q.mu.Lock()
defer q.mu.Unlock()
q.closed = true
close(q.innerChan)
}
Loading

0 comments on commit 0b44cb2

Please sign in to comment.