Skip to content

Commit

Permalink
Merge pull request #24 from zerodha/perf
Browse files Browse the repository at this point in the history
Add several performance optimisations and fixes to the control flow.
  • Loading branch information
knadh authored Jul 3, 2024
2 parents 3e082b8 + e5c601e commit 43e191d
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func (f *TestFilter) IsAllowed(msg []byte) bool {
* Copy this plugin code to a directory. `mkdir testfilter && cp sample.go testfilter`
* Build the plugin. `CGO_ENABLED=1 go build -a -ldflags="-s -w" -buildmode=plugin -o testfilter.filter sample.go`
* Change the config.toml to add the filter provider config.
* Run kaf-relay with the filter pluing. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`
* Run kaf-relay with the filter plugin. `./kaf-relay.bin --mode single --stop-at-end --filter ./testfilter/testfilter.filter`

## Metrics

Expand Down
2 changes: 2 additions & 0 deletions config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ source_topic2 = "target_topic2"
[source_pool]
# Kafka client config common to all upstream sources ([[sources]]).
initial_offset = "start"
# Static memmbership to pin the member for the consumer group for respawn / reconnect and fence other members from connecting using the same id.
instance_id = "client_instance_id"
# Consumer group id.
group_id = "consumer_group"

# Frequency at which source servers are polled for health/lag.
Expand Down
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ require (
github.com/knadh/koanf/providers/posflag v0.1.0
github.com/knadh/koanf/v2 v2.0.1
github.com/spf13/pflag v1.0.5
github.com/twmb/franz-go v1.15.4
github.com/twmb/franz-go/pkg/kmsg v1.7.0
github.com/twmb/franz-go v1.17.0
github.com/twmb/franz-go/pkg/kmsg v1.8.0
)

require (
Expand All @@ -19,15 +19,15 @@ require (
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/mitchellh/reflectwalk v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pierrec/lz4/v4 v4.1.19 // indirect
github.com/pierrec/lz4/v4 v4.1.21 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
golang.org/x/sys v0.15.0 // indirect
golang.org/x/sys v0.20.0 // indirect
)

require (
github.com/VictoriaMetrics/metrics v1.23.1
github.com/klauspost/compress v1.16.7 // indirect
github.com/klauspost/compress v1.17.8 // indirect
github.com/twmb/franz-go/pkg/kadm v1.8.1
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
)
24 changes: 12 additions & 12 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY=
github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw=
github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I=
github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE=
github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU=
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/knadh/koanf/maps v0.1.1 h1:G5TjmUh2D7G2YWf5SQQqSiHRJEjaicvU0KpypqB3NIs=
github.com/knadh/koanf/maps v0.1.1/go.mod h1:npD/QZY3V6ghQDdcQzl1W4ICNVTkohC8E73eI2xW4yI=
github.com/knadh/koanf/parsers/toml v0.1.0 h1:S2hLqS4TgWZYj4/7mI5m1CQQcWurxUz6ODgOub/6LCI=
Expand All @@ -24,28 +24,28 @@ github.com/mitchellh/reflectwalk v1.0.2 h1:G2LzWKi524PWgd3mLHV8Y5k7s6XUvT0Gef6zx
github.com/mitchellh/reflectwalk v1.0.2/go.mod h1:mSTlrgnPZtwu0c4WaC2kGObEpuNDbx0jmZXqmk4esnw=
github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8=
github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/pierrec/lz4/v4 v4.1.19 h1:tYLzDnjDXh9qIxSTKHwXwOYmm9d887Y7Y1ZkyXYHAN4=
github.com/pierrec/lz4/v4 v4.1.19/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ=
github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk=
github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
github.com/twmb/franz-go v1.15.4 h1:qBCkHaiutetnrXjAUWA99D9FEcZVMt2AYwkH3vWEQTw=
github.com/twmb/franz-go v1.15.4/go.mod h1:rC18hqNmfo8TMc1kz7CQmHL74PLNF8KVvhflxiiJZCU=
github.com/twmb/franz-go v1.17.0 h1:hawgCx5ejDHkLe6IwAtFWwxi3OU4OztSTl7ZV5rwkYk=
github.com/twmb/franz-go v1.17.0/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM=
github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE=
github.com/twmb/franz-go/pkg/kadm v1.8.1/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8=
github.com/twmb/franz-go/pkg/kmsg v1.7.0 h1:a457IbvezYfA5UkiBvyV3zj0Is3y1i8EJgqjJYoij2E=
github.com/twmb/franz-go/pkg/kmsg v1.7.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
github.com/valyala/histogram v1.2.0/go.mod h1:Hb4kBwb4UxsaNbbbh+RRz8ZR6pdodR57tzWUS3BUzXY=
golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k=
golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4=
golang.org/x/crypto v0.23.0 h1:dIJU/v2J8Mdglj/8rJ6UUOM3Zc9zLZxVZwwxMooUSAI=
golang.org/x/crypto v0.23.0/go.mod h1:CKFgDieR+mRhux2Lsu27y0fO304Db0wZe70UKqHu0v8=
golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc=
golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
6 changes: 6 additions & 0 deletions internal/relay/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,8 +228,14 @@ func retryBackoff(min, max time.Duration) func(int) time.Duration {

// waitTries waits for the timer to hit for the deadline with the backoff duration.
func waitTries(ctx context.Context, waitDuration time.Duration) {
if waitDuration == 0 {
return
}

deadline := time.Now().Add(waitDuration)
after := time.NewTimer(time.Until(deadline))
defer after.Stop()

select {
case <-ctx.Done():
return
Expand Down
54 changes: 32 additions & 22 deletions internal/relay/relay.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"log/slog"
"sync"
"time"

"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -70,7 +71,6 @@ func NewRelay(cfg RelayCfg, src *SourcePool, target *Target, topics Topics, filt
// Start starts the consumer loop on kafka (A), fetch messages and relays over to kafka (B) using an async
func (re *Relay) Start(globalCtx context.Context) error {
wg := &sync.WaitGroup{}
defer wg.Wait()

// Derive a cancellable context from the global context (which captures kill signals) to use
// for subsequent connections/health tracking/retries etc.
Expand All @@ -95,41 +95,44 @@ func (re *Relay) Start(globalCtx context.Context) error {
re.log.Info("starting producer worker")
go func() {
defer wg.Done()
if err := re.target.Start(ctx); err != nil {
if err := re.target.Start(); err != nil {
re.log.Error("error starting producer worker", "err", err)
}

if ctx.Err() != context.Canceled {
cancel()
}
}()

// Start the consumer group worker by trigger a signal to the relay loop to fetch
// a consumer worker to fetch initial healthy node.
re.log.Info("starting consumer worker")
re.signalCh <- struct{}{}

wg.Add(1)
// Relay teardown.
go func() {
defer wg.Done()
// Wait till main ctx is cancelled.
<-globalCtx.Done()

// Stop consumer group.
re.source.Close()
}()

// Start the indefinite poll that asks for new connections
// and then consumes messages from them.
if err := re.startPoll(ctx); err != nil {
re.log.Error("error starting consumer worker", "err", err)
}

// Close the target/producer on exit.
re.target.CloseBatchCh()
// Close the producer inlet channel.
close(re.target.inletCh)

// Close producer.
re.target.Close()

wg.Wait()

return nil
}

// Close close the underlying kgo.Client(s)
func (re *Relay) Close() {
re.log.Debug("closing relay consumer, producer...")
re.source.Close()
re.target.Close()
}

// startPoll starts the consumer worker which polls the kafka cluster for messages.
func (re *Relay) startPoll(ctx context.Context) error {
var (
Expand Down Expand Up @@ -246,21 +249,28 @@ func (re *Relay) processMessage(ctx context.Context, rec *kgo.Record) error {
return nil
}

msg := &kgo.Record{
Key: rec.Key,
Value: rec.Value,
Topic: t.TargetTopic,
Partition: rec.Partition,
// Repurpose &kgo.Record and forward it to producer to reduce allocs.
rec.Headers = nil
rec.Timestamp = time.Time{}
rec.Topic = t.TargetTopic
if !t.AutoTargetPartition {
rec.Partition = int32(t.TargetPartition)
}
rec.Attrs = kgo.RecordAttrs{}
rec.ProducerEpoch = 0
rec.ProducerID = 0
rec.LeaderEpoch = 0
rec.Offset = 0
rec.Context = nil

// Queue the message for writing to target.
select {
case <-ctx.Done():
return ctx.Err()
case re.target.GetBatchCh() <- msg:
case re.target.GetBatchCh() <- rec:
default:
re.log.Error("target inlet channel blocked")
re.target.GetBatchCh() <- msg
re.target.GetBatchCh() <- rec
}

return nil
Expand Down
25 changes: 14 additions & 11 deletions internal/relay/source_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ func (sp *SourcePool) SetInitialOffsets(of map[string]map[int32]kgo.Offset) {
// indefinitely long to return based on the config.
func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
retries := 0
loop:
for {
select {
case <-globalCtx.Done():
Expand All @@ -155,6 +156,7 @@ func (sp *SourcePool) Get(globalCtx context.Context) (*Server, error) {
retries++
sp.log.Error("new source connection failed", "id", s.ID, "broker", s.Config.BootstrapBrokers, "error", err, "retries", retries)
waitTries(globalCtx, sp.backoffFn(retries))
continue loop
}

// Cache the current live connection internally.
Expand Down Expand Up @@ -201,18 +203,19 @@ func (sp *SourcePool) GetFetches(s *Server) (kgo.Fetches, error) {
// RecordOffsets records the offsets of the latest fetched records per topic.
// This is used to resume consumption on new connections/reconnections from the source during runtime.
func (sp *SourcePool) RecordOffsets(rec *kgo.Record) {
oMap := make(map[int32]kgo.Offset)
oMap[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
if sp.offsets != nil {
if o, ok := sp.offsets[rec.Topic]; ok {
o[rec.Partition] = oMap[rec.Partition]
sp.offsets[rec.Topic] = o
} else {
sp.offsets[rec.Topic] = oMap
}
} else {
if sp.offsets == nil {
sp.offsets = make(map[string]map[int32]kgo.Offset)
sp.offsets[rec.Topic] = oMap
}

if o, ok := sp.offsets[rec.Topic]; ok {
// If the topic already exists, update the offset for the partition.
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.offsets[rec.Topic] = o
} else {
// If the topic does not exist, create a new map for the topic.
o := make(map[int32]kgo.Offset)
o[rec.Partition] = kgo.NewOffset().At(rec.Offset + 1)
sp.offsets[rec.Topic] = o
}
}

Expand Down
32 changes: 5 additions & 27 deletions internal/relay/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,6 @@ type Target struct {
metrics *metrics.Set
log *slog.Logger

// Map of optional destination topic partitions.
topicPartitions map[string]int32

// Map of target topics and their config.
targetTopics Topics

Expand Down Expand Up @@ -66,34 +63,24 @@ func NewTarget(globalCtx context.Context, cfg TargetCfg, pCfg ProducerCfg, topic
return p, nil
}

// Close closes the kafka client.
// Close remove the producer topics from &kgo.Client.
func (tg *Target) Close() {
if tg.client != nil {
// prevent blocking on close
tg.client.PurgeTopicsFromProducing()
}
}

// CloseBatchCh closes the Producer batch channel.
func (tg *Target) CloseBatchCh() {
close(tg.inletCh)
}

// GetBatchCh returns the Producer batch channel.
func (tg *Target) GetBatchCh() chan *kgo.Record {
return tg.inletCh
}

// prepareRecord checks if custom topic partition mapping is defined.
// If required, it updates the records partition
func (tg *Target) prepareRecord(rec *kgo.Record) {
if part, ok := tg.topicPartitions[rec.Topic]; ok {
rec.Partition = part
}
}

// Start starts the blocking producer which flushes messages to the target Kafka.
func (tg *Target) Start(ctx context.Context) error {
func (tg *Target) Start() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

tick := time.NewTicker(tg.pCfg.FlushFrequency)
defer tick.Stop()

Expand All @@ -106,13 +93,6 @@ func (tg *Target) Start(ctx context.Context) error {

for {
select {
case <-ctx.Done():
if err := tg.drain(); err != nil {
return err
}

return ctx.Err()

// Queue the message to and flush if the batch size is reached.
case msg, ok := <-tg.inletCh:
if !ok {
Expand All @@ -124,7 +104,6 @@ func (tg *Target) Start(ctx context.Context) error {
return nil
}

tg.prepareRecord(msg)
tg.batch = append(tg.batch, msg)
if len(tg.batch) >= tg.pCfg.FlushBatchSize {
if err := tg.flush(ctx); err != nil {
Expand Down Expand Up @@ -250,7 +229,6 @@ outerLoop:
func (tg *Target) drain() error {
now := time.Now()
for rec := range tg.inletCh {
tg.prepareRecord(rec)
tg.batch = append(tg.batch, rec)
}

Expand Down
5 changes: 3 additions & 2 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ func main() {
log.Fatalf("error starting relay controller: %v", err)
}

metrSrv.Shutdown(globalCtx)
relay.Close()
if metrSrv != nil {
metrSrv.Shutdown(globalCtx)
}
lo.Info("bye")
}

0 comments on commit 43e191d

Please sign in to comment.