Skip to content

Commit

Permalink
temp - raft protcol
Browse files Browse the repository at this point in the history
  • Loading branch information
Tanguyvans committed Dec 15, 2023
1 parent a2aa96b commit 5f038c8
Show file tree
Hide file tree
Showing 2 changed files with 275 additions and 1 deletion.
49 changes: 48 additions & 1 deletion node.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import threading
import json
import os
import time

from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
Expand All @@ -13,6 +14,7 @@
import base64

from protocols.pbft_protocol import PBFTProtocol
from protocols.raft_protocol import RaftProtocol
from protocols.consensus_protocol import ConsensusProtocol
from blockchain import Blockchain
from block import Block
Expand All @@ -28,6 +30,9 @@ def __init__(self, node_id, host, port, consensus_protocol):

if consensus_protocol == "pbft":
self.consensus_protocol = PBFTProtocol(node=self, blockchain=self.blockchain)
elif consensus_protocol == "raft":
self.consensus_protocol = RaftProtocol(node=self, blockchain=self.blockchain)
threading.Thread(target=self.consensus_protocol.run).start()

private_key_path = f"keys/{node_id}_private_key.pem"
public_key_path = f"keys/{node_id}_public_key.pem"
Expand Down Expand Up @@ -145,4 +150,46 @@ def getKeys(self, private_key_path, public_key_path):
f.write(self.public_key.public_bytes(
encoding=serialization.Encoding.PEM,
format=serialization.PublicFormat.SubjectPublicKeyInfo
))
))

if __name__ == "__main__":
port_node_1 = 5010
port_node_2 = 5011
port_node_3 = 5012

node1 = Node(node_id="N0", host="localhost", port=port_node_1, consensus_protocol="raft")
node2 = Node(node_id="N1", host="localhost", port=port_node_2, consensus_protocol="raft")
node3 = Node(node_id="N2", host="localhost", port=port_node_3, consensus_protocol="raft")

node1.add_peer(peer_id="N1", peer_address=("localhost", port_node_2))
node1.add_peer(peer_id="N2", peer_address=("localhost", port_node_3))

node2.add_peer(peer_id="N0", peer_address=("localhost", port_node_1))
node2.add_peer(peer_id="N2", peer_address=("localhost", port_node_3))

node3.add_peer(peer_id="N0", peer_address=("localhost", port_node_1))
node3.add_peer(peer_id="N1", peer_address=("localhost", port_node_2))

threading.Thread(target=node1.start_server).start()
threading.Thread(target=node2.start_server).start()
threading.Thread(target=node3.start_server).start()


time.sleep(20)

print("client sending")
message_from_node1 = {"type": "client_request", "content": "hello"}
node2.send_message(peer_id="N0", message=message_from_node1)

time.sleep(5)
message_from_node1 = {"type": "client_request", "content": "testing"}
node1.send_message(peer_id="N1", message=message_from_node1)

time.sleep(5)
message_from_node1 = {"type": "client_request", "content": "tanguy"}
node2.send_message(peer_id="N0", message=message_from_node1)

time.sleep(5)
print(node1.consensus_protocol.log)
print(node2.consensus_protocol.log)
print(node3.consensus_protocol.log)
227 changes: 227 additions & 0 deletions protocols/raft_protocol.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
import threading
import time
import logging
import random

from block import Block

class RaftProtocol:
def __init__(self, node, blockchain):
self.node = node
self.node_id = self.node.node_id
self.total_nodes = len(self.node.peers)
self.current_term = 0
self.voted_for = None
self.commit_index = 0
self.last_applied = 0
self.state = "follower"
self.votes_received = 0
self.log = []

self.leader_id = None

self.prepare_counts = {}
self.commit_counts = {}

self.blockchain = blockchain

self.reset_election_timeout()

def handle_message(self, message):
message_type = message.get("type")

if message_type == "request_vote":
print("inside the request form")
self.handle_request_vote(message)
elif message_type == "vote_response":
print("Inside the vote response")
self.handle_vote_response(message)
elif message_type == "append_entries":
self.handle_append_entries(message)
elif message_type == "client_request":
self.handle_client_request(message)

def handle_request_vote(self, message):
candidate_id = message.get("candidate_id")
candidate_term = message.get("candidate_term")

if candidate_term < self.current_term:
# Reject the vote if the candidate's term is outdated
return

if candidate_term > self.current_term or (candidate_term == self.current_term and not self.voted_for):
# Vote for the candidate if its term is equal or greater, and the node has not voted yet
self.voted_for = candidate_id
self.current_term = candidate_term
response = {"type": "vote_response", "vote_granted": True, "term": self.current_term}

