Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Change informer initialization to read target namespace (#6)
Browse files Browse the repository at this point in the history
  • Loading branch information
andresmgot authored Aug 2, 2018
1 parent 612abcb commit c26169f
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 28 deletions.
1 change: 1 addition & 0 deletions cmd/kafka-trigger-controller/kafka-controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ var rootCmd = &cobra.Command{
}

kafkaTriggerCfg := controller.KafkaTriggerConfig{
KubeCli: kubelessutils.GetClient(),
TriggerClient: kafkaClient,
KubelessClient: kubelessClient,
}
Expand Down
14 changes: 13 additions & 1 deletion kafka-zookeeper.jsonnet
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,21 @@ local crd = [
},
];

local controllerEnv = [
{
name: "KUBELESS_NAMESPACE",
valueFrom: {fieldRef: {fieldPath: "metadata.namespace"}}
},
{
name: "KUBELESS_CONFIG",
value: "kubeless-config"
},
];

local controllerContainer =
container.default("kafka-trigger-controller", "bitnami/kafka-trigger-controller:latest") +
container.imagePullPolicy("IfNotPresent");
container.imagePullPolicy("IfNotPresent") +
container.env(controllerEnv);

local kubelessLabel = {kubeless: "kafka-trigger-controller"};

Expand Down
60 changes: 34 additions & 26 deletions pkg/controller/kafka_trigger_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,11 @@ import (

kafkaApi "github.com/kubeless/kafka-trigger/pkg/apis/kubeless/v1beta1"
"github.com/kubeless/kafka-trigger/pkg/client/clientset/versioned"
"github.com/kubeless/kafka-trigger/pkg/client/informers/externalversions"
kafkaInformers "github.com/kubeless/kafka-trigger/pkg/client/informers/externalversions/kubeless/v1beta1"
"github.com/kubeless/kafka-trigger/pkg/event-consumers/kafka"
"github.com/kubeless/kafka-trigger/pkg/utils"
kubelessApi "github.com/kubeless/kubeless/pkg/apis/kubeless/v1beta1"
kubelessversioned "github.com/kubeless/kubeless/pkg/client/clientset/versioned"
kubelessexternalversion "github.com/kubeless/kubeless/pkg/client/informers/externalversions"
kubelessInformers "github.com/kubeless/kubeless/pkg/client/informers/externalversions/kubeless/v1beta1"
kubelessutils "github.com/kubeless/kubeless/pkg/utils"
)
Expand All @@ -52,15 +50,17 @@ const (
// KafkaTriggerController object
type KafkaTriggerController struct {
logger *logrus.Entry
kubelessclient versioned.Interface
kafkaclient versioned.Interface
kubelessclient kubelessversioned.Interface
kubernetesClient kubernetes.Interface
queue workqueue.RateLimitingInterface
kafkaInformer kafkaInformers.KafkaTriggerInformer
functionInformer kubelessInformers.FunctionInformer
kafkaInformer cache.SharedIndexInformer
functionInformer cache.SharedIndexInformer
}

// KafkaTriggerConfig contains config for KafkaTriggerController
type KafkaTriggerConfig struct {
KubeCli kubernetes.Interface
TriggerClient versioned.Interface
KubelessClient kubelessversioned.Interface
}
Expand All @@ -69,13 +69,16 @@ type KafkaTriggerConfig struct {
func NewKafkaTriggerController(cfg KafkaTriggerConfig) *KafkaTriggerController {
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())

kafkaSharedInformers := externalversions.NewSharedInformerFactory(cfg.TriggerClient, 0)
kafkaInformer := kafkaSharedInformers.Kubeless().V1beta1().KafkaTriggers()
config, err := kubelessutils.GetKubelessConfig(cfg.KubeCli, kubelessutils.GetAPIExtensionsClientInCluster())
if err != nil {
logrus.Fatalf("Unable to read the configmap: %s", err)
}

kubelessSharedInformers := kubelessexternalversion.NewSharedInformerFactory(cfg.KubelessClient, 0)
functionInformer := kubelessSharedInformers.Kubeless().V1beta1().Functions()
kafkaInformer := kafkaInformers.NewKafkaTriggerInformer(cfg.TriggerClient, config.Data["functions-namespace"], 0, cache.Indexers{})

kafkaInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
functionInformer := kubelessInformers.NewFunctionInformer(cfg.KubelessClient, config.Data["functions-namespace"], 0, cache.Indexers{})

kafkaInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
Expand All @@ -102,14 +105,15 @@ func NewKafkaTriggerController(cfg KafkaTriggerConfig) *KafkaTriggerController {

controller := KafkaTriggerController{
logger: logrus.WithField("controller", "kafka-trigger-controller"),
kubelessclient: cfg.TriggerClient,
kafkaclient: cfg.TriggerClient,
kubelessclient: cfg.KubelessClient,
kubernetesClient: kubelessutils.GetClient(),
kafkaInformer: kafkaInformer,
functionInformer: functionInformer,
queue: queue,
}

functionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
functionInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
controller.FunctionAddedDeletedUpdated(obj, false)
},
Expand All @@ -132,8 +136,8 @@ func (c *KafkaTriggerController) Run(stopCh <-chan struct{}) {
c.logger.Info("Starting Kafka Trigger controller")
defer c.logger.Info("Shutting down Kafka Trigger controller")

go c.kafkaInformer.Informer().Run(stopCh)
go c.functionInformer.Informer().Run(stopCh)
go c.kafkaInformer.Run(stopCh)
go c.functionInformer.Run(stopCh)

if !c.waitForCacheSync(stopCh) {
return
Expand All @@ -143,7 +147,7 @@ func (c *KafkaTriggerController) Run(stopCh <-chan struct{}) {
}

func (c *KafkaTriggerController) waitForCacheSync(stopCh <-chan struct{}) bool {
if !cache.WaitForCacheSync(stopCh, c.kafkaInformer.Informer().HasSynced, c.functionInformer.Informer().HasSynced) {
if !cache.WaitForCacheSync(stopCh, c.kafkaInformer.HasSynced, c.functionInformer.HasSynced) {
utilruntime.HandleError(fmt.Errorf("Timed out waiting for caches required for Kafka triggers controller to sync;"))
return false
}
Expand Down Expand Up @@ -188,7 +192,7 @@ func (c *KafkaTriggerController) syncKafkaTrigger(key string) error {
return err
}

obj, exists, err := c.kafkaInformer.Informer().GetIndexer().GetByKey(key)
obj, exists, err := c.kafkaInformer.GetIndexer().GetByKey(key)
if err != nil {
return fmt.Errorf("Error fetching object with key %s from store: %v", key, err)
}
Expand Down Expand Up @@ -219,16 +223,18 @@ func (c *KafkaTriggerController) syncKafkaTrigger(key string) error {
c.logger.Errorf("Failed to convert LabelSelector %v in Kafka Trigger object %s to Selector due to %v: ", triggerObj.Spec.FunctionSelector, key, err)
return err
}
functions, err := c.functionInformer.Lister().Functions(ns).List(funcSelector)
functions, err := c.kubelessclient.KubelessV1beta1().Functions(triggerObj.ObjectMeta.Namespace).List(apimachineryHelpers.ListOptions{
LabelSelector: funcSelector.String(),
})
if err != nil {
c.logger.Errorf("Failed to list associated functions with Kafka trigger %s by selector due to %s: ", key, err)
return err
}
if len(functions) == 0 {
if len(functions.Items) == 0 {
c.logger.Infof("No matching functions found for Kafka trigger %s so marking CRD object for deleteion", key)
}

for _, function := range functions {
for _, function := range functions.Items {
funcName := function.ObjectMeta.Name
err = kafka.DeleteKafkaConsumer(triggerObjName, funcName, ns, topic)
if err != nil {
Expand Down Expand Up @@ -260,16 +266,18 @@ func (c *KafkaTriggerController) syncKafkaTrigger(key string) error {
c.logger.Errorf("Failed to convert LabelSelector %v in Kafka Trigger object %s to Selector due to %v: ", triggerObj.Spec.FunctionSelector, key, err)
return err
}
functions, err := c.functionInformer.Lister().Functions(ns).List(funcSelector)
functions, err := c.kubelessclient.KubelessV1beta1().Functions(triggerObj.ObjectMeta.Namespace).List(apimachineryHelpers.ListOptions{
LabelSelector: funcSelector.String(),
})
if err != nil {
c.logger.Errorf("Failed to list associated functions with Kafka trigger %s by Selector due to %s: ", key, err)
}

if len(functions) == 0 {
if len(functions.Items) == 0 {
c.logger.Infof("No matching functions with selector %v found in namespace %s", funcSelector, ns)
}

for _, function := range functions {
for _, function := range functions.Items {
funcName := function.ObjectMeta.Name
err = kafka.CreateKafkaConsumer(triggerObjName, funcName, ns, topic, c.kubernetesClient)
if err != nil {
Expand Down Expand Up @@ -298,13 +306,13 @@ func (c *KafkaTriggerController) FunctionAddedDeletedUpdated(obj interface{}, de
}

c.logger.Infof("Processing update to function object %s Namespace: %s", functionObj.Name, functionObj.Namespace)
kafkaTriggers, err := c.kafkaInformer.Lister().KafkaTriggers(functionObj.ObjectMeta.Namespace).List(labels.Everything())
kafkaTriggers, err := c.kafkaclient.KubelessV1beta1().KafkaTriggers(functionObj.Namespace).List(apimachineryHelpers.ListOptions{})
if err != nil {
c.logger.Errorf("Failed to get list of Kafka trigger in namespace %s due to %s: ", functionObj.ObjectMeta.Namespace, err)
return
}

for _, triggerObj := range kafkaTriggers {
for _, triggerObj := range kafkaTriggers.Items {
funcSelector, err := apimachineryHelpers.LabelSelectorAsSelector(&triggerObj.Spec.FunctionSelector)
if err != nil {
c.logger.Errorf("Failed to convert LabelSelector to Selector due to %s: ", err)
Expand Down Expand Up @@ -348,7 +356,7 @@ func (c *KafkaTriggerController) kafkaTriggerHasFinalizer(triggercObj *kafkaApi.
func (c *KafkaTriggerController) kafkaTriggerObjAddFinalizer(triggercObj *kafkaApi.KafkaTrigger) error {
triggercObjClone := triggercObj.DeepCopy()
triggercObjClone.ObjectMeta.Finalizers = append(triggercObjClone.ObjectMeta.Finalizers, kafkaTriggerFinalizer)
return utils.UpdateKafkaTriggerCustomResource(c.kubelessclient, triggercObjClone)
return utils.UpdateKafkaTriggerCustomResource(c.kafkaclient, triggercObjClone)
}

func (c *KafkaTriggerController) kafkaTriggerObjRemoveFinalizer(triggercObj *kafkaApi.KafkaTrigger) error {
Expand All @@ -364,7 +372,7 @@ func (c *KafkaTriggerController) kafkaTriggerObjRemoveFinalizer(triggercObj *kaf
newSlice = nil
}
triggercObjClone.ObjectMeta.Finalizers = newSlice
err := utils.UpdateKafkaTriggerCustomResource(c.kubelessclient, triggercObjClone)
err := utils.UpdateKafkaTriggerCustomResource(c.kafkaclient, triggercObjClone)
if err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion script/libtest.bash
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ _wait_for_kubeless_controller_ready() {
}
_wait_for_kubeless_controller_logline() {
local string="${1:?}"
k8s_wait_for_pod_logline "${string}" -n kubeless -l kubeless=controller
k8s_wait_for_pod_logline "${string}" -n kubeless -l kubeless=controller -c kubeless-function-controller
}
wait_for_ingress() {
echo_info "Waiting until Nginx pod is ready ..."
Expand Down

0 comments on commit c26169f

Please sign in to comment.