Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(OraklNode) Major update to use udp instead of tcp for communication among peers #1571

Merged
merged 6 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,17 @@ func main() {
return
}

host, ps, err := libp2pSetup.SetupFromBootApi(ctx, listenPort)
host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(listenPort))
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
log.Error().Err(err).Msg("Failed to make host")
}

ps, err := libp2pSetup.MakePubsub(ctx, host)
if err != nil {
log.Error().Err(err).Msg("Failed to make pubsub")
}

err = libp2pSetup.ConnectThroughBootApi(ctx, host)
if err != nil {
log.Error().Err(err).Msg("Failed to setup libp2p")
select {}
Expand Down
2 changes: 1 addition & 1 deletion node/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-playground/validator v9.31.0+incompatible
github.com/go-playground/validator/v10 v10.18.0
github.com/gofiber/fiber/v2 v2.52.0
github.com/google/uuid v1.5.0
github.com/hashicorp/vault/api v1.12.0
github.com/hashicorp/vault/api/auth/kubernetes v0.6.0
github.com/jackc/pgx/v5 v5.5.3
Expand Down Expand Up @@ -77,7 +78,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions node/migrations/boot/000002_update_column.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DELETE FROM peers;
ALTER TABLE peers DROP COLUMN IF EXISTS url;
ALTER TABLE peers ADD COLUMN IF NOT EXISTS ip TEXT;
ALTER TABLE peers ADD COLUMN IF EXISTS port INT4;
ALTER TABLE peers ADD COLUMN IF EXISTS host_id TEXT;
3 changes: 3 additions & 0 deletions node/migrations/boot/000002_update_column.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DELETE FROM peers;
ALTER TABLE peers ADD COLUMN IF NOT EXISTS url TEXT;
ALTER TABLE peers DROP COLUMN IF EXISTS ip, DROP COLUMN IF EXISTS port, DROP COLUMN IF EXISTS host_id;
2 changes: 1 addition & 1 deletion node/pkg/aggregator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
}
testItems.admin = admin

h, err := libp2pSetup.MakeHost(10001)
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001))
if err != nil {
return nil, nil, err
}
Expand Down
11 changes: 5 additions & 6 deletions node/pkg/boot/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ import (
"github.com/rs/zerolog/log"
)

const REFRESH_INTERVAL = 60 * time.Second
const REFRESH_INTERVAL = 10 * time.Second

