Skip to content

Commit

Permalink
(OraklNode) Error Sentinel (#1452)
Browse files Browse the repository at this point in the history
* feat: use error declared from `errors/sentinel`

* fix: add missing err

* feat: fix testcodes

* fix: add POR into map, update local dummy contract addr
  • Loading branch information
nick-bisonai authored May 4, 2024
1 parent aa12e5d commit de618b6
Show file tree
Hide file tree
Showing 29 changed files with 384 additions and 189 deletions.
2 changes: 1 addition & 1 deletion node/.env.local
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ SIGNER_PK=


# this address is dummy contract in baobab
SUBMISSION_PROXY_CONTRACT=0xba5208a3b387796773Ecb38524C41eEa3b3541F9
SUBMISSION_PROXY_CONTRACT=0x284E7E442d64108Bd593Ec4b41538dCE5aEdA858

PRIVATE_NETWORK_SECRET=anything

Expand Down
7 changes: 4 additions & 3 deletions node/pkg/admin/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/db"
errorSentinel "bisonai.com/orakl/node/pkg/error"

"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
Expand All @@ -30,12 +31,12 @@ func Setup(setupInfo SetupInfo) (*fiber.App, error) {
ctx := context.Background()
_, err := db.GetPool(ctx)
if err != nil {
return nil, errors.New("error getting db pool")
return nil, errorSentinel.ErrAdminDbPoolNotFound
}

_, err = db.GetRedisConn(ctx)
if err != nil {
return nil, errors.New("error getting redis conn")
return nil, errorSentinel.ErrAdminRedisConnNotFound
}

app := fiber.New(fiber.Config{
Expand Down Expand Up @@ -116,7 +117,7 @@ func SendMessage(c *fiber.Ctx, to string, command string, args map[string]interf

messageBus, ok := c.Locals("bus").(*bus.MessageBus)
if !ok {
return msg, errors.New("bus is not found, failed to message fetcher")
return msg, errorSentinel.ErrAdminMessageBusNotFound
}

msg = bus.Message{
Expand Down
20 changes: 10 additions & 10 deletions node/pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package aggregator
import (
"context"
"encoding/json"
"fmt"
"sync"

"time"

"bisonai.com/orakl/node/pkg/chain/helper"
errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/raft"
"bisonai.com/orakl/node/pkg/utils/calculator"
pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -18,7 +18,7 @@ import (

func NewAggregator(h host.Host, ps *pubsub.PubSub, topicString string, config Config) (*Aggregator, error) {
if h == nil || ps == nil || topicString == "" {
return nil, fmt.Errorf("invalid parameters")
return nil, errorSentinel.ErrAggregatorInvalidInitValue
}

topic, err := ps.Join(topicString)
Expand Down Expand Up @@ -84,7 +84,7 @@ func (n *Aggregator) HandleCustomMessage(ctx context.Context, message raft.Messa
case ProofMsg:
return n.HandleProofMessage(ctx, message)
default:
return fmt.Errorf("unknown message type received in HandleCustomMessage: %v", message.Type)
return errorSentinel.ErrAggregatorUnhandledCustomMessage
}
}

Expand All @@ -98,12 +98,12 @@ func (n *Aggregator) HandleRoundSyncMessage(ctx context.Context, msg raft.Messag

if msg.SentFrom != n.Raft.GetLeader() {
log.Warn().Str("Player", "Aggregator").Msg("round sync message sent from non-leader")
return fmt.Errorf("round sync message sent from non-leader")
return errorSentinel.ErrAggregatorNonLeaderRaftMessage
}

if roundSyncMessage.LeaderID == "" || roundSyncMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid round sync message")
return fmt.Errorf("invalid round sync message: %v", roundSyncMessage)
return errorSentinel.ErrAggregatorInvalidRaftMessage
}

if n.Raft.GetRole() != raft.Leader {
Expand Down Expand Up @@ -148,7 +148,7 @@ func (n *Aggregator) HandleSyncReplyMessage(ctx context.Context, msg raft.Messag

if syncReplyMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid sync reply message")
return fmt.Errorf("invalid sync reply message: %v", syncReplyMessage)
return errorSentinel.ErrAggregatorInvalidRaftMessage
}

n.AggregatorMutex.Lock()
Expand Down Expand Up @@ -189,12 +189,12 @@ func (n *Aggregator) HandleTriggerMessage(ctx context.Context, msg raft.Message)

if triggerMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid trigger message")
return fmt.Errorf("invalid trigger message: %v", triggerMessage)
return errorSentinel.ErrAggregatorInvalidRaftMessage
}

if msg.SentFrom != n.Raft.GetLeader() {
log.Warn().Str("Player", "Aggregator").Msg("trigger message sent from non-leader")
return fmt.Errorf("trigger message sent from non-leader")
return errorSentinel.ErrAggregatorNonLeaderRaftMessage
}
defer delete(n.PreparedLocalAggregates, triggerMessage.RoundID)
return n.PublishPriceDataMessage(triggerMessage.RoundID, n.PreparedLocalAggregates[triggerMessage.RoundID])
Expand All @@ -210,7 +210,7 @@ func (n *Aggregator) HandlePriceDataMessage(ctx context.Context, msg raft.Messag

if priceDataMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid price data message")
return fmt.Errorf("invalid price data message: %v", priceDataMessage)
return errorSentinel.ErrAggregatorInvalidRaftMessage
}

n.AggregatorMutex.Lock()
Expand Down Expand Up @@ -258,7 +258,7 @@ func (n *Aggregator) HandleProofMessage(ctx context.Context, msg raft.Message) e

if proofMessage.RoundID == 0 {
log.Error().Str("Player", "Aggregator").Msg("invalid proof message")
return fmt.Errorf("invalid proof message: %v", proofMessage)
return errorSentinel.ErrAggregatorInvalidRaftMessage
}

n.AggregatorMutex.Lock()
Expand Down
16 changes: 8 additions & 8 deletions node/pkg/aggregator/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ package aggregator

import (
"context"
"errors"
"math/rand"
"strconv"
"time"

"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/db"

errorSentinel "bisonai.com/orakl/node/pkg/error"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/rs/zerolog/log"
Expand Down Expand Up @@ -91,7 +91,7 @@ func (a *App) getConfigs(ctx context.Context) ([]Config, error) {

func (a *App) startAggregator(ctx context.Context, aggregator *Aggregator) error {
if aggregator == nil {
return errors.New("aggregator not found")
return errorSentinel.ErrAggregatorNotFound
}

log.Debug().Str("Player", "Aggregator").Str("name", aggregator.Name).Msg("starting aggregator")
Expand All @@ -113,7 +113,7 @@ func (a *App) startAggregator(ctx context.Context, aggregator *Aggregator) error
func (a *App) startAggregatorById(ctx context.Context, id int32) error {
aggregator, ok := a.Aggregators[id]
if !ok {
return errors.New("aggregator not found")
return errorSentinel.ErrAggregatorNotFound
}
return a.startAggregator(ctx, aggregator)
}
Expand All @@ -139,7 +139,7 @@ func (a *App) stopAggregator(aggregator *Aggregator) error {
return nil
}
if aggregator.nodeCancel == nil {
return errors.New("aggregator cancel function not found")
return errorSentinel.ErrAggregatorCancelNotFound
}
aggregator.nodeCancel()
aggregator.isRunning = false
Expand All @@ -150,7 +150,7 @@ func (a *App) stopAggregator(aggregator *Aggregator) error {
func (a *App) stopAggregatorById(id int32) error {
aggregator, ok := a.Aggregators[id]
if !ok {
return errors.New("aggregator not found")
return errorSentinel.ErrAggregatorNotFound
}
return a.stopAggregator(aggregator)
}
Expand Down Expand Up @@ -195,7 +195,7 @@ func (a *App) getAggregatorByName(name string) (*Aggregator, error) {
return aggregator, nil
}
}
return nil, errors.New("aggregator not found")
return nil, errorSentinel.ErrAggregatorNotFound
}

func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
Expand All @@ -210,7 +210,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
}

if msg.From != bus.ADMIN {
bus.HandleMessageError(errors.New("non-admin"), msg, "aggregator received message from non-admin")
bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "aggregator received message from non-admin")
return
}

Expand Down Expand Up @@ -281,7 +281,7 @@ func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
}
msg.Response <- bus.MessageResponse{Success: true}
default:
bus.HandleMessageError(errors.New("unknown-command"), msg, "aggregator received unknown command")
bus.HandleMessageError(errorSentinel.ErrBusUnknownCommand, msg, "aggregator received unknown command")
return
}
}
7 changes: 3 additions & 4 deletions node/pkg/aggregator/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ package aggregator
import (
"bytes"
"context"
"fmt"
"strconv"
"strings"
"time"

"bisonai.com/orakl/node/pkg/db"
errorSentinel "bisonai.com/orakl/node/pkg/error"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -48,7 +47,7 @@ func InsertGlobalAggregate(ctx context.Context, configId int32, value int64, rou
}

if len(errs) > 0 {
return fmt.Errorf(strings.Join(errs, "; "))
return errorSentinel.ErrAggregatorGlobalAggregateInsertion
}

return nil
Expand Down Expand Up @@ -80,7 +79,7 @@ func InsertProof(ctx context.Context, configId int32, round int32, proofs [][]by
}

if len(errs) > 0 {
return fmt.Errorf(strings.Join(errs, "; "))
return errorSentinel.ErrAggregatorProofInsertion
}

return nil
Expand Down
4 changes: 3 additions & 1 deletion node/pkg/boot/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@ package boot

import (
"context"
"errors"
"fmt"
"os"
"time"

"bisonai.com/orakl/node/pkg/boot/peer"
"bisonai.com/orakl/node/pkg/boot/utils"
"bisonai.com/orakl/node/pkg/db"
errorSentinel "bisonai.com/orakl/node/pkg/error"
libp2p_setup "bisonai.com/orakl/node/pkg/libp2p/setup"
libp2p_utils "bisonai.com/orakl/node/pkg/libp2p/utils"
"github.com/gofiber/fiber/v2"
Expand Down Expand Up @@ -93,7 +95,7 @@ func RefreshJob(ctx context.Context) error {
isAlive, liveCheckErr := libp2p_utils.IsHostAlive(ctx, h, connectionUrl)
if liveCheckErr != nil {
log.Error().Err(liveCheckErr).Msg("Failed to check peer")
if liveCheckErr.Error() != "failed to connect to peer" {
if !errors.Is(liveCheckErr, errorSentinel.ErrLibP2pFailToConnectPeer) {
continue
}
}
Expand Down
3 changes: 2 additions & 1 deletion node/pkg/boot/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"bisonai.com/orakl/node/pkg/db"
errorSentinel "bisonai.com/orakl/node/pkg/error"
"github.com/gofiber/fiber/v2"
"github.com/gofiber/fiber/v2/middleware/cors"
"github.com/gofiber/fiber/v2/middleware/recover"
Expand All @@ -19,7 +20,7 @@ func Setup(ctx context.Context) (*fiber.App, error) {
_, err := db.GetPool(ctx)
if err != nil {
log.Error().Err(err).Msg("error getting db pool")
return nil, errors.New("error getting db pool")
return nil, errorSentinel.ErrBootAPIDbPoolNotFound
}

app := fiber.New(fiber.Config{
Expand Down
22 changes: 11 additions & 11 deletions node/pkg/bus/bus.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package bus

import (
"errors"
"strconv"

errorSentinel "bisonai.com/orakl/node/pkg/error"
"github.com/rs/zerolog/log"
)

Expand Down Expand Up @@ -47,30 +47,30 @@ func (mb *MessageBus) Subscribe(id string) <-chan Message {
func (mb *MessageBus) Publish(msg Message) error {
ch, ok := mb.channels[msg.To]
if !ok {
return errors.New("channel not found")
return errorSentinel.ErrBusChannelNotFound
}
select {
case ch <- msg:
return nil
default:
return errors.New("failed to send message to channel")
return errorSentinel.ErrBusMsgPublishFail
}
}

func ParseInt64MsgParam(msg Message, param string) (int64, error) {
rawId, ok := msg.Content.Args[param]
if !ok {
return 0, errors.New("param not found in message")
return 0, errorSentinel.ErrBusParamNotFound
}

idPayload, ok := rawId.(string)
if !ok {
return 0, errors.New("failed to convert adapter id to string")
return 0, errorSentinel.ErrBusConvertParamFail
}

id, err := strconv.ParseInt(idPayload, 10, 64)
if err != nil {
return 0, errors.New("failed to parse adapterId")
return 0, errorSentinel.ErrBusParseParamFail
}

return id, nil
Expand All @@ -79,17 +79,17 @@ func ParseInt64MsgParam(msg Message, param string) (int64, error) {
func ParseInt32MsgParam(msg Message, param string) (int32, error) {
rawId, ok := msg.Content.Args[param]
if !ok {
return 0, errors.New("param not found in message")
return 0, errorSentinel.ErrBusParamNotFound
}

idPayload, ok := rawId.(string)
if !ok {
return 0, errors.New("failed to convert adapter id to string")
return 0, errorSentinel.ErrBusConvertParamFail
}

id, err := strconv.ParseInt(idPayload, 10, 32)
if err != nil {
return 0, errors.New("failed to parse adapterId")
return 0, errorSentinel.ErrBusParseParamFail
}

return int32(id), nil
Expand All @@ -98,12 +98,12 @@ func ParseInt32MsgParam(msg Message, param string) (int32, error) {
func ParseStringMsgParam(msg Message, param string) (string, error) {
raw, ok := msg.Content.Args[param]
if !ok {
return "", errors.New("param not found in message")
return "", errorSentinel.ErrBusParamNotFound
}

payload, ok := raw.(string)
if !ok {
return "", errors.New("failed to convert param to string")
return "", errorSentinel.ErrBusConvertParamFail
}

return payload, nil
Expand Down
Loading

0 comments on commit de618b6

Please sign in to comment.