Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(OraklNode) Track peers manually #1572

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion node/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ FEED_DATA_STREAM_INTERVAL=
# (optional) designate external ip to be used if required
HOST_IP=


# `baobab` or `cypress`, defaults to baobab
CHAIN=

Expand Down
3 changes: 1 addition & 2 deletions node/cmd/node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ func main() {

listenPort, err := strconv.Atoi(os.Getenv("LISTEN_PORT"))
if err != nil {
log.Error().Err(err).Msg("Error parsing LISTEN_PORT")
return
log.Warn().Msg("LISTEN_PORT missing, using random port for libp2p")
}

host, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(listenPort))
Expand Down
5 changes: 3 additions & 2 deletions node/pkg/aggregator/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {
}
testItems.admin = admin

h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001))
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -159,7 +159,8 @@ func aggregatorCleanup(ctx context.Context, admin *fiber.App, app *App) func() e
if err != nil {
return err
}
return nil

return app.Host.Close()
}

}
Expand Down
6 changes: 3 additions & 3 deletions node/pkg/boot/tests/peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,12 @@ func TestSync(t *testing.T) {
}
defer cleanup()

mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic())
mockHost1, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
t.Fatalf("error making host: %v", err)
}

mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic())
mockHost2, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
t.Fatalf("error making host: %v", err)
}
Expand Down Expand Up @@ -131,7 +131,7 @@ func TestRefresh(t *testing.T) {
}
defer cleanup()

h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithQuic(), libp2pSetup.WithPort(10010))
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10010))
if err != nil {
t.Fatalf("error making host: %v", err)
}
Expand Down
9 changes: 9 additions & 0 deletions node/pkg/libp2p/setup/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,15 @@ func ConnectThroughBootApi(ctx context.Context, h host.Host) error {
return err
}

externalIp := os.Getenv("HOST_IP")
if externalIp != "" {
url, err = utils.ReplaceIpFromUrl(url, externalIp)
if err != nil {
log.Error().Err(err).Msg("failed to replace ip")
return err
}
}

apiEndpoint := os.Getenv("BOOT_API_URL")
if apiEndpoint == "" {
log.Info().Msg("boot api endpoint not set, using default url: http://localhost:8089")
Expand Down
27 changes: 8 additions & 19 deletions node/pkg/libp2p/setup/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"crypto/rand"
"crypto/sha256"
"fmt"
"os"
"strconv"

"bisonai.com/orakl/node/pkg/secrets"

Expand All @@ -23,7 +21,7 @@ type HostConfig struct {
PrivateKey crypto.PrivKey
SecretString string
HolePunch bool
Quic bool
Tcp bool
}

type HostOption func(*HostConfig)
Expand Down Expand Up @@ -52,28 +50,19 @@ func WithHolePunch() HostOption {
}
}

func WithQuic() HostOption {
func WithTcp() HostOption {
return func(hc *HostConfig) {
hc.Quic = true
hc.Tcp = true
}
}

func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) {
defaultPort := 0
defaultPortStr := os.Getenv("LISTEN_PORT")
if defaultPortStr != "" {
tmp, err := strconv.Atoi(defaultPortStr)
if err == nil {
defaultPort = tmp
}
}

config := &HostConfig{
Port: defaultPort,
Port: 0,
PrivateKey: nil,
SecretString: secrets.GetSecret("PRIVATE_NETWORK_SECRET"),
HolePunch: false,
Quic: false,
Tcp: false,
}
for _, opt := range opts {
opt(config)
Expand All @@ -88,10 +77,10 @@ func NewHost(ctx context.Context, opts ...HostOption) (host.Host, error) {
}

listenStr := ""
if config.Quic {
listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port)
} else {
if config.Tcp {
listenStr = fmt.Sprintf("/ip4/0.0.0.0/tcp/%d", config.Port)
} else {
listenStr = fmt.Sprintf("/ip4/0.0.0.0/udp/%d/quic-v1", config.Port)
}

libp2pOpts := []libp2p.Option{
Expand Down
28 changes: 25 additions & 3 deletions node/pkg/libp2p/tests/libp2p_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@ import (
)

func TestMakeHost(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001))
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Errorf("Failed to make host: %v", err)
}
defer h.Close()
}

func TestMakePubsub(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001))
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Fatalf("Failed to make host: %v", err)
}
Expand All @@ -31,7 +31,7 @@ func TestMakePubsub(t *testing.T) {
}

func TestGetHostAddress(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch(), setup.WithPort(10001))
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Fatalf("Failed to make host: %v", err)
}
Expand All @@ -41,3 +41,25 @@ func TestGetHostAddress(t *testing.T) {
t.Errorf("Failed to get host address: %v", err)
}
}

func TestReplaceIp(t *testing.T) {
h, err := setup.NewHost(context.Background(), setup.WithHolePunch())
if err != nil {
t.Fatalf("Failed to make host: %v", err)
}
defer h.Close()

url, err := utils.ExtractConnectionUrl(h)
if err != nil {
t.Fatalf("Failed to extract connection url: %v", err)
}

result, err := utils.ReplaceIpFromUrl(url, "127.0.0.1")
if err != nil {
t.Errorf("Failed to replace ip: %v", err)
}

if url == result {
t.Errorf("Failed to replace ip: %v", err)
}
}
9 changes: 9 additions & 0 deletions node/pkg/libp2p/utils/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,3 +100,12 @@ func ConnectionUrl2AddrInfo(url string) (*peer.AddrInfo, error) {

return info, nil
}

