Skip to content
This repository has been archived by the owner on Dec 16, 2021. It is now read-only.

Commit

Permalink
Add backoff (#33)
Browse files Browse the repository at this point in the history
* Add exponential backoff when consumer function fails

* Tidy modulues

* Add BACKOFF_INTERVAL
  • Loading branch information
sepetrov authored Oct 13, 2020
1 parent 96d7cd9 commit 997d5f6
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 12 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ go 1.12

require (
github.com/Shopify/sarama v1.27.0
github.com/cenkalti/backoff/v4 v4.0.2
github.com/coreos/prometheus-operator v0.0.0-20171201110357-197eb012d973
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYU
github.com/aws/aws-sdk-go v1.16.26/go.mod h1:KmX6BPdI08NWTb3/sm4ZGu5ShLoqVDhKgpiN924inxo=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/bradfitz/go-smtpd v0.0.0-20170404230938-deb6d6237625/go.mod h1:HYsPBTaaSFSlLx/70C2HPIMNZpVV8+vt/A+FMnYP11g=
github.com/cenkalti/backoff/v4 v4.0.2 h1:JIufpQLbh4DkbQoii76ItQIUFzevQSqOLZca4eamEDs=
github.com/cenkalti/backoff/v4 v4.0.2/go.mod h1:eEew/i+1Q6OrCDZh3WiXYv3+nJwBASZ8Bog/87DQnVg=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/prometheus-operator v0.0.0-20171201110357-197eb012d973 h1:7a78CgFQnnKoQomLoxGgKMaUp7QO9amd/IrifrECbmY=
Expand Down
65 changes: 53 additions & 12 deletions pkg/event-consumers/kafka/kafka-consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,20 +22,23 @@ import (
"strconv"
"strings"
"sync"
"time"

"github.com/Shopify/sarama"
backoff "github.com/cenkalti/backoff/v4"
"github.com/sirupsen/logrus"
"k8s.io/client-go/kubernetes"

"github.com/kubeless/kafka-trigger/pkg/utils"
)

var (
stopM map[string]chan struct{}
stoppedM map[string]chan struct{}
consumerM map[string]bool
brokers string
config *sarama.Config
stopM map[string]chan struct{}
stoppedM map[string]chan struct{}
consumerM map[string]bool
brokers string
maxBackOff time.Duration
config *sarama.Config
)

const clientID = "kubeless-kafka-trigger-controller"
Expand All @@ -58,6 +61,14 @@ func init() {
brokers = defaultBrokers
}

if s := os.Getenv("BACKOFF_INTERVAL"); len(s) > 0 {
if d, err := time.ParseDuration(s); err == nil {
maxBackOff = d
} else {
logrus.Errorf("Failed to parse maximum back off interval BACKOFF_INTERVAL: %v", err)
}
}

config = sarama.NewConfig()
config.ClientID = clientID
config.Version = sarama.V0_10_2_0 // Min supported version for consumer groups.
Expand Down Expand Up @@ -101,7 +112,7 @@ func createConsumerProcess(topic, funcName, ns, consumerGroupID string, clientse
}

ready := make(chan struct{})
consumer := NewConsumer(funcName, funcPort, ns, clientset, ready)
consumer := NewConsumer(funcName, funcPort, ns, clientset, ready, maxBackOff)
errchan := group.Errors()

go func() {
Expand Down Expand Up @@ -182,16 +193,18 @@ type Consumer struct {
ns string
clientset kubernetes.Interface
ready chan struct{}
backoff time.Duration
}

// NewConsumer returns new consumer.
func NewConsumer(funcName string, funcPort int, ns string, clientset kubernetes.Interface, ready chan struct{}) *Consumer {
func NewConsumer(funcName string, funcPort int, ns string, clientset kubernetes.Interface, ready chan struct{}, backoff time.Duration) *Consumer {
return &Consumer{
clientset: clientset,
funcName: funcName,
funcPort: funcPort,
ns: ns,
ready: ready,
backoff: backoff,
}
}

Expand All @@ -214,20 +227,48 @@ func (c *Consumer) Cleanup(sarama.ConsumerGroupSession) error {

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (c *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
b := getBackOff(c.backoff)

for msg := range claim.Messages() {
req, err := utils.GetHTTPReq(c.funcName, c.funcPort, msg.Topic, c.ns, kafkatriggersNamespace, "POST", string(msg.Value))
if err != nil {
logrus.Errorf("Unable to elaborate request (namespace = %v function = %v topic = %v partition = %v offset = %v): %v", c.ns, c.funcName, msg.Topic, msg.Partition, msg.Offset, err)
continue
}

if err = utils.SendMessage(req); err != nil {
logrus.Errorf("Failed to send message (namespace = %v function = %v topic = %v partition = %v offset = %v): %v", c.ns, c.funcName, msg.Topic, msg.Partition, msg.Offset, err)
} else {
logrus.Infof("Message sent successfully (namespace = %v function = %v topic = %v partition = %v offset = %v)", c.ns, c.funcName, msg.Topic, msg.Partition, msg.Offset)
err = utils.SendMessage(req)
session.MarkMessage(msg, "")

if err != nil {
d := b.NextBackOff()
logrus.Errorf("Failed to send message (namespace = %v function = %v topic = %v partition = %v offset = %v): %v: backing off for %v", c.ns, c.funcName, msg.Topic, msg.Partition, msg.Offset, err, d)
time.Sleep(d)
continue
}

session.MarkMessage(msg, "")
logrus.Infof("Message sent successfully (namespace = %v function = %v topic = %v partition = %v offset = %v)", c.ns, c.funcName, msg.Topic, msg.Partition, msg.Offset)
b.Reset()
}
return nil
}

type backOff interface {
NextBackOff() time.Duration
Reset()
}

type noopBackOff struct{}

func (noopBackOff) NextBackOff() time.Duration { return 0 }
func (noopBackOff) Reset() {}

func getBackOff(maxBackOff time.Duration) backOff {
if maxBackOff < 1 {
return noopBackOff{}
}

b := backoff.NewExponentialBackOff()
b.MaxElapsedTime = 0 // ... so that b.NextBackOff() never returns backoff.Stop.
b.MaxInterval = maxBackOff
return b
}

0 comments on commit 997d5f6

Please sign in to comment.