func Run(ctx context.Context) error {

log.Debug().Msg("Starting boot server")
app, err := utils.Setup(ctx)
if err != nil {
Expand Down Expand Up @@ -84,15 +83,15 @@ func RefreshJob(ctx context.Context) error {
return nil
}

h, err := libp2pSetup.MakeHost(0)
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(0))
if err != nil {
log.Error().Err(err).Msg("Failed to make host")
return err
}

for _, p := range peers {
connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", p.Ip, p.Port, p.HostId)
isAlive, liveCheckErr := libp2pUtils.IsHostAlive(ctx, h, connectionUrl)

isAlive, liveCheckErr := libp2pUtils.IsHostAlive(ctx, h, p.Url)
if liveCheckErr != nil {
log.Error().Err(liveCheckErr).Msg("Failed to check peer")
if !errors.Is(liveCheckErr, errorSentinel.ErrLibP2pFailToConnectPeer) {
Expand All @@ -103,7 +102,7 @@ func RefreshJob(ctx context.Context) error {
continue
}

log.Info().Str("peer", connectionUrl).Msg("Peer is not alive")
log.Info().Str("peer", p.Url).Msg("Peer is not alive")
err = db.QueryWithoutResult(ctx, peer.DeletePeerById, map[string]any{"id": p.ID})
if err != nil {
log.Error().Err(err).Msg("Failed to delete peer")
Expand Down
44 changes: 14 additions & 30 deletions node/pkg/boot/peer/controller.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package peer

import (
"fmt"

"bisonai.com/orakl/node/pkg/db"
libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup"
libp2pUtils "bisonai.com/orakl/node/pkg/libp2p/utils"
Expand All @@ -12,16 +10,12 @@ import (
)

type PeerModel struct {
ID int64 `db:"id" json:"id"`
Ip string `db:"ip" json:"ip"`
Port int `db:"port" json:"port"`
HostId string `db:"host_id" json:"host_id"`
ID int64 `db:"id" json:"id"`
Url string `db:"url" json:"url"`
}

type PeerInsertModel struct {
Ip string `db:"ip" json:"ip" validate:"required"`
Port int `db:"port" json:"port" validate:"required"`
HostId string `db:"host_id" json:"host_id" validate:"required"`
Url string `db:"url" json:"url" validate:"required"`
}

func insert(c *fiber.Ctx) error {
Expand All @@ -38,9 +32,7 @@ func insert(c *fiber.Ctx) error {
}

result, err := db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{
"ip": payload.Ip,
"port": payload.Port,
"host_id": payload.HostId})
"url": payload.Url})
if err != nil {
log.Error().Err(err).Msg("Failed to execute insert query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query")
Expand Down Expand Up @@ -71,25 +63,25 @@ func sync(c *fiber.Ctx) error {
return c.Status(fiber.StatusBadRequest).SendString("Failed to validate request")
}

h, err := libp2pSetup.MakeHost(0)
h, err := libp2pSetup.NewHost(c.Context(), libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(0))
if err != nil {
log.Error().Err(err).Msg("Failed to make host")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to make host")
}
defer func() {
closeErr := h.Close()
if closeErr != nil {
log.Error().Err(closeErr).Msg("Failed to close host")
}
}()

connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", payload.Ip, payload.Port, payload.HostId)
isAlive, err := libp2pUtils.IsHostAlive(c.Context(), h, connectionUrl)
isAlive, err := libp2pUtils.IsHostAlive(c.Context(), h, payload.Url)
if err != nil {
log.Error().Err(err).Msg("Failed to check peer")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to check peer")
}
if !isAlive {
log.Info().Str("peer", connectionUrl).Msg("invalid peer")
err = h.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close host")
}
log.Info().Str("peer", connectionUrl).Msg("invalid peer")
log.Info().Str("peer", payload.Url).Msg("invalid peer")
return c.Status(fiber.StatusBadRequest).SendString("invalid peer")
}

Expand All @@ -100,19 +92,11 @@ func sync(c *fiber.Ctx) error {
}

_, err = db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{
"ip": payload.Ip,
"port": payload.Port,
"host_id": payload.HostId})
"url": payload.Url})
if err != nil {
log.Error().Err(err).Msg("Failed to execute insert query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query")
}

err = h.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close host")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to close host")
}

return c.JSON(peers)
}
2 changes: 1 addition & 1 deletion node/pkg/boot/peer/queries.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package peer

const (
InsertPeer = `INSERT INTO peers (ip, port, host_id) VALUES (@ip, @port, @host_id) RETURNING *;`
InsertPeer = `INSERT INTO peers (url) VALUES (@url) RETURNING *;`

GetPeer = `SELECT * FROM peers;`

Expand Down
2 changes: 1 addition & 1 deletion node/pkg/boot/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
func insertSampleData(ctx context.Context) (*TmpData, error) {
var tmpData = new(TmpData)

tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"ip": "127.0.0.1", "port": 10000, "host_id": "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLabc"})
tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"url": "/ip4/100.78.175.63/tcp/10002/p2p/12D3KooWERrdEepSi8HPRNsfjj3Nd7XcxV9SJcHdovpPLyYUtuch"})
if err != nil {
return nil, err
}
Expand Down
59 changes: 13 additions & 46 deletions node/pkg/boot/tests/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package tests

