Skip to content

Commit

Permalink
Merge branch 'master' of github.com:Bisonai/orakl into i-1574/orakl-s…
Browse files Browse the repository at this point in the history
…entinel-blst-go
  • Loading branch information
jo-bisonai committed Jun 10, 2024
2 parents ac29ed2 + 638f8a6 commit 2286ca9
Show file tree
Hide file tree
Showing 28 changed files with 408 additions and 298 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ jobs:
run: |
if [[ "${{ steps.tag.outputs.service }}" == "cli" || "${{ steps.tag.outputs.service }}" == "fetcher" || "${{ steps.tag.outputs.service }}" == "core" ]]; then
echo "version=$(node -p -e "require('./"${{ steps.tag.outputs.service }}"/package.json').version")" >> $GITHUB_OUTPUT
else
if [[ "${{ steps.tag.outputs.service }}" == "boot-api" ]]; then
else
if [[ "${{ steps.tag.outputs.service }}" == "boot-api" || "${{ steps.tag.outputs.service }}" == "por" ]]; then
version=$(cat ./node/.version)
else
version=$(cat ./${{ steps.tag.outputs.service }}/.version)
Expand Down
3 changes: 1 addition & 2 deletions node/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ APP_PORT=
DATABASE_URL=
REDIS_HOST=
REDIS_PORT=
LISTEN_PORT=

KLAYTN_PROVIDER_URL=
SUBMISSION_PROXY_CONTRACT=
DELEGATOR_URL=
Expand All @@ -15,7 +15,6 @@ FEED_DATA_STREAM_INTERVAL=
# (optional) designate external ip to be used if required
HOST_IP=


# `baobab` or `cypress`, defaults to baobab
CHAIN=

Expand Down
13 changes: 8 additions & 5 deletions node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package main
import (
"context"
"os"
"strconv"
"strings"
"sync"

Expand All @@ -28,13 +27,17 @@ func main() {
mb := bus.New(10)
var wg sync.WaitGroup

listenPort, err := strconv.Atoi(os.Getenv("LISTEN_PORT"))
host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
log.Error().Err(err).Msg("Error parsing LISTEN_PORT")
return
log.Error().Err(err).Msg("Failed to make host")
}

ps, err := libp2pSetup.MakePubsub(ctx, host)
if err != nil {
log.Error().Err(err).Msg("Failed to make pubsub")
}

host, ps, err := libp2pSetup.SetupFromBootApi(ctx, listenPort)
err = libp2pSetup.ConnectThroughBootApi(ctx, host)
if err != nil {
log.Error().Err(err).Msg("Failed to setup libp2p")
select {}
Expand Down
2 changes: 1 addition & 1 deletion node/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ require (
github.com/go-playground/validator v9.31.0+incompatible
github.com/go-playground/validator/v10 v10.18.0
github.com/gofiber/fiber/v2 v2.52.0
github.com/google/uuid v1.5.0
github.com/hashicorp/vault/api v1.12.0
github.com/hashicorp/vault/api/auth/kubernetes v0.6.0
github.com/jackc/pgx/v5 v5.5.3
Expand Down Expand Up @@ -77,7 +78,6 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect
github.com/google/uuid v1.5.0 // indirect
github.com/gorilla/websocket v1.5.0 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
Expand Down
5 changes: 5 additions & 0 deletions node/migrations/boot/000002_update_column.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
DELETE FROM peers;
ALTER TABLE peers DROP COLUMN IF EXISTS url;
ALTER TABLE peers ADD COLUMN IF NOT EXISTS ip TEXT;
ALTER TABLE peers ADD COLUMN IF EXISTS port INT4;
ALTER TABLE peers ADD COLUMN IF EXISTS host_id TEXT;
3 changes: 3 additions & 0 deletions node/migrations/boot/000002_update_column.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
DELETE FROM peers;
ALTER TABLE peers ADD COLUMN IF NOT EXISTS url TEXT;
ALTER TABLE peers DROP COLUMN IF EXISTS ip, DROP COLUMN IF EXISTS port, DROP COLUMN IF EXISTS host_id;
5 changes: 3 additions & 2 deletions node/pkg/aggregator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
}
testItems.admin = admin

h, err := libp2pSetup.MakeHost(10001)
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -159,7 +159,8 @@ func aggregatorCleanup(ctx context.Context, admin *fiber.App, app *App) func() e
if err != nil {
return err
}
return nil

return app.Host.Close()
}

}
Expand Down
11 changes: 5 additions & 6 deletions node/pkg/boot/boot.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ import (
"github.com/rs/zerolog/log"
)

const REFRESH_INTERVAL = 60 * time.Second
const REFRESH_INTERVAL = 10 * time.Second

