Skip to content

Commit

Permalink
Merge pull request #90 from ethpandaops/fix/mimicry-leak
Browse files Browse the repository at this point in the history
fix(mimicry): peer stop leak
  • Loading branch information
Savid authored Mar 3, 2023
2 parents 27763f1 + 0e9b19d commit 822f35e
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 8 deletions.
32 changes: 26 additions & 6 deletions pkg/mimicry/coordinator/xatu/peer/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package xatu

import (
"context"
"errors"
"sync"
"time"

"github.com/avast/retry-go/v4"
Expand All @@ -19,6 +21,7 @@ type Peer struct {
retryDelay time.Duration

stopped bool
mu sync.Mutex

Record *xatu.CoordinatedNodeRecord
}
Expand All @@ -44,6 +47,18 @@ func (p *Peer) Start(ctx context.Context) error {

_ = retry.Do(
func() error {
p.mu.Lock()

// return unrecoverable error if peer has been stopped
if p.stopped {
p.log.Debug("peer stopped")
p.mu.Unlock()

return retry.Unrecoverable(errors.New("peer stopped"))
}

p.mu.Unlock()

peer, err := execution.New(ctx, p.log, p.Record.NodeRecord, p.handlers, p.cache)
if err != nil {
return err
Expand All @@ -70,17 +85,16 @@ func (p *Peer) Start(ctx context.Context) error {
return response
},
retry.Attempts(0),
// TODO: currently a bug when Attempts(0) is set, this will never be called
// https://github.com/avast/retry-go/issues/66
// also looks like this leaks goroutines
// retry.RetryIf(func(err error) bool {
// return !p.stopped
// }),
retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration {
p.mu.Lock()
defer p.mu.Unlock()

p.log.WithError(err).Debug("peer failed")

if !p.stopped {
p.Record.ConnectionAttempts++
} else {
return 0
}

return p.retryDelay
Expand All @@ -92,11 +106,17 @@ func (p *Peer) Start(ctx context.Context) error {
}

func (p *Peer) Stop() error {
p.mu.Lock()
defer p.mu.Unlock()

p.stopped = true

return nil
}

func (p *Peer) RetryDelay(delay time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()

p.retryDelay = delay
}
14 changes: 12 additions & 2 deletions pkg/mimicry/coordinator/xatu/xatu.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package xatu
import (
"context"
"errors"
"sync"
"time"

"github.com/ethpandaops/xatu/pkg/mimicry/coordinator/cache"
Expand All @@ -22,7 +23,9 @@ type Xatu struct {

cache *cache.SharedCache
coordinator *xatuCoordinator.Coordinator
peers map[string]*xatuPeer.Peer

mu sync.Mutex
peers map[string]*xatuPeer.Peer

metrics *Metrics
}
Expand All @@ -48,6 +51,7 @@ func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, lo
log: log,
cache: cache.NewSharedCache(),
coordinator: coordinator,
mu: sync.Mutex{},
peers: make(map[string]*xatuPeer.Peer),
metrics: NewMetrics("xatu_mimicry_coordinator_xatu"),
}, nil
Expand Down Expand Up @@ -81,6 +85,9 @@ func (x *Xatu) startCrons(ctx context.Context) error {
c := gocron.NewScheduler(time.Local)

if _, err := c.Every("5s").Do(func() {
x.mu.Lock()
defer x.mu.Unlock()

connectedPeers := 0
connectionAttempts := 0
for _, peer := range x.peers {
Expand All @@ -96,7 +103,10 @@ func (x *Xatu) startCrons(ctx context.Context) error {
return err
}

if _, err := c.Every("15s").Do(func() {
if _, err := c.Every("5m").Do(func() {
x.mu.Lock()
defer x.mu.Unlock()

var records []*xatupb.CoordinatedNodeRecord
for _, peer := range x.peers {
records = append(records, peer.Record)
Expand Down

0 comments on commit 822f35e

Please sign in to comment.