From dbef8586b4d030fe6386d6b3b17500dfc6cc2dd3 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Fri, 25 Feb 2022 14:01:11 -0800 Subject: [PATCH] feat: Kraken provider to WS (#580) (#583) --- price-feeder/CHANGELOG.md | 1 + price-feeder/oracle/oracle.go | 6 +- price-feeder/oracle/provider/binance.go | 22 +- price-feeder/oracle/provider/binance_test.go | 2 +- price-feeder/oracle/provider/huobi.go | 5 +- price-feeder/oracle/provider/huobi_test.go | 2 +- price-feeder/oracle/provider/kraken.go | 409 ++++++++++++++++--- price-feeder/oracle/provider/kraken_test.go | 152 +++---- price-feeder/oracle/provider/okx.go | 4 +- price-feeder/oracle/provider/okx_test.go | 2 +- price-feeder/oracle/provider/provider.go | 7 +- 11 files changed, 434 insertions(+), 178 deletions(-) diff --git a/price-feeder/CHANGELOG.md b/price-feeder/CHANGELOG.md index 20bc68c8be..e894e8538f 100644 --- a/price-feeder/CHANGELOG.md +++ b/price-feeder/CHANGELOG.md @@ -53,6 +53,7 @@ Ref: https://keepachangelog.com/en/1.0.0/ - [#502](https://github.com/umee-network/umee/pull/502) Faulty provider detection: discard prices that are not within 2𝜎 of others. - [#551](https://github.com/umee-network/umee/pull/551) Update Binance provider to use WebSocket. - [#569](https://github.com/umee-network/umee/pull/569) Update Huobi provider to use WebSocket. +- [#580](https://github.com/umee-network/umee/pull/580) Update Kraken provider to use WebSocket. ### Bug Fixes diff --git a/price-feeder/oracle/oracle.go b/price-feeder/oracle/oracle.go index 7576a01c7b..620c9100d9 100644 --- a/price-feeder/oracle/oracle.go +++ b/price-feeder/oracle/oracle.go @@ -286,7 +286,11 @@ func (o *Oracle) getOrSetProvider(ctx context.Context, providerName string) (pro priceProvider = binanceProvider case config.ProviderKraken: - priceProvider = provider.NewKrakenProvider() + krakenProvider, err := provider.NewKrakenProvider(ctx, o.logger, o.providerPairs[config.ProviderKraken]...) + if err != nil { + return nil, err + } + priceProvider = krakenProvider case config.ProviderOsmosis: priceProvider = provider.NewOsmosisProvider() diff --git a/price-feeder/oracle/provider/binance.go b/price-feeder/oracle/provider/binance.go index 9471256fa3..5790e5050b 100644 --- a/price-feeder/oracle/provider/binance.go +++ b/price-feeder/oracle/provider/binance.go @@ -15,10 +15,8 @@ import ( ) const ( - binanceHost = "stream.binance.com:9443" - binancePath = "/ws/umeestream" - binanceConnectionTime = time.Hour * 23 // should be < 24 - binanceReconnectTime = time.Minute * 15 + binanceHost = "stream.binance.com:9443" + binancePath = "/ws/umeestream" ) var _ Provider = (*BinanceProvider)(nil) @@ -106,7 +104,7 @@ func (p *BinanceProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[stri func (p *BinanceProvider) getTickerPrice(key string) (TickerPrice, error) { ticker, ok := p.tickers[key] if !ok { - return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", key) + return TickerPrice{}, fmt.Errorf("binance provider failed to get ticker price for %s", key) } return ticker.toTickerPrice() @@ -154,7 +152,7 @@ func (p *BinanceProvider) subscribeTickers(cps ...types.CurrencyPair) error { } func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) { - reconnectTicker := time.NewTicker(binanceConnectionTime) + reconnectTicker := time.NewTicker(defaultMaxConnectionTime) defer reconnectTicker.Stop() for { @@ -204,16 +202,22 @@ func (p *BinanceProvider) reconnect() error { return p.subscribeTickers(p.subscribedPairs...) } -// keepReconnecting keeps trying to reconnect if an error occurs in recconnect +// keepReconnecting keeps trying to reconnect if an error occurs in recconnect. func (p *BinanceProvider) keepReconnecting() { - reconnectTicker := time.NewTicker(binanceConnectionTime) + reconnectTicker := time.NewTicker(defaultReconnectTime) defer reconnectTicker.Stop() + connectionTries := 1 for time := range reconnectTicker.C { if err := p.reconnect(); err != nil { - p.logger.Err(err).Msg("binance provider error recconecting at " + time.String()) + p.logger.Err(err).Msgf("binance provider attempted to reconnect %d times at %s", connectionTries, time.String()) continue } + + if connectionTries > maxReconnectionTries { + p.logger.Warn().Msgf("binance provider failed to reconnect %d times", connectionTries) + } + connectionTries++ return } } diff --git a/price-feeder/oracle/provider/binance_test.go b/price-feeder/oracle/provider/binance_test.go index 3bb0e70367..cf08637cbc 100644 --- a/price-feeder/oracle/provider/binance_test.go +++ b/price-feeder/oracle/provider/binance_test.go @@ -68,7 +68,7 @@ func TestBinanceProvider_GetTickerPrices(t *testing.T) { t.Run("invalid_request_invalid_ticker", func(t *testing.T) { prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) require.Error(t, err) - require.Equal(t, "failed to get ticker price for FOOBAR", err.Error()) + require.Equal(t, "binance provider failed to get ticker price for FOOBAR", err.Error()) require.Nil(t, prices) }) } diff --git a/price-feeder/oracle/provider/huobi.go b/price-feeder/oracle/provider/huobi.go index ab812e5004..a50817d5da 100644 --- a/price-feeder/oracle/provider/huobi.go +++ b/price-feeder/oracle/provider/huobi.go @@ -60,6 +60,7 @@ type ( } ) +// NewHuobiProvider returns a new Huobi provider with the WS connection and msg handler. func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.CurrencyPair) (*HuobiProvider, error) { wsURL := url.URL{ Scheme: "wss", @@ -220,7 +221,7 @@ func (p *HuobiProvider) reconnect() error { return p.subscribeTickers(p.subscribedPairs...) } -// GetTickerPrices returns the tickerPrices based on the saved map +// GetTickerPrices returns the tickerPrices based on the saved map. func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { tickerPrices := make(map[string]TickerPrice, len(pairs)) @@ -238,7 +239,7 @@ func (p *HuobiProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string func (p *HuobiProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) { ticker, ok := p.tickers[getChannelTicker(cp)] if !ok { - return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", cp.String()) + return TickerPrice{}, fmt.Errorf("huobi provider failed to get ticker price for %s", cp.String()) } return ticker.toTickerPrice() diff --git a/price-feeder/oracle/provider/huobi_test.go b/price-feeder/oracle/provider/huobi_test.go index 62f3dc1c63..c3f0a87e07 100644 --- a/price-feeder/oracle/provider/huobi_test.go +++ b/price-feeder/oracle/provider/huobi_test.go @@ -75,7 +75,7 @@ func TestHuobiProvider_GetTickerPrices(t *testing.T) { t.Run("invalid_request_invalid_ticker", func(t *testing.T) { prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) require.Error(t, err) - require.Equal(t, "failed to get ticker price for FOOBAR", err.Error()) + require.Equal(t, "huobi provider failed to get ticker price for FOOBAR", err.Error()) require.Nil(t, prices) }) } diff --git a/price-feeder/oracle/provider/kraken.go b/price-feeder/oracle/provider/kraken.go index 13fc7c9785..7d732d5548 100644 --- a/price-feeder/oracle/provider/kraken.go +++ b/price-feeder/oracle/provider/kraken.go @@ -1,20 +1,23 @@ package provider import ( + "context" "encoding/json" "fmt" - "io/ioutil" - "net/http" + "net/url" "strings" + "sync" "time" - sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/gorilla/websocket" + "github.com/rs/zerolog" "github.com/umee-network/umee/price-feeder/oracle/types" ) const ( - krakenBaseURL = "https://api.kraken.com" - krakenTickerEndpoint = "/0/public/Ticker" + krakenHost = "ws.kraken.com" + krakenEventSystemStatus = "systemStatus" + krakenEventSubscriptionStatus = "subscriptionStatus" ) var _ Provider = (*KrakenProvider)(nil) @@ -23,100 +26,376 @@ type ( // KrakenProvider defines an Oracle provider implemented by the Kraken public // API. // - // REF: https://docs.kraken.com/rest/ + // REF: https://docs.kraken.com/websockets/#overview KrakenProvider struct { - baseURL string - client *http.Client + wsURL url.URL + wsClient *websocket.Conn + logger zerolog.Logger + mtx sync.Mutex + tickers map[string]TickerPrice // Symbol => TickerPrice + subscribedPairs map[string]types.CurrencyPair // Symbol => types.CurrencyPair } - // KrakenTickerPair defines the structure returned from Kraken for a ticker query. - // - // Note, we only care about 'c', which is the last trade closed [, ] - // and 'v', the volume. - KrakenTickerPair struct { - C []string `json:"c"` - V []string `json:"v"` + // KrakenTicker ticker price response from Kraken ticker channel. + // https://docs.kraken.com/websockets/#message-ticker + KrakenTicker struct { + C []string `json:"c"` // Close with Price in the first position + V []string `json:"v"` // Volume with the value over last 24 hours in the second position + } + + // KrakenSubscriptionMsg Msg to subscribe to all the pairs at once. + KrakenSubscriptionMsg struct { + Event string `json:"event"` // subscribe/unsubscribe + Pair []string `json:"pair"` // Array of currency pairs ex.: "BTC/USDT", + Subscription KrakenSubscriptionChannel `json:"subscription"` // subscription object + } + + // KrakenSubscriptionChannel Msg with the channel name to be subscribed. + KrakenSubscriptionChannel struct { + Name string `json:"name"` // channel to be subscribed ex.: ticker + } + + // KrakenEvent wraps the possible events from the provider. + KrakenEvent struct { + Event string `json:"event"` // events from kraken ex.: systemStatus | subscriptionStatus + } + + // KrakenEventSystemStatus parse the systemStatus event message. + KrakenEventSystemStatus struct { + Status string `json:"status"` // online|maintenance|cancel_only|limit_only|post_only } - // KrakenTickerResponse defines the response structure of a Kraken ticker request. - // The response may contain one or more tickers. - KrakenTickerResponse struct { - Error []interface{} - Result map[string]KrakenTickerPair + // KrakenEventSubscriptionStatus parse the subscriptionStatus event message. + KrakenEventSubscriptionStatus struct { + Status string `json:"status"` // subscribed|unsubscribed|error + Pair string `json:"pair"` // Pair symbol base/quote ex.: "XBT/USD" + ErrorMessage string `json:"errorMessage"` // error description } ) -func NewKrakenProvider() *KrakenProvider { - return &KrakenProvider{ - baseURL: krakenBaseURL, - client: newDefaultHTTPClient(), +// NewKrakenProvider returns a new Kraken provider with the WS connection and msg handler. +func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.CurrencyPair) (*KrakenProvider, error) { + wsURL := url.URL{ + Scheme: "wss", + Host: krakenHost, + } + + wsConn, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil) + if err != nil { + return nil, fmt.Errorf("error connecting to Kraken websocket: %w", err) + } + + provider := &KrakenProvider{ + wsURL: wsURL, + wsClient: wsConn, + logger: logger.With().Str("module", "oracle").Logger(), + tickers: map[string]TickerPrice{}, + subscribedPairs: map[string]types.CurrencyPair{}, + } + + if err := provider.SubscribeTickers(pairs...); err != nil { + return nil, err + } + + go provider.handleWebSocketMsgs(ctx) + + return provider, nil +} + +// GetTickerPrices returns the tickerPrices based on the saved map. +func (p *KrakenProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { + tickerPrices := make(map[string]TickerPrice, len(pairs)) + + for _, cp := range pairs { + key := cp.String() + tickerPrice, ok := p.tickers[key] + if !ok { + return nil, fmt.Errorf("kraken provider failed to get ticker price for %s", key) + } + tickerPrices[key] = tickerPrice + } + + return tickerPrices, nil +} + +// SubscribeTickers subscribe to all currency pairs and +// add the new ones into the provider subscribed pairs. +func (p *KrakenProvider) SubscribeTickers(cps ...types.CurrencyPair) error { + pairs := make([]string, len(cps)) + + for i, cp := range cps { + pairs[i] = currencyPairToKrakenPair(cp) } + + if err := p.subscribePairs(pairs...); err != nil { + return err + } + + p.setSubscribedPairs(cps...) + return nil } -func NewKrakenProviderWithTimeout(timeout time.Duration) *KrakenProvider { - return &KrakenProvider{ - baseURL: krakenBaseURL, - client: newHTTPClientWithTimeout(timeout), +// handleWebSocketMsgs receive all the messages from the provider +// and controls to reconnect the web socket. +func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) { + reconnectTicker := time.NewTicker(defaultMaxConnectionTime) + defer reconnectTicker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-time.After(defaultReadNewWSMessage): + messageType, bz, err := p.wsClient.ReadMessage() + if err != nil { + // if some error occurs continue to try to read the next message + p.logger.Err(err).Msg("kraken provider could not read message") + if err := p.ping(); err != nil { + p.logger.Err(err).Msg("failed to send ping") + p.keepReconnecting() + } + continue + } + + if len(bz) == 0 { + continue + } + + p.messageReceived(messageType, bz) + + case <-reconnectTicker.C: + if err := p.reconnect(); err != nil { + p.logger.Err(err).Msg("kraken provider attempted to reconnect") + p.keepReconnecting() + } + } } } -func (p KrakenProvider) GetTickerPrices(pairs ...types.CurrencyPair) (map[string]TickerPrice, error) { - tickers := make([]string, len(pairs)) - for i, cp := range pairs { - tickers[i] = cp.String() +// messageReceived handles any message sent by the provider. +func (p *KrakenProvider) messageReceived(messageType int, bz []byte) { + if messageType != websocket.TextMessage { + return } - path := fmt.Sprintf("%s%s?pair=%s", p.baseURL, krakenTickerEndpoint, strings.Join(tickers, ",")) + var krakenEvent KrakenEvent + if err := json.Unmarshal(bz, &krakenEvent); err != nil { + p.logger.Debug().Msg("kraken provider received a message that is not an event") + // msg is not an event, it will try to marshal to ticker message + p.messageReceivedTickerPrice(bz) + return + } - resp, err := p.client.Get(path) - if err != nil { - return nil, fmt.Errorf("failed to make Kraken request: %w", err) + switch krakenEvent.Event { + case krakenEventSystemStatus: + p.messageReceivedSystemStatus(bz) + return + case krakenEventSubscriptionStatus: + p.messageReceivedSubscriptionStatus(bz) + return + } +} + +// messageReceivedTickerPrice handles the ticker price msg. +func (p *KrakenProvider) messageReceivedTickerPrice(bz []byte) { + // the provider response is an array with different types at each index + // kraken documentation https://docs.kraken.com/websockets/#message-ticker + var tickerMessage []interface{} + if err := json.Unmarshal(bz, &tickerMessage); err != nil { + p.logger.Err(err).Msg("Kraken provider could not unmarshal") + return + } + + if len(tickerMessage) != 4 { + p.logger.Debug().Msg("Kraken provider sent something different than ticker") + return } - defer resp.Body.Close() + channelName, ok := tickerMessage[2].(string) + if !ok || channelName != "ticker" { + p.logger.Debug().Msg("Kraken provider sent an unexpected channel name") + return + } - bz, err := ioutil.ReadAll(resp.Body) + tickerBz, err := json.Marshal(tickerMessage[1]) if err != nil { - return nil, fmt.Errorf("failed to read Kraken response body: %w", err) + p.logger.Err(err).Msg("Kraken provider could not marshal ticker message") + return + } + + var krakenTicker KrakenTicker + if err := json.Unmarshal(tickerBz, &krakenTicker); err != nil { + p.logger.Err(err).Msg("Kraken provider could not unmarshal ticker") + return } - var tickerResp KrakenTickerResponse - if err := json.Unmarshal(bz, &tickerResp); err != nil { - return nil, fmt.Errorf("failed to unmarshal Kraken response body: %w", err) + krakenPair, ok := tickerMessage[3].(string) + if !ok { + p.logger.Debug().Msg("Kraken provider sent an unexpected pair") + return } - if len(tickerResp.Error) != 0 { - return nil, fmt.Errorf("received unexpected error from Kraken response: %v", tickerResp.Error) + krakenPair = normalizeKrakenBTCPair(krakenPair) + currencyPairSymbol := krakenPairToCurrencyPairSymbol(krakenPair) + + tickerPrice, err := krakenTicker.toTickerPrice(currencyPairSymbol) + if err != nil { + p.logger.Err(err).Msg("Kraken provider could not parse kraken ticker to ticker price") + return + } + + p.setTickerPair(currencyPairSymbol, tickerPrice) +} + +// reconnect closes the last WS connection and create a new one. +func (p *KrakenProvider) reconnect() error { + p.wsClient.Close() + + wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil) + if err != nil { + return fmt.Errorf("error connecting to Kraken websocket: %w", err) } + p.wsClient = wsConn - if len(tickers) != len(tickerResp.Result) { - return nil, fmt.Errorf( - "received unexpected number of tickers; expected: %d, got: %d", - len(tickers), len(tickerResp.Result), - ) + pairs := make([]string, len(p.subscribedPairs)) + iterator := 0 + for _, cp := range p.subscribedPairs { + pairs[iterator] = currencyPairToKrakenPair(cp) + iterator++ } - tickerPrices := make(map[string]TickerPrice, len(tickers)) - for _, t := range tickers { - // TODO: We may need to transform 't' prior to looking it up in the response - // as Kraken may represent currencies differently. - pair, ok := tickerResp.Result[t] - if !ok { - return nil, fmt.Errorf("failed to find ticker in Kraken response: %s", t) - } + return p.subscribePairs(pairs...) +} + +// keepReconnecting keeps trying to reconnect if an error occurs in recconnect. +func (p *KrakenProvider) keepReconnecting() { + reconnectTicker := time.NewTicker(defaultReconnectTime) + defer reconnectTicker.Stop() + connectionTries := 1 - price, err := sdk.NewDecFromStr(pair.C[0]) - if err != nil { - return nil, fmt.Errorf("failed to parse Kraken price (%s) for %s", pair.C[0], t) + for time := range reconnectTicker.C { + if err := p.reconnect(); err != nil { + p.logger.Err(err).Msgf("kraken provider attempted to reconnect %d times at %s", connectionTries, time.String()) + continue } - volume, err := sdk.NewDecFromStr(pair.V[1]) - if err != nil { - return nil, fmt.Errorf("failed to parse Kraken volume (%s) for %s", pair.V[1], t) + if connectionTries > maxReconnectionTries { + p.logger.Warn().Msgf("kraken provider failed to reconnect %d times", connectionTries) } + connectionTries++ + return + } +} - tickerPrices[t] = TickerPrice{Price: price, Volume: volume} +// messageReceivedSubscriptionStatus handle the subscription status message +// sent by the provider. +func (p *KrakenProvider) messageReceivedSubscriptionStatus(bz []byte) { + var subscriptionStatus KrakenEventSubscriptionStatus + if err := json.Unmarshal(bz, &subscriptionStatus); err != nil { + p.logger.Err(err).Msg("Kraken provider could not unmarshal KrakenEventSubscriptionStatus") + return } - return tickerPrices, nil + switch subscriptionStatus.Status { + case "error": + p.logger.Error().Msg(subscriptionStatus.ErrorMessage) + p.removeSubscribedTickers(krakenPairToCurrencyPairSymbol(subscriptionStatus.Pair)) + return + case "unsubscribed": + p.logger.Debug().Msgf("ticker %s was unsubscribed", subscriptionStatus.Pair) + p.removeSubscribedTickers(krakenPairToCurrencyPairSymbol(subscriptionStatus.Pair)) + return + } +} + +// messageReceivedSystemStatus handle the system status and try to reconnect if it is not online. +func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) { + var systemStatus KrakenEventSystemStatus + if err := json.Unmarshal(bz, &systemStatus); err != nil { + p.logger.Err(err).Msg("Kraken provider could not unmarshal KrakenEventSystemStatus") + return + } + + if strings.EqualFold(systemStatus.Status, "online") { + return + } + + p.keepReconnecting() +} + +// setTickerPair sets an ticker to the map thread safe by the mutex. +func (p *KrakenProvider) setTickerPair(symbol string, ticker TickerPrice) { + p.mtx.Lock() + defer p.mtx.Unlock() + p.tickers[symbol] = ticker +} + +// ping to check websocket connection. +func (p *KrakenProvider) ping() error { + return p.wsClient.WriteMessage(websocket.PingMessage, ping) +} + +// subscribePairs write the subscription msg to the provider. +func (p *KrakenProvider) subscribePairs(pairs ...string) error { + subsMsg := newKrakenSubscriptionMsg(pairs...) + return p.wsClient.WriteJSON(subsMsg) +} + +// setSubscribedPairs sets N currency pairs to the map of subscribed pairs. +func (p *KrakenProvider) setSubscribedPairs(cps ...types.CurrencyPair) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, cp := range cps { + p.subscribedPairs[cp.String()] = cp + } +} + +// removeSubscribedTickers delete N pairs from the subscribed map. +func (p *KrakenProvider) removeSubscribedTickers(tickerSymbols ...string) { + p.mtx.Lock() + defer p.mtx.Unlock() + + for _, tickerSymbol := range tickerSymbols { + delete(p.subscribedPairs, tickerSymbol) + } +} + +// toTickerPrice return a TickerPrice based on the KrakenTicker. +func (ticker KrakenTicker) toTickerPrice(symbol string) (TickerPrice, error) { + if len(ticker.C) != 2 || len(ticker.V) != 2 { + return TickerPrice{}, fmt.Errorf("error converting KrakenTicker to TickerPrice") + } + // ticker.C has the Price in the first position + // ticker.V has the totla Value over last 24 hours in the second position + return newTickerPrice("Kraken", symbol, ticker.C[0], ticker.V[1]) +} + +// newKrakenSubscriptionMsg returns a new subscription Msg. +func newKrakenSubscriptionMsg(pairs ...string) KrakenSubscriptionMsg { + return KrakenSubscriptionMsg{ + Event: "subscribe", + Pair: pairs, + Subscription: KrakenSubscriptionChannel{ + Name: "ticker", + }, + } +} + +// krakenPairToCurrencyPairSymbol receives a kraken pair formated +// ex.: ATOM/USDT and return currencyPair Symbol ATOMUSDT. +func krakenPairToCurrencyPairSymbol(krakenPair string) string { + return strings.Replace(krakenPair, "/", "", -1) +} + +// currencyPairToKrakenPair receives a currency pair +// and return kraken ticker symbol ATOM/USDT. +func currencyPairToKrakenPair(cp types.CurrencyPair) string { + return strings.ToUpper(cp.Base + "/" + cp.Quote) +} + +// normalizeKrakenBTCPair changes XBT pairs to BTC, +// since other providers list bitcoin as BTC +func normalizeKrakenBTCPair(ticker string) string { + return strings.Replace(ticker, "XBT", "BTC", 1) } diff --git a/price-feeder/oracle/provider/kraken_test.go b/price-feeder/oracle/provider/kraken_test.go index 46423ef161..d58816b6a7 100644 --- a/price-feeder/oracle/provider/kraken_test.go +++ b/price-feeder/oracle/provider/kraken_test.go @@ -1,127 +1,91 @@ package provider import ( - "net/http" - "net/http/httptest" + "context" "testing" sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/rs/zerolog" "github.com/stretchr/testify/require" "github.com/umee-network/umee/price-feeder/oracle/types" ) func TestKrakenProvider_GetTickerPrices(t *testing.T) { - p := NewKrakenProvider() + p, err := NewKrakenProvider(context.TODO(), zerolog.Nop(), types.CurrencyPair{Base: "BTC", Quote: "USDT"}) + require.NoError(t, err) t.Run("valid_request_single_ticker", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - require.Equal(t, "/0/public/Ticker?pair=ATOMUSD", req.URL.String()) - resp := `{ - "error": [], - "result": { - "ATOMUSD": { - "c": ["35.0872000", "0.32546988"], - "v": ["1920.83610601", "7954.00219674"] - } - } - } - ` - rw.Write([]byte(resp)) - })) - defer server.Close() - - p.client = server.Client() - p.baseURL = server.URL - - prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "ATOM", Quote: "USD"}) + lastPrice := sdk.MustNewDecFromStr("34.69000000") + volume := sdk.MustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]TickerPrice{} + tickerMap["ATOMUSDT"] = TickerPrice{ + Price: lastPrice, + Volume: volume, + } + + p.tickers = tickerMap + + prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "ATOM", Quote: "USDT"}) require.NoError(t, err) require.Len(t, prices, 1) - require.Equal(t, sdk.MustNewDecFromStr("35.0872"), prices["ATOMUSD"].Price) - require.Equal(t, sdk.MustNewDecFromStr("7954.00219674"), prices["ATOMUSD"].Volume) + require.Equal(t, lastPrice, prices["ATOMUSDT"].Price) + require.Equal(t, volume, prices["ATOMUSDT"].Volume) }) t.Run("valid_request_multi_ticker", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - require.Equal(t, "/0/public/Ticker?pair=ATOMUSD,XXBTZUSD", req.URL.String()) - resp := `{ - "error": [], - "result": { - "ATOMUSD": { - "c": ["35.0872000", "0.32546988"], - "v": ["1920.83610601", "7954.00219674"] - }, - "XXBTZUSD": { - "c": ["63339.40000", "0.00010000"], - "v": ["1920.83610601", "7954.00219674"] - } - } - } - ` - rw.Write([]byte(resp)) - })) - defer server.Close() - - p.client = server.Client() - p.baseURL = server.URL - + lastPriceAtom := sdk.MustNewDecFromStr("34.69000000") + lastPriceLuna := sdk.MustNewDecFromStr("41.35000000") + volume := sdk.MustNewDecFromStr("2396974.02000000") + + tickerMap := map[string]TickerPrice{} + tickerMap["ATOMUSDT"] = TickerPrice{ + Price: lastPriceAtom, + Volume: volume, + } + + tickerMap["LUNAUSDT"] = TickerPrice{ + Price: lastPriceLuna, + Volume: volume, + } + + p.tickers = tickerMap prices, err := p.GetTickerPrices( - types.CurrencyPair{Base: "ATOM", Quote: "USD"}, - types.CurrencyPair{Base: "XXBTZ", Quote: "USD"}, + types.CurrencyPair{Base: "ATOM", Quote: "USDT"}, + types.CurrencyPair{Base: "LUNA", Quote: "USDT"}, ) require.NoError(t, err) require.Len(t, prices, 2) - require.Equal(t, sdk.MustNewDecFromStr("35.0872"), prices["ATOMUSD"].Price) - require.Equal(t, sdk.MustNewDecFromStr("7954.00219674"), prices["ATOMUSD"].Volume) - require.Equal(t, sdk.MustNewDecFromStr("63339.40000"), prices["XXBTZUSD"].Price) - require.Equal(t, sdk.MustNewDecFromStr("7954.00219674"), prices["XXBTZUSD"].Volume) - }) - - t.Run("invalid_request_bad_response", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - require.Equal(t, "/0/public/Ticker?pair=ATOMUSD", req.URL.String()) - rw.Write([]byte(`FOO`)) - })) - defer server.Close() - - p.client = server.Client() - p.baseURL = server.URL - - prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "ATOM", Quote: "USD"}) - require.Error(t, err) - require.Nil(t, prices) + require.Equal(t, lastPriceAtom, prices["ATOMUSDT"].Price) + require.Equal(t, volume, prices["ATOMUSDT"].Volume) + require.Equal(t, lastPriceLuna, prices["LUNAUSDT"].Price) + require.Equal(t, volume, prices["LUNAUSDT"].Volume) }) t.Run("invalid_request_invalid_ticker", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { - require.Equal(t, "/0/public/Ticker?pair=FOOBAR", req.URL.String()) - resp := `{ - "error": ["EQuery:Unknown asset pair"] - } - ` - rw.Write([]byte(resp)) - })) - defer server.Close() - - p.client = server.Client() - p.baseURL = server.URL - prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) require.Error(t, err) + require.Equal(t, "kraken provider failed to get ticker price for FOOBAR", err.Error()) require.Nil(t, prices) }) +} + +func TestKrakenPairToCurrencyPairSymbol(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + currencyPairSymbol := krakenPairToCurrencyPairSymbol("ATOM/USDT") + require.Equal(t, cp.String(), currencyPairSymbol) +} - t.Run("check_redirect", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(rw http.ResponseWriter, r *http.Request) { - http.Redirect(rw, r, p.baseURL, http.StatusTemporaryRedirect) - })) - defer server.Close() +func TestKrakenCurrencyPairToKrakenPair(t *testing.T) { + cp := types.CurrencyPair{Base: "ATOM", Quote: "USDT"} + krakenSymbol := currencyPairToKrakenPair(cp) + require.Equal(t, krakenSymbol, "ATOM/USDT") +} - server.Client().CheckRedirect = preventRedirect - p.client = server.Client() - p.baseURL = server.URL +func TestNormalizeKrakenBTCPair(t *testing.T) { + btcSymbol := normalizeKrakenBTCPair("XBT/USDT") + require.Equal(t, btcSymbol, "BTC/USDT") - prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "ATOM", Quote: "USDT"}) - require.Error(t, err) - require.Nil(t, prices) - }) + atomSymbol := normalizeKrakenBTCPair("ATOM/USDT") + require.Equal(t, atomSymbol, "ATOM/USDT") } diff --git a/price-feeder/oracle/provider/okx.go b/price-feeder/oracle/provider/okx.go index 5e76961ef1..4fab072c6e 100644 --- a/price-feeder/oracle/provider/okx.go +++ b/price-feeder/oracle/provider/okx.go @@ -115,7 +115,7 @@ func (p *OkxProvider) getTickerPrice(cp types.CurrencyPair) (TickerPrice, error) instrumentId := getInstrumentId(cp) tickerPair, ok := p.tickers[instrumentId] if !ok { - return TickerPrice{}, fmt.Errorf("failed to get ticker price for %s", instrumentId) + return TickerPrice{}, fmt.Errorf("okx provider failed to get ticker price for %s", instrumentId) } return tickerPair.toTickerPrice() @@ -141,6 +141,7 @@ func (p *OkxProvider) handleReceivedTickers(ctx context.Context) { continue } + p.resetReconnectTimer() p.messageReceived(messageType, bz) case <-p.reconnectTimer.C: // reset by the pongHandler @@ -163,7 +164,6 @@ func (p *OkxProvider) messageReceived(messageType int, bz []byte) { return } - p.resetReconnectTimer() for _, tickerPair := range tickerResp.Data { p.setTickerPair(tickerPair) } diff --git a/price-feeder/oracle/provider/okx_test.go b/price-feeder/oracle/provider/okx_test.go index 20d77cb361..904838bffc 100644 --- a/price-feeder/oracle/provider/okx_test.go +++ b/price-feeder/oracle/provider/okx_test.go @@ -68,7 +68,7 @@ func TestOkxProvider_GetTickerPrices(t *testing.T) { t.Run("invalid_request_invalid_ticker", func(t *testing.T) { prices, err := p.GetTickerPrices(types.CurrencyPair{Base: "FOO", Quote: "BAR"}) require.Error(t, err) - require.Equal(t, "failed to get ticker price for FOO-BAR", err.Error()) + require.Equal(t, "okx provider failed to get ticker price for FOO-BAR", err.Error()) require.Nil(t, prices) }) } diff --git a/price-feeder/oracle/provider/provider.go b/price-feeder/oracle/provider/provider.go index fff9b0943a..b437683ebc 100644 --- a/price-feeder/oracle/provider/provider.go +++ b/price-feeder/oracle/provider/provider.go @@ -10,8 +10,11 @@ import ( ) const ( - defaultTimeout = 10 * time.Second - defaultReadNewWSMessage = 50 * time.Millisecond + defaultTimeout = 10 * time.Second + defaultReadNewWSMessage = 50 * time.Millisecond + defaultMaxConnectionTime = time.Hour * 23 // should be < 24h + defaultReconnectTime = time.Minute * 20 + maxReconnectionTries = 3 ) var (