Skip to content

Commit

Permalink
Merge pull request #245 from kubescape/bump
Browse files Browse the repository at this point in the history
optimize memory with pagers
  • Loading branch information
matthyx authored Sep 10, 2024
2 parents c62c243 + 52c31b8 commit 20df76f
Show file tree
Hide file tree
Showing 14 changed files with 185 additions and 235 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pr-merged.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ jobs:
BUILD_PLATFORM: linux/amd64,linux/arm64
GO_VERSION: "1.23"
REQUIRED_TESTS: '[
"vuln_scan",
"vuln_v2_views",
"vuln_scan_trigger_scan_public_registry",
"vuln_scan_trigger_scan_public_registry_excluded",
"vuln_scan_trigger_scan_private_quay_registry",
Expand Down
12 changes: 5 additions & 7 deletions continuousscanning/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/utils/ptr"

armoapi "github.com/armosec/armoapi-go/apis"
Expand All @@ -27,7 +27,7 @@ import (
sets "github.com/deckarep/golang-set/v2"
)

var orphanableWorkloadTypes sets.Set[string] = sets.NewSet[string]("Pod", "ReplicaSet", "Job")
var orphanableWorkloadTypes = sets.NewSet[string]("Pod", "ReplicaSet", "Job")

type EventHandler interface {
Handle(ctx context.Context, e watch.Event) error
Expand Down Expand Up @@ -97,7 +97,7 @@ func unstructuredToScanObject(uObject *unstructured.Unstructured) (*objectsenvel
}

func triggerScanFor(ctx context.Context, uObject *unstructured.Unstructured, isDelete bool, wp *ants.PoolWithFunc, clusterConfig config.IConfig) error {
logger.L().Ctx(ctx).Info(
logger.L().Info(
"triggering scan",
helpers.String("kind", uObject.GetKind()),
helpers.String("name", uObject.GetName()),
Expand All @@ -122,7 +122,6 @@ func (h *poolInvokerHandler) Handle(ctx context.Context, e watch.Event) error {
if e.Type != watch.Added && e.Type != watch.Modified {
return nil
}
isDelete := false

uObject, err := eventToUnstructured(e)
if err != nil {
Expand All @@ -134,7 +133,7 @@ func (h *poolInvokerHandler) Handle(ctx context.Context, e watch.Event) error {
return nil
}

return triggerScanFor(ctx, uObject, isDelete, h.wp, h.clusterConfig)
return triggerScanFor(ctx, uObject, false, h.wp, h.clusterConfig)
}

func NewTriggeringHandler(wp *ants.PoolWithFunc, clusterConfig config.IConfig) EventHandler {
Expand Down Expand Up @@ -220,7 +219,6 @@ func (h *deletedCleanerHandler) Handle(ctx context.Context, e watch.Event) error
if e.Type != watch.Deleted {
return nil
}
isDelete := true

uObject, err := eventToUnstructured(e)
if err != nil {
Expand All @@ -237,6 +235,6 @@ func (h *deletedCleanerHandler) Handle(ctx context.Context, e watch.Event) error
logger.L().Ctx(ctx).Error("failed to delete CRDs", helpers.Error(err))
}

err = triggerScanFor(ctx, uObject, isDelete, h.wp, h.clusterConfig)
err = triggerScanFor(ctx, uObject, true, h.wp, h.clusterConfig)
return err
}
10 changes: 5 additions & 5 deletions continuousscanning/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
watch "k8s.io/apimachinery/pkg/watch"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"

armoapi "github.com/armosec/armoapi-go/apis"
Expand All @@ -29,18 +29,18 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C
resourceEventsCh := make(chan watch.Event, 100)

gvrs := s.tl.LoadGVRs(ctx)
logger.L().Ctx(ctx).Info("fetched gvrs", helpers.Interface("gvrs", gvrs))
logger.L().Info("fetched gvrs", helpers.Interface("gvrs", gvrs))
wp, _ := NewWatchPool(ctx, s.k8sdynamic, gvrs, listOpts)
wp.Run(ctx, resourceEventsCh)
logger.L().Ctx(ctx).Info("ran watch pool")
logger.L().Info("ran watch pool")

go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *cooldownQueue) {
defer out.Stop(ctx)

for {
select {
case e := <-resourceEventsCh:
logger.L().Ctx(ctx).Debug(
logger.L().Debug(
"got event from channel",
helpers.Interface("event", e),
)
Expand All @@ -57,7 +57,7 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C

func (s *ContinuousScanningService) work(ctx context.Context) {
for e := range s.eventQueue.ResultChan {
logger.L().Ctx(ctx).Debug(
logger.L().Debug(
"got an event to process",
helpers.Interface("event", e),
)
Expand Down
18 changes: 9 additions & 9 deletions continuousscanning/watchbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,8 @@ func (w *SelfHealingWatch) Run(ctx context.Context, readyWg *sync.WaitGroup, out
return ctx.Err()
default:
gvr := helpers.String("gvr", w.gvr.String())
logger.L().Ctx(ctx).Debug("creating watch for GVR", gvr)
watch, err := w.makeWatchFunc(ctx, w.client, w.gvr, w.opts)
logger.L().Debug("creating watch for GVR", gvr)
watchFunc, err := w.makeWatchFunc(ctx, w.client, w.gvr, w.opts)
if err != nil {
logger.L().Ctx(ctx).Warning(
"got error when creating a watch for gvr",
Expand All @@ -76,8 +76,8 @@ func (w *SelfHealingWatch) Run(ctx context.Context, readyWg *sync.WaitGroup, out
)
continue
}
logger.L().Ctx(ctx).Debug("watch created\n")
w.currWatch = watch
logger.L().Debug("watch created\n")
w.currWatch = watchFunc

// Watch is considered ready once it is successfully acquired
// Signal we are done only the first time because
Expand All @@ -97,7 +97,7 @@ type WatchPool struct {
}

func (wp *WatchPool) Run(ctx context.Context, out chan<- watch.Event) {
logger.L().Ctx(ctx).Info("Watch pool: starting")
logger.L().Info("Watch pool: starting")

wg := &sync.WaitGroup{}
for idx := range wp.pool {
Expand All @@ -106,17 +106,17 @@ func (wp *WatchPool) Run(ctx context.Context, out chan<- watch.Event) {
}
wg.Wait()

logger.L().Ctx(ctx).Info("Watch pool: started ok")
logger.L().Info("Watch pool: started ok")
}

func NewWatchPool(ctx context.Context, client dynamic.Interface, gvrs []schema.GroupVersionResource, opts metav1.ListOptions) (*WatchPool, error) {
func NewWatchPool(_ context.Context, client dynamic.Interface, gvrs []schema.GroupVersionResource, opts metav1.ListOptions) (*WatchPool, error) {
watches := make([]*SelfHealingWatch, len(gvrs))

for idx := range gvrs {
gvr := gvrs[idx]
watch := NewSelfHealingWatch(client, gvr, opts)
selfHealingWatch := NewSelfHealingWatch(client, gvr, opts)

watches[idx] = watch
watches[idx] = selfHealingWatch
}

pool := &WatchPool{pool: watches}
Expand Down
9 changes: 4 additions & 5 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (

"github.com/armosec/utils-k8s-go/probes"
beUtils "github.com/kubescape/backend/pkg/utils"
logger "github.com/kubescape/go-logger"
"github.com/kubescape/go-logger"

"github.com/kubescape/operator/servicehandler"
)
Expand Down Expand Up @@ -107,12 +107,12 @@ func main() {
restclient.SetDefaultWarningHandler(restclient.NoWarnings{})

kubernetesCache := objectcache.NewKubernetesCache(k8sApi)

// Creating the ObjectCache using KubernetesCache
objectCache := objectcache.NewObjectCache(kubernetesCache)

if components.ServiceScanConfig.Enabled {
logger.L().Ctx(ctx).Info("service discovery enabeld and started with interval: ", helpers.String("interval", components.ServiceScanConfig.Interval.String()))
logger.L().Info("service discovery enabled and started with interval: ", helpers.String("interval", components.ServiceScanConfig.Interval.String()))
go servicehandler.DiscoveryServiceHandler(ctx, k8sApi, components.ServiceScanConfig.Interval)
}

Expand Down Expand Up @@ -151,7 +151,7 @@ func main() {
if operatorConfig.ContinuousScanEnabled() {
go func(mh *mainhandler.MainHandler) {
err := mh.SetupContinuousScanning(ctx, cs.DefaultQueueSize, cfg.EventDeduplicationInterval)
logger.L().Ctx(ctx).Info("set up cont scanning service")
logger.L().Info("set up cont scanning service")
if err != nil {
logger.L().Ctx(ctx).Fatal(err.Error(), helpers.Error(err))
}
Expand All @@ -178,7 +178,6 @@ func main() {
ruleBindingNotify := make(chan rulebindingmanager.RuleBindingNotify, 100)
ruleBindingCache.AddNotifier(&ruleBindingNotify)


admissionController := webhook.New(addr, "/etc/certs/tls.crt", "/etc/certs/tls.key", runtime.NewScheme(), webhook.NewAdmissionValidator(k8sApi, objectCache, exporter, ruleBindingCache), ruleBindingCache)
// Start HTTP REST server for webhook
go func() {
Expand Down
Loading

0 comments on commit 20df76f

Please sign in to comment.