diff --git a/node.py b/node.py index 383811f..28f35ce 100644 --- a/node.py +++ b/node.py @@ -2,6 +2,7 @@ import threading import json import os +import time from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes @@ -13,6 +14,7 @@ import base64 from protocols.pbft_protocol import PBFTProtocol +from protocols.raft_protocol import RaftProtocol from protocols.consensus_protocol import ConsensusProtocol from blockchain import Blockchain from block import Block @@ -28,6 +30,9 @@ def __init__(self, node_id, host, port, consensus_protocol): if consensus_protocol == "pbft": self.consensus_protocol = PBFTProtocol(node=self, blockchain=self.blockchain) + elif consensus_protocol == "raft": + self.consensus_protocol = RaftProtocol(node=self, blockchain=self.blockchain) + threading.Thread(target=self.consensus_protocol.run).start() private_key_path = f"keys/{node_id}_private_key.pem" public_key_path = f"keys/{node_id}_public_key.pem" @@ -145,4 +150,46 @@ def getKeys(self, private_key_path, public_key_path): f.write(self.public_key.public_bytes( encoding=serialization.Encoding.PEM, format=serialization.PublicFormat.SubjectPublicKeyInfo - )) \ No newline at end of file + )) + +if __name__ == "__main__": + port_node_1 = 5010 + port_node_2 = 5011 + port_node_3 = 5012 + + node1 = Node(node_id="N0", host="localhost", port=port_node_1, consensus_protocol="raft") + node2 = Node(node_id="N1", host="localhost", port=port_node_2, consensus_protocol="raft") + node3 = Node(node_id="N2", host="localhost", port=port_node_3, consensus_protocol="raft") + + node1.add_peer(peer_id="N1", peer_address=("localhost", port_node_2)) + node1.add_peer(peer_id="N2", peer_address=("localhost", port_node_3)) + + node2.add_peer(peer_id="N0", peer_address=("localhost", port_node_1)) + node2.add_peer(peer_id="N2", peer_address=("localhost", port_node_3)) + + node3.add_peer(peer_id="N0", peer_address=("localhost", port_node_1)) + node3.add_peer(peer_id="N1", peer_address=("localhost", port_node_2)) + + threading.Thread(target=node1.start_server).start() + threading.Thread(target=node2.start_server).start() + threading.Thread(target=node3.start_server).start() + + + time.sleep(20) + + print("client sending") + message_from_node1 = {"type": "client_request", "content": "hello"} + node2.send_message(peer_id="N0", message=message_from_node1) + + time.sleep(5) + message_from_node1 = {"type": "client_request", "content": "testing"} + node1.send_message(peer_id="N1", message=message_from_node1) + + time.sleep(5) + message_from_node1 = {"type": "client_request", "content": "tanguy"} + node2.send_message(peer_id="N0", message=message_from_node1) + + time.sleep(5) + print(node1.consensus_protocol.log) + print(node2.consensus_protocol.log) + print(node3.consensus_protocol.log) \ No newline at end of file diff --git a/protocols/raft_protocol.py b/protocols/raft_protocol.py new file mode 100644 index 0000000..1de870c --- /dev/null +++ b/protocols/raft_protocol.py @@ -0,0 +1,227 @@ +import threading +import time +import logging +import random + +from block import Block + +class RaftProtocol: + def __init__(self, node, blockchain): + self.node = node + self.node_id = self.node.node_id + self.total_nodes = len(self.node.peers) + self.current_term = 0 + self.voted_for = None + self.commit_index = 0 + self.last_applied = 0 + self.state = "follower" + self.votes_received = 0 + self.log = [] + + self.leader_id = None + + self.prepare_counts = {} + self.commit_counts = {} + + self.blockchain = blockchain + + self.reset_election_timeout() + + def handle_message(self, message): + message_type = message.get("type") + + if message_type == "request_vote": + print("inside the request form") + self.handle_request_vote(message) + elif message_type == "vote_response": + print("Inside the vote response") + self.handle_vote_response(message) + elif message_type == "append_entries": + self.handle_append_entries(message) + elif message_type == "client_request": + self.handle_client_request(message) + + def handle_request_vote(self, message): + candidate_id = message.get("candidate_id") + candidate_term = message.get("candidate_term") + + if candidate_term < self.current_term: + # Reject the vote if the candidate's term is outdated + return + + if candidate_term > self.current_term or (candidate_term == self.current_term and not self.voted_for): + # Vote for the candidate if its term is equal or greater, and the node has not voted yet + self.voted_for = candidate_id + self.current_term = candidate_term + response = {"type": "vote_response", "vote_granted": True, "term": self.current_term} + + self.reset_election_timeout() + else: + # Deny the vote + response = {"type": "vote_response", "vote_granted": False, "term": self.current_term} + + self.node.send_message(candidate_id, response) + + def handle_append_entries(self, message): + self.leader_id = message.get("leader_id") + leader_term = message.get("leader_term") + entries = message.get("entries") + + # Update the current term if the leader has a higher term + if leader_term > self.current_term: + self.current_term = leader_term + self.voted_for = None + self.state = "follower" + print(f"Node {self.node_id} updated to follower due to higher term {leader_term}") + + # Reset election timeout to avoid becoming a candidate + self.reset_election_timeout() + + # Process the log entries from the leader + if entries: + self.log.extend(entries) + + # Send a response to the leader + response = { + "type": "append_entries_response", + "success": True, # You can modify this based on your implementation + "term": self.current_term, + } + + self.node.send_message(self.leader_id, response) + + def handle_client_request(self, message): + print("in", self.state, self.leader_id) + if self.state == "leader": + # If this node is the leader, process the client request + self.process_client_request(message) + else: + if self.leader_id is not None: + print("sent message") + self.node.send_message(self.leader_id, {"type": "client_request", "message": message}) + + def handle_vote_response(self, message): + vote_granted = message.get("vote_granted") + term = message.get("term") + + if term > self.current_term: + # If the responding node has a higher term, update the current term and become a follower + self.current_term = term + self.voted_for = None + self.state = "follower" + self.leader_id = None + print(f"Node {self.node_id} updated to follower due to higher term {term}") + elif vote_granted: + # Increment the votes received if the vote is granted + self.votes_received += 1 + self.leader_id = self.node_id + print(f"Node {self.node_id} received a vote. Total votes: {self.votes_received}") + + # Check if the node has received a majority of votes + if self.votes_received > self.total_nodes // 2: + print(f"Node {self.node_id} has received a majority of votes. Becoming leader.") + self.state = "leader" + + def request_vote(self, candidate_id, candidate_term): + # Implement RequestVote RPC + pass + + def append_entries(self, leader_id, leader_term, entries): + # Implement AppendEntries RPC + pass + + def start_election(self): + logging.info("Node %s is starting an election for term %s", self.node_id, self.current_term + 1) + + self.current_term += 1 + self.voted_for = self.node_id + self.votes_received = 1 # Vote for itself + + print("Requesting votes from other nodes") + + # Prepare the request_vote message + request_vote_message = { + "type": "request_vote", + "candidate_id": self.node_id, + "candidate_term": self.current_term, + } + + # Ask for votes from other nodes + self.node.broadcast_message(request_vote_message) + + def process_client_request(self, client_request): + # Assuming client_request is a dictionary representing the client's request + entry = { + "term": self.current_term, + "command": client_request.get("command"), + # Add any other relevant information from the client request + } + + block = Block( + index=message["index"], + timestamp=message["timestamp"], + data=message["data"], + previous_hash=message["previous_hash"] + ) + self.blockchain.add_block(block) + + # Replicate the log entry to other nodes + append_entries_message = { + "type": "append_entries", + "leader_id": self.node_id, + "leader_term": self.current_term, + "entries": [entry], # Include the new log entry + } + + # Broadcast the append_entries_message to other nodes + self.node.broadcast_message(append_entries_message) + + def request_vote_from_node(self, node_id): + logging.info("Node %s is requesting a vote from Node %s for term %s", self.node_id, node_id, self.current_term) + + request_vote_message = { + "type": "request_vote", + "candidate_id": self.node_id, + "candidate_term": self.current_term, + } + + # Simulate network delay by sleeping for a short time + time.sleep(random.uniform(0, 0.1)) + + # Send the request for vote to the target node + threading.Thread(target=self.send_message, args=(node_id, request_vote_message)).start() + + def send_append_entries(self, node_id, entries): + # Implement sending AppendEntries RPC to another node + pass + + def send_heartbeats(self): + heartbeat_message = { + "type": "append_entries", + "leader_id": self.node_id, + "leader_term": self.current_term, + "entries": [], # Heartbeats typically have empty entries + } + + self.node.broadcast_message(heartbeat_message) + + def run(self): + while True: + current_time = time.time() + + if self.state == "follower" and current_time - self.last_heartbeat_time > self.election_timeout: + # Le délai d'élection a expiré, commence une nouvelle élection + self.state = "candidate" + print(f"starting election with node: {self.node_id}") + self.start_election() + elif self.state == "candidate": + # Run election + self.start_election() + time.sleep(random.uniform(1, 5)) # Simulate election timeout + elif self.state == "leader": + self.send_heartbeats() + time.sleep(0.1) + + def reset_election_timeout(self): + self.election_timeout = random.uniform(5, 15) + self.last_heartbeat_time = time.time() \ No newline at end of file