From 1bb8def214302a6bd2de7c4aacae32f77f3ce459 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 8 Jun 2024 17:16:32 +0900 Subject: [PATCH 1/6] wip --- node/cmd/node/main.go | 12 +- node/go.mod | 2 +- .../boot/000002_update_column.down.sql | 5 + .../boot/000002_update_column.up.sql | 3 + node/pkg/aggregator/main_test.go | 2 +- node/pkg/boot/boot.go | 11 +- node/pkg/boot/peer/controller.go | 41 +++--- node/pkg/boot/peer/queries.go | 2 +- node/pkg/boot/tests/main_test.go | 2 +- node/pkg/boot/tests/peer_test.go | 59 ++------- node/pkg/libp2p/setup/setup.go | 111 +++------------- node/pkg/libp2p/setup/type.go | 119 ++++++++++++++++++ node/pkg/libp2p/tests/libp2p_test.go | 44 +------ node/pkg/libp2p/utils/utils.go | 36 ++---- node/pkg/reporter/main_test.go | 2 +- node/script/test_connection/main.go | 15 ++- .../test_fetcher_and_aggregator/main.go | 17 ++- node/script/test_raft/main.go | 16 ++- 18 files changed, 248 insertions(+), 251 deletions(-) create mode 100644 node/migrations/boot/000002_update_column.down.sql create mode 100644 node/migrations/boot/000002_update_column.up.sql create mode 100644 node/pkg/libp2p/setup/type.go diff --git a/node/cmd/node/main.go b/node/cmd/node/main.go index 186afa209..b90788744 100644 --- a/node/cmd/node/main.go +++ b/node/cmd/node/main.go @@ -34,7 +34,17 @@ func main() { return } - host, ps, err := libp2pSetup.SetupFromBootApi(ctx, listenPort) + host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(listenPort)) + if err != nil { + log.Error().Err(err).Msg("Failed to make host") + } + + ps, err := libp2pSetup.MakePubsub(ctx, host) + if err != nil { + log.Error().Err(err).Msg("Failed to make pubsub") + } + + err = libp2pSetup.ConnectThroughBootApi(ctx, host) if err != nil { log.Error().Err(err).Msg("Failed to setup libp2p") select {} diff --git a/node/go.mod b/node/go.mod index 6acfe989b..e8344871b 100644 --- a/node/go.mod +++ b/node/go.mod @@ -7,6 +7,7 @@ require ( github.com/go-playground/validator v9.31.0+incompatible github.com/go-playground/validator/v10 v10.18.0 github.com/gofiber/fiber/v2 v2.52.0 + github.com/google/uuid v1.5.0 github.com/hashicorp/vault/api v1.12.0 github.com/hashicorp/vault/api/auth/kubernetes v0.6.0 github.com/jackc/pgx/v5 v5.5.3 @@ -77,7 +78,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20231023181126-ff6d637d2a7b // indirect - github.com/google/uuid v1.5.0 // indirect github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/node/migrations/boot/000002_update_column.down.sql b/node/migrations/boot/000002_update_column.down.sql new file mode 100644 index 000000000..efc413616 --- /dev/null +++ b/node/migrations/boot/000002_update_column.down.sql @@ -0,0 +1,5 @@ +DELETE FROM peers; +ALTER TABLE peers DROP COLUMN IF EXISTS url; +ALTER TABLE peers ADD COLUMN IF NOT EXISTS ip TEXT; +ALTER TABLE peers ADD COLUMN IF EXISTS port INT4; +ALTER TABLE peers ADD COLUMN IF EXISTS host_id TEXT; \ No newline at end of file diff --git a/node/migrations/boot/000002_update_column.up.sql b/node/migrations/boot/000002_update_column.up.sql new file mode 100644 index 000000000..e567b3017 --- /dev/null +++ b/node/migrations/boot/000002_update_column.up.sql @@ -0,0 +1,3 @@ +DELETE FROM peers; +ALTER TABLE peers ADD COLUMN IF NOT EXISTS url TEXT; +ALTER TABLE peers DROP COLUMN IF EXISTS ip, DROP COLUMN IF EXISTS port, DROP COLUMN IF EXISTS host_id; \ No newline at end of file diff --git a/node/pkg/aggregator/main_test.go b/node/pkg/aggregator/main_test.go index 57481e800..22aaaf992 100644 --- a/node/pkg/aggregator/main_test.go +++ b/node/pkg/aggregator/main_test.go @@ -57,7 +57,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { } testItems.admin = admin - h, err := libp2pSetup.MakeHost(10001) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001)) if err != nil { return nil, nil, err } diff --git a/node/pkg/boot/boot.go b/node/pkg/boot/boot.go index 3f37577f9..df57cd945 100644 --- a/node/pkg/boot/boot.go +++ b/node/pkg/boot/boot.go @@ -17,10 +17,9 @@ import ( "github.com/rs/zerolog/log" ) -const REFRESH_INTERVAL = 60 * time.Second +const REFRESH_INTERVAL = 10 * time.Second func Run(ctx context.Context) error { - log.Debug().Msg("Starting boot server") app, err := utils.Setup(ctx) if err != nil { @@ -84,15 +83,15 @@ func RefreshJob(ctx context.Context) error { return nil } - h, err := libp2pSetup.MakeHost(0) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) if err != nil { log.Error().Err(err).Msg("Failed to make host") return err } for _, p := range peers { - connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", p.Ip, p.Port, p.HostId) - isAlive, liveCheckErr := libp2pUtils.IsHostAlive(ctx, h, connectionUrl) + + isAlive, liveCheckErr := libp2pUtils.IsHostAlive(ctx, h, p.Url) if liveCheckErr != nil { log.Error().Err(liveCheckErr).Msg("Failed to check peer") if !errors.Is(liveCheckErr, errorSentinel.ErrLibP2pFailToConnectPeer) { @@ -103,7 +102,7 @@ func RefreshJob(ctx context.Context) error { continue } - log.Info().Str("peer", connectionUrl).Msg("Peer is not alive") + log.Info().Str("peer", p.Url).Msg("Peer is not alive") err = db.QueryWithoutResult(ctx, peer.DeletePeerById, map[string]any{"id": p.ID}) if err != nil { log.Error().Err(err).Msg("Failed to delete peer") diff --git a/node/pkg/boot/peer/controller.go b/node/pkg/boot/peer/controller.go index 17c233c7e..d93fbcf26 100644 --- a/node/pkg/boot/peer/controller.go +++ b/node/pkg/boot/peer/controller.go @@ -1,8 +1,6 @@ package peer import ( - "fmt" - "bisonai.com/orakl/node/pkg/db" libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup" libp2pUtils "bisonai.com/orakl/node/pkg/libp2p/utils" @@ -12,16 +10,12 @@ import ( ) type PeerModel struct { - ID int64 `db:"id" json:"id"` - Ip string `db:"ip" json:"ip"` - Port int `db:"port" json:"port"` - HostId string `db:"host_id" json:"host_id"` + ID int64 `db:"id" json:"id"` + Url string `db:"url" json:"url"` } type PeerInsertModel struct { - Ip string `db:"ip" json:"ip" validate:"required"` - Port int `db:"port" json:"port" validate:"required"` - HostId string `db:"host_id" json:"host_id" validate:"required"` + Url string `db:"url" json:"url" validate:"required"` } func insert(c *fiber.Ctx) error { @@ -38,9 +32,7 @@ func insert(c *fiber.Ctx) error { } result, err := db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{ - "ip": payload.Ip, - "port": payload.Port, - "host_id": payload.HostId}) + "url": payload.Url}) if err != nil { log.Error().Err(err).Msg("Failed to execute insert query") return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query") @@ -71,25 +63,30 @@ func sync(c *fiber.Ctx) error { return c.Status(fiber.StatusBadRequest).SendString("Failed to validate request") } - h, err := libp2pSetup.MakeHost(0) + h, err := libp2pSetup.NewHost(c.Context(), libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) if err != nil { log.Error().Err(err).Msg("Failed to make host") return c.Status(fiber.StatusInternalServerError).SendString("Failed to make host") } + defer func() { + err := h.Close() + if err != nil { + log.Error().Err(err).Msg("Failed to close host") + } + }() - connectionUrl := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", payload.Ip, payload.Port, payload.HostId) - isAlive, err := libp2pUtils.IsHostAlive(c.Context(), h, connectionUrl) + isAlive, err := libp2pUtils.IsHostAlive(c.Context(), h, payload.Url) if err != nil { log.Error().Err(err).Msg("Failed to check peer") return c.Status(fiber.StatusInternalServerError).SendString("Failed to check peer") } if !isAlive { - log.Info().Str("peer", connectionUrl).Msg("invalid peer") + log.Info().Str("peer", payload.Url).Msg("invalid peer") err = h.Close() if err != nil { log.Error().Err(err).Msg("Failed to close host") } - log.Info().Str("peer", connectionUrl).Msg("invalid peer") + log.Info().Str("peer", payload.Url).Msg("invalid peer") return c.Status(fiber.StatusBadRequest).SendString("invalid peer") } @@ -100,19 +97,11 @@ func sync(c *fiber.Ctx) error { } _, err = db.QueryRow[PeerModel](c.Context(), InsertPeer, map[string]any{ - "ip": payload.Ip, - "port": payload.Port, - "host_id": payload.HostId}) + "url": payload.Url}) if err != nil { log.Error().Err(err).Msg("Failed to execute insert query") return c.Status(fiber.StatusInternalServerError).SendString("Failed to execute insert query") } - err = h.Close() - if err != nil { - log.Error().Err(err).Msg("Failed to close host") - return c.Status(fiber.StatusInternalServerError).SendString("Failed to close host") - } - return c.JSON(peers) } diff --git a/node/pkg/boot/peer/queries.go b/node/pkg/boot/peer/queries.go index f9b416d5d..320ce09bd 100644 --- a/node/pkg/boot/peer/queries.go +++ b/node/pkg/boot/peer/queries.go @@ -1,7 +1,7 @@ package peer const ( - InsertPeer = `INSERT INTO peers (ip, port, host_id) VALUES (@ip, @port, @host_id) RETURNING *;` + InsertPeer = `INSERT INTO peers (url) VALUES (@url) RETURNING *;` GetPeer = `SELECT * FROM peers;` diff --git a/node/pkg/boot/tests/main_test.go b/node/pkg/boot/tests/main_test.go index ddf461994..93c0bddfe 100644 --- a/node/pkg/boot/tests/main_test.go +++ b/node/pkg/boot/tests/main_test.go @@ -46,7 +46,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { func insertSampleData(ctx context.Context) (*TmpData, error) { var tmpData = new(TmpData) - tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"ip": "127.0.0.1", "port": 10000, "host_id": "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLabc"}) + tmpPeer, err := db.QueryRow[peer.PeerModel](ctx, peer.InsertPeer, map[string]any{"url": "/ip4/100.78.175.63/tcp/10002/p2p/12D3KooWERrdEepSi8HPRNsfjj3Nd7XcxV9SJcHdovpPLyYUtuch"}) if err != nil { return nil, err } diff --git a/node/pkg/boot/tests/peer_test.go b/node/pkg/boot/tests/peer_test.go index 1da86ebee..3ec3a97aa 100644 --- a/node/pkg/boot/tests/peer_test.go +++ b/node/pkg/boot/tests/peer_test.go @@ -3,8 +3,6 @@ package tests import ( "context" - "strconv" - "strings" "testing" adminTests "bisonai.com/orakl/node/pkg/admin/tests" @@ -14,8 +12,6 @@ import ( libp2pSetup "bisonai.com/orakl/node/pkg/libp2p/setup" libp2pUtils "bisonai.com/orakl/node/pkg/libp2p/utils" - _peer "github.com/libp2p/go-libp2p/core/peer" - "github.com/multiformats/go-multiaddr" "github.com/stretchr/testify/assert" ) @@ -28,9 +24,7 @@ func TestPeerInsert(t *testing.T) { defer cleanup() mockPeer1 := peer.PeerInsertModel{ - Ip: "127.0.0.2", - Port: 10002, - HostId: "12DGKooWM8vWWqGPWWNCVPqb4tfqGrzx45W257GDBSeYbDSSLdef", + Url: "/ip4/100.78.175.63/udp/10002/quic-v1/p2p/12D3KooWLT1Pp1EN1G4waBShMWgr67acueYfnrWMtkpsUbAt59Lj", } readResultBefore, err := adminTests.GetRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer", nil) @@ -42,7 +36,7 @@ func TestPeerInsert(t *testing.T) { if err != nil { t.Fatalf("error inserting peer: %v", err) } - assert.Equal(t, insertResult.Ip, mockPeer1.Ip) + assert.Equal(t, insertResult.Url, mockPeer1.Url) readResultAfter, err := adminTests.GetRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer", nil) if err != nil { @@ -81,35 +75,31 @@ func TestSync(t *testing.T) { } defer cleanup() - mockHost1, err := libp2pSetup.MakeHost(0) + mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) if err != nil { t.Fatalf("error making host: %v", err) } - mockHost2, err := libp2pSetup.MakeHost(0) + mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) if err != nil { t.Fatalf("error making host: %v", err) } - ip1, port1, hostId1, err := libp2pUtils.ExtractPayloadFromHost(mockHost1) + url1, err := libp2pUtils.ExtractConnectionUrl(mockHost1) if err != nil { t.Fatalf("error extracting payload from host: %v", err) } - ip2, port2, hostId2, err := libp2pUtils.ExtractPayloadFromHost(mockHost2) + url2, err := libp2pUtils.ExtractConnectionUrl(mockHost2) if err != nil { t.Fatalf("error extracting payload from host: %v", err) } mockPeer1 := peer.PeerInsertModel{ - Ip: ip1, - Port: port1, - HostId: hostId1, + Url: url1, } mockPeer2 := peer.PeerInsertModel{ - Ip: ip2, - Port: port2, - HostId: hostId2, + Url: url2, } syncResult, err := adminTests.PostRequest[[]peer.PeerModel](testItems.app, "/api/v1/peer/sync", mockPeer1) @@ -141,41 +131,18 @@ func TestRefresh(t *testing.T) { } defer cleanup() - h, err := libp2pSetup.MakeHost(10011) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic(), libp2pSetup.WithPort(10010)) if err != nil { t.Fatalf("error making host: %v", err) } - pi := _peer.AddrInfo{ - ID: h.ID(), - Addrs: h.Addrs(), - } - - var addr multiaddr.Multiaddr - for _, a := range pi.Addrs { - if strings.Contains(a.String(), "127.0.0.1") { - continue - } - addr = a - break - } - - splitted := strings.Split(addr.String(), "/") - if len(splitted) < 5 { - t.Fatalf("error splitting address: %v", splitted) - } - ip := splitted[2] - port := splitted[4] - - portInt, err := strconv.Atoi(port) + url, err := libp2pUtils.ExtractConnectionUrl(h) if err != nil { - t.Fatalf("error converting port to int: %v", err) + t.Fatalf("error extracting payload from host: %v", err) } res, err := adminTests.PostRequest[peer.PeerModel](testItems.app, "/api/v1/peer", peer.PeerInsertModel{ - Ip: ip, - Port: portInt, - HostId: h.ID().String(), + Url: url, }) if err != nil { t.Fatalf("error inserting peer: %v", err) @@ -186,7 +153,7 @@ func TestRefresh(t *testing.T) { t.Fatalf("error getting peers before: %v", err) } - assert.Equal(t, res.Ip, ip, "expected to have the same ip") + assert.Equal(t, res.Url, url, "expected to have the same url") err = boot.RefreshJob(ctx) if err != nil { diff --git a/node/pkg/libp2p/setup/setup.go b/node/pkg/libp2p/setup/setup.go index c053b31d3..1ed298b52 100644 --- a/node/pkg/libp2p/setup/setup.go +++ b/node/pkg/libp2p/setup/setup.go @@ -2,52 +2,24 @@ package setup import ( "context" - "crypto/rand" - "fmt" "os" - "crypto/sha256" - "strings" - "bisonai.com/orakl/node/pkg/libp2p/utils" - "bisonai.com/orakl/node/pkg/secrets" "bisonai.com/orakl/node/pkg/utils/request" - "github.com/libp2p/go-libp2p" - pubsub "github.com/libp2p/go-libp2p-pubsub" - "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/pnet" "github.com/rs/zerolog/log" - - "github.com/multiformats/go-multiaddr" ) type BootPeerModel struct { - ID int64 `db:"id" json:"id"` - Ip string `db:"ip" json:"ip"` - Port int `db:"port" json:"port"` - HostId string `db:"host_id" json:"host_id"` + ID int64 `db:"id" json:"id"` + Url string `db:"url" json:"url"` } -func SetupFromBootApi(ctx context.Context, port int) (host.Host, *pubsub.PubSub, error) { - host, err := MakeHost(port) - if err != nil { - log.Error().Err(err).Msg("Error making host") - return nil, nil, err - } - - ps, err := MakePubsub(ctx, host) +func ConnectThroughBootApi(ctx context.Context, h host.Host) error { + url, err := utils.ExtractConnectionUrl(h) if err != nil { - log.Error().Err(err).Msg("Error making pubsub") - return nil, nil, err - } - - ip, port, hostId, err := utils.ExtractPayloadFromHost(host) - if err != nil { - log.Error().Err(err).Msg("Error extracting payload from host") - return nil, nil, err + return err } apiEndpoint := os.Getenv("BOOT_API_URL") @@ -56,82 +28,29 @@ func SetupFromBootApi(ctx context.Context, port int) (host.Host, *pubsub.PubSub, apiEndpoint = "http://localhost:8089" } - log.Info().Str("ip", ip).Int("port", port).Str("host_id", hostId).Str("api_endpoint", apiEndpoint).Msg("connecting to boot API") + log.Info().Str("url", url).Str("api_endpoint", apiEndpoint).Msg("connecting to boot API") + dbPeers, err := request.UrlRequest[[]BootPeerModel](apiEndpoint+"/api/v1/peer/sync", "POST", map[string]any{ - "ip": ip, - "port": port, - "host_id": hostId, + "url": url, }, nil, "") if err != nil { - log.Error().Err(err).Msg("Error getting peers from boot API") - return nil, nil, err + log.Error().Err(err).Msg("Error connecting to boot API") + return err } for _, dbPeer := range dbPeers { - peerAddr := fmt.Sprintf("/ip4/%s/tcp/%d/p2p/%s", dbPeer.Ip, dbPeer.Port, dbPeer.HostId) - peerMultiAddr, err := multiaddr.NewMultiaddr(peerAddr) + info, err := utils.ConnectionUrl2AddrInfo(dbPeer.Url) if err != nil { - log.Error().Err(err).Msg("Error creating multiaddr: " + peerAddr) + log.Error().Err(err).Msg("error parsing peer url: " + dbPeer.Url) continue } - peerInfo, err := peer.AddrInfoFromP2pAddr(peerMultiAddr) + err = h.Connect(ctx, *info) if err != nil { - log.Error().Err(err).Msg("Error getting AddrInfo from p2p address: " + peerAddr) + log.Error().Err(err).Msg("error connecting to peer: " + dbPeer.Url) continue } - - err = host.Connect(ctx, *peerInfo) - if err != nil { - log.Error().Err(err).Msg("error connecting to peer: " + peerAddr) - continue - } - } - - return host, ps, nil -} - -func MakeHost(listenPort int) (host.Host, error) { - r := rand.Reader - - log.Debug().Msg("generating private key") - priv, _, err := crypto.GenerateKeyPairWithReader(crypto.Ed25519, 2048, r) - if err != nil { - return nil, err - } - log.Debug().Msg("generating libp2p options") - - return makeHost(listenPort, priv) -} - -func makeHost(listenPort int, priv crypto.PrivKey) (host.Host, error) { - opts := []libp2p.Option{ - libp2p.Identity(priv), - } - - if listenPort != 0 { - opts = append(opts, libp2p.ListenAddrStrings(fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", listenPort))) - } - - // opts = append(opts, libp2p.NATPortMap()) - - opts = append(opts, libp2p.EnableHolePunching()) - - secretString := secrets.GetSecret("PRIVATE_NETWORK_SECRET") - if secretString != "" { - hash := sha256.Sum256([]byte(secretString)) - protector := pnet.PSK(hash[:]) - opts = append(opts, libp2p.PrivateNetwork(protector)) - } - - return libp2p.New(opts...) -} - -func MakePubsub(ctx context.Context, host host.Host) (*pubsub.PubSub, error) { - log.Debug().Msg("creating pubsub instance") - var basePeerFilter pubsub.PeerFilter = func(pid peer.ID, topic string) bool { - return strings.HasPrefix(pid.String(), "12D") } - return pubsub.NewGossipSub(ctx, host, pubsub.WithPeerFilter(basePeerFilter)) + return nil } diff --git a/node/pkg/libp2p/setup/type.go b/node/pkg/libp2p/setup/type.go new file mode 100644 index 000000000..a29d1c9b2 --- /dev/null +++ b/node/pkg/libp2p/setup/type.go @@ -0,0 +1,119 @@ +package setup + +import ( + "context" + "crypto/rand" + "crypto/sha256" + "fmt" + "os" + "strconv" + + "bisonai.com/orakl/node/pkg/secrets" + + "github.com/libp2p/go-libp2p" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/libp2p/go-libp2p/core/crypto" + "github.com/libp2p/go-libp2p/core/host" + "github.com/libp2p/go-libp2p/core/pnet" + "github.com/rs/zerolog/log" +) + +type HostConfig struct { + Port int + PrivateKey crypto.PrivKey + SecretString string + HolePunch bool + Quic bool +} + +type HostOption func(*HostConfig) + +func WithPort(port int) HostOption { + return func(hc *HostConfig) { + hc.Port = port + } +} + +func WithPrivateKey(priv crypto.PrivKey) HostOption { + return func(hc *HostConfig) { + hc.PrivateKey = priv + } +} + +func WithSecretString(secretString string) HostOption { + return func(hc *HostConfig) { + hc.SecretString = secretString + } +} + +func WithHolePunch() HostOption { + return func(hc *HostConfig) { + hc.HolePunch = true + } +} + +func WithQuic() HostOption { + return func(hc *HostConfig) { + hc.Quic = true + } +} + +func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) { + defaultPort := 0 + defaultPortStr := os.Getenv("LISTEN_PORT") + if defaultPortStr != "" { + tmp, err := strconv.Atoi(defaultPortStr) + if err != nil { + defaultPort = tmp + } + } + + config := &HostConfig{ + Port: defaultPort, + PrivateKey: nil, + SecretString: secrets.GetSecret("PRIVATE_NETWORK_SECRET"), + HolePunch: false, + Quic: false, + } + for _, opt := range opts { + opt(config) + } + + if config.PrivateKey == nil { + priv, _, err := crypto.GenerateEd25519Key(rand.Reader) + if err != nil { + return nil, err + } + config.PrivateKey = priv + } + + listenStr := "" + if config.Quic { + listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port) + } else { + listenStr = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", config.Port) + } + + libp2pOpts := []libp2p.Option{ + libp2p.Identity(config.PrivateKey), + libp2p.ListenAddrStrings(listenStr), + } + + if config.SecretString != "" { + hash := sha256.Sum256([]byte(config.SecretString)) + protector := pnet.PSK(hash[:]) + libp2pOpts = append(libp2pOpts, libp2p.PrivateNetwork(protector)) + } + + if config.HolePunch { + libp2pOpts = append(libp2pOpts, libp2p.EnableHolePunching()) + } + + h, err := libp2p.New(libp2pOpts...) + return h, err +} + +func MakePubsub(ctx context.Context, host host.Host) (*pubsub.PubSub, error) { + log.Debug().Msg("creating pubsub instance") + return pubsub.NewGossipSub(ctx, host) +} diff --git a/node/pkg/libp2p/tests/libp2p_test.go b/node/pkg/libp2p/tests/libp2p_test.go index 2afe0b137..e5faebd27 100644 --- a/node/pkg/libp2p/tests/libp2p_test.go +++ b/node/pkg/libp2p/tests/libp2p_test.go @@ -5,17 +5,12 @@ import ( "context" "testing" - "bisonai.com/orakl/node/pkg/boot" - "bisonai.com/orakl/node/pkg/db" "bisonai.com/orakl/node/pkg/libp2p/setup" "bisonai.com/orakl/node/pkg/libp2p/utils" - "github.com/libp2p/go-libp2p/core/network" - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" ) func TestMakeHost(t *testing.T) { - h, err := setup.MakeHost(10001) + h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001)) if err != nil { t.Errorf("Failed to make host: %v", err) } @@ -23,7 +18,7 @@ func TestMakeHost(t *testing.T) { } func TestMakePubsub(t *testing.T) { - h, err := setup.MakeHost(10001) + h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001)) if err != nil { t.Fatalf("Failed to make host: %v", err) } @@ -33,11 +28,10 @@ func TestMakePubsub(t *testing.T) { if err != nil { t.Errorf("Failed to make pubsub: %v", err) } - } func TestGetHostAddress(t *testing.T) { - h, err := setup.MakeHost(10001) + h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001)) if err != nil { t.Fatalf("Failed to make host: %v", err) } @@ -47,35 +41,3 @@ func TestGetHostAddress(t *testing.T) { t.Errorf("Failed to get host address: %v", err) } } - -func TestSetupFromBootApi(t *testing.T) { - t.Skip() - ctx := context.Background() - go func() { - err := boot.Run(ctx) - if err != nil { - zerolog.Ctx(ctx).Error().Err(err).Msg("Failed to start boot server") - } - }() - - h1, _, err := setup.SetupFromBootApi(ctx, 10001) - if err != nil { - t.Errorf("Failed to setup from boot api: %v", err) - } - defer h1.Close() - - h2, _, err := setup.SetupFromBootApi(ctx, 10002) - if err != nil { - t.Errorf("Failed to setup from boot api: %v", err) - } - defer h2.Close() - - assert.Equal(t, network.Connected, h1.Network().Connectedness(h2.ID())) - assert.Equal(t, network.Connected, h2.Network().Connectedness(h1.ID())) - - // cleanup db - err = db.QueryWithoutResult(ctx, "DELETE FROM peers;", nil) - if err != nil { - t.Fatalf("error cleaning up test: %v", err) - } -} diff --git a/node/pkg/libp2p/utils/utils.go b/node/pkg/libp2p/utils/utils.go index e57395116..d0751a11f 100644 --- a/node/pkg/libp2p/utils/utils.go +++ b/node/pkg/libp2p/utils/utils.go @@ -3,8 +3,6 @@ package utils import ( "context" "fmt" - "os" - "strconv" "time" "strings" @@ -44,12 +42,7 @@ func GetHostAddress(host host.Host) (string, error) { } func IsHostAlive(ctx context.Context, h host.Host, addr string) (bool, error) { - maddr, err := multiaddr.NewMultiaddr(addr) - if err != nil { - return false, err - } - - info, err := peer.AddrInfoFromP2pAddr(maddr) + info, err := ConnectionUrl2AddrInfo(addr) if err != nil { return false, err } @@ -76,7 +69,7 @@ func IsHostAlive(ctx context.Context, h host.Host, addr string) (bool, error) { return true, nil } -func ExtractPayloadFromHost(h host.Host) (ip string, port int, host_id string, err error) { +func ExtractConnectionUrl(h host.Host) (string, error) { var addr multiaddr.Multiaddr for _, a := range h.Addrs() { if strings.Contains(a.String(), "127.0.0.1") { @@ -88,25 +81,22 @@ func ExtractPayloadFromHost(h host.Host) (ip string, port int, host_id string, e if addr == nil { log.Error().Msg("host has no non-local addresses") - return "", 0, "", errorSentinel.ErrLibP2pEmptyNonLocalAddress + return "", errorSentinel.ErrLibP2pEmptyNonLocalAddress } - splitted := strings.Split(addr.String(), "/") - if len(splitted) < 5 { - log.Error().Msg("error splitting address") - return "", 0, "", errorSentinel.ErrLibP2pAddressSplitFail - } - ip = splitted[2] - rawPort := splitted[4] - port, err = strconv.Atoi(rawPort) + return addr.String() + "/p2p/" + h.ID().String(), nil +} + +func ConnectionUrl2AddrInfo(url string) (*peer.AddrInfo, error) { + peerMultiAddr, err := multiaddr.NewMultiaddr(url) if err != nil { - log.Error().Err(err).Msg("error converting port to int") - return "", 0, "", err + return nil, err } - if os.Getenv("HOST_IP") != "" { - ip = os.Getenv("HOST_IP") + info, err := peer.AddrInfoFromP2pAddr(peerMultiAddr) + if err != nil { + return nil, err } - return ip, port, h.ID().String(), nil + return info, nil } diff --git a/node/pkg/reporter/main_test.go b/node/pkg/reporter/main_test.go index 190b2e6b9..1d4184c3c 100644 --- a/node/pkg/reporter/main_test.go +++ b/node/pkg/reporter/main_test.go @@ -119,7 +119,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { testItems.admin = admin - h, err := libp2pSetup.MakeHost(10001) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001)) if err != nil { return nil, nil, err } diff --git a/node/script/test_connection/main.go b/node/script/test_connection/main.go index 652ffb2f6..72d42cdc5 100644 --- a/node/script/test_connection/main.go +++ b/node/script/test_connection/main.go @@ -22,9 +22,20 @@ func main() { } startTime := time.Now() - _, ps, err := libp2pSetup.SetupFromBootApi(ctx, *port) + + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(*port)) + if err != nil { + log.Fatal().Err(err).Msg("Failed to make host") + } + + ps, err := libp2pSetup.MakePubsub(ctx, h) + if err != nil { + log.Fatal().Err(err).Msg("Failed to make pubsub") + } + + err = libp2pSetup.ConnectThroughBootApi(ctx, h) if err != nil { - log.Fatal().Err(err).Msg("Failed to setup libp2p") + log.Fatal().Err(err).Msg("Failed to connect through boot api") } topic, err := ps.Join(topicString) diff --git a/node/script/test_fetcher_and_aggregator/main.go b/node/script/test_fetcher_and_aggregator/main.go index 30e5851ac..98198543e 100644 --- a/node/script/test_fetcher_and_aggregator/main.go +++ b/node/script/test_fetcher_and_aggregator/main.go @@ -73,11 +73,24 @@ func main() { return } - host, ps, err := libp2pSetup.SetupFromBootApi(ctx, listenPort) + host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(listenPort)) if err != nil { - log.Error().Err(err).Msg("Failed to setup libp2p") + log.Error().Err(err).Msg("Failed to make host") return } + + ps, err := libp2pSetup.MakePubsub(ctx, host) + if err != nil { + log.Error().Err(err).Msg("Failed to make pubsub") + return + } + + err = libp2pSetup.ConnectThroughBootApi(ctx, host) + if err != nil { + log.Error().Err(err).Msg("Failed to connect through boot api") + return + } + a := aggregator.New(mb, host, ps) err = a.Run(ctx) if err != nil { diff --git a/node/script/test_raft/main.go b/node/script/test_raft/main.go index 9856df6f3..4176ac790 100644 --- a/node/script/test_raft/main.go +++ b/node/script/test_raft/main.go @@ -20,9 +20,19 @@ func main() { flag.Parse() - host, ps, err := libp2pSetup.SetupFromBootApi(ctx, *port) + host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithPort(*port)) if err != nil { - log.Fatal().Err(err).Msg("Failed to setup libp2p") + log.Fatal().Err(err).Msg("Failed to make host") + } + + ps, err := libp2pSetup.MakePubsub(ctx, host) + if err != nil { + log.Fatal().Err(err).Msg("Failed to make pubsub") + } + + err = libp2pSetup.ConnectThroughBootApi(ctx, host) + if err != nil { + log.Fatal().Err(err).Msg("Failed to connect through boot api") } log.Debug().Msg("connecting to topic string") @@ -34,7 +44,7 @@ func main() { log.Debug().Msg("connected to topic string") log.Debug().Msg("creating raft node") - node := raft.NewRaftNode(host, ps, topic, 100, 5*time.Second) + node := raft.NewRaftNode(host, ps, topic, 100, 1*time.Second) node.LeaderJob = func() error { log.Debug().Int("subscribers", node.SubscribersCount()).Int("Term", node.GetCurrentTerm()).Msg("Leader job") node.IncreaseTerm() From f7594d77666e25178bd63486a0f07ff5ae904e9f Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 8 Jun 2024 17:19:46 +0900 Subject: [PATCH 2/6] fix: vet err --- node/pkg/boot/peer/controller.go | 6 +++--- node/pkg/libp2p/utils/utils.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/node/pkg/boot/peer/controller.go b/node/pkg/boot/peer/controller.go index d93fbcf26..20f6fd21a 100644 --- a/node/pkg/boot/peer/controller.go +++ b/node/pkg/boot/peer/controller.go @@ -69,9 +69,9 @@ func sync(c *fiber.Ctx) error { return c.Status(fiber.StatusInternalServerError).SendString("Failed to make host") } defer func() { - err := h.Close() - if err != nil { - log.Error().Err(err).Msg("Failed to close host") + closeErr := h.Close() + if closeErr != nil { + log.Error().Err(closeErr).Msg("Failed to close host") } }() diff --git a/node/pkg/libp2p/utils/utils.go b/node/pkg/libp2p/utils/utils.go index d0751a11f..c1c58a1c1 100644 --- a/node/pkg/libp2p/utils/utils.go +++ b/node/pkg/libp2p/utils/utils.go @@ -47,9 +47,9 @@ func IsHostAlive(ctx context.Context, h host.Host, addr string) (bool, error) { return false, err } + log.Info().Str("addr", addr).Str("peer", info.ID.String()).Msg("checking peer alive") lastErr := retrier.Retry( func() error { - log.Info().Str("addr", addr).Str("peer", info.ID.String()).Msg("checking peer alive") return h.Connect(ctx, *info) }, 3, From 633b9e67f7b694af27408c15296161908a8b7c2a Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 8 Jun 2024 17:40:49 +0900 Subject: [PATCH 3/6] fix: update based on feedbacks --- node/pkg/boot/peer/controller.go | 5 ----- node/pkg/libp2p/setup/type.go | 2 +- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/node/pkg/boot/peer/controller.go b/node/pkg/boot/peer/controller.go index 20f6fd21a..b396749d2 100644 --- a/node/pkg/boot/peer/controller.go +++ b/node/pkg/boot/peer/controller.go @@ -81,11 +81,6 @@ func sync(c *fiber.Ctx) error { return c.Status(fiber.StatusInternalServerError).SendString("Failed to check peer") } if !isAlive { - log.Info().Str("peer", payload.Url).Msg("invalid peer") - err = h.Close() - if err != nil { - log.Error().Err(err).Msg("Failed to close host") - } log.Info().Str("peer", payload.Url).Msg("invalid peer") return c.Status(fiber.StatusBadRequest).SendString("invalid peer") } diff --git a/node/pkg/libp2p/setup/type.go b/node/pkg/libp2p/setup/type.go index a29d1c9b2..f310f4c47 100644 --- a/node/pkg/libp2p/setup/type.go +++ b/node/pkg/libp2p/setup/type.go @@ -63,7 +63,7 @@ func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) { defaultPortStr := os.Getenv("LISTEN_PORT") if defaultPortStr != "" { tmp, err := strconv.Atoi(defaultPortStr) - if err != nil { + if err == nil { defaultPort = tmp } } From 4e65405b5aa5d3cb29194ad1f04ee2bb266e5ebb Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 8 Jun 2024 18:07:21 +0900 Subject: [PATCH 4/6] fix: remove boot api property for testing purpose --- node/pkg/boot/boot.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/boot/boot.go b/node/pkg/boot/boot.go index df57cd945..2ba963f95 100644 --- a/node/pkg/boot/boot.go +++ b/node/pkg/boot/boot.go @@ -83,7 +83,7 @@ func RefreshJob(ctx context.Context) error { return nil } - h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(0)) if err != nil { log.Error().Err(err).Msg("Failed to make host") return err From 0ced283a981ef7a3932ad93537e6b2efc9a58fc7 Mon Sep 17 00:00:00 2001 From: nick Date: Sat, 8 Jun 2024 20:02:32 +0900 Subject: [PATCH 5/6] feat: boot api follow previous behavior --- node/pkg/boot/peer/controller.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/boot/peer/controller.go b/node/pkg/boot/peer/controller.go index b396749d2..9189ede56 100644 --- a/node/pkg/boot/peer/controller.go +++ b/node/pkg/boot/peer/controller.go @@ -63,7 +63,7 @@ func sync(c *fiber.Ctx) error { return c.Status(fiber.StatusBadRequest).SendString("Failed to validate request") } - h, err := libp2pSetup.NewHost(c.Context(), libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) + h, err := libp2pSetup.NewHost(c.Context(), libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(0)) if err != nil { log.Error().Err(err).Msg("Failed to make host") return c.Status(fiber.StatusInternalServerError).SendString("Failed to make host") From c4c829867811869aa69230d4316a194f4868e2a9 Mon Sep 17 00:00:00 2001 From: Nick <148735107+nick-bisonai@users.noreply.github.com> Date: Mon, 10 Jun 2024 10:22:15 +0900 Subject: [PATCH 6/6] (OraklNode) Track peers manually (#1572) * feat: implement peer count into raft pkg * fix: remove unused property * test: add cases to test based on feedback * (OraklNode) Default to udp - quic (#1573) * feat: default to udp - quic * test: prevent possible duplicate port err * test: fix port error * test: fix test err * fix: remove listen port, add missing `HOST_IP` reference * fix: rollback listen port * fix: update based on feedback --- node/.env.example | 1 - node/cmd/node/main.go | 3 +- node/pkg/aggregator/main_test.go | 5 ++-- node/pkg/boot/tests/peer_test.go | 6 ++-- node/pkg/libp2p/setup/setup.go | 9 ++++++ node/pkg/libp2p/setup/type.go | 27 ++++++------------ node/pkg/libp2p/tests/libp2p_test.go | 28 +++++++++++++++++-- node/pkg/libp2p/utils/utils.go | 9 ++++++ node/pkg/raft/accessors.go | 8 +----- node/pkg/raft/raft.go | 41 +++++++++++++++++++++++++++- node/pkg/raft/types.go | 7 +++++ node/pkg/reporter/main_test.go | 4 +-- node/pkg/utils/set/set.go | 26 ++++++++++++++++++ node/pkg/utils/tests/set_test.go | 37 +++++++++++++++++++++++++ 14 files changed, 171 insertions(+), 40 deletions(-) create mode 100644 node/pkg/utils/set/set.go create mode 100644 node/pkg/utils/tests/set_test.go diff --git a/node/.env.example b/node/.env.example index 44fc4ea75..3bea38abb 100644 --- a/node/.env.example +++ b/node/.env.example @@ -15,7 +15,6 @@ FEED_DATA_STREAM_INTERVAL= # (optional) designate external ip to be used if required HOST_IP= - # `baobab` or `cypress`, defaults to baobab CHAIN= diff --git a/node/cmd/node/main.go b/node/cmd/node/main.go index b90788744..32fa7affe 100644 --- a/node/cmd/node/main.go +++ b/node/cmd/node/main.go @@ -30,8 +30,7 @@ func main() { listenPort, err := strconv.Atoi(os.Getenv("LISTEN_PORT")) if err != nil { - log.Error().Err(err).Msg("Error parsing LISTEN_PORT") - return + log.Warn().Msg("LISTEN_PORT missing, using random port for libp2p") } host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(listenPort)) diff --git a/node/pkg/aggregator/main_test.go b/node/pkg/aggregator/main_test.go index 22aaaf992..2d1c50faf 100644 --- a/node/pkg/aggregator/main_test.go +++ b/node/pkg/aggregator/main_test.go @@ -57,7 +57,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { } testItems.admin = admin - h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001)) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch()) if err != nil { return nil, nil, err } @@ -159,7 +159,8 @@ func aggregatorCleanup(ctx context.Context, admin *fiber.App, app *App) func() e if err != nil { return err } - return nil + + return app.Host.Close() } } diff --git a/node/pkg/boot/tests/peer_test.go b/node/pkg/boot/tests/peer_test.go index 3ec3a97aa..3f9cfe5db 100644 --- a/node/pkg/boot/tests/peer_test.go +++ b/node/pkg/boot/tests/peer_test.go @@ -75,12 +75,12 @@ func TestSync(t *testing.T) { } defer cleanup() - mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) + mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch()) if err != nil { t.Fatalf("error making host: %v", err) } - mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic()) + mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch()) if err != nil { t.Fatalf("error making host: %v", err) } @@ -131,7 +131,7 @@ func TestRefresh(t *testing.T) { } defer cleanup() - h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic(), libp2pSetup.WithPort(10010)) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10010)) if err != nil { t.Fatalf("error making host: %v", err) } diff --git a/node/pkg/libp2p/setup/setup.go b/node/pkg/libp2p/setup/setup.go index 1ed298b52..32da638d4 100644 --- a/node/pkg/libp2p/setup/setup.go +++ b/node/pkg/libp2p/setup/setup.go @@ -22,6 +22,15 @@ func ConnectThroughBootApi(ctx context.Context, h host.Host) error { return err } + externalIp := os.Getenv("HOST_IP") + if externalIp != "" { + url, err = utils.ReplaceIpFromUrl(url, externalIp) + if err != nil { + log.Error().Err(err).Msg("failed to replace ip") + return err + } + } + apiEndpoint := os.Getenv("BOOT_API_URL") if apiEndpoint == "" { log.Info().Msg("boot api endpoint not set, using default url: http://localhost:8089") diff --git a/node/pkg/libp2p/setup/type.go b/node/pkg/libp2p/setup/type.go index f310f4c47..da9bcffeb 100644 --- a/node/pkg/libp2p/setup/type.go +++ b/node/pkg/libp2p/setup/type.go @@ -5,8 +5,6 @@ import ( "crypto/rand" "crypto/sha256" "fmt" - "os" - "strconv" "bisonai.com/orakl/node/pkg/secrets" @@ -23,7 +21,7 @@ type HostConfig struct { PrivateKey crypto.PrivKey SecretString string HolePunch bool - Quic bool + Tcp bool } type HostOption func(*HostConfig) @@ -52,28 +50,19 @@ func WithHolePunch() HostOption { } } -func WithQuic() HostOption { +func WithTcp() HostOption { return func(hc *HostConfig) { - hc.Quic = true + hc.Tcp = true } } func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) { - defaultPort := 0 - defaultPortStr := os.Getenv("LISTEN_PORT") - if defaultPortStr != "" { - tmp, err := strconv.Atoi(defaultPortStr) - if err == nil { - defaultPort = tmp - } - } - config := &HostConfig{ - Port: defaultPort, + Port: 0, PrivateKey: nil, SecretString: secrets.GetSecret("PRIVATE_NETWORK_SECRET"), HolePunch: false, - Quic: false, + Tcp: false, } for _, opt := range opts { opt(config) @@ -88,10 +77,10 @@ func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) { } listenStr := "" - if config.Quic { - listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port) - } else { + if config.Tcp { listenStr = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", config.Port) + } else { + listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port) } libp2pOpts := []libp2p.Option{ diff --git a/node/pkg/libp2p/tests/libp2p_test.go b/node/pkg/libp2p/tests/libp2p_test.go index e5faebd27..c22f443d2 100644 --- a/node/pkg/libp2p/tests/libp2p_test.go +++ b/node/pkg/libp2p/tests/libp2p_test.go @@ -10,7 +10,7 @@ import ( ) func TestMakeHost(t *testing.T) { - h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001)) + h, err := setup.NewHost(context.Background(), setup.WithHolePunch()) if err != nil { t.Errorf("Failed to make host: %v", err) } @@ -18,7 +18,7 @@ func TestMakeHost(t *testing.T) { } func TestMakePubsub(t *testing.T) { - h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001)) + h, err := setup.NewHost(context.Background(), setup.WithHolePunch()) if err != nil { t.Fatalf("Failed to make host: %v", err) } @@ -31,7 +31,7 @@ func TestMakePubsub(t *testing.T) { } func TestGetHostAddress(t *testing.T) { - h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001)) + h, err := setup.NewHost(context.Background(), setup.WithHolePunch()) if err != nil { t.Fatalf("Failed to make host: %v", err) } @@ -41,3 +41,25 @@ func TestGetHostAddress(t *testing.T) { t.Errorf("Failed to get host address: %v", err) } } + +func TestReplaceIp(t *testing.T) { + h, err := setup.NewHost(context.Background(), setup.WithHolePunch()) + if err != nil { + t.Fatalf("Failed to make host: %v", err) + } + defer h.Close() + + url, err := utils.ExtractConnectionUrl(h) + if err != nil { + t.Fatalf("Failed to extract connection url: %v", err) + } + + result, err := utils.ReplaceIpFromUrl(url, "127.0.0.1") + if err != nil { + t.Errorf("Failed to replace ip: %v", err) + } + + if url == result { + t.Errorf("Failed to replace ip: %v", err) + } +} diff --git a/node/pkg/libp2p/utils/utils.go b/node/pkg/libp2p/utils/utils.go index c1c58a1c1..c02443993 100644 --- a/node/pkg/libp2p/utils/utils.go +++ b/node/pkg/libp2p/utils/utils.go @@ -100,3 +100,12 @@ func ConnectionUrl2AddrInfo(url string) (*peer.AddrInfo, error) { return info, nil } + +func ReplaceIpFromUrl(url string, ip string) (string, error) { + parts := strings.Split(url, "/") + if len(parts) < 5 || parts[1] != "ip4" { + return "", fmt.Errorf("invalid URL format") + } + parts[2] = ip + return strings.Join(parts, "/"), nil +} diff --git a/node/pkg/raft/accessors.go b/node/pkg/raft/accessors.go index 79def5022..686476041 100644 --- a/node/pkg/raft/accessors.go +++ b/node/pkg/raft/accessors.go @@ -1,7 +1,5 @@ package raft -import "github.com/libp2p/go-libp2p/core/peer" - func (r *Raft) IncreaseTerm() { r.Mutex.Lock() defer r.Mutex.Unlock() @@ -75,11 +73,7 @@ func (r *Raft) UpdateVotedFor(votedFor string) { } func (r *Raft) SubscribersCount() int { - return len(r.Subscribers()) -} - -func (r *Raft) Subscribers() []peer.ID { - return r.Ps.ListPeers(r.Topic.String()) + return r.Peers.Size() } func (r *Raft) GetHostId() string { diff --git a/node/pkg/raft/raft.go b/node/pkg/raft/raft.go index 6bf7d8181..b376d4287 100644 --- a/node/pkg/raft/raft.go +++ b/node/pkg/raft/raft.go @@ -10,6 +10,7 @@ import ( "github.com/rs/zerolog/log" errorSentinel "bisonai.com/orakl/node/pkg/error" + "bisonai.com/orakl/node/pkg/utils/set" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" ) @@ -40,6 +41,9 @@ func NewRaftNode( HeartbeatTimeout: HEARTBEAT_TIMEOUT, LeaderJobTimeout: leaderJobTimeout, + + PrevPeers: *set.NewSet[string](), + Peers: *set.NewSet[string](), } return r } @@ -104,12 +108,17 @@ func (r *Raft) handleMessage(ctx context.Context, msg Message) error { return r.handleRequestVote(msg) case ReplyVote: return r.handleReplyVote(ctx, msg) + case ReplyHeartbeat: + return r.handleReplyHeartbeat(msg) default: return r.HandleCustomMessage(ctx, msg) } } func (r *Raft) handleHeartbeat(msg Message) error { + r.Peers = r.PrevPeers + r.PrevPeers = *set.NewSet[string]() + if msg.SentFrom == r.GetHostId() { return nil } @@ -151,7 +160,7 @@ func (r *Raft) handleHeartbeat(msg Message) error { r.UpdateLeader(heartbeatMessage.LeaderID) } - return nil + return r.sendReplyHeartbeat() } func (r *Raft) handleRequestVote(msg Message) error { @@ -216,6 +225,17 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error { return nil } +func (r *Raft) handleReplyHeartbeat(msg Message) error { + var replyHeartbeatMessage ReplyHeartbeatMessage + err := json.Unmarshal(msg.Data, &replyHeartbeatMessage) + if err != nil { + return err + } + + r.PrevPeers.Add(msg.SentFrom) + return nil +} + // publishing messages func (r *Raft) PublishMessage(msg Message) error { @@ -227,6 +247,7 @@ func (r *Raft) PublishMessage(msg Message) error { } func (r *Raft) sendHeartbeat() error { + heartbeatMessage := HeartbeatMessage{ LeaderID: r.GetHostId(), Term: r.GetCurrentTerm(), @@ -250,6 +271,24 @@ func (r *Raft) sendHeartbeat() error { return nil } +func (r *Raft) sendReplyHeartbeat() error { + replyHeartbeatMessage := ReplyHeartbeatMessage{} + marshalledReplyHeartbeatMsg, err := json.Marshal(replyHeartbeatMessage) + if err != nil { + return err + } + message := Message{ + Type: ReplyHeartbeat, + SentFrom: r.GetHostId(), + Data: json.RawMessage(marshalledReplyHeartbeatMsg), + } + err = r.PublishMessage(message) + if err != nil { + return err + } + return nil +} + func (r *Raft) sendReplyVote(to string, voteGranted bool) error { replyVoteMessage := ReplyRequestVoteMessage{ VoteGranted: voteGranted, diff --git a/node/pkg/raft/types.go b/node/pkg/raft/types.go index 6fbe4e723..efbcf1a1a 100644 --- a/node/pkg/raft/types.go +++ b/node/pkg/raft/types.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "bisonai.com/orakl/node/pkg/utils/set" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" ) @@ -20,6 +21,7 @@ const ( ReplyVote MessageType = "replyVote" AppendEntries MessageType = "appendEntries" ReplyAppendEntries MessageType = "replyAppendEntries" + ReplyHeartbeat MessageType = "replyHeartbeat" Leader RoleType = "leader" Candidate RoleType = "candidate" @@ -41,6 +43,8 @@ type HeartbeatMessage struct { Term int `json:"term"` } +type ReplyHeartbeatMessage struct{} + type ReplyRequestVoteMessage struct { VoteGranted bool `json:"voteGranted"` LeaderID string `json:"leaderID"` @@ -68,4 +72,7 @@ type Raft struct { LeaderJobTicker *time.Ticker HandleCustomMessage func(context.Context, Message) error LeaderJob func() error + + PrevPeers set.Set[string] + Peers set.Set[string] } diff --git a/node/pkg/reporter/main_test.go b/node/pkg/reporter/main_test.go index 1d4184c3c..7817308ba 100644 --- a/node/pkg/reporter/main_test.go +++ b/node/pkg/reporter/main_test.go @@ -119,7 +119,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) { testItems.admin = admin - h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001)) + h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch()) if err != nil { return nil, nil, err } @@ -171,7 +171,7 @@ func reporterCleanup(ctx context.Context, admin *fiber.App, app *App) func() err if err != nil { return err } - return nil + return app.Host.Close() } } diff --git a/node/pkg/utils/set/set.go b/node/pkg/utils/set/set.go new file mode 100644 index 000000000..3d708acb7 --- /dev/null +++ b/node/pkg/utils/set/set.go @@ -0,0 +1,26 @@ +package set + +type Set[T comparable] struct { + data map[T]struct{} +} + +func NewSet[T comparable]() *Set[T] { + return &Set[T]{data: make(map[T]struct{})} +} + +func (s *Set[T]) Add(element T) { + s.data[element] = struct{}{} +} + +func (s *Set[T]) Remove(element T) { + delete(s.data, element) +} + +func (s *Set[T]) Contains(element T) bool { + _, exists := s.data[element] + return exists +} + +func (s *Set[T]) Size() int { + return len(s.data) +} diff --git a/node/pkg/utils/tests/set_test.go b/node/pkg/utils/tests/set_test.go new file mode 100644 index 000000000..6b482f408 --- /dev/null +++ b/node/pkg/utils/tests/set_test.go @@ -0,0 +1,37 @@ +package tests + +import ( + "testing" + + "bisonai.com/orakl/node/pkg/utils/set" + "github.com/stretchr/testify/assert" +) + +func TestSetBasicOperations(t *testing.T) { + s := set.NewSet[int]() + assert.Equal(t, 0, s.Size()) + + s.Add(1) + assert.Equal(t, 1, s.Size()) + assert.True(t, s.Contains(1)) + assert.False(t, s.Contains(2)) + + s.Remove(1) + assert.Equal(t, 0, s.Size()) + assert.False(t, s.Contains(1)) +} + +func TestSetAddDuplicates(t *testing.T) { + s := set.NewSet[int]() + s.Add(1) + s.Add(1) // Attempt to add duplicate + assert.Equal(t, 1, s.Size(), "Set should not allow duplicates") + assert.True(t, s.Contains(1)) +} + +func TestSetOperationsOnEmpty(t *testing.T) { + s := set.NewSet[int]() + assert.False(t, s.Contains(1), "Empty set should not contain any element") + s.Remove(1) // Attempt to remove from empty set + assert.Equal(t, 0, s.Size(), "Size should remain 0 after remove operation on empty set") +}