From 6685293de766f6730f53ac24a7b0d3ca2cb89064 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Wed, 4 Sep 2024 14:48:04 +0200 Subject: [PATCH 1/5] fix logger calls with context Signed-off-by: Matthias Bertschy --- continuousscanning/handlers.go | 12 +++++------- continuousscanning/service.go | 10 +++++----- continuousscanning/watchbuilder.go | 18 +++++++++--------- main.go | 9 ++++----- mainhandler/vulnscan.go | 6 +++--- restapihandler/triggeraction.go | 2 +- watcher/filteredsbomwatcher.go | 2 +- 7 files changed, 28 insertions(+), 31 deletions(-) diff --git a/continuousscanning/handlers.go b/continuousscanning/handlers.go index 7dfa46e0..70f63501 100644 --- a/continuousscanning/handlers.go +++ b/continuousscanning/handlers.go @@ -6,7 +6,7 @@ import ( "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" - watch "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/watch" "k8s.io/utils/ptr" armoapi "github.com/armosec/armoapi-go/apis" @@ -27,7 +27,7 @@ import ( sets "github.com/deckarep/golang-set/v2" ) -var orphanableWorkloadTypes sets.Set[string] = sets.NewSet[string]("Pod", "ReplicaSet", "Job") +var orphanableWorkloadTypes = sets.NewSet[string]("Pod", "ReplicaSet", "Job") type EventHandler interface { Handle(ctx context.Context, e watch.Event) error @@ -97,7 +97,7 @@ func unstructuredToScanObject(uObject *unstructured.Unstructured) (*objectsenvel } func triggerScanFor(ctx context.Context, uObject *unstructured.Unstructured, isDelete bool, wp *ants.PoolWithFunc, clusterConfig config.IConfig) error { - logger.L().Ctx(ctx).Info( + logger.L().Info( "triggering scan", helpers.String("kind", uObject.GetKind()), helpers.String("name", uObject.GetName()), @@ -122,7 +122,6 @@ func (h *poolInvokerHandler) Handle(ctx context.Context, e watch.Event) error { if e.Type != watch.Added && e.Type != watch.Modified { return nil } - isDelete := false uObject, err := eventToUnstructured(e) if err != nil { @@ -134,7 +133,7 @@ func (h *poolInvokerHandler) Handle(ctx context.Context, e watch.Event) error { return nil } - return triggerScanFor(ctx, uObject, isDelete, h.wp, h.clusterConfig) + return triggerScanFor(ctx, uObject, false, h.wp, h.clusterConfig) } func NewTriggeringHandler(wp *ants.PoolWithFunc, clusterConfig config.IConfig) EventHandler { @@ -220,7 +219,6 @@ func (h *deletedCleanerHandler) Handle(ctx context.Context, e watch.Event) error if e.Type != watch.Deleted { return nil } - isDelete := true uObject, err := eventToUnstructured(e) if err != nil { @@ -237,6 +235,6 @@ func (h *deletedCleanerHandler) Handle(ctx context.Context, e watch.Event) error logger.L().Ctx(ctx).Error("failed to delete CRDs", helpers.Error(err)) } - err = triggerScanFor(ctx, uObject, isDelete, h.wp, h.clusterConfig) + err = triggerScanFor(ctx, uObject, true, h.wp, h.clusterConfig) return err } diff --git a/continuousscanning/service.go b/continuousscanning/service.go index 7e69d8c6..bdad22da 100644 --- a/continuousscanning/service.go +++ b/continuousscanning/service.go @@ -5,7 +5,7 @@ import ( "time" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - watch "k8s.io/apimachinery/pkg/watch" + "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" armoapi "github.com/armosec/armoapi-go/apis" @@ -29,10 +29,10 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C resourceEventsCh := make(chan watch.Event, 100) gvrs := s.tl.LoadGVRs(ctx) - logger.L().Ctx(ctx).Info("fetched gvrs", helpers.Interface("gvrs", gvrs)) + logger.L().Info("fetched gvrs", helpers.Interface("gvrs", gvrs)) wp, _ := NewWatchPool(ctx, s.k8sdynamic, gvrs, listOpts) wp.Run(ctx, resourceEventsCh) - logger.L().Ctx(ctx).Info("ran watch pool") + logger.L().Info("ran watch pool") go func(shutdownCh <-chan struct{}, resourceEventsCh <-chan watch.Event, out *cooldownQueue) { defer out.Stop(ctx) @@ -40,7 +40,7 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C for { select { case e := <-resourceEventsCh: - logger.L().Ctx(ctx).Debug( + logger.L().Debug( "got event from channel", helpers.Interface("event", e), ) @@ -57,7 +57,7 @@ func (s *ContinuousScanningService) listen(ctx context.Context) <-chan armoapi.C func (s *ContinuousScanningService) work(ctx context.Context) { for e := range s.eventQueue.ResultChan { - logger.L().Ctx(ctx).Debug( + logger.L().Debug( "got an event to process", helpers.Interface("event", e), ) diff --git a/continuousscanning/watchbuilder.go b/continuousscanning/watchbuilder.go index 743dba67..d67b3859 100644 --- a/continuousscanning/watchbuilder.go +++ b/continuousscanning/watchbuilder.go @@ -66,8 +66,8 @@ func (w *SelfHealingWatch) Run(ctx context.Context, readyWg *sync.WaitGroup, out return ctx.Err() default: gvr := helpers.String("gvr", w.gvr.String()) - logger.L().Ctx(ctx).Debug("creating watch for GVR", gvr) - watch, err := w.makeWatchFunc(ctx, w.client, w.gvr, w.opts) + logger.L().Debug("creating watch for GVR", gvr) + watchFunc, err := w.makeWatchFunc(ctx, w.client, w.gvr, w.opts) if err != nil { logger.L().Ctx(ctx).Warning( "got error when creating a watch for gvr", @@ -76,8 +76,8 @@ func (w *SelfHealingWatch) Run(ctx context.Context, readyWg *sync.WaitGroup, out ) continue } - logger.L().Ctx(ctx).Debug("watch created\n") - w.currWatch = watch + logger.L().Debug("watch created\n") + w.currWatch = watchFunc // Watch is considered ready once it is successfully acquired // Signal we are done only the first time because @@ -97,7 +97,7 @@ type WatchPool struct { } func (wp *WatchPool) Run(ctx context.Context, out chan<- watch.Event) { - logger.L().Ctx(ctx).Info("Watch pool: starting") + logger.L().Info("Watch pool: starting") wg := &sync.WaitGroup{} for idx := range wp.pool { @@ -106,17 +106,17 @@ func (wp *WatchPool) Run(ctx context.Context, out chan<- watch.Event) { } wg.Wait() - logger.L().Ctx(ctx).Info("Watch pool: started ok") + logger.L().Info("Watch pool: started ok") } -func NewWatchPool(ctx context.Context, client dynamic.Interface, gvrs []schema.GroupVersionResource, opts metav1.ListOptions) (*WatchPool, error) { +func NewWatchPool(_ context.Context, client dynamic.Interface, gvrs []schema.GroupVersionResource, opts metav1.ListOptions) (*WatchPool, error) { watches := make([]*SelfHealingWatch, len(gvrs)) for idx := range gvrs { gvr := gvrs[idx] - watch := NewSelfHealingWatch(client, gvr, opts) + selfHealingWatch := NewSelfHealingWatch(client, gvr, opts) - watches[idx] = watch + watches[idx] = selfHealingWatch } pool := &WatchPool{pool: watches} diff --git a/main.go b/main.go index 17ad2537..e3ffc725 100644 --- a/main.go +++ b/main.go @@ -33,7 +33,7 @@ import ( "github.com/armosec/utils-k8s-go/probes" beUtils "github.com/kubescape/backend/pkg/utils" - logger "github.com/kubescape/go-logger" + "github.com/kubescape/go-logger" "github.com/kubescape/operator/servicehandler" ) @@ -107,12 +107,12 @@ func main() { restclient.SetDefaultWarningHandler(restclient.NoWarnings{}) kubernetesCache := objectcache.NewKubernetesCache(k8sApi) - + // Creating the ObjectCache using KubernetesCache objectCache := objectcache.NewObjectCache(kubernetesCache) if components.ServiceScanConfig.Enabled { - logger.L().Ctx(ctx).Info("service discovery enabeld and started with interval: ", helpers.String("interval", components.ServiceScanConfig.Interval.String())) + logger.L().Info("service discovery enabled and started with interval: ", helpers.String("interval", components.ServiceScanConfig.Interval.String())) go servicehandler.DiscoveryServiceHandler(ctx, k8sApi, components.ServiceScanConfig.Interval) } @@ -151,7 +151,7 @@ func main() { if operatorConfig.ContinuousScanEnabled() { go func(mh *mainhandler.MainHandler) { err := mh.SetupContinuousScanning(ctx, cs.DefaultQueueSize, cfg.EventDeduplicationInterval) - logger.L().Ctx(ctx).Info("set up cont scanning service") + logger.L().Info("set up cont scanning service") if err != nil { logger.L().Ctx(ctx).Fatal(err.Error(), helpers.Error(err)) } @@ -178,7 +178,6 @@ func main() { ruleBindingNotify := make(chan rulebindingmanager.RuleBindingNotify, 100) ruleBindingCache.AddNotifier(&ruleBindingNotify) - admissionController := webhook.New(addr, "/etc/certs/tls.crt", "/etc/certs/tls.key", runtime.NewScheme(), webhook.NewAdmissionValidator(k8sApi, objectCache, exporter, ruleBindingCache), ruleBindingCache) // Start HTTP REST server for webhook go func() { diff --git a/mainhandler/vulnscan.go b/mainhandler/vulnscan.go index 1e8f052e..bc3be548 100644 --- a/mainhandler/vulnscan.go +++ b/mainhandler/vulnscan.go @@ -23,7 +23,7 @@ import ( corev1 "k8s.io/api/core/v1" "k8s.io/utils/strings/slices" - uuid "github.com/google/uuid" + "github.com/google/uuid" "github.com/armosec/armoapi-go/apis" apitypes "github.com/armosec/armoapi-go/armotypes" @@ -481,7 +481,7 @@ func sendWorkloadWithCredentials(ctx context.Context, scanUrl *url.URL, command imageScanCommand, ok := command.(*apis.WebsocketScanCommand) instanceID := "NOT_A_WEBSOCKET_SCAN_COMMAND" if !ok { - logger.L().Ctx(ctx).Debug("Not an image scan command") + logger.L().Debug("Not an image scan command") } else { instanceID = *imageScanCommand.InstanceID } @@ -490,7 +490,7 @@ func sendWorkloadWithCredentials(ctx context.Context, scanUrl *url.URL, command return fmt.Errorf("failed to marshal websocketScanCommand with err %v", err) } if command.GetWlid() == "" { - logger.L().Ctx(ctx).Debug(fmt.Sprintf("sending scan command to kubevuln: %s", string(jsonScannerC))) + logger.L().Debug(fmt.Sprintf("sending scan command to kubevuln: %s", string(jsonScannerC))) } creds := command.GetCreds() diff --git a/restapihandler/triggeraction.go b/restapihandler/triggeraction.go index 43f272d2..5896261a 100644 --- a/restapihandler/triggeraction.go +++ b/restapihandler/triggeraction.go @@ -67,7 +67,7 @@ func (resthandler *HTTPHandler) ActionRequest(w http.ResponseWriter, r *http.Req w.WriteHeader(http.StatusInternalServerError) bErr, _ := json.Marshal(err) w.Write(bErr) - logger.L().Ctx(context.Background()).Fatal("recover in ActionRequest", helpers.Interface("error", err)) + logger.L().Fatal("recover in ActionRequest", helpers.Interface("error", err)) } }() diff --git a/watcher/filteredsbomwatcher.go b/watcher/filteredsbomwatcher.go index c8535f87..2f1097e8 100644 --- a/watcher/filteredsbomwatcher.go +++ b/watcher/filteredsbomwatcher.go @@ -117,7 +117,7 @@ func (wh *WatchHandler) HandleSBOMFilteredEvents(sfEvents <-chan watch.Event, pr containerData, err := wh.getContainerDataFilteredSBOM(obj) if err != nil { - logger.L().Ctx(context.TODO()).Error("failed to get container data from filtered SBOM", + logger.L().Error("failed to get container data from filtered SBOM", helpers.String("name", obj.ObjectMeta.Name), helpers.String("namespace", obj.ObjectMeta.Namespace), helpers.Interface("annotations", obj.ObjectMeta.Annotations), From de152fe6109d6de386493c871a62c5e8f8cf5491 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Wed, 4 Sep 2024 15:17:31 +0200 Subject: [PATCH 2/5] rewrite service discovery with pagers Signed-off-by: Matthias Bertschy --- servicehandler/servicediscovery.go | 106 +++++++++--------------- servicehandler/servicediscovery_test.go | 41 ++++++++- 2 files changed, 79 insertions(+), 68 deletions(-) diff --git a/servicehandler/servicediscovery.go b/servicehandler/servicediscovery.go index 962ab0c0..496abe83 100644 --- a/servicehandler/servicediscovery.go +++ b/servicehandler/servicediscovery.go @@ -2,11 +2,13 @@ package servicehandler import ( "context" - "slices" + "fmt" "sync" "time" - logger "github.com/kubescape/go-logger" + mapset "github.com/deckarep/golang-set/v2" + "github.com/kubescape/go-logger" + "github.com/kubescape/go-logger/helpers" "github.com/kubescape/k8s-interface/k8sinterface" "github.com/panjf2000/ants/v2" v1 "k8s.io/api/core/v1" @@ -16,6 +18,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/dynamic" "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/pager" ) const ( @@ -28,7 +31,7 @@ const ( workerNum = int(20) ) -var protocolFilter = []string{"UDP"} +var protocolFilter = mapset.NewSet("UDP") var serviceListOptions = metav1.ListOptions{ FieldSelector: "metadata.namespace!=kube-system", @@ -40,29 +43,25 @@ var ServiceScanSchema = schema.GroupVersionResource{ Resource: resource, } -func deleteServices(ctx context.Context, client dynamic.NamespaceableResourceInterface, currentServicesMetadata []metadata) { +func deleteServices(ctx context.Context, client dynamic.NamespaceableResourceInterface, currentServices mapset.Set[string]) { // get all services from the current cycle and compare them with the current CRDs - authServices, err := client.List(ctx, metav1.ListOptions{}) - if err != nil { - logger.L().Ctx(ctx).Error(err.Error()) - return - } - - for _, service := range authServices.Items { - crdMetadata := metadata{ - name: service.GetName(), - namespace: service.GetNamespace(), - } - - if !slices.Contains(currentServicesMetadata, crdMetadata) { + if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (k8sruntime.Object, error) { + return client.List(ctx, opts) + }).EachListItem(ctx, metav1.ListOptions{}, func(obj k8sruntime.Object) error { + service := obj.(*unstructured.Unstructured) + if !currentServices.Contains(service.GetNamespace() + "/" + service.GetName()) { err := client.Namespace(service.GetNamespace()).Delete(ctx, service.GetName(), metav1.DeleteOptions{}) if err != nil { - logger.L().Ctx(ctx).Error(err.Error()) - continue + logger.L().Ctx(ctx).Error("failed to delete service", helpers.Error(err), helpers.String("namespace", service.GetNamespace()), helpers.String("name", service.GetName())) + } else { + logger.L().Debug("Authentication Service " + service.GetName() + " in namespace " + service.GetNamespace() + " deleted") } - logger.L().Ctx(ctx).Info("Authentication Service " + service.GetName() + " in namespace " + service.GetNamespace() + " deleted") } + return nil + }); err != nil { + logger.L().Ctx(ctx).Error(err.Error()) + return } } @@ -83,7 +82,7 @@ type spec struct { ports []Port } -func (sra serviceAuthentication) unstructured() (*unstructured.Unstructured, error) { +func (sra *serviceAuthentication) unstructured() (*unstructured.Unstructured, error) { a, err := k8sruntime.DefaultUnstructuredConverter.ToUnstructured(&sra) if err != nil { logger.L().Error(err.Error()) @@ -111,7 +110,7 @@ func (sra *serviceAuthentication) serviceScan(ctx context.Context, client dynami for idx := range sra.spec.ports { pr := &sra.spec.ports[idx] - if slices.Contains(protocolFilter, string(pr.protocol)) { + if protocolFilter.Contains(pr.protocol) { continue } @@ -124,43 +123,7 @@ func (sra *serviceAuthentication) serviceScan(ctx context.Context, client dynami return sra.applyCrd(ctx, client) } -func getClusterServices(ctx context.Context, regularClient kubernetes.Interface) (*v1.ServiceList, error) { - services, err := regularClient.CoreV1().Services("").List(ctx, serviceListOptions) - if err != nil { - logger.L().Ctx(ctx).Error(err.Error()) - return nil, err - } - return services, nil -} - -func serviceExtractor(ctx context.Context, regularClient kubernetes.Interface) ([]serviceAuthentication, []metadata) { - // get a list of all services in the cluster - services, err := getClusterServices(ctx, regularClient) - if err != nil { - return []serviceAuthentication{}, []metadata{} - } - - currentServiceList := make([]serviceAuthentication, 0, len(services.Items)) - metadataList := make([]metadata, 0, len(services.Items)) - for _, service := range services.Items { - sra := serviceAuthentication{} - sra.kind = kind - sra.apiVersion = apiVersion - sra.metadata.name = service.Name - sra.metadata.namespace = service.Namespace - sra.spec.clusterIP = service.Spec.ClusterIP - sra.spec.ports = K8sPortsTranslator(service.Spec.Ports) - - currentServiceList = append(currentServiceList, sra) - metadataList = append(metadataList, sra.metadata) - } - return currentServiceList, metadataList - -} - func discoveryService(ctx context.Context, regularClient kubernetes.Interface, dynamicClient dynamic.Interface) error { - serviceList, metadataList := serviceExtractor(ctx, regularClient) - scanWg := sync.WaitGroup{} p, err := ants.NewPoolWithFunc(workerNum, func(i interface{}) { defer scanWg.Done() @@ -173,23 +136,34 @@ func discoveryService(ctx context.Context, regularClient kubernetes.Interface, d logger.L().Ctx(ctx).Error(scanErr.Error()) } }) - if err != nil { - return err + return fmt.Errorf("failed to create a pool of workers: %w", err) } - for _, sra := range serviceList { + currentServices := mapset.NewSet[string]() + _ = pager.New(func(ctx context.Context, opts metav1.ListOptions) (k8sruntime.Object, error) { + return regularClient.CoreV1().Services("").List(ctx, opts) + }).EachListItem(ctx, serviceListOptions, func(obj k8sruntime.Object) error { scanWg.Add(1) + service := obj.(*v1.Service) + sra := serviceAuthentication{} + sra.kind = kind + sra.apiVersion = apiVersion + sra.metadata.name = service.Name + sra.metadata.namespace = service.Namespace + sra.spec.clusterIP = service.Spec.ClusterIP + sra.spec.ports = K8sPortsTranslator(service.Spec.Ports) + currentServices.Add(service.Namespace + "/" + service.Name) err := p.Invoke(sra) if err != nil { logger.L().Ctx(ctx).Error(err.Error()) - continue } - } + return nil + }) scanWg.Wait() p.Release() - deleteServices(ctx, dynamicClient.Resource(ServiceScanSchema), metadataList) + deleteServices(ctx, dynamicClient.Resource(ServiceScanSchema), currentServices) return nil } @@ -198,12 +172,12 @@ func DiscoveryServiceHandler(ctx context.Context, kubeClient *k8sinterface.Kuber regularClient := kubeClient.KubernetesClient for { - logger.L().Ctx(ctx).Info("starting a new service discovery handling") + logger.L().Info("starting a new service discovery handling") err := discoveryService(ctx, regularClient, dynamicClient) if err != nil { logger.L().Ctx(ctx).Error(err.Error()) } else { - logger.L().Ctx(ctx).Info("finished service discovery cycle") + logger.L().Info("finished service discovery cycle") } time.Sleep(interval) diff --git a/servicehandler/servicediscovery_test.go b/servicehandler/servicediscovery_test.go index 925e54ae..86eba518 100644 --- a/servicehandler/servicediscovery_test.go +++ b/servicehandler/servicediscovery_test.go @@ -2,8 +2,10 @@ package servicehandler import ( "context" + "slices" "testing" + "github.com/kubescape/go-logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" v1 "k8s.io/api/core/v1" @@ -13,6 +15,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" dynamicFake "k8s.io/client-go/dynamic/fake" + "k8s.io/client-go/kubernetes" kubernetesFake "k8s.io/client-go/kubernetes/fake" "k8s.io/utils/ptr" ) @@ -113,7 +116,7 @@ func Test_translate(t *testing.T) { func TestDiscoveryServiceHandler(t *testing.T) { //write a component test that creates fake client and test the service discovery and see if it creates a crd //and if it deletes the crd - //IMPORTANT: fake cilent doesnt have an Apply option like the real client so we need to create the crd and check if it exists -it will blog errors but will pass + //IMPORTANT: fake client doesn't have an Apply option like the real client so we need to create the crd and check if it exists -it will blog errors but will pass testCases := []struct { name string services []runtime.Object @@ -297,7 +300,7 @@ func TestDiscoveryServiceHandler(t *testing.T) { ServiceScanSchema: "ServiceScanList", }, ) - inObjects := append(tc.services, tc.delete...) + inObjects := slices.Concat(tc.services, tc.delete) regClient := kubernetesFake.NewSimpleClientset(inObjects...) var crds *unstructured.UnstructuredList @@ -331,3 +334,37 @@ func TestDiscoveryServiceHandler(t *testing.T) { } } + +func getClusterServices(ctx context.Context, regularClient kubernetes.Interface) (*v1.ServiceList, error) { + services, err := regularClient.CoreV1().Services("").List(ctx, serviceListOptions) + if err != nil { + logger.L().Ctx(ctx).Error(err.Error()) + return nil, err + } + return services, nil +} + +func serviceExtractor(ctx context.Context, regularClient kubernetes.Interface) ([]serviceAuthentication, []metadata) { + // get a list of all services in the cluster + services, err := getClusterServices(ctx, regularClient) + if err != nil { + return []serviceAuthentication{}, []metadata{} + } + + currentServiceList := make([]serviceAuthentication, 0, len(services.Items)) + metadataList := make([]metadata, 0, len(services.Items)) + for _, service := range services.Items { + sra := serviceAuthentication{} + sra.kind = kind + sra.apiVersion = apiVersion + sra.metadata.name = service.Name + sra.metadata.namespace = service.Namespace + sra.spec.clusterIP = service.Spec.ClusterIP + sra.spec.ports = K8sPortsTranslator(service.Spec.Ports) + + currentServiceList = append(currentServiceList, sra) + metadataList = append(metadataList, sra.metadata) + } + return currentServiceList, metadataList + +} From e5d19de134de7228861870c6056afc8c3cc09b4c Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Fri, 6 Sep 2024 14:13:07 +0200 Subject: [PATCH 3/5] rewrite request handlers with pagers Signed-off-by: Matthias Bertschy --- mainhandler/handlerequests.go | 104 +++++++++++++---------------- mainhandler/handlerequestsutils.go | 48 +------------ 2 files changed, 48 insertions(+), 104 deletions(-) diff --git a/mainhandler/handlerequests.go b/mainhandler/handlerequests.go index 6463a54f..79a68c93 100644 --- a/mainhandler/handlerequests.go +++ b/mainhandler/handlerequests.go @@ -9,10 +9,11 @@ import ( "github.com/kubescape/backend/pkg/versioncheck" "github.com/kubescape/k8s-interface/workloadinterface" - "k8s.io/apimachinery/pkg/runtime" - core1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/pager" "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" @@ -153,7 +154,7 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) { watchHandler := watcher.NewWatchHandler(ctx, mainHandler.config, mainHandler.k8sAPI, ksStorageClient, eventQueue) // wait for the kubevuln component to be ready - logger.L().Ctx(ctx).Info("Waiting for vuln scan to be ready") + logger.L().Info("Waiting for vuln scan to be ready") waitFunc := isActionNeedToWait(apis.Command{CommandName: apis.TypeScanImages}) waitFunc(mainHandler.config) @@ -162,7 +163,7 @@ func (mainHandler *MainHandler) HandleWatchers(ctx context.Context) { go watchHandler.SBOMFilteredWatch(ctx, mainHandler.eventWorkerPool) } -func (h *MainHandler) StartContinuousScanning(ctx context.Context) error { +func (h *MainHandler) StartContinuousScanning(_ context.Context) error { return nil } @@ -201,7 +202,7 @@ func (mainHandler *MainHandler) handleRequest(j utils.Job) { sessionObj.Reporter.SendError(err, mainHandler.sendReport, true) } else { sessionObj.Reporter.SendStatus(systemreports.JobDone, mainHandler.sendReport) - logger.L().Ctx(ctx).Info("action completed successfully", helpers.String("command", string(sessionObj.Command.CommandName)), helpers.String("wlid", sessionObj.Command.GetID())) + logger.L().Info("action completed successfully", helpers.String("command", string(sessionObj.Command.CommandName)), helpers.String("wlid", sessionObj.Command.GetID())) } } span.End() @@ -270,8 +271,8 @@ func (mainHandler *MainHandler) HandleScopedRequest(ctx context.Context, session return } - labels := sessionObj.Command.GetLabels() - fields := sessionObj.Command.GetFieldSelector() + podLabels := sessionObj.Command.GetLabels() + fieldSelector := sessionObj.Command.GetFieldSelector() namespaces, err := mainHandler.getNamespaces(ctx, sessionObj) if err != nil { logger.L().Ctx(ctx).Error("failed to list namespaces", helpers.Error(err)) @@ -279,30 +280,37 @@ func (mainHandler *MainHandler) HandleScopedRequest(ctx context.Context, session return } - info := fmt.Sprintf("%s: id: '%s', namespaces: '%v', labels: '%v', fieldSelector: '%v'", sessionObj.Command.CommandName, sessionObj.Command.GetID(), namespaces, labels, fields) + info := fmt.Sprintf("%s: id: '%s', namespaces: '%v', labels: '%v', fieldSelector: '%v'", sessionObj.Command.CommandName, sessionObj.Command.GetID(), namespaces, podLabels, fieldSelector) logger.L().Info(info) sessionObj.Reporter.SendDetails(info, mainHandler.sendReport) + listOptions := metav1.ListOptions{} + if len(podLabels) > 0 { + set := labels.Set(podLabels) + listOptions.LabelSelector = k8sinterface.SelectorToString(set) + } + if len(fieldSelector) > 0 { + set := labels.Set(fieldSelector) + listOptions.FieldSelector = k8sinterface.SelectorToString(set) + } + for _, ns := range namespaces { if mainHandler.config.SkipNamespace(ns) { continue } - ids, errs := mainHandler.getIDs(ns, labels, fields, []string{"pods"}) - for i := range errs { - logger.L().Ctx(ctx).Warning(errs[i].Error()) - sessionObj.Reporter.SendError(errs[i], mainHandler.sendReport, true) - } sessionObj.Reporter.SendStatus(systemreports.JobSuccess, mainHandler.sendReport) - logger.L().Info(fmt.Sprintf("ids found: '%v'", ids)) - - for i := range ids { + if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return mainHandler.k8sAPI.KubernetesClient.CoreV1().Pods(ns).List(ctx, opts) + }).EachListItem(ctx, listOptions, func(obj runtime.Object) error { + pod := obj.(*core1.Pod) + podId := pkgwlid.GetWLID(mainHandler.config.ClusterName(), pod.GetNamespace(), "pod", pod.GetName()) cmd := sessionObj.Command.DeepCopy() var err error - if pkgwlid.IsWlid(ids[i]) { - cmd.Wlid = ids[i] + if pkgwlid.IsWlid(podId) { + cmd.Wlid = podId err = pkgwlid.IsWlidValid(cmd.Wlid) } else { err = fmt.Errorf("unknown id") @@ -319,17 +327,21 @@ func (mainHandler *MainHandler) HandleScopedRequest(ctx context.Context, session err := fmt.Errorf("invalid: %s, id: '%s'", err.Error(), newSessionObj.Command.GetID()) logger.L().Ctx(ctx).Error(err.Error()) sessionObj.Reporter.SendError(err, mainHandler.sendReport, true) - continue + return nil } logger.L().Info("triggering", helpers.String("id", newSessionObj.Command.GetID())) if err := mainHandler.HandleSingleRequest(ctx, newSessionObj); err != nil { logger.L().Ctx(ctx).Error("failed to complete action", helpers.String("command", string(sessionObj.Command.CommandName)), helpers.String("wlid", sessionObj.Command.GetID()), helpers.Error(err)) sessionObj.Reporter.SendError(err, mainHandler.sendReport, true) - continue + return nil } sessionObj.Reporter.SendStatus(systemreports.JobDone, mainHandler.sendReport) - logger.L().Ctx(ctx).Info("action completed successfully", helpers.String("command", string(sessionObj.Command.CommandName)), helpers.String("wlid", sessionObj.Command.GetID())) + logger.L().Info("action completed successfully", helpers.String("command", string(sessionObj.Command.CommandName)), helpers.String("wlid", sessionObj.Command.GetID())) + return nil + }); err != nil { + logger.L().Ctx(ctx).Warning(err.Error()) + sessionObj.Reporter.SendError(err, mainHandler.sendReport, true) } } } @@ -370,17 +382,13 @@ func (mainHandler *MainHandler) HandleImageScanningScopedRequest(ctx context.Con if mainHandler.config.SkipNamespace(ns) { continue } - pods, err := mainHandler.k8sAPI.KubernetesClient.CoreV1().Pods(ns).List(ctx, listOptions) - if err != nil { - logger.L().Ctx(ctx).Error("failed to list pods", helpers.String("namespace", ns), helpers.Error(err)) - sessionObj.Reporter.SendError(err, mainHandler.sendReport, true) - continue - } - for i := range pods.Items { - pod := pods.Items[i] + if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return mainHandler.k8sAPI.KubernetesClient.CoreV1().Pods(ns).List(ctx, opts) + }).EachListItem(ctx, listOptions, func(obj runtime.Object) error { + pod := obj.(*core1.Pod) if pod.Status.Phase != core1.PodRunning { // skip non-running pods, for some reason the list includes non-running pods - continue + return nil } // need to set APIVersion and Kind before unstructured conversion, preparing for instanceID extraction pod.APIVersion = "v1" @@ -390,19 +398,19 @@ func (mainHandler *MainHandler) HandleImageScanningScopedRequest(ctx context.Con unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&pod) if err != nil { logger.L().Ctx(ctx).Error("failed to convert pod to unstructured", helpers.String("pod", pod.GetName()), helpers.String("namespace", pod.GetNamespace()), helpers.Error(err)) - continue + return nil } wl := workloadinterface.NewWorkloadObj(unstructuredObj) instanceIDs, err := instanceidhandlerv1.GenerateInstanceID(wl) if err != nil { logger.L().Ctx(ctx).Error("failed to generate instance ID for pod", helpers.String("pod", pod.GetName()), helpers.String("namespace", pod.GetNamespace()), helpers.Error(err)) - continue + return nil } // for naked pods, only handle if pod is older than guard time if !k8sinterface.WorkloadHasParent(wl) && time.Now().Before(pod.CreationTimestamp.Add(mainHandler.config.GuardTime())) { logger.L().Debug("naked pod younger than guard time detected, skipping scan", helpers.String("pod", pod.GetName()), helpers.String("namespace", pod.GetNamespace()), helpers.String("creationTimestamp", pod.CreationTimestamp.String())) - continue + return nil } for _, instanceID := range instanceIDs { @@ -413,7 +421,7 @@ func (mainHandler *MainHandler) HandleImageScanningScopedRequest(ctx context.Con } // get container data - containerData, err := utils.PodToContainerData(mainHandler.k8sAPI, &pod, instanceID, mainHandler.config.ClusterName()) + containerData, err := utils.PodToContainerData(mainHandler.k8sAPI, pod, instanceID, mainHandler.config.ClusterName()) if err != nil { // if pod is not running, we can't get the image id continue @@ -442,33 +450,13 @@ func (mainHandler *MainHandler) HandleImageScanningScopedRequest(ctx context.Con logger.L().Info("action completed successfully", helpers.String("id", newSessionObj.Command.GetID()), helpers.String("slug", s), helpers.String("containerName", containerData.ContainerName), helpers.String("imageTag", containerData.ImageTag), helpers.String("imageID", containerData.ImageID)) slugs[s] = true } - } - } -} - -func (mainHandler *MainHandler) getIDs(namespace string, labels, fields map[string]string, resources []string) ([]string, []error) { - var ids []string - var errs []error - for _, resource := range resources { - workloads, err := mainHandler.listWorkloads(namespace, resource, labels, fields) - if err != nil { - errs = append(errs, err) - } - if len(workloads) == 0 { + return nil + }); err != nil { + logger.L().Ctx(ctx).Error("failed to list pods", helpers.String("namespace", ns), helpers.Error(err)) + sessionObj.Reporter.SendError(err, mainHandler.sendReport, true) continue } - w, e := mainHandler.getResourcesIDs(workloads) - if len(e) != 0 { - errs = append(errs, e...) - } - if len(w) == 0 { - err := fmt.Errorf("resource: '%s', failed to calculate workloadIDs. namespace: '%s', labels: '%v'", resource, namespace, labels) - errs = append(errs, err) - } - ids = append(ids, w...) } - - return ids, errs } func (mainHandler *MainHandler) getNamespaces(ctx context.Context, sessionObj *utils.SessionObj) ([]string, error) { diff --git a/mainhandler/handlerequestsutils.go b/mainhandler/handlerequestsutils.go index 74a61b14..55728d11 100644 --- a/mainhandler/handlerequestsutils.go +++ b/mainhandler/handlerequestsutils.go @@ -4,57 +4,13 @@ import ( "fmt" "time" - "github.com/kubescape/go-logger" - "github.com/kubescape/operator/config" - "github.com/kubescape/operator/utils" - "github.com/armosec/armoapi-go/apis" "github.com/armosec/utils-go/httputils" "github.com/armosec/utils-k8s-go/probes" - pkgwlid "github.com/armosec/utils-k8s-go/wlid" - - "github.com/kubescape/k8s-interface/k8sinterface" + "github.com/kubescape/go-logger" + "github.com/kubescape/operator/config" ) -func (mainHandler *MainHandler) listWorkloads(namespace string, resource string, labels, fields map[string]string) ([]k8sinterface.IWorkload, error) { - groupVersionResource, err := k8sinterface.GetGroupVersionResource(resource) - if err != nil { - return nil, err - } - res, err := mainHandler.k8sAPI.ListWorkloads(&groupVersionResource, namespace, labels, fields) - if err != nil { - return res, err - } - return res, nil -} -func (mainHandler *MainHandler) getResourcesIDs(workloads []k8sinterface.IWorkload) ([]string, []error) { - errs := []error{} - idMap := make(map[string]interface{}) - for i := range workloads { - switch workloads[i].GetKind() { - case "Namespace": - idMap[pkgwlid.GetWLID(mainHandler.config.ClusterName(), workloads[i].GetName(), "namespace", workloads[i].GetName())] = true - default: - // find wlid - kind, name, err := mainHandler.k8sAPI.CalculateWorkloadParentRecursive(workloads[i]) - if err != nil { - errs = append(errs, fmt.Errorf("CalculateWorkloadParentRecursive: namespace: %s, pod name: %s, error: %s", workloads[i].GetNamespace(), workloads[i].GetName(), err.Error())) - } - - // skip cronjobs - if kind == "CronJob" { - continue - } - - wlid := pkgwlid.GetWLID(mainHandler.config.ClusterName(), workloads[i].GetNamespace(), kind, name) - if wlid != "" { - idMap[wlid] = true - } - } - } - return utils.MapToString(idMap), errs -} - func notWaitAtAll(_ config.IConfig) { } From 70bfc8b15f9c2fec172076fa1d5506d2cef7010c Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Fri, 6 Sep 2024 15:09:34 +0200 Subject: [PATCH 4/5] rewrite getRegistryScanSecrets with pagers Signed-off-by: Matthias Bertschy --- mainhandler/imageregistryhandler.go | 29 +++++++++++++++-------------- mainhandler/vulnscan.go | 3 ++- mainhandler/vulnscan_test.go | 28 ++++++++++++---------------- 3 files changed, 29 insertions(+), 31 deletions(-) diff --git a/mainhandler/imageregistryhandler.go b/mainhandler/imageregistryhandler.go index b24965b6..45bbeaf4 100644 --- a/mainhandler/imageregistryhandler.go +++ b/mainhandler/imageregistryhandler.go @@ -12,9 +12,12 @@ import ( "github.com/aws/aws-sdk-go/aws" "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" + "github.com/kubescape/k8s-interface/workloadinterface" "github.com/kubescape/operator/config" "github.com/kubescape/operator/utils" "github.com/mitchellh/mapstructure" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/client-go/tools/pager" regCommon "github.com/armosec/registryx/common" regInterfaces "github.com/armosec/registryx/interfaces" @@ -806,13 +809,7 @@ func (registryScan *registryScan) setRegistryInfoFromConfigMap(registryInfo *arm registryInfo.Exclude = registryConfig.Exclude } -// KubernetesApiSecrets is an interface for getting workloads from k8s api -type IWorkloadsGetter interface { - GetWorkload(namespace, kind, name string) (k8sinterface.IWorkload, error) - ListWorkloads2(namespace, kind string) ([]k8sinterface.IWorkload, error) -} - -func getRegistryScanSecrets(k8sAPI IWorkloadsGetter, namespace, secretName string) ([]k8sinterface.IWorkload, error) { +func getRegistryScanSecrets(k8sAPI *k8sinterface.KubernetesApi, namespace, secretName string) ([]k8sinterface.IWorkload, error) { if secretName != "" { secret, err := k8sAPI.GetWorkload(namespace, "Secret", secretName) if err == nil && secret != nil { @@ -822,15 +819,19 @@ func getRegistryScanSecrets(k8sAPI IWorkloadsGetter, namespace, secretName strin // when secret name is not provided, we will try to find all secrets starting with kubescape-registry-scan var registryScanSecrets []k8sinterface.IWorkload - all, err := k8sAPI.ListWorkloads2(namespace, "Secret") - if err == nil { - for _, secret := range all { - if strings.HasPrefix(secret.GetName(), armotypes.RegistryScanSecretName) { - registryScanSecrets = append(registryScanSecrets, secret) + err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { + return k8sAPI.KubernetesClient.CoreV1().Secrets(namespace).List(ctx, opts) + }).EachListItem(k8sAPI.Context, metav1.ListOptions{}, func(obj runtime.Object) error { + secret := obj.(*corev1.Secret) + if strings.HasPrefix(secret.GetName(), registryScanConfigmap) { + unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(secret) + if err == nil { + wl := workloadinterface.NewWorkloadObj(unstructuredObj) + registryScanSecrets = append(registryScanSecrets, wl) } } - } - + return nil + }) return registryScanSecrets, err } diff --git a/mainhandler/vulnscan.go b/mainhandler/vulnscan.go index bc3be548..4f06f4f6 100644 --- a/mainhandler/vulnscan.go +++ b/mainhandler/vulnscan.go @@ -14,6 +14,7 @@ import ( "github.com/kubescape/backend/pkg/server/v1/systemreports" "github.com/kubescape/go-logger" "github.com/kubescape/go-logger/helpers" + "github.com/kubescape/k8s-interface/k8sinterface" "github.com/kubescape/operator/config" "github.com/kubescape/operator/utils" "go.opentelemetry.io/otel" @@ -406,7 +407,7 @@ type ImageScanConfig struct { authConfigs []dockerregistry.AuthConfig } -func getImageScanConfig(k8sAPI IWorkloadsGetter, namespace string, pod *corev1.Pod, imageTag string) (*ImageScanConfig, error) { +func getImageScanConfig(k8sAPI *k8sinterface.KubernetesApi, namespace string, pod *corev1.Pod, imageTag string) (*ImageScanConfig, error) { imageScanConfig := ImageScanConfig{} registryName := getRegistryNameFromImageTag(imageTag) logger.L().Debug("parsed registry name from image tag", helpers.String("registryName", registryName), helpers.String("imageTag", imageTag)) diff --git a/mainhandler/vulnscan_test.go b/mainhandler/vulnscan_test.go index 7c873278..f6e6d1c7 100644 --- a/mainhandler/vulnscan_test.go +++ b/mainhandler/vulnscan_test.go @@ -1,33 +1,23 @@ package mainhandler import ( + "context" + "encoding/json" "testing" _ "embed" dockerregistry "github.com/docker/docker/api/types/registry" "github.com/kubescape/k8s-interface/k8sinterface" - "github.com/kubescape/k8s-interface/workloadinterface" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + k8sfake "k8s.io/client-go/kubernetes/fake" ) //go:embed testdata/vulnscan/registry-secret.json var registrySecret []byte -type WorkloadsGetterMock struct{} - -func (mock *WorkloadsGetterMock) GetWorkload(namespace, kind, name string) (k8sinterface.IWorkload, error) { - wl, err := workloadinterface.NewWorkload(registrySecret) - if err != nil { - panic(err) - } - return wl, nil -} -func (mock *WorkloadsGetterMock) ListWorkloads2(namespace, kind string) ([]k8sinterface.IWorkload, error) { - wl, _ := mock.GetWorkload(namespace, kind, "") - return []k8sinterface.IWorkload{wl}, nil -} - func Test_ActionHandler_getImageScanConfig(t *testing.T) { expectedAuthConfigs := []dockerregistry.AuthConfig{ { @@ -42,7 +32,13 @@ func Test_ActionHandler_getImageScanConfig(t *testing.T) { }, } - k8sApiMock := &WorkloadsGetterMock{} + var secret *corev1.Secret + require.NoError(t, json.Unmarshal(registrySecret, &secret)) + + k8sApiMock := &k8sinterface.KubernetesApi{ + Context: context.TODO(), + KubernetesClient: k8sfake.NewSimpleClientset(secret), + } res, err := getImageScanConfig(k8sApiMock, "", nil, "nginx:latest") // no registry treated as docker.io assert.NoError(t, err) From 52c31b860eb533a2e9cc98f024c3f718054b7673 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Tue, 10 Sep 2024 08:48:38 +0200 Subject: [PATCH 5/5] replace outdated vuln_scan system test Signed-off-by: Matthias Bertschy --- .github/workflows/pr-merged.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-merged.yaml b/.github/workflows/pr-merged.yaml index ea380fa8..5fda21a0 100644 --- a/.github/workflows/pr-merged.yaml +++ b/.github/workflows/pr-merged.yaml @@ -37,7 +37,7 @@ jobs: BUILD_PLATFORM: linux/amd64,linux/arm64 GO_VERSION: "1.23" REQUIRED_TESTS: '[ - "vuln_scan", + "vuln_v2_views", "vuln_scan_trigger_scan_public_registry", "vuln_scan_trigger_scan_public_registry_excluded", "vuln_scan_trigger_scan_private_quay_registry",