Skip to content

Commit

Permalink
(OraklNode) Boot api (#1268)
Browse files Browse the repository at this point in the history
* feat: boot api

* fix: fix mock addr, update live check

* fix: update based on feedback

* fix: update table column name (lib_id -> host_id)
  • Loading branch information
nick-bisonai authored Mar 18, 2024
1 parent 52f1d07 commit 6b22d9e
Show file tree
Hide file tree
Showing 12 changed files with 640 additions and 1 deletion.
5 changes: 4 additions & 1 deletion node/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,7 @@ DELEGATOR_URL=
#required to run klaytn_helper test
#TEST_FEE_PAYER_PK=

PRIVATE_NETWORK_SECRET=
PRIVATE_NETWORK_SECRET=

BOOT_API_PORT=
BOOT_LISTEN_PORT=
1 change: 1 addition & 0 deletions node/migrations/000006_peers.down.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE IF EXISTS peers;
6 changes: 6 additions & 0 deletions node/migrations/000006_peers.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
CREATE TABLE IF NOT EXISTS peers (
id SERIAL PRIMARY KEY,
ip TEXT NOT NULL UNIQUE,
port INTEGER NOT NULL,
host_id TEXT NOT NULL UNIQUE
);
120 changes: 120 additions & 0 deletions node/pkg/boot/boot.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package boot

import (
"context"
"fmt"
"os"
"strconv"
"time"

"bisonai.com/orakl/node/pkg/boot/peer"
"bisonai.com/orakl/node/pkg/boot/utils"
"bisonai.com/orakl/node/pkg/db"
"bisonai.com/orakl/node/pkg/libp2p"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)

const REFRESH_INTERVAL = 60 * time.Second

func Run(ctx context.Context) error {

log.Debug().Msg("Starting boot server")
app, err := utils.Setup(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to setup boot server")
return err
}

v1 := app.Group("/api/v1")
v1.Get("/", func(c *fiber.Ctx) error {
return c.SendString("Orakl Node Boot API")
})

peer.Routes(v1)

port := os.Getenv("BOOT_API_PORT")
if port == "" {
port = "8089"
}

err = app.Listen(fmt.Sprintf(":%s", port))
if err != nil {
log.Fatal().Err(err).Msg("Failed to start boot server")
return err
}

refreshTimer := time.NewTimer(REFRESH_INTERVAL)
go func() {
for {
select {
case <-refreshTimer.C:
err := RefreshJob(ctx)
if err != nil {
log.Error().Err(err).Msg("Failed to refresh peers")
}
case <-ctx.Done():
log.Debug().Msg("context cancelled")
refreshTimer.Stop()
return
}
}
}()

return nil

}

func RefreshJob(ctx context.Context) error {
bootPortStr := os.Getenv("BOOT_LISTEN_PORT")
if bootPortStr == "" {
log.Info().Msg("BOOT_PORT not set, defaulting to 10010")
bootPortStr = "10010"
}

bootPort, err := strconv.Atoi(bootPortStr)
if err != nil {
log.Error().Err(err).Msg("Failed to convert BOOT_PORT to int")
bootPort = 10010
}

h, err := libp2p.MakeHost(bootPort)
if err != nil {
log.Error().Err(err).Msg("Failed to make host")
return err
}

peers, err := db.QueryRows[peer.PeerModel](ctx, peer.GetPeer, nil)
if err != nil {
log.Error().Err(err).Msg("Failed to get peers")
return err
}

for _, p := range peers {
connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", p.Ip, p.Port, p.HostId)
isAlive, liveCheckErr := libp2p.IsHostAlive(ctx, h, connectionUrl)
if liveCheckErr != nil {
log.Error().Err(liveCheckErr).Msg("Failed to check peer")
if liveCheckErr.Error() != "failed to connect to peer" {
continue
}
}
if isAlive {
continue
}

log.Info().Str("peer", connectionUrl).Msg("Peer is not alive")
err = db.QueryWithoutResult(ctx, peer.DeletePeerById, map[string]any{"id": p.Id})
if err != nil {
log.Error().Err(err).Msg("Failed to delete peer")
}
}

err = h.Close()
if err != nil {
log.Error().Err(err).Msg("Failed to close host")
return err
}

return nil
}
86 changes: 86 additions & 0 deletions node/pkg/boot/peer/controller.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package peer

import (
"bisonai.com/orakl/node/pkg/db"
"github.com/go-playground/validator"
"github.com/gofiber/fiber/v2"
"github.com/rs/zerolog/log"
)

type PeerModel struct {
Id int64 `db:"id" json:"id"`
Ip string `db:"ip" json:"ip"`
Port int `db:"port" json:"port"`
HostId string `db:"host_id" json:"host_id"`
}

type PeerInsertModel struct {
Ip string `db:"ip" json:"ip" validate:"required"`
Port int `db:"port" json:"port" validate:"required"`
HostId string `db:"host_id" json:"host_id" validate:"required"`
}

func insert(c *fiber.Ctx) error {
payload := new(PeerInsertModel)
if err := c.BodyParser(payload); err != nil {
log.Error().Err(err).Msg("Failed to parse request")
return c.Status(fiber.StatusBadRequest).SendString("Failed to parse request")
}

validate := validator.New()
if err := validate.Struct(payload); err != nil {
log.Error().Err(err).Msg("Failed to validate request")
return c.Status(fiber.StatusBadRequest).SendString("Failed to validate request")
}

result, err := db.QueryRow[PeerModel](c.Context(), UpsertPeer, map[string]any{
"ip": payload.Ip,
"port": payload.Port,
"host_id": payload.HostId})
if err != nil {
log.Error().Err(err).Msg("Failed to execute insert query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query")
}

return c.JSON(result)
}

func get(c *fiber.Ctx) error {
result, err := db.QueryRows[PeerModel](c.Context(), GetPeer, nil)
if err != nil {
log.Error().Err(err).Msg("Failed to execute get query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute get query")
}
return c.JSON(result)
}

func sync(c *fiber.Ctx) error {
payload := new(PeerInsertModel)
if err := c.BodyParser(payload); err != nil {
log.Error().Err(err).Msg("Failed to parse request")
return c.Status(fiber.StatusBadRequest).SendString("Failed to parse request")
}

validate := validator.New()
if err := validate.Struct(payload); err != nil {
log.Error().Err(err).Msg("Failed to validate request")
return c.Status(fiber.StatusBadRequest).SendString("Failed to validate request")
}

peers, err := db.QueryRows[PeerModel](c.Context(), GetPeer, nil)
if err != nil {
log.Error().Err(err).Msg("Failed to execute get query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute get query")
}

_, err = db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{
"ip": payload.Ip,
"port": payload.Port,
"host_id": payload.HostId})
if err != nil {
log.Error().Err(err).Msg("Failed to execute insert query")
return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query")
}

return c.JSON(peers)
}
14 changes: 14 additions & 0 deletions node/pkg/boot/peer/queries.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package peer

