Skip to content

Commit

Permalink
(OraklNode) Libp2p reconnect attempt after disconnection (#1799)
Browse files Browse the repository at this point in the history
* feat: reconnection support

* feat: prevent duplicate url inserts

* fix: remove unused features

* fix: simplify subscription code

* feat: admin function to for `sync` and `peerCount`

* fix: add go task, rename bus message type

* fix: add minor log and update test

* fix: update based on feedback

* fix: update based on feedback

* feat: handle default
  • Loading branch information
nick-bisonai authored Jul 15, 2024
1 parent 979dc78 commit 7403efd
Show file tree
Hide file tree
Showing 16 changed files with 411 additions and 8 deletions.
15 changes: 13 additions & 2 deletions node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"bisonai.com/orakl/node/pkg/aggregator"
"bisonai.com/orakl/node/pkg/bus"
"bisonai.com/orakl/node/pkg/fetcher"
"bisonai.com/orakl/node/pkg/libp2p/helper"
libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup"
"bisonai.com/orakl/node/pkg/reporter"
"bisonai.com/orakl/node/pkg/utils/retrier"
Expand Down Expand Up @@ -80,7 +81,6 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()

a := aggregator.New(mb, host, ps)
aggregatorErr := a.Run(ctx)
if aggregatorErr != nil {
Expand All @@ -93,7 +93,6 @@ func main() {
wg.Add(1)
go func() {
defer wg.Done()

r := reporter.New(mb, host, ps)
reporterErr := r.Run(ctx)
if reporterErr != nil {
Expand All @@ -103,6 +102,18 @@ func main() {
}()
log.Info().Msg("Reporter started")

wg.Add(1)
go func() {
defer wg.Done()
helperApp := helper.New(mb, host)
libp2pHelperErr := helperApp.Run(ctx)
if libp2pHelperErr != nil {
log.Error().Err(libp2pHelperErr).Msg("Failed to start libp2p helper")
os.Exit(1)
}
}()
log.Info().Msg("libp2p helper started")

wg.Wait()
}

Expand Down
1 change: 1 addition & 0 deletions node/migrations/boot/000003_add_url_constraints.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE peers DROP CONSTRAINT IF EXISTS unique_url;
2 changes: 2 additions & 0 deletions node/migrations/boot/000003_add_url_constraints.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
ALTER TABLE peers DROP CONSTRAINT IF EXISTS unique_url;
ALTER TABLE peers ADD CONSTRAINT unique_url UNIQUE (url);
2 changes: 2 additions & 0 deletions node/pkg/admin/admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bisonai.com/orakl/node/pkg/admin/config"
"bisonai.com/orakl/node/pkg/admin/feed"
"bisonai.com/orakl/node/pkg/admin/fetcher"
"bisonai.com/orakl/node/pkg/admin/host"
"bisonai.com/orakl/node/pkg/admin/providerUrl"
"bisonai.com/orakl/node/pkg/admin/proxy"
"bisonai.com/orakl/node/pkg/admin/reporter"
Expand Down Expand Up @@ -44,6 +45,7 @@ func Run(bus *bus.MessageBus) error {
wallet.Routes(v1)
providerUrl.Routes(v1)
config.Routes(v1)
host.Routes(v1)

port := os.Getenv("APP_PORT")
if port == "" {
Expand Down
38 changes: 38 additions & 0 deletions node/pkg/admin/host/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package host

import (
"bisonai.com/orakl/node/pkg/admin/utils"
"bisonai.com/orakl/node/pkg/bus"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)

func getPeerCount(c *fiber.Ctx) error {
msg, err := utils.SendMessage(c, bus.LIBP2P, bus.GET_PEER_COUNT, nil)
if err != nil {
log.Error().Err(err).Str("Player", "Admin").Msg("failed to send message to libp2p helper")
return c.Status(fiber.StatusInternalServerError).SendString("failed to get peer count: " + err.Error())
}
resp := <-msg.Response
if !resp.Success {
log.Error().Str("Player", "Admin").Msg("failed to get peer count: " + resp.Args["error"].(string))
return c.Status(fiber.StatusInternalServerError).SendString("failed to get peer count: " + resp.Args["error"].(string))
}

return c.JSON(resp.Args)
}

func sync(c *fiber.Ctx) error {
msg, err := utils.SendMessage(c, bus.LIBP2P, bus.LIBP2P_SYNC, nil)
if err != nil {
log.Error().Err(err).Str("Player", "Admin").Msg("failed to send message to libp2p helper")
return c.Status(fiber.StatusInternalServerError).SendString("failed to sync libp2p host: " + err.Error())
}
resp := <-msg.Response
if !resp.Success {
log.Error().Str("Player", "Admin").Msg("failed to sync libp2p host")
return c.Status(fiber.StatusInternalServerError).SendString("failed to sync libp2p host: " + resp.Args["error"].(string))
}

return c.SendString("libp2p synced")
}
12 changes: 12 additions & 0 deletions node/pkg/admin/host/route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package host

import (
"github.com/gofiber/fiber/v2"
)

func Routes(router fiber.Router) {
host := router.Group("/host")

host.Get("/peercount", getPeerCount)
host.Post("/sync", sync)
}
49 changes: 49 additions & 0 deletions node/pkg/admin/tests/host_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
//nolint:all
package tests

import (
"context"

"testing"

"bisonai.com/orakl/node/pkg/bus"
"github.com/stretchr/testify/assert"
)

func TestGetPeerCount(t *testing.T) {
ctx := context.Background()
cleanup, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()

channel := testItems.mb.Subscribe(bus.LIBP2P)
waitForMessageWithResponse(t, channel, bus.ADMIN, bus.LIBP2P, bus.GET_PEER_COUNT, map[string]any{"Count": 1})

result, err := GetRequest[struct{ Count int }](testItems.app, "/api/v1/host/peercount", nil)
if err != nil {
t.Fatalf("error getting peercount: %v", err)
}

assert.Equal(t, 1, result.Count)
}

func TestSync(t *testing.T) {
ctx := context.Background()
cleanup, testItems, err := setup(ctx)
if err != nil {
t.Fatalf("error setting up test: %v", err)
}
defer cleanup()

channel := testItems.mb.Subscribe(bus.LIBP2P)
waitForMessage(t, channel, bus.ADMIN, bus.LIBP2P, bus.LIBP2P_SYNC)

result, err := RawPostRequest(testItems.app, "/api/v1/host/sync", nil)
if err != nil {
t.Fatalf("error sync libp2p host: %v", err)
}

assert.Equal(t, string(result), "libp2p synced")
}
2 changes: 2 additions & 0 deletions node/pkg/admin/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"bisonai.com/orakl/node/pkg/admin/config"
"bisonai.com/orakl/node/pkg/admin/feed"
"bisonai.com/orakl/node/pkg/admin/fetcher"
"bisonai.com/orakl/node/pkg/admin/host"
"bisonai.com/orakl/node/pkg/admin/providerUrl"
"bisonai.com/orakl/node/pkg/admin/proxy"
"bisonai.com/orakl/node/pkg/admin/reporter"
Expand Down Expand Up @@ -67,6 +68,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
reporter.Routes(v1)
providerUrl.Routes(v1)
config.Routes(v1)
host.Routes(v1)
return adminCleanup(testItems), testItems, nil
}

Expand Down
15 changes: 15 additions & 0 deletions node/pkg/admin/tests/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,3 +158,18 @@ func waitForMessage(t *testing.T, channel <-chan bus.Message, from, to, command
}
}()
}

func waitForMessageWithResponse(t *testing.T, channel <-chan bus.Message, from, to, command string, resp map[string]any) {
go func() {
select {
case msg := <-channel:
if msg.From != from || msg.To != to || msg.Content.Command != command {
t.Errorf("unexpected message: %v", msg)
}
msg.Response <- bus.MessageResponse{Success: true, Args: resp}
case <-time.After(5 * time.Second):
t.Errorf("no message received on channel")
}
}()

}
2 changes: 1 addition & 1 deletion node/pkg/boot/peer/queries.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package peer

const (
InsertPeer = `INSERT INTO peers (url) VALUES (@url) RETURNING *;`
InsertPeer = `INSERT INTO peers (url) VALUES (@url) ON CONFLICT (url) DO NOTHING RETURNING *;`

GetPeer = `SELECT * FROM peers;`

Expand Down
4 changes: 4 additions & 0 deletions node/pkg/bus/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ const (
FETCHER = "fetcher"
AGGREGATOR = "aggregator"
REPORTER = "reporter"
LIBP2P = "libp2p"

// Modular Monolith pkg commands, please follow {verb}_{noun} pattern for both variable name and value
START_FETCHER_APP = "start_fetcher_app"
Expand All @@ -31,4 +32,7 @@ const (
ACTIVATE_REPORTER = "activate_reporter"
DEACTIVATE_REPORTER = "deactivate_reporter"
REFRESH_REPORTER = "refresh_reporter"

GET_PEER_COUNT = "get_peer_count"
LIBP2P_SYNC = "libp2p_sync"
)
119 changes: 119 additions & 0 deletions node/pkg/libp2p/helper/app.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
package helper

import (
"context"
"fmt"
"time"

"bisonai.com/orakl/node/pkg/bus"
errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/libp2p/setup"
"github.com/libp2p/go-libp2p/core/event"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/rs/zerolog/log"
)

type App struct {
Host host.Host
Bus *bus.MessageBus
}

func New(bus *bus.MessageBus, h host.Host) *App {
return &App{
Bus: bus,
Host: h,
}
}

func (a *App) Run(ctx context.Context) error {
defer a.subscribe(ctx)

sub, err := a.Host.EventBus().Subscribe(new(event.EvtPeerConnectednessChanged))
if err != nil {
return fmt.Errorf("event subscription failed: %w", err)
}

a.subscribeLibp2pEvent(ctx, sub)
return nil
}

func (a *App) subscribe(ctx context.Context) {
log.Debug().Str("Player", "Libp2pHelper").Msg("subscribing to libp2pHelper topics")
channel := a.Bus.Subscribe(bus.LIBP2P)
go func() {
log.Debug().Str("Player", "Libp2pHelper").Msg("starting libp2p subscription goroutine")
for {
select {
case msg := <-channel:
log.Debug().
Str("Player", "Libp2pHelper").
Str("from", msg.From).
Str("to", msg.To).
Str("command", msg.Content.Command).
Msg("libp2p received bus message")
go a.handleMessage(ctx, msg)
case <-ctx.Done():
log.Debug().Str("Player", "Libp2pHelper").Msg("stopping libp2pHelper subscription goroutine")
return
}
}
}()
}

func (a *App) handleMessage(ctx context.Context, msg bus.Message) {
if msg.To != bus.LIBP2P {
log.Debug().Str("Player", "Libp2pHelper").Msg("message not for libp2pHelper")
return
}
if msg.From != bus.ADMIN {
bus.HandleMessageError(errorSentinel.ErrBusNonAdmin, msg, "libp2pHelper received message from non-admin")
}

switch msg.Content.Command {
case bus.GET_PEER_COUNT:
log.Debug().Str("Player", "Libp2pHelper").Msg("get peer count msg received")
peerCount := len(a.Host.Network().Peers())
msg.Response <- bus.MessageResponse{Success: true, Args: map[string]any{"Count": peerCount}}
case bus.LIBP2P_SYNC:
log.Debug().Str("Player", "Libp2pHelper").Msg("libp2p sync msg received")
err := setup.ConnectThroughBootApi(ctx, a.Host)
if err != nil {
bus.HandleMessageError(err, msg, "failed to sync through boot api")
return
}
msg.Response <- bus.MessageResponse{Success: true}
default:
bus.HandleMessageError(errorSentinel.ErrBusUnknownCommand, msg, "libp2p helper received unknown command")
return
}
}

func (a *App) subscribeLibp2pEvent(ctx context.Context, sub event.Subscription) {
go func() {
for {
select {
case <-ctx.Done():
sub.Close()
return
case e := <-sub.Out():
a.handleDisconnectEvent(ctx, e)
}
}
}()
}

func (a *App) handleDisconnectEvent(ctx context.Context, e interface{}) {
log.Info().Str("Player", "Libp2pHelper").Msg("Disconnect event catched, triggering resync")
evt := e.(event.EvtPeerConnectednessChanged)
if evt.Connectedness == network.NotConnected {
for i := 1; i < 4; i++ {
// do not attempt immediate resync, but give some time
time.Sleep(time.Duration(i) * time.Minute)
err := setup.ConnectThroughBootApi(ctx, a.Host)
if err != nil {
log.Error().Err(err).Str("Player", "Libp2pHelper").Msg("Error occurred on boot API sync")
}
}
}
}
11 changes: 11 additions & 0 deletions node/pkg/libp2p/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,17 @@ func ConnectThroughBootApi(ctx context.Context, h host.Host) error {
continue
}

alreadyConnected := false
for _, p := range h.Network().Peers() {
if info.ID == p {
alreadyConnected = true
break
}
}
if alreadyConnected {
continue
}

err = retrier.Retry(func() error {
return h.Connect(ctx, *info)
}, 5, 1*time.Second, 5*time.Second)
Expand Down
Loading

0 comments on commit 7403efd

Please sign in to comment.