diff --git a/pkg/event-consumers/kafka/kafka-consumer.go b/pkg/event-consumers/kafka/kafka-consumer.go index 0687bd82..5200e3bd 100644 --- a/pkg/event-consumers/kafka/kafka-consumer.go +++ b/pkg/event-consumers/kafka/kafka-consumer.go @@ -94,19 +94,21 @@ func createConsumerProcess(broker, topic, funcName, ns, consumerGroupID string, if more { logrus.Infof("Received Kafka message Partition: %d Offset: %d Key: %s Value: %s ", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value)) logrus.Infof("Sending message %s to function %s", msg, funcName) - req, err := utils.GetHTTPReq(clientset, funcName, ns, "kafkatriggers.kubeless.io", "POST", string(msg.Value)) - if err != nil { - logrus.Errorf("Unable to elaborate request: %v", err) - } else { - //forward msg to function - err = utils.SendMessage(req) + consumer.MarkOffset(msg, "") + go func() { + req, err := utils.GetHTTPReq(clientset, funcName, ns, "kafkatriggers.kubeless.io", "POST", string(msg.Value)) if err != nil { - logrus.Errorf("Failed to send message to function: %v", err) + logrus.Errorf("Unable to elaborate request: %v", err) } else { - logrus.Infof("Message has sent to function %s successfully", funcName) + //forward msg to function + err = utils.SendMessage(req) + if err != nil { + logrus.Errorf("Failed to send message to function: %v", err) + } else { + logrus.Infof("Message has sent to function %s successfully", funcName) + } } - consumer.MarkOffset(msg, "") - } + }() } case ntf, more := <-consumer.Notifications(): if more {