diff --git a/pkg/event-consumers/kafka/kafka-consumer.go b/pkg/event-consumers/kafka/kafka-consumer.go index 5200e3bd..3cd89090 100644 --- a/pkg/event-consumers/kafka/kafka-consumer.go +++ b/pkg/event-consumers/kafka/kafka-consumer.go @@ -32,6 +32,7 @@ var ( stoppedM map[string](chan struct{}) consumerM map[string]bool brokers string + logLevel string config *cluster.Config ) @@ -40,6 +41,15 @@ func init() { stoppedM = make(map[string](chan struct{})) consumerM = make(map[string]bool) + //logrus initialization + // taking log level from env var + logLevel = os.Getenv("KUBELESS_LOG_LEVEL") + if logLevel == "DEBUG" { + logrus.SetLevel(logrus.DebugLevel) + } else { + logrus.SetLevel(logrus.InfoLevel) + } + // Init config // taking brokers from env var brokers = os.Getenv("KAFKA_BROKERS") @@ -92,8 +102,8 @@ func createConsumerProcess(broker, topic, funcName, ns, consumerGroupID string, select { case msg, more := <-consumer.Messages(): if more { - logrus.Infof("Received Kafka message Partition: %d Offset: %d Key: %s Value: %s ", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) - logrus.Infof("Sending message %s to function %s", msg, funcName) + logrus.Debugf("Received Kafka message Partition: %d Offset: %d Key: %s Value: %s ", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) + logrus.Debugf("Sending message %s to function %s", msg, funcName) consumer.MarkOffset(msg, "") go func() { req, err := utils.GetHTTPReq(clientset, funcName, ns, "kafkatriggers.kubeless.io", "POST", string(msg.Value))