func Run(ctx context.Context) error {

log.Debug().Msg("Starting boot server")
app, err := utils.Setup(ctx)
if err != nil {
Expand Down Expand Up @@ -84,15 +83,15 @@ func RefreshJob(ctx context.Context) error {
return nil
}

h, err := libp2pSetup.MakeHost(0)
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(0))
if err != nil {
log.Error().Err(err).Msg("Failed to make host")
return err
}

for _, p := range peers {
connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", p.Ip, p.Port, p.HostId)
isAlive, liveCheckErr := libp2pUtils.IsHostAlive(ctx, h, connectionUrl)

isAlive, liveCheckErr := libp2pUtils.IsHostAlive(ctx, h, p.Url)
if liveCheckErr != nil {
log.Error().Err(liveCheckErr).Msg("Failed to check peer")
if !errors.Is(liveCheckErr, errorSentinel.ErrLibP2pFailToConnectPeer) {
Expand All @@ -103,7 +102,7 @@ func RefreshJob(ctx context.Context) error {
continue
}

log.Info().Str("peer", connectionUrl).Msg("Peer is not alive")
log.Info().Str("peer", p.Url).Msg("Peer is not alive")
err = db.QueryWithoutResult(ctx, peer.DeletePeerById, map[string]any{"id": p.ID})
if err != nil {
log.Error().Err(err).Msg("Failed to delete peer")
Expand Down
46 changes: 15 additions & 31 deletions node/pkg/boot/peer/controller.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package peer

import (
"fmt"

"bisonai.com/orakl/node/pkg/db"
libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup"
libp2pUtils "bisonai.com/orakl/node/pkg/libp2p/utils"
Expand All @@ -12,16 +10,12 @@ import (
)

type PeerModel struct {
ID int64 `db:"id" json:"id"`
Ip string `db:"ip" json:"ip"`
Port int `db:"port" json:"port"`
HostId string `db:"host_id" json:"host_id"`
ID int64 `db:"id" json:"id"`
Url string `db:"url" json:"url"`
}

type PeerInsertModel struct {
Ip string `db:"ip" json:"ip" validate:"required"`
Port int `db:"port" json:"port" validate:"required"`
HostId string `db:"host_id" json:"host_id" validate:"required"`
Url string `db:"url" json:"url" validate:"required"`
}

func insert(c *fiber.Ctx) error {
Expand All @@ -38,9 +32,7 @@ func insert(c *fiber.Ctx) error {
}

result, err := db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{
"ip": payload.Ip,
"port": payload.Port,
"host_id": payload.HostId})
"url": payload.Url})
if err != nil {
log.Error().Err(err).Msg("Failed to execute insert query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query")
Expand All @@ -67,29 +59,29 @@ func sync(c *fiber.Ctx) error {

validate := validator.New()
if err := validate.Struct(payload); err != nil {
log.Error().Err(err).Msg("Failed to validate request")
log.Error().Any("payload", payload).Err(err).Msg("Failed to validate request")
return c.Status(fiber.StatusBadRequest).SendString("Failed to validate request")
}

h, err := libp2pSetup.MakeHost(0)
h, err := libp2pSetup.NewHost(c.Context(), libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(0))
if err != nil {
log.Error().Err(err).Msg("Failed to make host")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to make host")
}
defer func() {
closeErr := h.Close()
if closeErr != nil {
log.Error().Err(closeErr).Msg("Failed to close host")
}
}()

connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", payload.Ip, payload.Port, payload.HostId)
isAlive, err := libp2pUtils.IsHostAlive(c.Context(), h, connectionUrl)
isAlive, err := libp2pUtils.IsHostAlive(c.Context(), h, payload.Url)
if err != nil {
log.Error().Err(err).Msg("Failed to check peer")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to check peer")
}
if !isAlive {
log.Info().Str("peer", connectionUrl).Msg("invalid peer")
err = h.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close host")
}
log.Info().Str("peer", connectionUrl).Msg("invalid peer")
log.Info().Str("peer", payload.Url).Msg("invalid peer")
return c.Status(fiber.StatusBadRequest).SendString("invalid peer")
}

Expand All @@ -100,19 +92,11 @@ func sync(c *fiber.Ctx) error {
}

_, err = db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{
"ip": payload.Ip,
"port": payload.Port,
"host_id": payload.HostId})
"url": payload.Url})
if err != nil {
log.Error().Err(err).Msg("Failed to execute insert query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query")
}

err = h.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close host")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to close host")
}

return c.JSON(peers)
}
2 changes: 1 addition & 1 deletion node/pkg/boot/peer/queries.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package peer

