Skip to content

Commit

Permalink
FIX DATA RACE
Browse files Browse the repository at this point in the history
  • Loading branch information
swift1337 committed Jan 10, 2025
1 parent 849bdde commit c1c0078
Showing 1 changed file with 25 additions and 13 deletions.
38 changes: 25 additions & 13 deletions zetaclient/zetacore/client_subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package zetacore
import (
"context"

"cosmossdk.io/errors"
ctypes "github.com/cometbft/cometbft/types"

"github.com/zeta-chain/node/pkg/fanout"
Expand All @@ -13,7 +14,7 @@ import (
func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataNewBlock, error) {
blockSubscriber, err := c.resolveBlockSubscriber()
if err != nil {
return nil, err
return nil, errors.Wrap(err, "unable to resolve block subscriber")

Check warning on line 17 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L17

Added line #L17 was not covered by tests
}

// we need a "proxy" chan instead of directly returning blockSubscriber.Add()
Expand All @@ -26,6 +27,9 @@ func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataN
for {
select {
case <-ctx.Done():
// fixme: MEMORY LEAK: this might be dangerous because the consumer is not closed.
// Fanout will spawn "zombie" goroutines to push to the chan, but nobody is reading from it,
// Will be addressed in future orchestrator V2 PRs (not urgent as of now)

Check warning on line 32 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L30-L32

Added lines #L30 - L32 were not covered by tests
return
case block := <-consumer:
blocksChan <- block
Expand All @@ -40,18 +44,24 @@ func (c *Client) NewBlockSubscriber(ctx context.Context) (chan ctypes.EventDataN
// or subscribes to it for the first time.
func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlock], error) {
// noop
if blocksFanout, ok := c.getBlockFanoutChan(); ok {
if blocksFanout := c.blockFanOutThreadSafe(); blocksFanout != nil {
c.logger.Info().Msg("Resolved existing block subscriber")
return blocksFanout, nil
}

Check warning on line 50 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L48-L50

Added lines #L48 - L50 were not covered by tests

// we need this lock to prevent 2 Subscribe calls at the same time
c.mu.Lock()
defer c.mu.Unlock()

c.logger.Info().Msg("Subscribing to block events")

// Subscribe to comet bft events
eventsChan, err := c.cometBFTClient.Subscribe(context.Background(), "", ctypes.EventQueryNewBlock.String())
if err != nil {
return nil, err
return nil, errors.Wrap(err, "unable to subscribe to new block events")
}

Check warning on line 62 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L61-L62

Added lines #L61 - L62 were not covered by tests

c.logger.Info().Msg("Subscribed to new block events")
c.logger.Info().Msg("Subscribed to block events")

// Create block chan
blockChan := make(chan ctypes.EventDataNewBlock)
Expand All @@ -65,23 +75,25 @@ func (c *Client) resolveBlockSubscriber() (*fanout.FanOut[ctypes.EventDataNewBlo
continue

Check warning on line 75 in zetaclient/zetacore/client_subscriptions.go

View check run for this annotation

Codecov / codecov/patch

zetaclient/zetacore/client_subscriptions.go#L74-L75

Added lines #L74 - L75 were not covered by tests
}

c.logger.Info().Int64("height", newBlockEvent.Block.Height).Msg("Received new block event")

blockChan <- newBlockEvent
}
}()

// Create a fanout
// It allows a "global" chan (i.e. blockChan) to stream to multiple consumers independently.
c.mu.Lock()
defer c.mu.Unlock()
c.blocksFanout = fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer)
fo := fanout.New[ctypes.EventDataNewBlock](blockChan, fanout.DefaultBuffer)
fo.Start()

c.blocksFanout.Start()
c.blocksFanout = fo

return c.blocksFanout, nil
return fo, nil
}

func (c *Client) getBlockFanoutChan() (*fanout.FanOut[ctypes.EventDataNewBlock], bool) {
c.mu.RLock()
defer c.mu.RUnlock()
return c.blocksFanout, c.blocksFanout != nil
func (c *Client) blockFanOutThreadSafe() *fanout.FanOut[ctypes.EventDataNewBlock] {
c.mu.Lock()
defer c.mu.Unlock()

return c.blocksFanout
}

0 comments on commit c1c0078

Please sign in to comment.