Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(OraklNode) Track peers manually #1572

Merged
merged 4 commits into from
Jun 10, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions node/pkg/raft/accessors.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package raft

import "github.com/libp2p/go-libp2p/core/peer"

func (r *Raft) IncreaseTerm() {
r.Mutex.Lock()
defer r.Mutex.Unlock()
Expand Down Expand Up @@ -75,11 +73,7 @@ func (r *Raft) UpdateVotedFor(votedFor string) {
}

func (r *Raft) SubscribersCount() int {
return len(r.Subscribers())
}

func (r *Raft) Subscribers() []peer.ID {
return r.Ps.ListPeers(r.Topic.String())
return r.Peers.Size()
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved
}

func (r *Raft) GetHostId() string {
Expand Down
41 changes: 40 additions & 1 deletion node/pkg/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/rs/zerolog/log"

errorSentinel "bisonai.com/orakl/node/pkg/error"
"bisonai.com/orakl/node/pkg/utils/set"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand Down Expand Up @@ -40,6 +41,9 @@ func NewRaftNode(
HeartbeatTimeout: HEARTBEAT_TIMEOUT,

LeaderJobTimeout: leaderJobTimeout,

PrevPeers: *set.NewSet[string](),
Peers: *set.NewSet[string](),
}
return r
}
Expand Down Expand Up @@ -104,12 +108,17 @@ func (r *Raft) handleMessage(ctx context.Context, msg Message) error {
return r.handleRequestVote(msg)
case ReplyVote:
return r.handleReplyVote(ctx, msg)
case ReplyHeartbeat:
return r.handleReplyHeartbeat(msg)
default:
return r.HandleCustomMessage(ctx, msg)
}
}

func (r *Raft) handleHeartbeat(msg Message) error {
r.Peers = r.PrevPeers
r.PrevPeers = *set.NewSet[string]()

if msg.SentFrom == r.GetHostId() {
return nil
}
Expand Down Expand Up @@ -151,7 +160,7 @@ func (r *Raft) handleHeartbeat(msg Message) error {
r.UpdateLeader(heartbeatMessage.LeaderID)
}

return nil
return r.sendReplyHeartbeat()
}

func (r *Raft) handleRequestVote(msg Message) error {
Expand Down Expand Up @@ -216,6 +225,17 @@ func (r *Raft) handleReplyVote(ctx context.Context, msg Message) error {
return nil
}

func (r *Raft) handleReplyHeartbeat(msg Message) error {
var replyHeartbeatMessage ReplyHeartbeatMessage
err := json.Unmarshal(msg.Data, &replyHeartbeatMessage)
if err != nil {
return err
}

r.PrevPeers.Add(msg.SentFrom)
return nil
}

// publishing messages

func (r *Raft) PublishMessage(msg Message) error {
Expand All @@ -227,6 +247,7 @@ func (r *Raft) PublishMessage(msg Message) error {
}

func (r *Raft) sendHeartbeat() error {

heartbeatMessage := HeartbeatMessage{
LeaderID: r.GetHostId(),
Term: r.GetCurrentTerm(),
Expand All @@ -250,6 +271,24 @@ func (r *Raft) sendHeartbeat() error {
return nil
}

func (r *Raft) sendReplyHeartbeat() error {
replyHeartbeatMessage := ReplyHeartbeatMessage{}
marshalledReplyHeartbeatMsg, err := json.Marshal(replyHeartbeatMessage)
if err != nil {
return err
}
message := Message{
Type: ReplyHeartbeat,
SentFrom: r.GetHostId(),
Data: json.RawMessage(marshalledReplyHeartbeatMsg),
}
err = r.PublishMessage(message)
if err != nil {
return err
}
return nil
}

func (r *Raft) sendReplyVote(to string, voteGranted bool) error {
replyVoteMessage := ReplyRequestVoteMessage{
VoteGranted: voteGranted,
Expand Down
7 changes: 7 additions & 0 deletions node/pkg/raft/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"bisonai.com/orakl/node/pkg/utils/set"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
Expand All @@ -20,6 +21,7 @@ const (
ReplyVote MessageType = "replyVote"
AppendEntries MessageType = "appendEntries"
ReplyAppendEntries MessageType = "replyAppendEntries"
ReplyHeartbeat MessageType = "replyHeartbeat"

Leader RoleType = "leader"
Candidate RoleType = "candidate"
Expand All @@ -41,6 +43,8 @@ type HeartbeatMessage struct {
Term int `json:"term"`
}

type ReplyHeartbeatMessage struct{}

type ReplyRequestVoteMessage struct {
VoteGranted bool `json:"voteGranted"`
LeaderID string `json:"leaderID"`
Expand Down Expand Up @@ -68,4 +72,7 @@ type Raft struct {
LeaderJobTicker *time.Ticker
HandleCustomMessage func(context.Context, Message) error
LeaderJob func() error

PrevPeers set.Set[string]
Peers set.Set[string]
}
26 changes: 26 additions & 0 deletions node/pkg/utils/set/set.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package set

type Set[T comparable] struct {
data map[T]struct{}
}

func NewSet[T comparable]() *Set[T] {
return &Set[T]{data: make(map[T]struct{})}
}

func (s *Set[T]) Add(element T) {
s.data[element] = struct{}{}
}

func (s *Set[T]) Remove(element T) {
delete(s.data, element)
}

func (s *Set[T]) Contains(element T) bool {
_, exists := s.data[element]
return exists
}

func (s *Set[T]) Size() int {
return len(s.data)
}
37 changes: 37 additions & 0 deletions node/pkg/utils/tests/set_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package tests

import (
"testing"

"bisonai.com/orakl/node/pkg/utils/set"
"github.com/stretchr/testify/assert"
)

func TestSetBasicOperations(t *testing.T) {
s := set.NewSet[int]()
assert.Equal(t, 0, s.Size())

s.Add(1)
assert.Equal(t, 1, s.Size())
assert.True(t, s.Contains(1))
assert.False(t, s.Contains(2))

s.Remove(1)
assert.Equal(t, 0, s.Size())
assert.False(t, s.Contains(1))
}
nick-bisonai marked this conversation as resolved.
Show resolved Hide resolved

func TestSetAddDuplicates(t *testing.T) {
s := set.NewSet[int]()
s.Add(1)
s.Add(1) // Attempt to add duplicate
assert.Equal(t, 1, s.Size(), "Set should not allow duplicates")
assert.True(t, s.Contains(1))
}

func TestSetOperationsOnEmpty(t *testing.T) {
s := set.NewSet[int]()
assert.False(t, s.Contains(1), "Empty set should not contain any element")
s.Remove(1) // Attempt to remove from empty set
assert.Equal(t, 0, s.Size(), "Size should remain 0 after remove operation on empty set")
}
Loading