const (
InsertPeer = `INSERT INTO peers (ip, port, host_id) VALUES (@ip, @port, @host_id) RETURNING *;`
InsertPeer = `INSERT INTO peers (url) VALUES (@url) RETURNING *;`

GetPeer = `SELECT * FROM peers;`

Expand Down
2 changes: 1 addition & 1 deletion node/pkg/boot/tests/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
func insertSampleData(ctx context.Context) (*TmpData, error) {
var tmpData = new(TmpData)

tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"ip": "127.0.0.1", "port": 10000, "host_id": "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLabc"})
tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"url": "/ip4/100.78.175.63/tcp/10002/p2p/12D3KooWERrdEepSi8HPRNsfjj3Nd7XcxV9SJcHdovpPLyYUtuch"})
if err != nil {
return nil, err
}
Expand Down
59 changes: 13 additions & 46 deletions node/pkg/boot/tests/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package tests

import (
"context"
"strconv"
"strings"
"testing"

adminTests "bisonai.com/orakl/node/pkg/admin/tests"
Expand All @@ -14,8 +12,6 @@ import (
libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup"
libp2pUtils "bisonai.com/orakl/node/pkg/libp2p/utils"

_peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/stretchr/testify/assert"
)

Expand All @@ -28,9 +24,7 @@ func TestPeerInsert(t *testing.T) {
defer cleanup()

mockPeer1 := peer.PeerInsertModel{
Ip: "127.0.0.2",
Port: 10002,
HostId: "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLdef",
Url: "/ip4/100.78.175.63/udp/10002/quic-v1/p2p/12D3KooWLT1Pp1EN1G4waBShMWgr67acueYfnrWMtkpsUbAt59Lj",
}

readResultBefore, err := adminTests.GetRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer", nil)
Expand All @@ -42,7 +36,7 @@ func TestPeerInsert(t *testing.T) {
if err != nil {
t.Fatalf("error inserting peer: %v", err)
}
assert.Equal(t, insertResult.Ip, mockPeer1.Ip)
assert.Equal(t, insertResult.Url, mockPeer1.Url)

readResultAfter, err := adminTests.GetRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer", nil)
if err != nil {
Expand Down Expand Up @@ -81,35 +75,31 @@ func TestSync(t *testing.T) {
}
defer cleanup()

mockHost1, err := libp2pSetup.MakeHost(0)
mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
t.Fatalf("error making host: %v", err)
}

mockHost2, err := libp2pSetup.MakeHost(0)
mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
t.Fatalf("error making host: %v", err)
}

ip1, port1, hostId1, err := libp2pUtils.ExtractPayloadFromHost(mockHost1)
url1, err := libp2pUtils.ExtractConnectionUrl(mockHost1)
if err != nil {
t.Fatalf("error extracting payload from host: %v", err)
}
ip2, port2, hostId2, err := libp2pUtils.ExtractPayloadFromHost(mockHost2)
url2, err := libp2pUtils.ExtractConnectionUrl(mockHost2)
if err != nil {
t.Fatalf("error extracting payload from host: %v", err)
}

mockPeer1 := peer.PeerInsertModel{
Ip: ip1,
Port: port1,
HostId: hostId1,
Url: url1,
}

mockPeer2 := peer.PeerInsertModel{
Ip: ip2,
Port: port2,
HostId: hostId2,
Url: url2,
}

syncResult, err := adminTests.PostRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer/sync", mockPeer1)
Expand Down Expand Up @@ -141,41 +131,18 @@ func TestRefresh(t *testing.T) {
}
defer cleanup()

h, err := libp2pSetup.MakeHost(10011)
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10010))
if err != nil {
t.Fatalf("error making host: %v", err)
}

pi := _peer.AddrInfo{
ID: h.ID(),
Addrs: h.Addrs(),
}

var addr multiaddr.Multiaddr
for _, a := range pi.Addrs {
if strings.Contains(a.String(), "127.0.0.1") {
continue
}
addr = a
break
}

splitted := strings.Split(addr.String(), "/")
if len(splitted) < 5 {
t.Fatalf("error splitting address: %v", splitted)
}
ip := splitted[2]
port := splitted[4]

portInt, err := strconv.Atoi(port)
url, err := libp2pUtils.ExtractConnectionUrl(h)
if err != nil {
t.Fatalf("error converting port to int: %v", err)
t.Fatalf("error extracting payload from host: %v", err)
}

res, err := adminTests.PostRequest[peer.PeerModel](testItems.app, "/api/v1/peer", peer.PeerInsertModel{
Ip: ip,
Port: portInt,
HostId: h.ID().String(),
Url: url,
})
if err != nil {
t.Fatalf("error inserting peer: %v", err)
Expand All @@ -186,7 +153,7 @@ func TestRefresh(t *testing.T) {
t.Fatalf("error getting peers before: %v", err)
}

assert.Equal(t, res.Ip, ip, "expected to have the same ip")
assert.Equal(t, res.Url, url, "expected to have the same url")

err = boot.RefreshJob(ctx)
if err != nil {
Expand Down
Loading

0 comments on commit 2286ca9

Please sign in to comment.