Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Add kafka topic to the kubeless func request headers (#29)
Browse files Browse the repository at this point in the history
* Add kafka topic to the kubeless func request headers

* Update NodeJS in tests

Co-authored-by: Jonathan Whitaker <[email protected]>
Co-authored-by: Andres Martinez Gotor <[email protected]>
  • Loading branch information
3 people authored Jul 15, 2020
1 parent 33dd9dc commit 1418a10
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 6 deletions.
2 changes: 1 addition & 1 deletion examples/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ pubsub-python36-verify:

pubsub-nodejs:
kubeless topic create s3-nodejs || true
kubeless function deploy pubsub-nodejs --runtime nodejs6 --handler pubsub-nodejs.handler --from-file nodejs/hellowithdata.js
kubeless function deploy pubsub-nodejs --runtime nodejs12 --handler pubsub-nodejs.handler --from-file nodejs/hellowithdata.js
kubeless trigger kafka create pubsub-nodejs --function-selector created-by=kubeless,function=pubsub-nodejs --trigger-topic s3-nodejs

pubsub-nodejs-verify:
Expand Down
2 changes: 1 addition & 1 deletion pkg/event-consumers/kafka/kafka-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func createConsumerProcess(brokers, topic, funcName, ns, consumerGroupID string,
logrus.Debugf("Sending message %v to function %s", msg, funcName)
consumer.MarkOffset(msg, "")
go func() {
req, err := utils.GetHTTPReq(clientset, funcName, ns, "kafkatriggers.kubeless.io", "POST", string(msg.Value))
req, err := utils.GetHTTPReq(clientset, funcName, topic, ns, "kafkatriggers.kubeless.io", "POST", string(msg.Value))
if err != nil {
logrus.Errorf("Unable to elaborate request: %v", err)
} else {
Expand Down
3 changes: 2 additions & 1 deletion pkg/utils/event_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func GetFunctionPort(clientset kubernetes.Interface, namespace, functionName str
}

// GetHTTPReq returns the http request object that can be used to send a event with payload to function service
func GetHTTPReq(clientset kubernetes.Interface, funcName, namespace, eventNamespace, method, body string) (*http.Request, error) {
func GetHTTPReq(clientset kubernetes.Interface, funcName, kafkaTopic, namespace, eventNamespace, method, body string) (*http.Request, error) {

funcPort, err := GetFunctionPort(clientset, namespace, funcName)
if err != nil {
Expand All @@ -65,6 +65,7 @@ func GetHTTPReq(clientset kubernetes.Interface, funcName, namespace, eventNamesp
req.Header.Add("event-id", eventID)
req.Header.Add("event-time", timestamp.String())
req.Header.Add("event-namespace", eventNamespace)
req.Header.Add("event-topic", kafkaTopic)
if IsJSON(body) {
req.Header.Add("Content-Type", "application/json")
req.Header.Add("event-type", "application/json")
Expand Down
12 changes: 9 additions & 3 deletions pkg/utils/event_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
"io/ioutil"
"testing"

"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes/fake"
)
Expand All @@ -40,7 +40,7 @@ func TestGetHTTPRequest(t *testing.T) {
},
}
clientset := fake.NewSimpleClientset(&svc)
req, err := GetHTTPReq(clientset, "foo", "myns", "kafkatriggers.kubeless.io", "POST", "my msg")
req, err := GetHTTPReq(clientset, "foo", "mytopic", "myns", "kafkatriggers.kubeless.io", "POST", "my msg")
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand Down Expand Up @@ -72,6 +72,9 @@ func TestGetHTTPRequest(t *testing.T) {
if req.Header.Get("event-namespace") != "kafkatriggers.kubeless.io" {
t.Errorf("Unexpected event-type %s", req.Header.Get("event-type"))
}
if req.Header.Get("event-topic") != "mytopic" {
t.Errorf("Unexpected event-topic %s", req.Header.Get("event-topic"))
}
}

func TestGetJSONHTTPRequest(t *testing.T) {
Expand All @@ -90,7 +93,7 @@ func TestGetJSONHTTPRequest(t *testing.T) {
},
}
clientset := fake.NewSimpleClientset(&svc)
req, err := GetHTTPReq(clientset, "foo", "myns", "kafkatriggers.kubeless.io", "POST", `{"hello": "world"}`)
req, err := GetHTTPReq(clientset, "foo", "mytopic", "myns", "kafkatriggers.kubeless.io", "POST", `{"hello": "world"}`)
if err != nil {
t.Errorf("Unexpected error %v", err)
}
Expand All @@ -100,6 +103,9 @@ func TestGetJSONHTTPRequest(t *testing.T) {
if req.Header.Get("event-type") != "application/json" {
t.Errorf("Unexpected event-type %s", req.Header.Get("event-type"))
}
if req.Header.Get("event-topic") != "mytopic" {
t.Errorf("Unexpected event-topic %s", req.Header.Get("event-topic"))
}
}

func TestIsJSON(t *testing.T) {
Expand Down

0 comments on commit 1418a10

Please sign in to comment.