Skip to content

Commit

Permalink
feat: Kraken provider to WS (#580) (#583)
Browse files Browse the repository at this point in the history
  • Loading branch information
mergify[bot] authored Feb 25, 2022
1 parent d596237 commit dbef858
Show file tree
Hide file tree
Showing 11 changed files with 434 additions and 178 deletions.
1 change: 1 addition & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#502](https://github.com/umee-network/umee/pull/502) Faulty provider detection: discard prices that are not within 2𝜎 of others.
- [#551](https://github.com/umee-network/umee/pull/551) Update Binance provider to use WebSocket.
- [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket.
- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket.

### Bug Fixes

Expand Down
6 changes: 5 additions & 1 deletion price-feeder/oracle/oracle.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ func (o *Oracle) getOrSetProvider(ctx context.Context, providerName string) (pro
priceProvider = binanceProvider

case config.ProviderKraken:
priceProvider = provider.NewKrakenProvider()
krakenProvider, err := provider.NewKrakenProvider(ctx, o.logger, o.providerPairs[config.ProviderKraken]...)
if err != nil {
return nil, err
}
priceProvider = krakenProvider

case config.ProviderOsmosis:
priceProvider = provider.NewOsmosisProvider()
Expand Down
22 changes: 13 additions & 9 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@ import (
)

const (
binanceHost = "stream.binance.com:9443"
binancePath = "/ws/umeestream"
binanceConnectionTime = time.Hour * 23 // should be < 24
binanceReconnectTime = time.Minute * 15
binanceHost = "stream.binance.com:9443"
binancePath = "/ws/umeestream"
)

var _ Provider = (*BinanceProvider)(nil)
Expand Down Expand Up @@ -106,7 +104,7 @@ func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[stri
func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) {
ticker, ok := p.tickers[key]
if !ok {
return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", key)
return TickerPrice{}, fmt.Errorf("binance provider failed to get ticker price for %s", key)
}

return ticker.toTickerPrice()
Expand Down Expand Up @@ -154,7 +152,7 @@ func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error {
}

func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
reconnectTicker := time.NewTicker(binanceConnectionTime)
reconnectTicker := time.NewTicker(defaultMaxConnectionTime)
defer reconnectTicker.Stop()

for {
Expand Down Expand Up @@ -204,16 +202,22 @@ func (p *BinanceProvider) reconnect() error {
return p.subscribeTickers(p.subscribedPairs...)
}

// keepReconnecting keeps trying to reconnect if an error occurs in recconnect
// keepReconnecting keeps trying to reconnect if an error occurs in recconnect.
func (p *BinanceProvider) keepReconnecting() {
reconnectTicker := time.NewTicker(binanceConnectionTime)
reconnectTicker := time.NewTicker(defaultReconnectTime)
defer reconnectTicker.Stop()
connectionTries := 1

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("binance provider error recconecting at " + time.String())
p.logger.Err(err).Msgf("binance provider attempted to reconnect %d times at %s", connectionTries, time.String())
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("binance provider failed to reconnect %d times", connectionTries)
}
connectionTries++
return
}
}
Expand Down
2 changes: 1 addition & 1 deletion price-feeder/oracle/provider/binance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestBinanceProvider_GetTickerPrices(t *testing.T) {
t.Run("invalid_request_invalid_ticker", func(t *testing.T) {
prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"})
require.Error(t, err)
require.Equal(t, "failed to get ticker price for FOOBAR", err.Error())
require.Equal(t, "binance provider failed to get ticker price for FOOBAR", err.Error())
require.Nil(t, prices)
})
}
5 changes: 3 additions & 2 deletions price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type (
}
)

// NewHuobiProvider returns a new Huobi provider with the WS connection and msg handler.
func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.CurrencyPair) (*HuobiProvider, error) {
wsURL := url.URL{
Scheme: "wss",
Expand Down Expand Up @@ -220,7 +221,7 @@ func (p *HuobiProvider) reconnect() error {
return p.subscribeTickers(p.subscribedPairs...)
}

// GetTickerPrices returns the tickerPrices based on the saved map
// GetTickerPrices returns the tickerPrices based on the saved map.
func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) {
tickerPrices := make(map[string]TickerPrice, len(pairs))

Expand All @@ -238,7 +239,7 @@ func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string
func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) {
ticker, ok := p.tickers[getChannelTicker(cp)]
if !ok {
return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", cp.String())
return TickerPrice{}, fmt.Errorf("huobi provider failed to get ticker price for %s", cp.String())
}

return ticker.toTickerPrice()
Expand Down
2 changes: 1 addition & 1 deletion price-feeder/oracle/provider/huobi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func TestHuobiProvider_GetTickerPrices(t *testing.T) {
t.Run("invalid_request_invalid_ticker", func(t *testing.T) {
prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"})
require.Error(t, err)
require.Equal(t, "failed to get ticker price for FOOBAR", err.Error())
require.Equal(t, "huobi provider failed to get ticker price for FOOBAR", err.Error())
require.Nil(t, prices)
})
}
Loading

0 comments on commit dbef858

Please sign in to comment.