From 0e9b19df54d8f273d296971dc92c5120e50312b3 Mon Sep 17 00:00:00 2001 From: Andrew Davis <1709934+Savid@users.noreply.github.com> Date: Fri, 3 Mar 2023 12:10:48 +1000 Subject: [PATCH] fix(mimicry): peer stopping --- pkg/mimicry/coordinator/xatu/peer/peer.go | 32 ++++++++++++++++++----- pkg/mimicry/coordinator/xatu/xatu.go | 14 ++++++++-- 2 files changed, 38 insertions(+), 8 deletions(-) diff --git a/pkg/mimicry/coordinator/xatu/peer/peer.go b/pkg/mimicry/coordinator/xatu/peer/peer.go index 6be74903..530d82d9 100644 --- a/pkg/mimicry/coordinator/xatu/peer/peer.go +++ b/pkg/mimicry/coordinator/xatu/peer/peer.go @@ -2,6 +2,8 @@ package xatu import ( "context" + "errors" + "sync" "time" "github.com/avast/retry-go/v4" @@ -19,6 +21,7 @@ type Peer struct { retryDelay time.Duration stopped bool + mu sync.Mutex Record *xatu.CoordinatedNodeRecord } @@ -44,6 +47,18 @@ func (p *Peer) Start(ctx context.Context) error { _ = retry.Do( func() error { + p.mu.Lock() + + // return unrecoverable error if peer has been stopped + if p.stopped { + p.log.Debug("peer stopped") + p.mu.Unlock() + + return retry.Unrecoverable(errors.New("peer stopped")) + } + + p.mu.Unlock() + peer, err := execution.New(ctx, p.log, p.Record.NodeRecord, p.handlers, p.cache) if err != nil { return err @@ -70,17 +85,16 @@ func (p *Peer) Start(ctx context.Context) error { return response }, retry.Attempts(0), - // TODO: currently a bug when Attempts(0) is set, this will never be called - // https://github.com/avast/retry-go/issues/66 - // also looks like this leaks goroutines - // retry.RetryIf(func(err error) bool { - // return !p.stopped - // }), retry.DelayType(func(n uint, err error, config *retry.Config) time.Duration { + p.mu.Lock() + defer p.mu.Unlock() + p.log.WithError(err).Debug("peer failed") if !p.stopped { p.Record.ConnectionAttempts++ + } else { + return 0 } return p.retryDelay @@ -92,11 +106,17 @@ func (p *Peer) Start(ctx context.Context) error { } func (p *Peer) Stop() error { + p.mu.Lock() + defer p.mu.Unlock() + p.stopped = true return nil } func (p *Peer) RetryDelay(delay time.Duration) { + p.mu.Lock() + defer p.mu.Unlock() + p.retryDelay = delay } diff --git a/pkg/mimicry/coordinator/xatu/xatu.go b/pkg/mimicry/coordinator/xatu/xatu.go index f718269b..64cee159 100644 --- a/pkg/mimicry/coordinator/xatu/xatu.go +++ b/pkg/mimicry/coordinator/xatu/xatu.go @@ -3,6 +3,7 @@ package xatu import ( "context" "errors" + "sync" "time" "github.com/ethpandaops/xatu/pkg/mimicry/coordinator/cache" @@ -22,7 +23,9 @@ type Xatu struct { cache *cache.SharedCache coordinator *xatuCoordinator.Coordinator - peers map[string]*xatuPeer.Peer + + mu sync.Mutex + peers map[string]*xatuPeer.Peer metrics *Metrics } @@ -48,6 +51,7 @@ func New(name string, config *xatuCoordinator.Config, handlers *handler.Peer, lo log: log, cache: cache.NewSharedCache(), coordinator: coordinator, + mu: sync.Mutex{}, peers: make(map[string]*xatuPeer.Peer), metrics: NewMetrics("xatu_mimicry_coordinator_xatu"), }, nil @@ -81,6 +85,9 @@ func (x *Xatu) startCrons(ctx context.Context) error { c := gocron.NewScheduler(time.Local) if _, err := c.Every("5s").Do(func() { + x.mu.Lock() + defer x.mu.Unlock() + connectedPeers := 0 connectionAttempts := 0 for _, peer := range x.peers { @@ -96,7 +103,10 @@ func (x *Xatu) startCrons(ctx context.Context) error { return err } - if _, err := c.Every("15s").Do(func() { + if _, err := c.Every("5m").Do(func() { + x.mu.Lock() + defer x.mu.Unlock() + var records []*xatupb.CoordinatedNodeRecord for _, peer := range x.peers { records = append(records, peer.Record)