const (
InsertPeer = `INSERT INTO peers (ip, port, host_id) VALUES (@ip, @port, @host_id) RETURNING *;`

UpsertPeer = `
INSERT INTO peers (ip, port, host_id) VALUES (@ip, @port, @host_id)
ON CONFLICT (ip) DO UPDATE SET port = @port, host_id = @host_id RETURNING *;
`

GetPeer = `SELECT * FROM peers;`

DeletePeerById = `DELETE FROM peers WHERE id = @id RETURNING *;`
)
13 changes: 13 additions & 0 deletions node/pkg/boot/peer/route.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package peer

import (
"github.com/gofiber/fiber/v2"
)

func Routes(router fiber.Router) {
peer := router.Group("/peer")

peer.Post("/sync", sync)
peer.Post("", insert)
peer.Get("", get)
}
75 changes: 75 additions & 0 deletions node/pkg/boot/tests/main_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package tests

import (
"context"
"fmt"
"os"
"testing"

"bisonai.com/orakl/node/pkg/boot/peer"
"bisonai.com/orakl/node/pkg/boot/utils"
"bisonai.com/orakl/node/pkg/db"
"github.com/gofiber/fiber/v2"
)

type TestItems struct {
app *fiber.App
tmpData *TmpData
}

type TmpData struct {
peer peer.PeerModel
}

func setup(ctx context.Context) (func() error, *TestItems, error) {
var testItems = new(TestItems)

app, err := utils.Setup(ctx)
if err != nil {
return nil, nil, err
}

testItems.app = app

tmpData, err := insertSampleData(ctx)
if err != nil {
return nil, nil, err
}

testItems.tmpData = tmpData

v1 := app.Group("/api/v1")
peer.Routes(v1)

return bootCleanup(testItems), testItems, nil
}

func insertSampleData(ctx context.Context) (*TmpData, error) {
var tmpData = new(TmpData)

tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"ip": "127.0.0.1", "port": 10000, "host_id": "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLabc"})
if err != nil {
return nil, err
}
tmpData.peer = tmpPeer
fmt.Println(tmpPeer)
return tmpData, nil
}

func bootCleanup(testItems *TestItems) func() error {

return func() error {
err := testItems.app.Shutdown()
if err != nil {
return err
}

return db.QueryWithoutResult(context.Background(), peer.DeletePeerById, map[string]any{"id": testItems.tmpData.peer.Id})
}
}

func TestMain(m *testing.M) {
code := m.Run()
db.ClosePool()
os.Exit(code)
}
Loading

0 comments on commit 6b22d9e

Please sign in to comment.