diff --git a/.gitignore b/.gitignore index 3860073..799ee63 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,4 @@ go_sbot_benchmarks /releases /cmds +/logs diff --git a/Cargo.lock b/Cargo.lock index 40da0e5..932052b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1065,6 +1065,7 @@ dependencies = [ [[package]] name = "kuska-handshake" version = "0.2.0" +source = "git+https://github.com/Kuska-ssb/handshake.git#750d214b58a11fdc1a7c0c5d8073f0d99237082f" dependencies = [ "futures", "hex", @@ -1086,7 +1087,8 @@ dependencies = [ [[package]] name = "kuska-ssb" -version = "0.4.2" +version = "0.4.3" +source = "git+https://github.com/Kuska-ssb/ssb?branch=master#512669e4ff5fb698c44fcf04b2cc4a81b3482eb4" dependencies = [ "async-std", "async-stream", diff --git a/solar/Cargo.toml b/solar/Cargo.toml index dbe9ced..0c6e173 100644 --- a/solar/Cargo.toml +++ b/solar/Cargo.toml @@ -17,8 +17,7 @@ futures = "0.3" hex = "0.4.0" jsonrpsee = { version = "0.18.2", features = ["server"] } kuska-sodiumoxide = "0.2.5-0" -#kuska-ssb = { git = "https://github.com/Kuska-ssb/ssb", branch = "master" } -kuska-ssb = { path = "../../ssb" } +kuska-ssb = { git = "https://github.com/Kuska-ssb/ssb", branch = "master" } log = "0.4" once_cell = "1.16" rand = "0.8" diff --git a/solar/src/actors/muxrpc/ebt.rs b/solar/src/actors/muxrpc/ebt.rs new file mode 100644 index 0000000..a7bbfd2 --- /dev/null +++ b/solar/src/actors/muxrpc/ebt.rs @@ -0,0 +1,392 @@ +//! Epidemic Broadcast Tree (EBT) Replication Handler. + +use std::{collections::HashMap, marker::PhantomData}; + +use async_std::io::Write; +use futures::SinkExt; +use kuska_ssb::{ + api::{ + dto::{self, content::SsbId}, + ApiCaller, ApiMethod, + }, + feed::{Feed as MessageKvt, Message}, + rpc, +}; +use log::{trace, warn}; + +use crate::{ + actors::{ + muxrpc::{ReqNo, RpcInput}, + replication::ebt::{EbtEvent, SessionRole}, + }, + broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination, BROKER}, + error::Error, + Result, +}; + +/// EBT replicate handler. Tracks active requests and peer connections. +pub struct EbtReplicateHandler +where + W: Write + Unpin + Send + Sync, +{ + /// EBT-related requests which are known and allowed. + active_requests: HashMap, + phantom: PhantomData, +} + +impl EbtReplicateHandler +where + W: Write + Unpin + Send + Sync, +{ + /// Handle an RPC event. + pub async fn handle( + &mut self, + api: &mut ApiCaller, + op: &RpcInput, + ch_broker: &mut ChBrokerSend, + peer_ssb_id: String, + active_request: Option, + ) -> Result { + trace!(target: "ebt-handler", "Received MUXRPC input: {:?}", op); + + // An outbound EBT replicate request was made before the handler was + // called; add it to the map of active requests. + if let Some(session_req_no) = active_request { + let _ = self + .active_requests + .insert(session_req_no, peer_ssb_id.to_owned()); + } + + match op { + // Handle an incoming MUXRPC request. + RpcInput::Network(req_no, rpc::RecvMsg::RpcRequest(req)) => { + match ApiMethod::from_rpc_body(req) { + Some(ApiMethod::EbtReplicate) => { + self.recv_ebtreplicate(api, *req_no, req, peer_ssb_id).await + } + _ => Ok(false), + } + } + RpcInput::Network(req_no, rpc::RecvMsg::OtherRequest(_type, req)) => { + // Attempt to deserialize bytes into vector clock hashmap. + // If the deserialization is successful, emit a 'received clock' + // event. + if let Ok(clock) = serde_json::from_slice(req) { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedClock( + *req_no, + peer_ssb_id, + clock, + )), + )) + .await?; + } + + Ok(false) + } + // Handle an incoming MUXRPC response. + RpcInput::Network(req_no, rpc::RecvMsg::RpcResponse(_type, res)) => { + self.recv_rpc_response(api, ch_broker, *req_no, res, peer_ssb_id) + .await + } + // Handle an incoming MUXRPC 'cancel stream' response. + RpcInput::Network(req_no, rpc::RecvMsg::CancelStreamResponse()) => { + self.recv_cancelstream(api, *req_no).await + } + // Handle an incoming MUXRPC error response. + RpcInput::Network(req_no, rpc::RecvMsg::ErrorResponse(err)) => { + self.recv_error_response(*req_no, err).await + } + // Handle a broker message. + RpcInput::Message(msg) => match msg { + BrokerMessage::Ebt(EbtEvent::SendClock(req_no, clock)) => { + // Serialize the vector clock as a JSON string. + let json_clock = serde_json::to_string(&clock)?; + api.ebt_clock_res_send(*req_no, &json_clock).await?; + + Ok(false) + } + BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, ssb_id, msg)) => { + // Ensure the message is sent to the correct peer. + if peer_ssb_id == *ssb_id { + let json_msg = msg.to_string(); + api.ebt_feed_res_send(*req_no, &json_msg).await?; + } + + Ok(false) + } + _ => Ok(false), + }, + /* + RpcInput::Message(msg) => { + if let Some(kv_event) = msg.downcast_ref::() { + match kv_event { + // Notification from the key-value store indicating that + // a new message has just been appended to the feed + // identified by `id`. + StoreKvEvent::IdChanged(id) => { + return self.recv_storageevent_idchanged(api, id).await + } + } + } + Ok(false) + } + */ + _ => Ok(false), + } + } +} + +impl EbtReplicateHandler +where + W: Write + Unpin + Send + Sync, +{ + /// Instantiate a new instance of `EbtReplicateHandler`. + pub fn new() -> Self { + Self { + active_requests: HashMap::new(), + phantom: PhantomData, + } + } + + /// Process and respond to an incoming EBT replicate request. + async fn recv_ebtreplicate( + &mut self, + api: &mut ApiCaller, + req_no: ReqNo, + req: &rpc::Body, + peer_ssb_id: String, + ) -> Result { + // Deserialize the args from an incoming EBT replicate request. + let mut args: Vec = serde_json::from_value(req.args.clone())?; + trace!(target: "ebt-handler", "Received replicate request: {:?}", args); + + // Retrieve the `EbtReplicate` args from the array. + let args = args.pop().unwrap(); + + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Validate the EBT request args (`version` and `format`). + // Terminate the stream with an error response if expectations are + // not met. + if !args.version == 3 { + let err_msg = String::from("ebt version != 3"); + api.rpc().send_error(req_no, req.rpc_type, &err_msg).await?; + + return Err(Error::EbtReplicate((req_no, err_msg))); + } else if args.format.as_str() != "classic" { + let err_msg = String::from("ebt format != classic"); + api.rpc().send_error(req_no, req.rpc_type, &err_msg).await?; + + return Err(Error::EbtReplicate((req_no, err_msg))); + } + + trace!(target: "ebt-handler", "Successfully validated replicate request arguments"); + + // Insert the request number and peer public key into the active + // requests map. + self.active_requests.insert(req_no, peer_ssb_id.to_owned()); + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SessionInitiated( + req_no, + peer_ssb_id, + SessionRole::Responder, + )), + )) + .await?; + + Ok(false) + } + + /// Process an incoming MUXRPC response. The response is expected to + /// contain a vector clock or an SSB message. + async fn recv_rpc_response( + &mut self, + _api: &mut ApiCaller, + ch_broker: &mut ChBrokerSend, + req_no: ReqNo, + res: &[u8], + peer_ssb_id: String, + ) -> Result { + // Only handle the response if the associated request number is known + // to us, either because we sent or received the initiating replicate + // request. + if self.active_requests.contains_key(&req_no) { + // The response may be a vector clock (aka. notes) or an SSB message. + // + // Since there is no explicit way to determine which was received, + // we first attempt deserialization of a vector clock and move on + // to attempting message deserialization if that fails. + // + // TODO: Is matching on clock here redundant? + // We are already matching on `OtherRequest` in the handler. + if let Ok(clock) = serde_json::from_slice(res) { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock)), + )) + .await?; + } else { + // First try to deserialize the response into a message value. + // If that fails, try to deserialize into a message KVT and then + // convert that into a message value. Return an error if that fails. + // This approach allows us to handle the unlikely event that + // messages are sent as KVTs and not simply values. + // + // Validation of the message signature and fields is also performed + // as part of the call to `from_slice`. + let msg = match Message::from_slice(res) { + Ok(msg) => msg, + Err(_) => MessageKvt::from_slice(res)?.into_message()?, + }; + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::ReceivedMessage(msg)), + )) + .await?; + } + } + + Ok(false) + } + + /// Close the stream and remove the associated request from the map of + /// active requests. + async fn recv_cancelstream(&mut self, api: &mut ApiCaller, req_no: ReqNo) -> Result { + api.rpc().send_stream_eof(-req_no).await?; + self.active_requests.remove(&req_no); + Ok(true) + } + + /// Report a MUXRPC error and remove the associated request from the map of + /// active requests. + async fn recv_error_response(&mut self, req_no: ReqNo, err_msg: &str) -> Result { + warn!("Received MUXRPC error response: {}", err_msg); + + self.active_requests.remove(&req_no); + + Err(Error::EbtReplicate((req_no, err_msg.to_string()))) + } + + /* + /// Extract blob references from post-type messages. + fn extract_blob_refs(&mut self, msg: &Message) -> Vec { + let mut refs = Vec::new(); + + let msg = serde_json::from_value(msg.content().clone()); + + if let Ok(dto::content::TypedMessage::Post { text, .. }) = msg { + for cap in BLOB_REGEX.captures_iter(&text) { + let key = cap.get(0).unwrap().as_str().to_owned(); + refs.push(key); + } + } + + refs + } + + /// Process an incoming MUXRPC response. The response is expected to + /// contain an SSB message. + async fn recv_rpc_response( + &mut self, + _api: &mut ApiCaller, + ch_broker: &mut ChBrokerSend, + req_no: i32, + res: &[u8], + ) -> Result { + // Only handle the response if we made the request. + if self.peers.contains_key(&req_no) { + // First try to deserialize the response into a message value. + // If that fails, try to deserialize into a message KVT and then + // convert that into a message value. Return an error if that fails. + // This approach allows us to handle the unlikely event that + // messages are sent as KVTs and not simply values. + // + // Validation of the message signature and fields is also performed + // as part of the call to `from_slice`. + let msg = match Message::from_slice(res) { + Ok(msg) => msg, + Err(_) => MessageKvt::from_slice(res)?.into_message()?, + }; + + // Retrieve the sequence number of the most recent message for + // the peer that authored the received message. + let last_seq = KV_STORE + .read() + .await + .get_latest_seq(&msg.author().to_string())? + .unwrap_or(0); + + // Validate the sequence number. + if msg.sequence() == last_seq + 1 { + // Append the message to the feed. + KV_STORE.write().await.append_feed(msg.clone()).await?; + + info!( + "received msg number {} from {}", + msg.sequence(), + msg.author() + ); + + // Extract blob references from the received message and + // request those blobs if they are not already in the local + // blobstore. + for key in self.extract_blob_refs(&msg) { + if !BLOB_STORE.read().await.exists(&key) { + let event = super::blobs_get::RpcBlobsGetEvent::Get(dto::BlobsGetIn { + key, + size: None, + max: None, + }); + let broker_msg = BrokerEvent::new(Destination::Broadcast, event); + ch_broker.send(broker_msg).await.unwrap(); + } + } + } else { + warn!( + "received out-of-order msg from {}; recv: {} db: {}", + &msg.author().to_string(), + msg.sequence(), + last_seq + ); + } + + Ok(true) + } else { + Ok(false) + } + } + */ + + /* + /// Respond to a key-value store state change for the given peer. + /// This is triggered when a new message is appended to the local feed. + /// Remove the peer from the list of active streams, send the requested + /// messages from the local feed to the peer and then reinsert the public + /// key of the peer to the list of active streams. + async fn recv_storageevent_idchanged( + &mut self, + api: &mut ApiCaller, + id: &str, + ) -> Result { + // Attempt to remove the peer from the list of active streams. + if let Some(mut req) = self.reqs.remove(id) { + // Send local messages to the peer. + self.send_history(api, &mut req).await?; + // Reinsert the peer into the list of active streams. + self.reqs.insert(id.to_string(), req); + Ok(true) + } else { + Ok(false) + } + } + */ +} diff --git a/solar/src/actors/muxrpc/history_stream.rs b/solar/src/actors/muxrpc/history_stream.rs index bccef7d..466cd4f 100644 --- a/solar/src/actors/muxrpc/history_stream.rs +++ b/solar/src/actors/muxrpc/history_stream.rs @@ -143,18 +143,14 @@ where // Loop through the public keys of all peers in the replication list. for peer_pk in PEERS_TO_REPLICATE.get().unwrap().keys() { - // Prefix `@` to public key. - let peer_public_key = format!("@{}", peer_pk); - // Instantiate the history stream request args for the given peer. // The `live` arg means: keep the connection open after initial // replication. - let mut args = - dto::CreateHistoryStreamIn::new(peer_public_key.to_owned()).live(true); + let mut args = dto::CreateHistoryStreamIn::new(peer_pk.to_owned()).live(true); // Retrieve the sequence number of the most recent message for // this peer from the local key-value store. - if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(&peer_public_key)? { + if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(peer_pk)? { // Use the latest sequence number to update the request args. args = args.after_seq(last_seq); } @@ -164,11 +160,11 @@ where // Insert the history stream request ID and peer ID // (public key) into the peers hash map. - self.peers.insert(id, peer_public_key.to_owned()); + self.peers.insert(id, peer_pk.to_owned()); info!( "requesting messages authored by peer {} after {:?}", - peer_public_key, args.seq + peer_pk, args.seq ); } diff --git a/solar/src/actors/muxrpc/mod.rs b/solar/src/actors/muxrpc/mod.rs index 37a9438..cd6f5e3 100644 --- a/solar/src/actors/muxrpc/mod.rs +++ b/solar/src/actors/muxrpc/mod.rs @@ -1,12 +1,17 @@ mod blobs_get; mod blobs_wants; +mod ebt; mod get; mod handler; mod history_stream; mod whoami; +/// The unique identifier of a MUXRPC request. +pub type ReqNo = i32; + pub use blobs_get::{BlobsGetHandler, RpcBlobsGetEvent}; pub use blobs_wants::{BlobsWantsHandler, RpcBlobsWantsEvent}; +pub use ebt::EbtReplicateHandler; pub use get::GetHandler; pub use handler::{RpcHandler, RpcInput}; pub use history_stream::HistoryStreamHandler; diff --git a/solar/src/actors/network/connection.rs b/solar/src/actors/network/connection.rs index 3c176e4..17911c6 100644 --- a/solar/src/actors/network/connection.rs +++ b/solar/src/actors/network/connection.rs @@ -22,6 +22,7 @@ use crate::{ }; /// Encapsulate inbound and outbound TCP connections. +#[derive(Debug, Clone)] pub enum TcpConnection { /// An outbound TCP connection. Dial { @@ -34,6 +35,31 @@ pub enum TcpConnection { Listen { stream: TcpStream }, } +impl Display for TcpConnection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + TcpConnection::Dial { addr, public_key } => { + let public_key_as_id = public_key.to_ssb_id(); + let peer_public_key = if public_key_as_id.starts_with('@') { + public_key_as_id + } else { + format!("@{}", public_key_as_id) + }; + + write!(f, "", peer_public_key, addr) + } + TcpConnection::Listen { stream } => { + let peer_addr = match stream.peer_addr() { + Ok(addr) => addr.to_string(), + _ => "_".to_string(), + }; + + write!(f, "", peer_addr) + } + } + } +} + /// Connection data. #[derive(Debug, Default, Clone)] pub struct ConnectionData { @@ -88,8 +114,8 @@ impl ConnectionData { } pub async fn actor( - identity: OwnedIdentity, connection: TcpConnection, + identity: OwnedIdentity, selective_replication: bool, ) -> Result<()> { // Register a new connection with the connection manager. @@ -110,11 +136,11 @@ pub async fn actor( connection_data.peer_addr = Some(addr.to_owned()); connection_data.peer_public_key = Some(public_key); - // Send 'connecting' connection event message via the broker. + // Send 'staging' connection event message via the broker. ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Connection(ConnectionEvent::Connecting( + BrokerMessage::Connection(ConnectionEvent::Staging( connection_data, identity, selective_replication, diff --git a/solar/src/actors/network/connection_manager.rs b/solar/src/actors/network/connection_manager.rs index 8d38e2b..b898453 100644 --- a/solar/src/actors/network/connection_manager.rs +++ b/solar/src/actors/network/connection_manager.rs @@ -9,7 +9,7 @@ //! Connection data, including the underlying TCP stream, is passed around with //! each event variant - allowing the handlers to take ownership of the data. -use std::{collections::HashSet, net::Shutdown}; +use std::net::Shutdown; use async_std::{ net::TcpStream, @@ -27,7 +27,13 @@ use log::{debug, error, info, trace}; use once_cell::sync::Lazy; use crate::{ - actors::network::connection::ConnectionData, + actors::{ + network::{ + connection, + connection::{ConnectionData, TcpConnection}, + }, + replication::ebt::EbtEvent, + }, broker::{ ActorEndpoint, Broker, BrokerEvent, BrokerMessage, ChBrokerSend, Destination, BROKER, }, @@ -40,13 +46,25 @@ use crate::{ pub static CONNECTION_MANAGER: Lazy>> = Lazy::new(|| Arc::new(RwLock::new(ConnectionManager::new()))); +type EnableSelectiveReplication = bool; +type IsListener = bool; + /// Connection events with associated connection data. #[derive(Debug, Clone)] pub enum ConnectionEvent { - Connecting(ConnectionData, OwnedIdentity, bool), - Handshaking(ConnectionData, OwnedIdentity, bool, bool), - Connected(ConnectionData, bool), - Replicating(ConnectionData, bool), + LanDiscovery(TcpConnection, OwnedIdentity, EnableSelectiveReplication), + Staging(ConnectionData, OwnedIdentity, EnableSelectiveReplication), + Connecting(ConnectionData, OwnedIdentity, EnableSelectiveReplication), + Handshaking( + ConnectionData, + OwnedIdentity, + EnableSelectiveReplication, + IsListener, + ), + Connected(ConnectionData, EnableSelectiveReplication, IsListener), + Replicate(ConnectionData, EnableSelectiveReplication, IsListener), + ReplicatingEbt(ConnectionData, IsListener), + ReplicatingClassic(ConnectionData), Disconnecting(ConnectionData), Disconnected(ConnectionData), Error(ConnectionData, String), @@ -56,16 +74,16 @@ pub enum ConnectionEvent { #[derive(Debug)] pub struct ConnectionManager { /// The public keys of all peers to whom we are currently connected. - pub connected_peers: HashSet, + pub connected_peers: Vec<(ed25519::PublicKey, usize)>, + /// The public keys of all peers to whom we are currently attempting a + /// connection + pub connecting_peers: Vec<(ed25519::PublicKey, usize)>, /// Idle connection timeout limit. pub idle_timeout_limit: u8, /// ID number of the most recently registered connection. last_connection_id: usize, /// Message loop handle. msgloop: Option>, - // TODO: keep a list of active connections. - // Then we can query total active connections using `.len()`. - //active_connections: HashSet, } impl ConnectionManager { @@ -75,10 +93,11 @@ impl ConnectionManager { let msgloop = task::spawn(Self::msg_loop()); Self { + connected_peers: Vec::new(), + connecting_peers: Vec::new(), + idle_timeout_limit: 30, last_connection_id: 0, msgloop: Some(msgloop), - idle_timeout_limit: 30, - connected_peers: HashSet::new(), } } @@ -91,21 +110,60 @@ impl ConnectionManager { /// Returns `true` if the peer is in the list, otherwise a `false` value is /// returned. pub fn contains_connected_peer(&self, peer_id: &ed25519::PublicKey) -> bool { - self.connected_peers.contains(peer_id) + self.connected_peers + .iter() + .any(|(connected_peer_id, _)| connected_peer_id == peer_id) } /// Add a peer to the list of connected peers. /// Returns `true` if the peer was not already in the list, otherwise a /// `false` value is returned. - fn insert_connected_peer(&mut self, peer_id: ed25519::PublicKey) -> bool { - self.connected_peers.insert(peer_id) + fn insert_connected_peer(&mut self, peer_id: ed25519::PublicKey, connection_id: usize) { + self.connected_peers.push((peer_id, connection_id)); } /// Remove a peer from the list of connected peers. /// Returns `true` if the peer was in the list, otherwise a `false` value /// is returned. - fn remove_connected_peer(&mut self, peer_id: ed25519::PublicKey) -> bool { - self.connected_peers.remove(&peer_id) + fn remove_connected_peer(&mut self, peer_id: ed25519::PublicKey, connection_id: usize) { + if let Some(index) = self + .connected_peers + .iter() + .position(|&entry| entry == (peer_id, connection_id)) + { + // Ignore the return value. + let _ = self.connected_peers.remove(index); + } + } + + /// Query whether the list of connecting peers contains the given peer. + /// Returns `true` if the peer is in the list, otherwise a `false` value is + /// returned. + pub fn contains_connecting_peer(&self, peer_id: &ed25519::PublicKey) -> bool { + self.connecting_peers + .iter() + .any(|(connecting_peer_id, _)| connecting_peer_id == peer_id) + } + + /// Add a peer to the list of connecting peers. + /// Returns `true` if the peer was not already in the list, otherwise a + /// `false` value is returned. + fn insert_connecting_peer(&mut self, peer_id: ed25519::PublicKey, connection_id: usize) { + self.connecting_peers.push((peer_id, connection_id)) + } + + /// Remove a peer from the list of connecting peers. + /// Returns `true` if the peer was in the list, otherwise a `false` value + /// is returned. + fn remove_connecting_peer(&mut self, peer_id: ed25519::PublicKey, connection_id: usize) { + if let Some(index) = self + .connecting_peers + .iter() + .position(|&entry| entry == (peer_id, connection_id)) + { + // Ignore the return value. + let _ = self.connecting_peers.remove(index); + } } /// Return a handle for the connection event message loop. @@ -123,35 +181,94 @@ impl ConnectionManager { self.last_connection_id } - async fn handle_connecting( - mut connection_data: ConnectionData, + /// Handle a LAN discovery event. + async fn handle_lan_discovery( + tcp_connection: TcpConnection, identity: OwnedIdentity, - selective_replication: bool, + selective_replication: EnableSelectiveReplication, + ) -> Result<()> { + // First ensure there is no active or in-progress connection + // with the given peer. + if let TcpConnection::Dial { public_key, .. } = &tcp_connection { + if !CONNECTION_MANAGER + .read() + .await + .contains_connected_peer(public_key) + && !CONNECTION_MANAGER + .read() + .await + .contains_connecting_peer(public_key) + { + // Spawn a connection actor with the given connection parameters. + // + // The connection actor is responsible for initiating the + // outbound TCP connection. + Broker::spawn(connection::actor( + tcp_connection, + identity, + selective_replication, + )); + } + } + + Ok(()) + } + + /// Handle a staging event. + async fn handle_staging( + connection_data: ConnectionData, + identity: OwnedIdentity, + selective_replication: EnableSelectiveReplication, mut ch_broker: ChBrokerSend, ) -> Result<()> { - // Check if we are already connected to the selected peer. - // If yes, remove this connection. if let Some(peer_public_key) = &connection_data.peer_public_key { - if CONNECTION_MANAGER + // Only proceed with a connection attempt if there is not an + // active connection or connection attempt in-progress. + if !CONNECTION_MANAGER .read() .await .contains_connected_peer(peer_public_key) + && !CONNECTION_MANAGER + .read() + .await + .contains_connecting_peer(peer_public_key) { - info!("peer {} is already connected", peer_public_key.to_ssb_id()); + // If no connection or connection attempt exists, initiate + // the connection process. - // Since we already have an active connection to this peer, - // we can disconnect the redundant connection. - - // Send 'disconnecting' connection event message via the broker. + // Send 'connecting' connection event message via the broker. ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), + BrokerMessage::Connection(ConnectionEvent::Connecting( + connection_data, + identity, + selective_replication, + )), )) .await?; - } else if let Some(addr) = &connection_data.peer_addr { + } + } + + Ok(()) + } + + /// Handle a connecting event. + async fn handle_connecting( + mut connection_data: ConnectionData, + identity: OwnedIdentity, + selective_replication: EnableSelectiveReplication, + mut ch_broker: ChBrokerSend, + ) -> Result<()> { + if let Some(peer_public_key) = &connection_data.peer_public_key { + if let Some(peer_addr) = &connection_data.peer_addr { + CONNECTION_MANAGER + .write() + .await + .insert_connecting_peer(*peer_public_key, connection_data.id); + // Attempt connection. - if let Ok(stream) = TcpStream::connect(&addr).await { + if let Ok(stream) = TcpStream::connect(&peer_addr).await { connection_data.stream = Some(stream); // Send 'handshaking' connection event message via the broker. @@ -166,6 +283,21 @@ impl ConnectionManager { )), )) .await?; + } else { + // If the connection attempt fails, send 'disconnecting' + // connection event message via the broker. + // + // This removes the connection from the list of in-progress + // attempts, ensuring that future connection attempts to + // this peer are not blocked. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::Disconnecting( + connection_data, + )), + )) + .await?; } } } @@ -173,11 +305,12 @@ impl ConnectionManager { Ok(()) } + /// Handle a handshaking event. async fn handle_handshaking( mut connection_data: ConnectionData, identity: OwnedIdentity, - selective_replication: bool, - listener: bool, + selective_replication: EnableSelectiveReplication, + listener: IsListener, mut ch_broker: ChBrokerSend, ) -> Result<()> { // Parse the public key and secret key from the SSB identity. @@ -210,6 +343,7 @@ impl ConnectionManager { BrokerMessage::Connection(ConnectionEvent::Connected( connection_data, selective_replication, + listener, )), )) .await?; @@ -217,9 +351,11 @@ impl ConnectionManager { Ok(()) } + /// Handle a connected event. async fn handle_connected( connection_data: ConnectionData, - selective_replication: bool, + selective_replication: EnableSelectiveReplication, + listener: IsListener, mut ch_broker: ChBrokerSend, ) -> Result<()> { // Add the peer to the list of connected peers. @@ -229,16 +365,22 @@ impl ConnectionManager { CONNECTION_MANAGER .write() .await - .insert_connected_peer(public_key); + .remove_connecting_peer(public_key, connection_data.id); + + CONNECTION_MANAGER + .write() + .await + .insert_connected_peer(public_key, connection_data.id); } - // Send 'replicating' connection event message via the broker. + // Send 'replicate' connection event message via the broker. ch_broker .send(BrokerEvent::new( Destination::Broadcast, - BrokerMessage::Connection(ConnectionEvent::Replicating( + BrokerMessage::Connection(ConnectionEvent::Replicate( connection_data, selective_replication, + listener, )), )) .await?; @@ -246,9 +388,12 @@ impl ConnectionManager { Ok(()) } - async fn handle_replicating( + /// Handle a replicate event. + async fn handle_replicate( connection_data: ConnectionData, - selective_replication: bool, + selective_replication: EnableSelectiveReplication, + listener: IsListener, + mut ch_broker: ChBrokerSend, ) -> Result<()> { let peer_public_key = connection_data .peer_public_key @@ -268,18 +413,73 @@ impl ConnectionManager { "peer {} is not in replication list and selective replication is enabled; dropping connection", peer_public_key ); + + // Send 'disconnecting' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::Disconnecting(connection_data)), + )) + .await?; + } else { + // Send 'replicating ebt' connection event message via the broker. + // + // If the EBT replication attempt is unsuccessful, the EBT replication + // actor will emit a `ConnectionEvent::ReplicatingClassic` message. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::ReplicatingEbt( + connection_data, + listener, + )), + )) + .await?; } + Ok(()) + } + + /// Handle a classic replication (`create_history_stream`) event. + async fn handle_replicating_classic(connection_data: ConnectionData) -> Result<()> { debug!("Attempting classic replication with peer..."); + // Spawn the classic replication actor and await the result. Broker::spawn(crate::actors::replication::classic::actor(connection_data)).await; - // TODO: Attempt EBT replication, using classic replication - // as fallback. + Ok(()) + } + + /// Handle an EBT replication event. + async fn handle_replicating_ebt( + connection_data: ConnectionData, + listener: IsListener, + mut ch_broker: ChBrokerSend, + ) -> Result<()> { + debug!("Attempting EBT replication with peer..."); + + // The listener (aka. responder or server) waits for an EBT session to + // be requested by the client (aka. requestor or dialer). + if listener { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::WaitForSessionRequest(connection_data)), + )) + .await?; + } else { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::RequestSession(connection_data)), + )) + .await?; + } Ok(()) } + /// Handle a disconnecting event. async fn handle_disconnecting( connection_data: ConnectionData, mut ch_broker: ChBrokerSend, @@ -299,11 +499,28 @@ impl ConnectionManager { Ok(()) } + + /// Handle a disconnected event. + async fn handle_disconnected(connection_data: ConnectionData) -> Result<()> { + if let Some(public_key) = connection_data.peer_public_key { + CONNECTION_MANAGER + .write() + .await + .remove_connected_peer(public_key, connection_data.id); + + CONNECTION_MANAGER + .write() + .await + .remove_connecting_peer(public_key, connection_data.id); + } + + Ok(()) + } + /// Start the connection manager event loop. /// /// Listen for connection event messages via the broker and update /// connection state accordingly. - // TODO: Can we return a Result here? async fn msg_loop() { // Register the connection manager actor with the broker. let ActorEndpoint { @@ -335,6 +552,27 @@ impl ConnectionManager { msg = broker_msg_ch.next().fuse() => { if let Some(BrokerMessage::Connection(event)) = msg { match event { + ConnectionEvent::LanDiscovery(tcp_connection, identity, selective_replication) => { + if let Err(err) = ConnectionManager::handle_lan_discovery( + tcp_connection, + identity, + selective_replication, + ).await { + error!("Error while handling 'lan discovery' event: {}", err) + } + } + ConnectionEvent::Staging(connection_data, identity, selective_replication,) => { + trace!(target: "connection-manager", "Staging: {connection_data}"); + + if let Err(err) = ConnectionManager::handle_staging( + connection_data, + identity, + selective_replication, + ch_broker.clone() + ).await { + error!("Error while handling 'staging' event: {}", err) + } + } ConnectionEvent::Connecting(connection_data, identity, selective_replication,) => { trace!(target: "connection-manager", "Connecting: {connection_data}"); @@ -344,7 +582,7 @@ impl ConnectionManager { selective_replication, ch_broker.clone() ).await { - error!("Error while handling connecting event: {}", err) + error!("Error while handling 'connecting' event: {}", err) } } ConnectionEvent::Handshaking(connection_data, identity, selective_replication, listener) => { @@ -357,28 +595,51 @@ impl ConnectionManager { listener, ch_broker.clone() ).await { - error!("Error while handling handshaking event: {}", err) + error!("Error while handling 'handshaking' event: {}", err) } } - ConnectionEvent::Connected(connection_data, selective_replication) => { + ConnectionEvent::Connected(connection_data, selective_replication, listener) => { trace!(target: "connection-manager", "Connected: {connection_data}"); if let Err(err) = ConnectionManager::handle_connected( connection_data, selective_replication, + listener, ch_broker.clone() ).await { - error!("Error while handling connected event: {}", err) + error!("Error while handling 'connected' event: {}", err) } } - ConnectionEvent::Replicating(connection_data, selective_replication) => { - trace!(target: "connection-manager", "Replicating: {connection_data}"); + ConnectionEvent::Replicate(connection_data, selective_replication, listener) => { + trace!(target: "connection-manager", "Replicate: {connection_data}"); - if let Err(err) = ConnectionManager::handle_replicating( + if let Err(err) = ConnectionManager::handle_replicate( connection_data, selective_replication, + listener, + ch_broker.clone() + ).await { + error!("Error while handling 'replicate' event: {}", err) + } + } + ConnectionEvent::ReplicatingClassic(connection_data) => { + trace!(target: "connection-manager", "Replicating classic: {connection_data}"); + + if let Err(err) = ConnectionManager::handle_replicating_classic( + connection_data, + ).await { + error!("Error while handling 'replicating classic' event: {}", err) + } + } + ConnectionEvent::ReplicatingEbt(connection_data, listener) => { + trace!(target: "connection-manager", "Replicating EBT: {connection_data}"); + + if let Err(err) = ConnectionManager::handle_replicating_ebt( + connection_data, + listener, + ch_broker.clone() ).await { - error!("Error while handling replicating event: {}", err) + error!("Error while handling 'replicating EBT' event: {}", err) } } ConnectionEvent::Disconnecting(connection_data) => { @@ -388,29 +649,26 @@ impl ConnectionManager { connection_data, ch_broker.clone() ).await { - error!("Error while handling disconnecting event: {}", err) + error!("Error while handling 'disconnecting' event: {}", err) } } - ConnectionEvent::Disconnected(data) => { - trace!(target: "connection-manager", "Disconnected: {data}"); - - // Remove the peer from the list of connected peers. - if let Some(public_key) = data.peer_public_key { - CONNECTION_MANAGER - .write() - .await - .remove_connected_peer(public_key); + ConnectionEvent::Disconnected(connection_data) => { + trace!(target: "connection-manager", "Disconnected: {connection_data}"); + + if let Err(err) = ConnectionManager::handle_disconnected( + connection_data, + ).await { + error!("Error while handling 'disconnected' event: {}", err) } } - ConnectionEvent::Error(data, err) => { - trace!(target: "connection-manager", "Error: {data}: {err}"); - - // Remove the peer from the list of connected peers. - if let Some(public_key) = data.peer_public_key { - CONNECTION_MANAGER - .write() - .await - .remove_connected_peer(public_key); + ConnectionEvent::Error(connection_data, err) => { + trace!(target: "connection-manager", "Error: {connection_data}: {err}"); + error!("Connection error: {connection_data}: {err}"); + + if let Err(err) = ConnectionManager::handle_disconnected( + connection_data, + ).await { + error!("Error while handling 'disconnected' event: {}", err) } } } @@ -484,44 +742,52 @@ mod test { async fn test_connected_peers() -> Result<()> { let connection_manager = instantiate_new_connection_manager(); - // Create a unique keypair to sign messages. - let keypair = SecretConfig::create().to_owned_identity().unwrap(); + // Create two unique keypairs to sign messages. + let keypair_1 = SecretConfig::create().to_owned_identity().unwrap(); + let keypair_2 = SecretConfig::create().to_owned_identity().unwrap(); // Insert a new connected peer. - let insert_result = connection_manager + connection_manager .write() .await - .insert_connected_peer(keypair.pk); - assert_eq!(insert_result, true); + .insert_connected_peer(keypair_1.pk, 1); // Query the list of connected peers. let query_result = connection_manager .read() .await - .contains_connected_peer(&keypair.pk); + .contains_connected_peer(&keypair_1.pk); assert_eq!(query_result, true); - // Attempt to insert the same peer ID for a second time. - let reinsert_result = connection_manager + // Insert the a second connected peer. + connection_manager + .write() + .await + .insert_connected_peer(keypair_2.pk, 2); + + // Count the active connections. + let connections = connection_manager.read().await._count_connections(); + assert_eq!(connections, 2); + + // Remove the first peer from the list of connected peers. + connection_manager .write() .await - .insert_connected_peer(keypair.pk); - assert_eq!(reinsert_result, false); + .remove_connected_peer(keypair_1.pk, 1); // Count the active connections. let connections = connection_manager.read().await._count_connections(); assert_eq!(connections, 1); - // Remove a peer from the list of connected peers. - let remove_result = connection_manager + // Remove the second peer from the list of connected peers. + connection_manager .write() .await - .remove_connected_peer(keypair.pk); - assert_eq!(remove_result, true); + .remove_connected_peer(keypair_2.pk, 2); // Count the active connections. - let conns = connection_manager.read().await._count_connections(); - assert_eq!(conns, 0); + let connections = connection_manager.read().await._count_connections(); + assert_eq!(connections, 0); Ok(()) } diff --git a/solar/src/actors/network/connection_scheduler.rs b/solar/src/actors/network/connection_scheduler.rs index 11bb6d4..3b5fc02 100644 --- a/solar/src/actors/network/connection_scheduler.rs +++ b/solar/src/actors/network/connection_scheduler.rs @@ -207,7 +207,7 @@ pub async fn actor(peers: Vec<(PublicKey, String)>) -> Result<()> { msg = broker_msg_ch.next().fuse() => { if let Some(BrokerMessage::Connection(event)) = msg { match event { - ConnectionEvent::Replicating(data, _selective_replication) => { + ConnectionEvent::Replicate(data, _selective_replication, _listener) => { // This connection was "successful". // Push the peer to the back of the eager queue. if let Some(public_key) = data.peer_public_key { diff --git a/solar/src/actors/network/dialer.rs b/solar/src/actors/network/dialer.rs index b1c9cd3..b063115 100644 --- a/solar/src/actors/network/dialer.rs +++ b/solar/src/actors/network/dialer.rs @@ -4,11 +4,11 @@ //! message bus. Each request includes the public key and address of the peer //! to be dialed. Upon receiving a request, the dialer spawns the connection actor. use futures::{select_biased, FutureExt, StreamExt}; +use kuska_ssb::keystore::OwnedIdentity; use crate::{ actors::network::{connection, connection::TcpConnection, connection_scheduler::DialRequest}, broker::{ActorEndpoint, Broker, BrokerMessage, BROKER}, - config::SECRET_CONFIG, Result, }; @@ -18,7 +18,7 @@ use crate::{ /// for dial requests from the scheduler. Once received, use the attached /// public key and outbound address to dial the peer by spawning the connection /// actor. -pub async fn actor(selective_replication: bool) -> Result<()> { +pub async fn actor(owned_identity: OwnedIdentity, selective_replication: bool) -> Result<()> { // Register the connection dialer actor with the broker. let ActorEndpoint { ch_terminate, @@ -46,11 +46,11 @@ pub async fn actor(selective_replication: bool) -> Result<()> { msg = broker_msg_ch.next().fuse() => { if let Some(BrokerMessage::Dial(DialRequest((public_key, addr)))) = msg { Broker::spawn(connection::actor( - SECRET_CONFIG.get().unwrap().to_owned_identity()?, TcpConnection::Dial { addr: addr.to_string(), public_key, }, + owned_identity.clone(), selective_replication, )); } diff --git a/solar/src/actors/network/lan_discovery.rs b/solar/src/actors/network/lan_discovery.rs index f62999c..2b3c67b 100644 --- a/solar/src/actors/network/lan_discovery.rs +++ b/solar/src/actors/network/lan_discovery.rs @@ -3,12 +3,12 @@ use std::time::Duration; use async_std::{net::UdpSocket, task}; -use futures::{select_biased, FutureExt}; +use futures::{select_biased, FutureExt, SinkExt}; use kuska_ssb::{discovery::LanBroadcast, keystore::OwnedIdentity}; -use log::warn; +use log::{trace, warn}; use crate::{ - actors::network::{connection, connection::TcpConnection}, + actors::network::{connection::TcpConnection, connection_manager::ConnectionEvent}, broker::*, Result, }; @@ -22,9 +22,10 @@ pub async fn actor( ) -> Result<()> { // Instantiate a new LAN broadcaster with the given public key and port. let broadcaster = LanBroadcast::new(&server_id.pk, rpc_port).await?; + trace!(target: "lan-discovery", "Initiated LAN broadcaster: {:?}", broadcaster); // Register the "lan_discovery" actor endpoint with the broker. - let broker = BROKER.lock().await.register("lan_discovery", false).await?; + let broker = BROKER.lock().await.register("lan-discovery", false).await?; // Fuse internal termination channel with external channel. // This allows termination of the peer loop to be initiated from outside // this function. @@ -58,8 +59,8 @@ pub async fn actor( } } } - // Sleep for 5 seconds. - _ = task::sleep(Duration::from_secs(5)).fuse() => {} + // Sleep for 15 seconds. + _ = task::sleep(Duration::from_secs(15)).fuse() => {} } // Drop the socket connection. @@ -91,12 +92,20 @@ async fn process_broadcast( if let Some((server, port, public_key)) = LanBroadcast::parse(&msg) { let addr = format!("{server}:{port}"); - // Spawn a connection actor with the given connection parameters. - Broker::spawn(connection::actor( - server_id.clone(), - TcpConnection::Dial { addr, public_key }, - selective_replication, - )); + // Create a sender channel to the broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Send 'lan discovery' connection event message via the broker. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::LanDiscovery( + TcpConnection::Dial { addr, public_key }, + server_id.to_owned(), + selective_replication, + )), + )) + .await?; } else { warn!("failed to parse broadcast {}", msg); } diff --git a/solar/src/actors/network/tcp_server.rs b/solar/src/actors/network/tcp_server.rs index ceba8e9..0cb6a23 100644 --- a/solar/src/actors/network/tcp_server.rs +++ b/solar/src/actors/network/tcp_server.rs @@ -34,8 +34,8 @@ pub async fn actor( debug!("Received inbound TCP connection"); Broker::spawn( connection::actor( - server_id.clone(), TcpConnection::Listen { stream }, + server_id.clone(), selective_replication ) ); diff --git a/solar/src/actors/replication/classic.rs b/solar/src/actors/replication/classic.rs index b0d96cd..605d946 100644 --- a/solar/src/actors/replication/classic.rs +++ b/solar/src/actors/replication/classic.rs @@ -31,7 +31,6 @@ use crate::{ Result, }; -//pub async fn actor( pub async fn actor(connection_data: ConnectionData) -> Result<()> { let mut ch_broker = BROKER.lock().await.create_sender(); diff --git a/solar/src/actors/replication/ebt/clock.rs b/solar/src/actors/replication/ebt/clock.rs new file mode 100644 index 0000000..54ec7ad --- /dev/null +++ b/solar/src/actors/replication/ebt/clock.rs @@ -0,0 +1,111 @@ +use std::{collections::HashMap, convert::TryInto}; + +use kuska_ssb::api::dto::content::SsbId; + +use crate::Result; + +/// The encoded vector clock value. +pub type EncodedClockValue = i64; + +/// A vector clock which maps an SSB ID to an encoded vector clock value. +pub type VectorClock = HashMap; + +/// Decode a value from a control message (aka. note), returning the values +/// of the replicate flag, receive flag and sequence. +/// +/// If the replicate flag is `false`, the peer does not wish to replicate +/// messages for the referenced feed. +/// +/// If the replicate flag is `true`, values will be returned for the receive +/// flag and sequence. +/// +/// The sequence refers to a sequence number of the referenced feed. +pub fn decode(value: EncodedClockValue) -> Result<(bool, Option, Option)> { + let (replicate_flag, receive_flag, sequence) = if value < 0 { + // Replicate flag is `false`. + // Peer does not wish to receive messages for this feed. + (false, None, None) + } else { + // Get the least-significant bit (aka. rightmost bit). + let lsb = value & 1; + // Set the receive flag value. + let receive_flag = lsb == 0; + // Perform a single bit arithmetic right shift to obtain the sequence + // number. + let sequence: u64 = (value >> 1).try_into()?; + + (true, Some(receive_flag), Some(sequence)) + }; + + Ok((replicate_flag, receive_flag, sequence)) +} + +/// Encode a replicate flag, receive flag and sequence number as a control +/// message (aka. note) value. +/// +/// If the replicate flag is `false`, a value of `-1` is returned. +/// +/// If the replicate flag is `true` and the receive flag is `true`, a single +/// bit arithmetic left shift is performed on the sequence number and the +/// least-significant bit is set to `0`. +/// +/// If the replicate flag is `true` and the receive flag is `false`, a single +/// bit arithmetic left shift is performed on the sequence number and the +/// least-significant bit is set to `1`. +pub fn encode( + replicate_flag: bool, + receive_flag: Option, + sequence: Option, +) -> Result { + let value = if replicate_flag { + // Perform a single bit arithmetic left shift. + let mut signed: i64 = (sequence.unwrap() << 1).try_into()?; + // Get the least-significant bit (aka. rightmost bit). + let lsb = signed & 1; + // Set the least-significant bit based on the value of the receive flag. + if let Some(_flag @ true) = receive_flag { + // Set the LSB to 0. + signed |= 0 << lsb; + } else { + // Set the LSB to 1. + signed |= 1 << lsb; + } + signed + } else { + -1 + }; + + Ok(value) +} + +#[cfg(test)] +mod test { + use super::*; + + const VALUES: [i64; 7] = [-1, 0, 1, 2, 3, 12, 450]; + const NOTES: [(bool, std::option::Option, std::option::Option); 7] = [ + (false, None, None), + (true, Some(true), Some(0)), + (true, Some(false), Some(0)), + (true, Some(true), Some(1)), + (true, Some(false), Some(1)), + (true, Some(true), Some(6)), + (true, Some(true), Some(225)), + ]; + + #[test] + fn test_decode() { + VALUES + .iter() + .zip(NOTES) + .for_each(|(value, note)| assert_eq!(decode(*value).unwrap(), note)); + } + + #[test] + fn test_encode() { + VALUES + .iter() + .zip(NOTES) + .for_each(|(value, note)| assert_eq!(encode(note.0, note.1, note.2).unwrap(), *value)); + } +} diff --git a/solar/src/actors/replication/ebt/manager.rs b/solar/src/actors/replication/ebt/manager.rs new file mode 100644 index 0000000..62982c8 --- /dev/null +++ b/solar/src/actors/replication/ebt/manager.rs @@ -0,0 +1,496 @@ +//! Epidemic Broadcast Tree (EBT) Replication. +//! +//! Two kinds of messages are sent by both peers during an EBT session: +//! +//! - Vector clocks (also known as control messages or notes) +//! - Feed messages +//! +//! Each vector clock is a JSON object containing one or more name/value pairs. + +use std::{ + collections::{HashMap, HashSet}, + fmt::Display, +}; + +use async_std::task; +use futures::{select_biased, FutureExt, SinkExt, StreamExt}; +use kuska_ssb::{api::dto::content::SsbId, crypto::ToSsbId, feed::Message}; +use log::{debug, error, trace, warn}; +use serde_json::Value; + +use crate::{ + actors::{ + muxrpc::ReqNo, + network::{connection::ConnectionData, connection_manager::ConnectionEvent}, + replication::ebt::{clock, replicator, EncodedClockValue, VectorClock}, + }, + broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER}, + config::PEERS_TO_REPLICATE, + node::KV_STORE, + Result, +}; + +/// EBT replication events. +#[derive(Debug, Clone)] +pub enum EbtEvent { + WaitForSessionRequest(ConnectionData), + RequestSession(ConnectionData), + SessionInitiated(ReqNo, SsbId, SessionRole), + SendClock(ReqNo, VectorClock), + SendMessage(ReqNo, SsbId, Value), + ReceivedClock(ReqNo, SsbId, VectorClock), + ReceivedMessage(Message), + SessionConcluded(SsbId), + SessionTimeout(ConnectionData), + Error(ConnectionData, ReqNo, SsbId, String), +} + +/// Role of a peer in an EBT session. +#[derive(Debug, Clone, PartialEq)] +pub enum SessionRole { + Requester, + Responder, +} + +impl Display for SessionRole { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match &self { + SessionRole::Requester => write!(f, "requester"), + SessionRole::Responder => write!(f, "responder"), + } + } +} + +#[derive(Debug)] +pub struct EbtManager { + /// Active EBT peer sessions. + active_sessions: HashSet, + /// Duration to wait before switching feed request to a different peer. + _feed_wait_timeout: u64, + /// The state of the replication loop. + _is_replication_loop_active: bool, + /// The local vector clock. + local_clock: VectorClock, + /// The SSB ID of the local node. + local_id: SsbId, + /// The vector clock for each known peer. + peer_clocks: HashMap, + /// A set of all the feeds for which active requests are open. + /// + /// This allows us to avoid requesting a feed from multiple peers + /// simultaneously. + _requested_feeds: HashSet, + /// Duration to wait for a connected peer to initiate an EBT session. + session_wait_timeout: u64, +} + +impl Default for EbtManager { + fn default() -> Self { + EbtManager { + active_sessions: HashSet::new(), + _feed_wait_timeout: 3, + _is_replication_loop_active: false, + local_clock: HashMap::new(), + local_id: String::new(), + peer_clocks: HashMap::new(), + _requested_feeds: HashSet::new(), + session_wait_timeout: 5, + } + } +} + +impl EbtManager { + // Read peer clock state from file. + // fn load_peer_clocks() + // Write peer clock state to file. + // fn persist_peer_clocks() + + /// Initialise the local clock based on peers to be replicated. + /// + /// This defines the public keys of all feeds we wish to replicate, + /// along with the latest sequence number for each. + async fn init_local_clock(&mut self) -> Result<()> { + debug!("Initialising local EBT clock"); + + let local_id = self.local_id.to_owned(); + + // Set the local feed to be replicated. + self.replicate(&local_id).await?; + + // Get list of peers to replicate. + if let Some(peers) = PEERS_TO_REPLICATE.get() { + // Request replication of each peer. + for peer in peers.keys() { + self.replicate(peer).await?; + } + } + + // TODO: Load peer clocks from file and update `peer_clocks`. + + Ok(()) + } + + /// Retrieve the vector clock for the given SSB ID. + fn _get_clock(self, peer_id: &SsbId) -> Option { + if peer_id == &self.local_id { + Some(self.local_clock) + } else { + self.peer_clocks.get(peer_id).cloned() + } + } + + /// Set or update the vector clock for the given SSB ID. + fn set_clock(&mut self, peer_id: &SsbId, clock: VectorClock) { + if peer_id == &self.local_id { + self.local_clock = clock + } else { + self.peer_clocks.insert(peer_id.to_owned(), clock); + } + } + + /// Request that the feed represented by the given SSB ID be replicated. + async fn replicate(&mut self, peer_id: &SsbId) -> Result<()> { + // Look up the latest sequence for the given ID. + if let Some(seq) = KV_STORE.read().await.get_latest_seq(peer_id)? { + // Encode the replicate flag, receive flag and sequence. + let encoded_value: EncodedClockValue = clock::encode(true, Some(true), Some(seq))?; + // Insert the ID and encoded sequence into the local clock. + self.local_clock.insert(peer_id.to_owned(), encoded_value); + } else { + // No messages are stored in the local database for this feed. + // Set replicate flag to `true`, receive to `false` and `seq` to 0. + let encoded_value: EncodedClockValue = clock::encode(true, Some(false), Some(0))?; + self.local_clock.insert(peer_id.to_owned(), encoded_value); + } + + Ok(()) + } + + /// Register a new EBT session for the given peer. + fn register_session(&mut self, peer_ssb_id: &SsbId) { + self.active_sessions.insert(peer_ssb_id.to_owned()); + + trace!(target: "ebt-session", "Registered new EBT session for {}", peer_ssb_id); + } + + /// Remove the given peer from the list of active session. + fn remove_session(&mut self, peer_ssb_id: &SsbId) { + let _ = self.active_sessions.remove(peer_ssb_id); + } + + /// Revoke a replication request for the feed represented by the given SSB + /// ID. + fn _revoke(&mut self, peer_id: &SsbId) { + self.local_clock.remove(peer_id); + } + + /// Request the feed represented by the given SSB ID from a peer. + fn _request(&mut self, peer_id: &SsbId) { + self._requested_feeds.insert(peer_id.to_owned()); + } + + /// Decode a peer's vector clock and retrieve all requested messages. + async fn retrieve_requested_messages( + // TODO: Do we need these two parameters? + _req_no: &ReqNo, + _peer_ssb_id: &SsbId, + clock: VectorClock, + ) -> Result> { + let mut messages_to_be_sent = Vec::new(); + + // Iterate over all key-value pairs in the vector clock. + for (feed_id, encoded_seq_no) in clock.iter() { + if *encoded_seq_no != -1 { + // Decode the encoded vector clock sequence number. + // TODO: Match properly on the values of replicate_flag and receive_flag. + let (_replicate_flag, _receive_flag, sequence) = clock::decode(*encoded_seq_no)?; + if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(feed_id)? { + if let Some(seq) = sequence { + for n in seq..(last_seq + 1) { + if let Some(msg_kvt) = KV_STORE.read().await.get_msg_kvt(feed_id, n)? { + messages_to_be_sent.push(msg_kvt.value) + } + } + } + } + } + } + + Ok(messages_to_be_sent) + } + + /* ------------------ */ + /* EbtEvent handlers. */ + /* ------------------ */ + + async fn handle_wait_for_session_request(&self, connection_data: ConnectionData) { + trace!(target: "ebt", "Waiting for EBT session request"); + + let session_role = SessionRole::Responder; + task::spawn(replicator::run( + connection_data, + session_role, + self.session_wait_timeout, + )); + } + + async fn handle_request_session(&self, connection_data: ConnectionData) { + if let Some(peer_public_key) = &connection_data.peer_public_key { + let peer_ssb_id = peer_public_key.to_ssb_id(); + + // Only proceed with session initiation if there + // is no currently active session with the given peer. + if !self.active_sessions.contains(&peer_ssb_id) { + trace!( + target: "ebt", + "Requesting an EBT session with {:?}", + connection_data.peer_public_key.unwrap() + ); + + let session_role = SessionRole::Requester; + task::spawn(replicator::run( + connection_data, + session_role, + self.session_wait_timeout, + )); + } + } + } + + async fn handle_session_initiated( + &mut self, + req_no: ReqNo, + peer_ssb_id: SsbId, + session_role: SessionRole, + ) -> Result<()> { + trace!(target: "ebt-replication", "Initiated EBT session with {} as {}", peer_ssb_id, session_role); + + self.register_session(&peer_ssb_id); + let local_clock = self.local_clock.to_owned(); + + match session_role { + SessionRole::Responder => { + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendClock(req_no, local_clock)), + )) + .await?; + } + SessionRole::Requester => { + trace!(target: "ebt-replication", "EBT session requester: {}", req_no); + // The requester waits for a clock to be sent by the responder. + } + } + + Ok(()) + } + + async fn handle_received_clock( + &mut self, + req_no: ReqNo, + peer_ssb_id: SsbId, + clock: VectorClock, + ) -> Result<()> { + trace!(target: "ebt-replication", "Received vector clock: {:?}", clock); + + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // If a clock is received without a prior EBT replicate + // request having been received from the associated peer, it is + // assumed that the clock was sent in response to a locally-sent + // EBT replicate request. Ie. The session was requested by the + // local peer. + if !self.active_sessions.contains(&peer_ssb_id) { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SessionInitiated( + req_no, + peer_ssb_id.to_owned(), + SessionRole::Requester, + )), + )) + .await?; + } + + self.set_clock(&peer_ssb_id, clock.to_owned()); + + let msgs = EbtManager::retrieve_requested_messages(&req_no, &peer_ssb_id, clock).await?; + for msg in msgs { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SendMessage(req_no, peer_ssb_id.to_owned(), msg)), + )) + .await?; + } + + Ok(()) + } + + async fn handle_received_message(&mut self, msg: Message) -> Result<()> { + trace!(target: "ebt-replication", "Received message: {:?}", msg); + + // Retrieve the sequence number of the most recent message for + // the peer that authored the received message. + let last_seq = KV_STORE + .read() + .await + .get_latest_seq(&msg.author().to_string())? + .unwrap_or(0); + + // Validate the sequence number. + if msg.sequence() == last_seq + 1 { + // Append the message to the feed. + KV_STORE.write().await.append_feed(msg.clone()).await?; + + debug!( + "Received message number {} from {}", + msg.sequence(), + msg.author() + ); + } else { + warn!( + "Received out-of-order message from {}; received: {}, expected: {} + 1", + &msg.author().to_string(), + msg.sequence(), + last_seq + ); + } + Ok(()) + } + + async fn handle_session_concluded(&mut self, peer_ssb_id: SsbId) { + trace!(target: "ebt-replication", "Session concluded with: {}", peer_ssb_id); + self.remove_session(&peer_ssb_id); + } + + async fn handle_session_timeout(&mut self, connection_data: ConnectionData) -> Result<()> { + trace!(target: "ebt-replication", "Session timeout while waiting for request"); + + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Fallback to classic replication. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), + )) + .await?; + + Ok(()) + } + + async fn handle_error( + &mut self, + connection_data: ConnectionData, + req_no: ReqNo, + peer_ssb_id: SsbId, + err_msg: String, + ) -> Result<()> { + trace!(target: "ebt-replication", "Session error with {} for request number {}: {}", peer_ssb_id, req_no, err_msg); + + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + // Fallback to classic replication. + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Connection(ConnectionEvent::ReplicatingClassic(connection_data)), + )) + .await?; + + Ok(()) + } + + /// Start the EBT event loop. + /// + /// Listen for EBT event messages via the broker and update EBT session + /// state accordingly. + pub async fn event_loop(mut self, local_id: SsbId) -> Result<()> { + debug!("Started EBT event loop"); + + // Set the ID (@-prefixed public key) of the local node. + self.local_id = local_id; + + // Initialise the local clock based on peers to be replicated. + self.init_local_clock().await?; + + // Register the EBT event loop actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_msg, + .. + } = BROKER.lock().await.register("ebt-event-loop", true).await?; + + let mut ch_terminate_fuse = ch_terminate.fuse(); + let mut broker_msg_ch = ch_msg.unwrap(); + + // Listen for EBT events via the broker message bus. + loop { + select_biased! { + _value = ch_terminate_fuse => { + break; + }, + msg = broker_msg_ch.next().fuse() => { + if let Some(BrokerMessage::Ebt(event)) = msg { + debug!("Received EBT event message from broker"); + match event { + EbtEvent::WaitForSessionRequest(connection_data) => { + self.handle_wait_for_session_request(connection_data).await; + } + EbtEvent::RequestSession(connection_data) => { + self.handle_request_session(connection_data).await; + } + EbtEvent::SessionInitiated(req_no, peer_ssb_id, session_role) => { + if let Err(err) = self.handle_session_initiated(req_no, peer_ssb_id, session_role).await { + error!("Error while handling 'session initiated' event: {}", err) + } + } + EbtEvent::SendClock(_, clock) => { + trace!(target: "ebt-replication", "Sending vector clock: {:?}", clock); + // TODO: Update sent clocks. + } + EbtEvent::ReceivedClock(req_no, peer_ssb_id, clock) => { + if let Err(err) = self.handle_received_clock(req_no, peer_ssb_id, clock).await { + error!("Error while handling 'received clock' event: {}", err) + } + } + EbtEvent::ReceivedMessage(msg) => { + if let Err(err) = self.handle_received_message(msg).await { + error!("Error while handling 'received message' event: {}", err) + } + } + EbtEvent::SendMessage(_req_no, _peer_ssb_id, _msg) => { + trace!(target: "ebt-replication", "Sending message..."); + // TODO: Update sent messages. + } + EbtEvent::SessionConcluded(connection_data) => { + self.handle_session_concluded(connection_data).await; + } + EbtEvent::SessionTimeout(connection_data) => { + if let Err(err) = self.handle_session_timeout(connection_data).await { + error!("Error while handling 'session timeout' event: {}", err) + } + } + EbtEvent::Error(connection_data, req_no, peer_ssb_id, err_msg) => { + if let Err(err) = self.handle_error(connection_data, req_no, peer_ssb_id, err_msg).await { + error!("Error while handling 'error' event: {}", err) + } + } + } + } + } + } + } + + Ok(()) + } +} diff --git a/solar/src/actors/replication/ebt/mod.rs b/solar/src/actors/replication/ebt/mod.rs new file mode 100644 index 0000000..992e477 --- /dev/null +++ b/solar/src/actors/replication/ebt/mod.rs @@ -0,0 +1,6 @@ +mod clock; +mod manager; +mod replicator; + +pub use clock::{EncodedClockValue, VectorClock}; +pub use manager::{EbtEvent, EbtManager, SessionRole}; diff --git a/solar/src/actors/replication/ebt/replicator.rs b/solar/src/actors/replication/ebt/replicator.rs new file mode 100644 index 0000000..6eec9e6 --- /dev/null +++ b/solar/src/actors/replication/ebt/replicator.rs @@ -0,0 +1,186 @@ +use std::time::{Duration, Instant}; + +use futures::{pin_mut, select_biased, FutureExt, SinkExt, StreamExt}; +use kuska_ssb::{ + api::{dto::EbtReplicate, ApiCaller}, + crypto::ToSsbId, + handshake::async_std::BoxStream, + rpc::{RpcReader, RpcWriter}, +}; +use log::{error, trace}; + +use crate::{ + actors::{ + muxrpc::{EbtReplicateHandler, RpcInput}, + network::connection::ConnectionData, + replication::ebt::{EbtEvent, SessionRole}, + }, + broker::{ActorEndpoint, BrokerEvent, BrokerMessage, Destination, BROKER}, + Error, Result, +}; + +pub async fn run( + connection_data: ConnectionData, + session_role: SessionRole, + session_wait_timeout: u64, +) -> Result<()> { + // Register the EBT replication loop actor with the broker. + let ActorEndpoint { + ch_terminate, + ch_msg, + .. + } = BROKER + .lock() + .await + .register("ebt-replication-loop", true) + .await?; + + let mut ch_msg = ch_msg.ok_or(Error::OptionIsNone)?; + + let stream_reader = connection_data.stream.clone().ok_or(Error::OptionIsNone)?; + let stream_writer = connection_data.stream.clone().ok_or(Error::OptionIsNone)?; + let handshake = connection_data + .handshake + .clone() + .ok_or(Error::OptionIsNone)?; + let peer_ssb_id = handshake.peer_pk.to_ssb_id(); + + // Instantiate a box stream and split it into reader and writer streams. + let (box_stream_read, box_stream_write) = + BoxStream::from_handshake(stream_reader, stream_writer, handshake, 0x8000) + .split_read_write(); + + // Instantiate RPC reader and writer using the box streams. + let rpc_reader = RpcReader::new(box_stream_read); + let rpc_writer = RpcWriter::new(box_stream_write); + let mut api = ApiCaller::new(rpc_writer); + + // Instantiate the MUXRPC handler. + let mut ebt_replicate_handler = EbtReplicateHandler::new(); + + // Fuse internal termination channel with external channel. + // This allows termination of the peer loop to be initiated from outside + // this function. + let mut ch_terminate_fuse = ch_terminate.fuse(); + + // Convert the box stream reader into a stream. + let rpc_recv_stream = rpc_reader.into_stream().fuse(); + pin_mut!(rpc_recv_stream); + + // Create channel to send messages to broker. + let mut ch_broker = BROKER.lock().await.create_sender(); + + trace!(target: "ebt-session", "Initiating EBT replication session with: {}", peer_ssb_id); + + let mut session_initiated = false; + let mut replicate_req_no = None; + + // Record the time at which we begin the EBT session. + // + // This is later used to implement a timeout if no request or response is + // received. + let ebt_session_start = Instant::now(); + + if let SessionRole::Requester = session_role { + // Send EBT request. + let ebt_args = EbtReplicate::default(); + let req_no = api.ebt_replicate_req_send(&ebt_args).await?; + // Set the request number to be passed into the MUXRPC EBT handler. + // This allows tracking of the request (ensuring we respond to + // MUXRPC responses with this request number). + replicate_req_no = Some(req_no); + } + + loop { + // Poll multiple futures and streams simultaneously, executing the + // branch for the future that finishes first. If multiple futures are + // ready, one will be selected in order of declaration. + let input = select_biased! { + _value = ch_terminate_fuse => { + break; + }, + packet = rpc_recv_stream.select_next_some() => { + let (req_no, packet) = packet; + RpcInput::Network(req_no, packet) + }, + msg = ch_msg.next().fuse() => { + // Listen for a 'session initiated' event. + if let Some(BrokerMessage::Ebt(EbtEvent::SessionInitiated(_, ref ssb_id, ref session_role))) = msg { + if peer_ssb_id == *ssb_id && *session_role == SessionRole::Responder { + session_initiated = true; + } + } + if let Some(msg) = msg { + RpcInput::Message(msg) + } else { + RpcInput::None + } + }, + }; + + match ebt_replicate_handler + .handle( + &mut api, + &input, + &mut ch_broker, + peer_ssb_id.to_owned(), + replicate_req_no, + ) + .await + { + Ok(true) => break, + Err(err) => { + error!("EBT replicate handler failed: {:?}", err); + + if let Error::EbtReplicate((req_no, err_msg)) = err { + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::Error( + connection_data, + req_no, + peer_ssb_id.to_owned(), + err_msg, + )), + )) + .await?; + } + + // Break out of the input processing loop to conclude + // the replication session. + break; + } + _ => (), + } + + // If no active session has been initiated within 5 seconds of + // waiting to receive a replicate request, broadcast a session timeout + // event (leading to initiation of classic replication). + if !session_initiated + && session_role == SessionRole::Responder + && ebt_session_start.elapsed() >= Duration::from_secs(session_wait_timeout) + { + trace!(target: "ebt-session", "Timeout while waiting for {} to initiate EBT replication session", peer_ssb_id); + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SessionTimeout(connection_data)), + )) + .await?; + + // Break out of the input processing loop to conclude + // the replication session. + break; + } + } + + ch_broker + .send(BrokerEvent::new( + Destination::Broadcast, + BrokerMessage::Ebt(EbtEvent::SessionConcluded(peer_ssb_id)), + )) + .await?; + + Ok(()) +} diff --git a/solar/src/actors/replication/mod.rs b/solar/src/actors/replication/mod.rs index 4b55f4e..f80eeb9 100644 --- a/solar/src/actors/replication/mod.rs +++ b/solar/src/actors/replication/mod.rs @@ -1,2 +1,3 @@ pub mod classic; pub mod config; +pub mod ebt; diff --git a/solar/src/broker.rs b/solar/src/broker.rs index 3ed5a43..d8933ca 100644 --- a/solar/src/broker.rs +++ b/solar/src/broker.rs @@ -12,6 +12,7 @@ use crate::{ actors::{ muxrpc::{RpcBlobsGetEvent, RpcBlobsWantsEvent}, network::{connection_manager::ConnectionEvent, connection_scheduler::DialRequest}, + replication::ebt::EbtEvent, }, storage::{blob::StoreBlobEvent, kv::StoreKvEvent}, Result, @@ -24,6 +25,7 @@ pub struct Void {} pub enum BrokerMessage { Connection(ConnectionEvent), Dial(DialRequest), + Ebt(EbtEvent), RpcBlobsGet(RpcBlobsGetEvent), RpcBlobsWants(RpcBlobsWantsEvent), StoreBlob(StoreBlobEvent), diff --git a/solar/src/config.rs b/solar/src/config.rs index 2595dfc..a11c54a 100644 --- a/solar/src/config.rs +++ b/solar/src/config.rs @@ -83,13 +83,20 @@ impl ApplicationConfig { config.secret = SecretConfig::return_or_create_file(&base_path)?; config.base_path = Some(base_path); + // Add @-prefix to all peer IDs. This is required for successful + // replication when using either classic or EBT replication methods. + let mut replication_peers = HashMap::new(); + for (id, addr) in &config.replication.peers { + replication_peers.insert(format!("@{}", id), addr.to_owned()); + } + // Log the list of public keys identifying peers whose data will be replicated. - debug!("Peers to be replicated are {:?}", &config.replication.peers); + debug!("Peers to be replicated are {:?}", &replication_peers); // Set the value of the network key (aka. secret handshake key or caps key). let _err = NETWORK_KEY.set(config.network.key.to_owned()); // Set the value of the peers to replicate cell. - let _err = PEERS_TO_REPLICATE.set(config.replication.peers.to_owned()); + let _err = PEERS_TO_REPLICATE.set(replication_peers); // Set the value of the resync configuration cell. let _err = RESYNC_CONFIG.set(config.replication.resync); // Set the value of the secret configuration cell. diff --git a/solar/src/error.rs b/solar/src/error.rs index dd22ff3..eab31bc 100644 --- a/solar/src/error.rs +++ b/solar/src/error.rs @@ -1,4 +1,4 @@ -use std::{fmt, io, net}; +use std::{fmt, io, net, num}; use futures::channel::mpsc; use jsonrpsee::types::error::ErrorObjectOwned as JsonRpcErrorOwned; @@ -6,6 +6,8 @@ use jsonrpsee::types::error::SERVER_ERROR_MSG; use kuska_ssb::{api, crypto, discovery, feed, handshake, rpc}; use toml::{de, ser}; +use crate::actors::muxrpc::ReqNo; + /// Possible solar errors. #[derive(Debug)] pub enum Error { @@ -21,6 +23,8 @@ pub enum Error { Database(sled::Error), /// Failed to deserialization TOML. DeserializeToml(de::Error), + /// EBT replicate request received an error response. + EbtReplicate((ReqNo, String)), /// Failed to send message on futures channel. FuturesChannel(mpsc::SendError), /// Database indexes. @@ -50,6 +54,8 @@ pub enum Error { SerializeToml(ser::Error), /// SSB API error. SsbApi(api::Error), + /// TryFromInt error. + TryFromInt(num::TryFromIntError), /// URL parsing error. UrlParse(url::ParseError), /// SSB message validation error. @@ -69,6 +75,10 @@ impl fmt::Display for Error { Error::Crypto(err) => write!(f, "SSB cryptographic error: {err}"), Error::Database(err) => write!(f, "Key-value database error: {err}"), Error::DeserializeToml(err) => write!(f, "Failed to deserialize TOML: {err}"), + Error::EbtReplicate((req_no, err)) => write!( + f, + "EBT replication error: request number {req_no} returned {err}" + ), Error::FuturesChannel(err) => { write!(f, "Failed to send message on futures channel: {err}") } @@ -89,6 +99,7 @@ impl fmt::Display for Error { Error::SerdeJson(err) => write!(f, "Serde JSON error: {err}"), Error::SerializeToml(err) => write!(f, "Failed to serialize TOML: {err}"), Error::SsbApi(err) => write!(f, "SSB API error: {err}"), + Error::TryFromInt(err) => write!(f, "Integer conversion error: {err}"), Error::UrlParse(err) => write!(f, "Failed to parse URL: {err}"), Error::Validation(err) => write!(f, "Message validation error: {err}"), Error::Other(err) => write!(f, "Uncategorized error: {err}"), @@ -186,6 +197,12 @@ impl From for Error { } } +impl From for Error { + fn from(err: num::TryFromIntError) -> Error { + Error::TryFromInt(err) + } +} + impl From for Error { fn from(err: url::ParseError) -> Error { Error::UrlParse(err) diff --git a/solar/src/node.rs b/solar/src/node.rs index 8ef0463..128fb08 100644 --- a/solar/src/node.rs +++ b/solar/src/node.rs @@ -12,6 +12,7 @@ use crate::{ connection_manager::CONNECTION_MANAGER, connection_scheduler, dialer, lan_discovery, tcp_server, }, + replication::ebt::EbtManager, }, broker::*, config::ApplicationConfig, @@ -61,13 +62,15 @@ impl Node { &config.network.ip, &config.network.port, &config.secret.public_key, ); + let owned_identity = config.secret.to_owned_identity()?; + // Construct the TCP server listening address. let tcp_server_addr: SocketAddr = format!("{}:{}", config.network.ip, config.network.port).parse()?; // Spawn the TCP server. Facilitates peer connections. Broker::spawn(tcp_server::actor( - config.secret.to_owned_identity()?, + owned_identity.to_owned(), tcp_server_addr, config.replication.selective, )); @@ -86,7 +89,7 @@ impl Node { // CLI arguments. Facilitates operator queries during runtime. if config.jsonrpc.server { Broker::spawn(jsonrpc::server::actor( - config.secret.to_owned_identity()?, + owned_identity.to_owned(), jsonrpc_server_addr, )); } @@ -95,7 +98,7 @@ impl Node { // to allow LAN-local peer connections. if config.network.lan_discovery { Broker::spawn(lan_discovery::actor( - config.secret.to_owned_identity()?, + owned_identity.to_owned(), config.network.port, config.replication.selective, )); @@ -123,13 +126,23 @@ impl Node { // Spawn the connection dialer actor. Dials remote peers as dial // requests are received from the connection scheduler. - Broker::spawn(dialer::actor(config.replication.selective)); + Broker::spawn(dialer::actor( + owned_identity.to_owned(), + config.replication.selective, + )); // Spawn the connection scheduler actor. Sends dial requests to the // dialer for remote peers on an ongoing basis (at `eager` or `lazy` // intervals). Broker::spawn(connection_scheduler::actor(peers_to_dial)); + // Spawn the EBT replication manager actor. + let ebt_replication_manager = EbtManager::default(); + Broker::spawn(EbtManager::event_loop( + ebt_replication_manager, + owned_identity.id, + )); + // Spawn the connection manager message loop. let connection_manager_msgloop = CONNECTION_MANAGER.write().await.take_msgloop(); connection_manager_msgloop.await; diff --git a/solar_cli/Cargo.toml b/solar_cli/Cargo.toml index fe3de2a..373fd57 100644 --- a/solar_cli/Cargo.toml +++ b/solar_cli/Cargo.toml @@ -19,8 +19,7 @@ clap = { version = "4.1.8", features = ["derive"] } env_logger = "0.10" hex = "0.4.0" kuska-sodiumoxide = "0.2.5-0" -#kuska-ssb = { git = "https://github.com/Kuska-ssb/ssb", branch = "master" } -kuska-ssb = { path = "../../ssb" } +kuska-ssb = { git = "https://github.com/Kuska-ssb/ssb", branch = "master" } log = "0.4" url = "2.3"