self.reset_election_timeout()
else:
# Deny the vote
response = {"type": "vote_response", "vote_granted": False, "term": self.current_term}

self.node.send_message(candidate_id, response)

def handle_append_entries(self, message):
self.leader_id = message.get("leader_id")
leader_term = message.get("leader_term")
entries = message.get("entries")

# Update the current term if the leader has a higher term
if leader_term > self.current_term:
self.current_term = leader_term
self.voted_for = None
self.state = "follower"
print(f"Node {self.node_id} updated to follower due to higher term {leader_term}")

# Reset election timeout to avoid becoming a candidate
self.reset_election_timeout()

# Process the log entries from the leader
if entries:
self.log.extend(entries)

# Send a response to the leader
response = {
"type": "append_entries_response",
"success": True, # You can modify this based on your implementation
"term": self.current_term,
}

self.node.send_message(self.leader_id, response)

def handle_client_request(self, message):
print("in", self.state, self.leader_id)
if self.state == "leader":
# If this node is the leader, process the client request
self.process_client_request(message)
else:
if self.leader_id is not None:
print("sent message")
self.node.send_message(self.leader_id, {"type": "client_request", "message": message})

def handle_vote_response(self, message):
vote_granted = message.get("vote_granted")
term = message.get("term")

if term > self.current_term:
# If the responding node has a higher term, update the current term and become a follower
self.current_term = term
self.voted_for = None
self.state = "follower"
self.leader_id = None
print(f"Node {self.node_id} updated to follower due to higher term {term}")
elif vote_granted:
# Increment the votes received if the vote is granted
self.votes_received += 1
self.leader_id = self.node_id
print(f"Node {self.node_id} received a vote. Total votes: {self.votes_received}")

# Check if the node has received a majority of votes
if self.votes_received > self.total_nodes // 2:
print(f"Node {self.node_id} has received a majority of votes. Becoming leader.")
self.state = "leader"

def request_vote(self, candidate_id, candidate_term):
# Implement RequestVote RPC
pass

def append_entries(self, leader_id, leader_term, entries):
# Implement AppendEntries RPC
pass

def start_election(self):
logging.info("Node %s is starting an election for term %s", self.node_id, self.current_term + 1)

self.current_term += 1
self.voted_for = self.node_id
self.votes_received = 1 # Vote for itself

print("Requesting votes from other nodes")

# Prepare the request_vote message
request_vote_message = {
"type": "request_vote",
"candidate_id": self.node_id,
"candidate_term": self.current_term,
}

# Ask for votes from other nodes
self.node.broadcast_message(request_vote_message)

def process_client_request(self, client_request):
# Assuming client_request is a dictionary representing the client's request
entry = {
"term": self.current_term,
"command": client_request.get("command"),
# Add any other relevant information from the client request
}

block = Block(
index=message["index"],
timestamp=message["timestamp"],
data=message["data"],
previous_hash=message["previous_hash"]
)
self.blockchain.add_block(block)

# Replicate the log entry to other nodes
append_entries_message = {
"type": "append_entries",
"leader_id": self.node_id,
"leader_term": self.current_term,
"entries": [entry], # Include the new log entry
}

# Broadcast the append_entries_message to other nodes
self.node.broadcast_message(append_entries_message)

def request_vote_from_node(self, node_id):
logging.info("Node %s is requesting a vote from Node %s for term %s", self.node_id, node_id, self.current_term)

request_vote_message = {
"type": "request_vote",
"candidate_id": self.node_id,
"candidate_term": self.current_term,
}

# Simulate network delay by sleeping for a short time
time.sleep(random.uniform(0, 0.1))

# Send the request for vote to the target node
threading.Thread(target=self.send_message, args=(node_id, request_vote_message)).start()

def send_append_entries(self, node_id, entries):
# Implement sending AppendEntries RPC to another node
pass

def send_heartbeats(self):
heartbeat_message = {
"type": "append_entries",
"leader_id": self.node_id,
"leader_term": self.current_term,
"entries": [], # Heartbeats typically have empty entries
}

self.node.broadcast_message(heartbeat_message)

def run(self):
while True:
current_time = time.time()

if self.state == "follower" and current_time - self.last_heartbeat_time > self.election_timeout:
# Le délai d'élection a expiré, commence une nouvelle élection
self.state = "candidate"
print(f"starting election with node: {self.node_id}")
self.start_election()
elif self.state == "candidate":
# Run election
self.start_election()
time.sleep(random.uniform(1, 5)) # Simulate election timeout
elif self.state == "leader":
self.send_heartbeats()
time.sleep(0.1)

def reset_election_timeout(self):
self.election_timeout = random.uniform(5, 15)
self.last_heartbeat_time = time.time()

0 comments on commit 5f038c8

Please sign in to comment.