func ReplaceIpFromUrl(url string, ip string) (string, error) {
parts := strings.Split(url, "/")
if len(parts) < 5 || parts[1] != "ip4" {
return "", fmt.Errorf("invalid URL format")
}
parts[2] = ip
return strings.Join(parts, "/"), nil
}
8 changes: 1 addition & 7 deletions node/pkg/raft/accessors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package raft

import "github.com/libp2p/go-libp2p/core/peer"

func (r *Raft) IncreaseTerm() {
r.Mutex.Lock()
defer r.Mutex.Unlock()
Expand Down Expand Up @@ -75,11 +73,7 @@ func (r *Raft) UpdateVotedFor(votedFor string) {
}

func (r *Raft) SubscribersCount() int {
return len(r.Subscribers())
}

func (r *Raft) Subscribers() []peer.ID {
return r.Ps.ListPeers(r.Topic.String())
return r.Peers.Size()
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *Raft) GetHostId() string {
Expand Down
41 changes: 40 additions & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog/log"

errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/utils/set"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ func NewRaftNode(
HeartbeatTimeout: HEARTBEAT_TIMEOUT,

LeaderJobTimeout: leaderJobTimeout,

PrevPeers: *set.NewSet[string](),
Peers: *set.NewSet[string](),
}
return r
}
Expand Down Expand Up @@ -104,12 +108,17 @@ func (r *Raft) handleMessage(ctx context.Context, msg Message) error {
return r.handleRequestVote(msg)
case ReplyVote:
return r.handleReplyVote(ctx, msg)
case ReplyHeartbeat:
return r.handleReplyHeartbeat(msg)
default:
return r.HandleCustomMessage(ctx, msg)
}
}

func (r *Raft) handleHeartbeat(msg Message) error {
r.Peers = r.PrevPeers
r.PrevPeers = *set.NewSet[string]()

if msg.SentFrom == r.GetHostId() {
return nil
}
Expand Down Expand Up @@ -151,7 +160,7 @@ func (r *Raft) handleHeartbeat(msg Message) error {
r.UpdateLeader(heartbeatMessage.LeaderID)
}

return nil
return r.sendReplyHeartbeat()
}

func (r *Raft) handleRequestVote(msg Message) error {
Expand Down Expand Up @@ -216,6 +225,17 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error {
return nil
}

func (r *Raft) handleReplyHeartbeat(msg Message) error {
var replyHeartbeatMessage ReplyHeartbeatMessage
err := json.Unmarshal(msg.Data, &replyHeartbeatMessage)
if err != nil {
return err
}

r.PrevPeers.Add(msg.SentFrom)
return nil
}

// publishing messages

func (r *Raft) PublishMessage(msg Message) error {
Expand All @@ -227,6 +247,7 @@ func (r *Raft) PublishMessage(msg Message) error {
}

func (r *Raft) sendHeartbeat() error {

heartbeatMessage := HeartbeatMessage{
LeaderID: r.GetHostId(),
Term: r.GetCurrentTerm(),
Expand All @@ -250,6 +271,24 @@ func (r *Raft) sendHeartbeat() error {
return nil
}

func (r *Raft) sendReplyHeartbeat() error {
replyHeartbeatMessage := ReplyHeartbeatMessage{}
marshalledReplyHeartbeatMsg, err := json.Marshal(replyHeartbeatMessage)
if err != nil {
return err
}
message := Message{
Type: ReplyHeartbeat,
SentFrom: r.GetHostId(),
Data: json.RawMessage(marshalledReplyHeartbeatMsg),
}
err = r.PublishMessage(message)
if err != nil {
return err
}
return nil
}

func (r *Raft) sendReplyVote(to string, voteGranted bool) error {
replyVoteMessage := ReplyRequestVoteMessage{
VoteGranted: voteGranted,
Expand Down
7 changes: 7 additions & 0 deletions node/pkg/raft/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"bisonai.com/orakl/node/pkg/utils/set"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand All @@ -20,6 +21,7 @@ const (
ReplyVote MessageType = "replyVote"
AppendEntries MessageType = "appendEntries"
ReplyAppendEntries MessageType = "replyAppendEntries"
ReplyHeartbeat MessageType = "replyHeartbeat"

Leader RoleType = "leader"
Candidate RoleType = "candidate"
Expand All @@ -41,6 +43,8 @@ type HeartbeatMessage struct {
Term int `json:"term"`
}

type ReplyHeartbeatMessage struct{}

type ReplyRequestVoteMessage struct {
VoteGranted bool `json:"voteGranted"`
LeaderID string `json:"leaderID"`
Expand Down Expand Up @@ -68,4 +72,7 @@ type Raft struct {
LeaderJobTicker *time.Ticker
HandleCustomMessage func(context.Context, Message) error
LeaderJob func() error

PrevPeers set.Set[string]
Peers set.Set[string]
}
4 changes: 2 additions & 2 deletions node/pkg/reporter/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func setup(ctx context.Context) (func() error, *TestItems, error) {

testItems.admin = admin

h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch(), libp2pSetup.WithPort(10001))
h, err := libp2pSetup.NewHost(ctx, libp2pSetup.WithHolePunch())
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func reporterCleanup(ctx context.Context, admin *fiber.App, app *App) func() err
if err != nil {
return err
}
return nil
return app.Host.Close()
}
}

Expand Down
Loading
Loading