Skip to content

Commit

Permalink
Merge pull request #81 from mycognosist/event_driven_replication
Browse files Browse the repository at this point in the history
Event driven connections with message passing
  • Loading branch information
mycognosist authored Oct 17, 2023
2 parents 731e399 + 0c7a182 commit 0dea49f
Show file tree
Hide file tree
Showing 10 changed files with 377 additions and 329 deletions.
3 changes: 0 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion solar/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ futures = "0.3"
hex = "0.4.0"
jsonrpsee = { version = "0.18.2", features = ["server"] }
kuska-sodiumoxide = "0.2.5-0"
kuska-ssb = { git = "https://github.com/Kuska-ssb/ssb", branch = "master" }
#kuska-ssb = { git = "https://github.com/Kuska-ssb/ssb", branch = "master" }
kuska-ssb = { path = "../../ssb" }
log = "0.4"
once_cell = "1.16"
rand = "0.8"
Expand Down
27 changes: 19 additions & 8 deletions solar/src/actors/muxrpc/history_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use crate::{
},
broker::{BrokerEvent, BrokerMessage, ChBrokerSend, Destination},
config::{PEERS_TO_REPLICATE, RESYNC_CONFIG, SECRET_CONFIG},
error::Error,
node::BLOB_STORE,
node::KV_STORE,
storage::kv::StoreKvEvent,
Expand Down Expand Up @@ -125,29 +126,35 @@ where
debug!("initializing history stream handler");

// If local database resync has been selected...
if *RESYNC_CONFIG.get().unwrap() {
if *RESYNC_CONFIG.get().ok_or(Error::OptionIsNone)? {
info!("database resync selected; requesting local feed from peers");
// Read the local public key from the secret config file.
let local_public_key = &SECRET_CONFIG.get().unwrap().public_key;
// The public key is @-prefixed (at-prefixed).
let local_public_key = &SECRET_CONFIG.get().ok_or(Error::OptionIsNone)?.public_key;
// Create a history stream request for the local feed.
let args = dto::CreateHistoryStreamIn::new(local_public_key.clone()).after_seq(1);
let args =
dto::CreateHistoryStreamIn::new(local_public_key.to_owned()).after_seq(1);
let req_id = api.create_history_stream_req_send(&args).await?;

// Insert the history stream request ID and peer public key
// into the peers hash map.
self.peers.insert(req_id, local_public_key.to_string());
self.peers.insert(req_id, local_public_key.to_owned());
}

// Loop through the public keys of all peers in the replication list.
for peer_pk in PEERS_TO_REPLICATE.get().unwrap().keys() {
// Prefix `@` to public key.
let peer_public_key = format!("@{}", peer_pk);

// Instantiate the history stream request args for the given peer.
// The `live` arg means: keep the connection open after initial
// replication.
let mut args = dto::CreateHistoryStreamIn::new(format!("@{}", peer_pk)).live(true);
let mut args =
dto::CreateHistoryStreamIn::new(peer_public_key.to_owned()).live(true);

// Retrieve the sequence number of the most recent message for
// this peer from the local key-value store.
if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(peer_pk)? {
if let Some(last_seq) = KV_STORE.read().await.get_latest_seq(&peer_public_key)? {
// Use the latest sequence number to update the request args.
args = args.after_seq(last_seq);
}
Expand All @@ -157,11 +164,11 @@ where

// Insert the history stream request ID and peer ID
// (public key) into the peers hash map.
self.peers.insert(id, peer_pk.to_string());
self.peers.insert(id, peer_public_key.to_owned());

info!(
"requesting messages authored by peer {} after {:?}",
peer_pk, args.seq
peer_public_key, args.seq
);
}

Expand Down Expand Up @@ -254,6 +261,10 @@ where
msg.sequence(),
last_seq
);

// Return to avoid handling multiple successive out-of-order
// messages.
return Ok(true);
}

Ok(true)
Expand Down
Loading

0 comments on commit 0dea49f

Please sign in to comment.