import (
"context"
"strconv"
"strings"
"testing"

adminTests "bisonai.com/orakl/node/pkg/admin/tests"
Expand All @@ -14,8 +12,6 @@ import (
libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup"
libp2pUtils "bisonai.com/orakl/node/pkg/libp2p/utils"

_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
)

Expand All @@ -28,9 +24,7 @@ func TestPeerInsert(t *testing.T) {
defer cleanup()

mockPeer1 := peer.PeerInsertModel{
Ip: "127.0.0.2",
Port: 10002,
HostId: "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLdef",
Url: "/ip4/100.78.175.63/udp/10002/quic-v1/p2p/12D3KooWLT1Pp1EN1G4waBShMWgr67acueYfnrWMtkpsUbAt59Lj",
}

readResultBefore, err := adminTests.GetRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer", nil)
Expand All @@ -42,7 +36,7 @@ func TestPeerInsert(t *testing.T) {
if err != nil {
t.Fatalf("error inserting peer: %v", err)
}
assert.Equal(t, insertResult.Ip, mockPeer1.Ip)
assert.Equal(t, insertResult.Url, mockPeer1.Url)

readResultAfter, err := adminTests.GetRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer", nil)
if err != nil {
Expand Down Expand Up @@ -81,35 +75,31 @@ func TestSync(t *testing.T) {
}
defer cleanup()

mockHost1, err := libp2pSetup.MakeHost(0)
mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic())
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
t.Fatalf("error making host: %v", err)
}

mockHost2, err := libp2pSetup.MakeHost(0)
mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic())
if err != nil {
t.Fatalf("error making host: %v", err)
}

ip1, port1, hostId1, err := libp2pUtils.ExtractPayloadFromHost(mockHost1)
url1, err := libp2pUtils.ExtractConnectionUrl(mockHost1)
if err != nil {
t.Fatalf("error extracting payload from host: %v", err)
}
ip2, port2, hostId2, err := libp2pUtils.ExtractPayloadFromHost(mockHost2)
url2, err := libp2pUtils.ExtractConnectionUrl(mockHost2)
if err != nil {
t.Fatalf("error extracting payload from host: %v", err)
}

mockPeer1 := peer.PeerInsertModel{
Ip: ip1,
Port: port1,
HostId: hostId1,
Url: url1,
}

mockPeer2 := peer.PeerInsertModel{
Ip: ip2,
Port: port2,
HostId: hostId2,
Url: url2,
}

syncResult, err := adminTests.PostRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer/sync", mockPeer1)
Expand Down Expand Up @@ -141,41 +131,18 @@ func TestRefresh(t *testing.T) {
}
defer cleanup()

h, err := libp2pSetup.MakeHost(10011)
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic(), libp2pSetup.WithPort(10010))
if err != nil {
t.Fatalf("error making host: %v", err)
}

pi := _peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}

var addr multiaddr.Multiaddr
for _, a := range pi.Addrs {
if strings.Contains(a.String(), "127.0.0.1") {
continue
}
addr = a
break
}

splitted := strings.Split(addr.String(), "/")
if len(splitted) < 5 {
t.Fatalf("error splitting address: %v", splitted)
}
ip := splitted[2]
port := splitted[4]

portInt, err := strconv.Atoi(port)
url, err := libp2pUtils.ExtractConnectionUrl(h)
if err != nil {
t.Fatalf("error converting port to int: %v", err)
t.Fatalf("error extracting payload from host: %v", err)
}

res, err := adminTests.PostRequest[peer.PeerModel](testItems.app, "/api/v1/peer", peer.PeerInsertModel{
Ip: ip,
Port: portInt,
HostId: h.ID().String(),
Url: url,
})
if err != nil {
t.Fatalf("error inserting peer: %v", err)
Expand All @@ -186,7 +153,7 @@ func TestRefresh(t *testing.T) {
t.Fatalf("error getting peers before: %v", err)
}

assert.Equal(t, res.Ip, ip, "expected to have the same ip")
assert.Equal(t, res.Url, url, "expected to have the same url")

err = boot.RefreshJob(ctx)
if err != nil {
Expand Down
Loading
Loading