Skip to content

Commit

Permalink
chore: price-feeder: clean up logs (#587) (#589)
Browse files Browse the repository at this point in the history
* chore: clean up log for kraken

* chore: clean up log for binance

* chore: clean up log for huobi

* chore: clean up log for binance

* chore: clean up log for kraken

* chore: clean up log for okx

* refactor: add clean up price-feeder logs to CHANGELOG

(cherry picked from commit 989ed32)

Co-authored-by: Rafael Tenfen <[email protected]>
  • Loading branch information
mergify[bot] and RafilxTenfen authored Feb 25, 2022
1 parent dbef858 commit 5d2b602
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 39 deletions.
4 changes: 4 additions & 0 deletions price-feeder/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ Ref: https://keepachangelog.com/en/1.0.0/
- [#574](https://github.com/umee-network/umee/pull/574) Stop registering metrics endpoint if telemetry is disabled.
- [#573](https://github.com/umee-network/umee/pull/573) Strengthen CORS settings.

### Refactor

- [#587](https://github.com/umee-network/umee/pull/587) Clean up logs from price feeder providers.

## [v0.1.0](https://github.com/umee-network/umee/releases/tag/price-feeder%2Fv0.1.0) - 2022-02-07

### Features
Expand Down
14 changes: 7 additions & 7 deletions price-feeder/oracle/provider/binance.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewBinanceProvider(ctx context.Context, logger zerolog.Logger, pairs ...typ
provider := &BinanceProvider{
wsURL: wsURL,
wsClient: wsConn,
logger: logger.With().Str("module", "oracle").Logger(),
logger: logger.With().Str("provider", "binance").Logger(),
tickers: map[string]BinanceTicker{},
subscribedPairs: pairs,
}
Expand Down Expand Up @@ -118,7 +118,7 @@ func (p *BinanceProvider) messageReceived(messageType int, bz []byte) {
var tickerResp BinanceTicker
if err := json.Unmarshal(bz, &tickerResp); err != nil {
// sometimes it returns other messages which are not ticker responses
p.logger.Err(err).Msg("Binance provider could not unmarshal")
p.logger.Err(err).Msg("could not unmarshal ticker")
return
}

Expand Down Expand Up @@ -163,7 +163,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
// if some error occurs continue to try to read the next message
p.logger.Err(err).Msg("Binance provider could not read message")
p.logger.Err(err).Msg("could not read message")
continue
}

Expand All @@ -175,7 +175,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {

case <-reconnectTicker.C:
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("binance provider error reconnecting")
p.logger.Err(err).Msg("error reconnecting")
p.keepReconnecting()
}
}
Expand All @@ -192,7 +192,7 @@ func (p *BinanceProvider) handleWebSocketMsgs(ctx context.Context) {
func (p *BinanceProvider) reconnect() error {
p.wsClient.Close()

p.logger.Debug().Msg("binance reconnecting websocket")
p.logger.Debug().Msg("reconnecting websocket")
wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
if err != nil {
return fmt.Errorf("error reconnect to binance websocket: %w", err)
Expand All @@ -210,12 +210,12 @@ func (p *BinanceProvider) keepReconnecting() {

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("binance provider attempted to reconnect %d times at %s", connectionTries, time.String())
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("binance provider failed to reconnect %d times", connectionTries)
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
connectionTries++
return
Expand Down
20 changes: 10 additions & 10 deletions price-feeder/oracle/provider/huobi.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewHuobiProvider(ctx context.Context, logger zerolog.Logger, pairs ...types
provider := &HuobiProvider{
wsURL: wsURL,
wsClient: wsConn,
logger: logger.With().Str("module", "oracle").Logger(),
logger: logger.With().Str("provider", "huobi").Logger(),
tickers: map[string]HuobiTicker{},
subscribedPairs: pairs,
}
Expand Down Expand Up @@ -113,11 +113,11 @@ func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) {
if err != nil {
// If some error occurs, check if connection is alive
// and continue to try to read the next message.
p.logger.Err(err).Msg("failed to read message from Huobi provider")
p.logger.Err(err).Msg("failed to read message")
if err := p.ping(); err != nil {
p.logger.Err(err).Msg("failed to send ping")
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting to the Huobi provider")
p.logger.Err(err).Msg("error reconnecting")
}
}
continue
Expand All @@ -131,7 +131,7 @@ func (p *HuobiProvider) handleWebSocketMsgs(ctx context.Context) {

case <-reconnectTicker.C:
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("error reconnecting to the Huobi provider")
p.logger.Err(err).Msg("error reconnecting")
}
}
}
Expand All @@ -147,7 +147,7 @@ func (p *HuobiProvider) messageReceived(messageType int, bz []byte, reconnectTic

bz, err := decompressGzip(bz)
if err != nil {
p.logger.Err(err).Msg("failed to decompress Huobi gziped message")
p.logger.Err(err).Msg("failed to decompress gziped message")
return
}

Expand All @@ -159,7 +159,7 @@ func (p *HuobiProvider) messageReceived(messageType int, bz []byte, reconnectTic
var tickerResp HuobiTicker
if err := json.Unmarshal(bz, &tickerResp); err != nil {
// sometimes it returns other messages which are not ticker responses
p.logger.Err(err).Msg("failed to unmarshal message from Huobi provider")
p.logger.Err(err).Msg("failed to unmarshal message")
return
}

Expand All @@ -185,14 +185,14 @@ func (p *HuobiProvider) pong(bz []byte, reconnectTicker *time.Ticker) {
}

if err := json.Unmarshal(bz, &heartbeat); err != nil {
p.logger.Err(err).Msg("Huobi provider could not unmarshal heartbeat")
p.logger.Err(err).Msg("could not unmarshal heartbeat")
return
}

if err := p.wsClient.WriteJSON(struct {
Pong uint64 `json:"pong"`
}{Pong: heartbeat.Ping}); err != nil {
p.logger.Err(err).Msg("Huobi provider could not send pong message back")
p.logger.Err(err).Msg("could not send pong message back")
}
}

Expand All @@ -211,10 +211,10 @@ func (p *HuobiProvider) setTickerPair(ticker HuobiTicker) {
func (p *HuobiProvider) reconnect() error {
p.wsClient.Close()

p.logger.Debug().Msg("huobi reconnecting websocket")
p.logger.Debug().Msg("reconnecting websocket")
wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
if err != nil {
return fmt.Errorf("error reconnecting to huobi websocket: %w", err)
return fmt.Errorf("error reconnecting to Huobi websocket: %w", err)
}
p.wsClient = wsConn

Expand Down
32 changes: 16 additions & 16 deletions price-feeder/oracle/provider/kraken.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,13 @@ func NewKrakenProvider(ctx context.Context, logger zerolog.Logger, pairs ...type

wsConn, _, err := websocket.DefaultDialer.Dial(wsURL.String(), nil)
if err != nil {
return nil, fmt.Errorf("error connecting to Kraken websocket: %w", err)
return nil, fmt.Errorf("error connecting to websocket: %w", err)
}

provider := &KrakenProvider{
wsURL: wsURL,
wsClient: wsConn,
logger: logger.With().Str("module", "oracle").Logger(),
logger: logger.With().Str("provider", "kraken").Logger(),
tickers: map[string]TickerPrice{},
subscribedPairs: map[string]types.CurrencyPair{},
}
Expand Down Expand Up @@ -149,7 +149,7 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
// if some error occurs continue to try to read the next message
p.logger.Err(err).Msg("kraken provider could not read message")
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
p.logger.Err(err).Msg("failed to send ping")
p.keepReconnecting()
Expand All @@ -165,7 +165,7 @@ func (p *KrakenProvider) handleWebSocketMsgs(ctx context.Context) {

case <-reconnectTicker.C:
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("kraken provider attempted to reconnect")
p.logger.Err(err).Msg("attempted to reconnect")
p.keepReconnecting()
}
}
Expand All @@ -180,7 +180,7 @@ func (p *KrakenProvider) messageReceived(messageType int, bz []byte) {

var krakenEvent KrakenEvent
if err := json.Unmarshal(bz, &krakenEvent); err != nil {
p.logger.Debug().Msg("kraken provider received a message that is not an event")
p.logger.Debug().Msg("received a message that is not an event")
// msg is not an event, it will try to marshal to ticker message
p.messageReceivedTickerPrice(bz)
return
Expand All @@ -202,36 +202,36 @@ func (p *KrakenProvider) messageReceivedTickerPrice(bz []byte) {
// kraken documentation https://docs.kraken.com/websockets/#message-ticker
var tickerMessage []interface{}
if err := json.Unmarshal(bz, &tickerMessage); err != nil {
p.logger.Err(err).Msg("Kraken provider could not unmarshal")
p.logger.Err(err).Msg("could not unmarshal ticker")
return
}

if len(tickerMessage) != 4 {
p.logger.Debug().Msg("Kraken provider sent something different than ticker")
p.logger.Debug().Msg("sent an unexpected structure")
return
}

channelName, ok := tickerMessage[2].(string)
if !ok || channelName != "ticker" {
p.logger.Debug().Msg("Kraken provider sent an unexpected channel name")
p.logger.Debug().Msg("sent an unexpected channel name")
return
}

tickerBz, err := json.Marshal(tickerMessage[1])
if err != nil {
p.logger.Err(err).Msg("Kraken provider could not marshal ticker message")
p.logger.Err(err).Msg("could not marshal ticker message")
return
}

var krakenTicker KrakenTicker
if err := json.Unmarshal(tickerBz, &krakenTicker); err != nil {
p.logger.Err(err).Msg("Kraken provider could not unmarshal ticker")
p.logger.Err(err).Msg("could not unmarshal ticker")
return
}

krakenPair, ok := tickerMessage[3].(string)
if !ok {
p.logger.Debug().Msg("Kraken provider sent an unexpected pair")
p.logger.Debug().Msg("sent an unexpected pair")
return
}

Expand All @@ -240,7 +240,7 @@ func (p *KrakenProvider) messageReceivedTickerPrice(bz []byte) {

tickerPrice, err := krakenTicker.toTickerPrice(currencyPairSymbol)
if err != nil {
p.logger.Err(err).Msg("Kraken provider could not parse kraken ticker to ticker price")
p.logger.Err(err).Msg("could not parse kraken ticker to ticker price")
return
}

Expand Down Expand Up @@ -275,12 +275,12 @@ func (p *KrakenProvider) keepReconnecting() {

for time := range reconnectTicker.C {
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msgf("kraken provider attempted to reconnect %d times at %s", connectionTries, time.String())
p.logger.Err(err).Msgf("attempted to reconnect %d times at %s", connectionTries, time.String())
continue
}

if connectionTries > maxReconnectionTries {
p.logger.Warn().Msgf("kraken provider failed to reconnect %d times", connectionTries)
p.logger.Warn().Msgf("failed to reconnect %d times", connectionTries)
}
connectionTries++
return
Expand All @@ -292,7 +292,7 @@ func (p *KrakenProvider) keepReconnecting() {
func (p *KrakenProvider) messageReceivedSubscriptionStatus(bz []byte) {
var subscriptionStatus KrakenEventSubscriptionStatus
if err := json.Unmarshal(bz, &subscriptionStatus); err != nil {
p.logger.Err(err).Msg("Kraken provider could not unmarshal KrakenEventSubscriptionStatus")
p.logger.Err(err).Msg("provider could not unmarshal KrakenEventSubscriptionStatus")
return
}

Expand All @@ -312,7 +312,7 @@ func (p *KrakenProvider) messageReceivedSubscriptionStatus(bz []byte) {
func (p *KrakenProvider) messageReceivedSystemStatus(bz []byte) {
var systemStatus KrakenEventSystemStatus
if err := json.Unmarshal(bz, &systemStatus); err != nil {
p.logger.Err(err).Msg("Kraken provider could not unmarshal KrakenEventSystemStatus")
p.logger.Err(err).Msg("could not unmarshal event system status")
return
}

Expand Down
12 changes: 6 additions & 6 deletions price-feeder/oracle/provider/okx.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ func NewOkxProvider(ctx context.Context, logger zerolog.Logger, pairs ...types.C
provider := &OkxProvider{
wsURL: wsURL,
wsClient: wsConn,
logger: logger.With().Str("module", "oracle").Logger(),
logger: logger.With().Str("provider", "okx").Logger(),
tickers: map[string]OkxTickerPair{},
reconnectTimer: time.NewTicker(okxPingCheck),
subscribedPairs: pairs,
Expand Down Expand Up @@ -130,9 +130,9 @@ func (p *OkxProvider) handleReceivedTickers(ctx context.Context) {
messageType, bz, err := p.wsClient.ReadMessage()
if err != nil {
// if some error occurs continue to try to read the next message
p.logger.Err(err).Msg("Okx provider could not read message")
p.logger.Err(err).Msg("could not read message")
if err := p.ping(); err != nil {
p.logger.Err(err).Msg("Okx provider could not send ping")
p.logger.Err(err).Msg("could not send ping")
}
continue
}
Expand All @@ -146,7 +146,7 @@ func (p *OkxProvider) handleReceivedTickers(ctx context.Context) {

case <-p.reconnectTimer.C: // reset by the pongHandler
if err := p.reconnect(); err != nil {
p.logger.Err(err).Msg("Okx provider error reconnecting")
p.logger.Err(err).Msg("error reconnecting")
}
}
}
Expand All @@ -160,7 +160,7 @@ func (p *OkxProvider) messageReceived(messageType int, bz []byte) {
var tickerResp OkxTickerResponse
if err := json.Unmarshal(bz, &tickerResp); err != nil {
// sometimes it returns other messages which are not tickerResponses
p.logger.Err(err).Msg("Okx provider could not unmarshal")
p.logger.Err(err).Msg("could not unmarshal")
return
}

Expand Down Expand Up @@ -203,7 +203,7 @@ func (p *OkxProvider) resetReconnectTimer() {
func (p *OkxProvider) reconnect() error {
p.wsClient.Close()

p.logger.Debug().Msg("Okx reconnecting websocket")
p.logger.Debug().Msg("reconnecting websocket")
wsConn, _, err := websocket.DefaultDialer.Dial(p.wsURL.String(), nil)
if err != nil {
return fmt.Errorf("error reconnecting to Okx websocket: %w", err)
Expand Down

0 comments on commit 5d2b602

Please sign in to comment.