diff --git a/Cargo.lock b/Cargo.lock index 788ffd3..a14d518 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,7 +40,7 @@ dependencies = [ [[package]] name = "aleph-bft" -version = "0.31.0" +version = "0.32.0" dependencies = [ "aleph-bft-mock", "aleph-bft-rmc", diff --git a/README.md b/README.md index 233eb87..f4bc035 100644 --- a/README.md +++ b/README.md @@ -60,7 +60,7 @@ More details are available [in the book][reference-link-implementation-details]. - Import AlephBFT in your crate ```toml [dependencies] - aleph-bft = "^0.31" + aleph-bft = "^0.32" ``` - The main entry point is the `run_session` function, which returns a Future that runs the consensus algorithm. diff --git a/consensus/Cargo.toml b/consensus/Cargo.toml index f444987..46fe449 100644 --- a/consensus/Cargo.toml +++ b/consensus/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "aleph-bft" -version = "0.31.0" +version = "0.32.0" edition = "2021" authors = ["Cardinal Cryptography"] categories = ["algorithms", "data-structures", "cryptography", "database"] diff --git a/consensus/src/alerts/service.rs b/consensus/src/alerts/service.rs index d60400e..a9fdb61 100644 --- a/consensus/src/alerts/service.rs +++ b/consensus/src/alerts/service.rs @@ -1,7 +1,7 @@ use crate::{ alerts::{ - handler::{Handler, OnNetworkAlertResponse, OnOwnAlertResponse, RmcResponse}, - Alert, AlertData, AlertMessage, ForkingNotification, NetworkMessage, + handler::{Handler, RmcResponse}, + Alert, AlertMessage, ForkingNotification, NetworkMessage, }, Data, Hasher, MultiKeychain, Multisigned, NodeIndex, Receiver, Recipient, Sender, }; @@ -9,7 +9,7 @@ use aleph_bft_rmc::{DoublingDelayScheduler, Message as RmcMessage}; use aleph_bft_types::Terminator; use futures::{FutureExt, StreamExt}; use log::{debug, error, warn}; -use std::{collections::HashMap, time::Duration}; +use std::time::Duration; const LOG_TARGET: &str = "AlephBFT-alerter"; type RmcService = @@ -20,11 +20,6 @@ pub struct Service { messages_from_network: Receiver>, notifications_for_units: Sender>, alerts_from_units: Receiver>, - data_for_backup: Sender>, - responses_from_backup: Receiver>, - own_alert_responses: HashMap>, - network_alert_responses: HashMap>, - multisigned_notifications: HashMap>, node_index: NodeIndex, exiting: bool, handler: Handler, @@ -36,8 +31,6 @@ pub struct IO { pub messages_from_network: Receiver>, pub notifications_for_units: Sender>, pub alerts_from_units: Receiver>, - pub data_for_backup: Sender>, - pub responses_from_backup: Receiver>, } impl Service { @@ -47,8 +40,6 @@ impl Service { messages_from_network, notifications_for_units, alerts_from_units, - data_for_backup, - responses_from_backup, } = io; let node_index = keychain.index(); @@ -63,11 +54,6 @@ impl Service { messages_from_network, notifications_for_units, alerts_from_units, - data_for_backup, - responses_from_backup, - own_alert_responses: HashMap::new(), - network_alert_responses: HashMap::new(), - multisigned_notifications: HashMap::new(), node_index, exiting: false, handler, @@ -126,18 +112,12 @@ impl Service { ) { match message { AlertMessage::ForkAlert(alert) => match self.handler.on_network_alert(alert.clone()) { - Ok(response) => { - let alert = alert.as_signable().clone(); - self.network_alert_responses.insert(alert.hash(), response); - if self - .data_for_backup - .unbounded_send(AlertData::NetworkAlert(alert)) - .is_err() - { - error!( - target: LOG_TARGET, - "Network alert couldn't be sent to backup.", - ); + Ok((maybe_notification, hash)) => { + if let Some(multisigned) = self.rmc_service.start_rmc(hash) { + self.handle_multisigned(multisigned); + } + if let Some(notification) = maybe_notification { + self.send_notification_for_units(notification); } } Err(error) => debug!(target: LOG_TARGET, "{}", error), @@ -168,79 +148,22 @@ impl Service { } fn handle_alert_from_runway(&mut self, alert: Alert) { - let response = self.handler.on_own_alert(alert.clone()); - self.own_alert_responses.insert(alert.hash(), response); - if self - .data_for_backup - .unbounded_send(AlertData::OwnAlert(alert)) - .is_err() - { - error!(target: LOG_TARGET, "Own alert couldn't be sent to backup."); + let (message, recipient, hash) = self.handler.on_own_alert(alert.clone()); + self.send_message_for_network(message, recipient); + if let Some(multisigned) = self.rmc_service.start_rmc(hash) { + self.handle_multisigned(multisigned); } } fn handle_multisigned(&mut self, multisigned: Multisigned) { match self.handler.alert_confirmed(multisigned.clone()) { Ok(notification) => { - self.multisigned_notifications - .insert(*multisigned.as_signable(), notification); - if self - .data_for_backup - .unbounded_send(AlertData::MultisignedHash(multisigned)) - .is_err() - { - error!( - target: LOG_TARGET, - "Multisigned hash couldn't be sent to backup." - ); - } + self.send_notification_for_units(notification); } Err(error) => warn!(target: LOG_TARGET, "{}", error), } } - fn handle_data_from_backup(&mut self, data: AlertData) { - match data { - AlertData::OwnAlert(alert) => match self.own_alert_responses.remove(&alert.hash()) { - Some((message, recipient, hash)) => { - self.send_message_for_network(message, recipient); - if let Some(multisigned) = self.rmc_service.start_rmc(hash) { - self.handle_multisigned(multisigned); - } - } - None => warn!(target: LOG_TARGET, "Alert response missing from storage."), - }, - AlertData::NetworkAlert(alert) => { - match self.network_alert_responses.remove(&alert.hash()) { - Some((maybe_notification, hash)) => { - if let Some(multisigned) = self.rmc_service.start_rmc(hash) { - self.handle_multisigned(multisigned); - } - if let Some(notification) = maybe_notification { - self.send_notification_for_units(notification); - } - } - None => warn!( - target: LOG_TARGET, - "Network alert response missing from storage." - ), - } - } - AlertData::MultisignedHash(multisigned) => { - match self - .multisigned_notifications - .remove(multisigned.as_signable()) - { - Some(notification) => self.send_notification_for_units(notification), - None => warn!( - target: LOG_TARGET, - "Multisigned response missing from storage." - ), - } - } - } - } - pub async fn run(&mut self, mut terminator: Terminator) { loop { futures::select! { @@ -261,13 +184,6 @@ impl Service { message = self.rmc_service.next_message().fuse() => { self.rmc_message_to_network(message); }, - item = self.responses_from_backup.next() => match item { - Some(item) => self.handle_data_from_backup(item), - None => { - error!(target: LOG_TARGET, "Backup responses stream closed."); - break; - } - }, _ = terminator.get_exit().fuse() => { debug!(target: LOG_TARGET, "Received exit signal."); self.exiting = true; diff --git a/consensus/src/backup/loader.rs b/consensus/src/backup/loader.rs index 15c99be..c7b960b 100644 --- a/consensus/src/backup/loader.rs +++ b/consensus/src/backup/loader.rs @@ -2,14 +2,11 @@ use std::{collections::HashSet, fmt, fmt::Debug, io::Read, marker::PhantomData}; use codec::{Decode, Error as CodecError}; use futures::channel::oneshot; -use itertools::{Either, Itertools}; use log::{error, info, warn}; use crate::{ - alerts::AlertData, - backup::BackupItem, units::{UncheckedSignedUnit, UnitCoord}, - Data, Hasher, Keychain, MultiKeychain, NodeIndex, Round, SessionId, + Data, Hasher, NodeIndex, Round, SessionId, Signature, }; const LOG_TARGET: &str = "AlephBFT-backup-loader"; @@ -66,20 +63,15 @@ impl From for LoaderError { } } -pub type LoadedData = ( - Vec::Signature>>, - Vec>, -); - -pub struct BackupLoader { +pub struct BackupLoader { backup: R, index: NodeIndex, session_id: SessionId, - _phantom: PhantomData<(H, D, MK)>, + _phantom: PhantomData<(H, D, S)>, } -impl BackupLoader { - pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader { +impl BackupLoader { + pub fn new(backup: R, index: NodeIndex, session_id: SessionId) -> BackupLoader { BackupLoader { backup, index, @@ -88,21 +80,18 @@ impl BackupLoader { } } - fn load(&mut self) -> Result>, LoaderError> { + fn load(&mut self) -> Result>, LoaderError> { let mut buf = Vec::new(); self.backup.read_to_end(&mut buf)?; let input = &mut &buf[..]; let mut result = Vec::new(); while !input.is_empty() { - result.push(>::decode(input)?); + result.push(>::decode(input)?); } Ok(result) } - fn verify_units( - &self, - units: &Vec>, - ) -> Result<(), LoaderError> { + fn verify_units(&self, units: &Vec>) -> Result<(), LoaderError> { let mut already_loaded_coords = HashSet::new(); for unit in units { @@ -139,29 +128,6 @@ impl BackupLoader { } } - fn load_and_verify(&mut self) -> Option> { - let items = match self.load() { - Ok(items) => items, - Err(e) => { - error!(target: LOG_TARGET, "unable to load backup data: {}", e); - return None; - } - }; - - let (units, alert_data): (Vec<_>, Vec<_>) = - items.into_iter().partition_map(|item| match item { - BackupItem::Unit(unit) => Either::Left(unit), - BackupItem::AlertData(data) => Either::Right(data), - }); - - if let Err(e) = self.verify_units(&units) { - error!(target: LOG_TARGET, "incorrect backup data: {}", e); - return None; - } - - Some((units, alert_data)) - } - fn verify_backup_and_collection_rounds( &self, next_round_backup: Round, @@ -193,17 +159,23 @@ impl BackupLoader { pub async fn run( &mut self, - loaded_data: oneshot::Sender>, + loaded_data: oneshot::Sender>>, starting_round: oneshot::Sender>, next_round_collection: oneshot::Receiver, ) { - let (units, alert_data) = match self.load_and_verify() { - Some((units, alert_data)) => (units, alert_data), - None => { + let units = match self.load() { + Ok(items) => items, + Err(e) => { + error!(target: LOG_TARGET, "unable to load backup data: {}", e); self.on_shutdown(starting_round); return; } }; + if let Err(e) = self.verify_units(&units) { + error!(target: LOG_TARGET, "incorrect backup data: {}", e); + self.on_shutdown(starting_round); + return; + } let next_round_backup: Round = units .iter() @@ -220,7 +192,7 @@ impl BackupLoader { next_round_backup ); - if loaded_data.send((units, alert_data)).is_err() { + if loaded_data.send(units).is_err() { error!(target: LOG_TARGET, "Could not send loaded items"); self.on_shutdown(starting_round); return; @@ -267,7 +239,7 @@ mod tests { use aleph_bft_mock::{Data, Hasher64, Keychain, Loader, Signature}; use crate::{ - backup::{loader::LoadedData, BackupItem, BackupLoader}, + backup::BackupLoader, units::{ create_units, creator_set, preunit_to_unchecked_signed_unit, preunit_to_unit, UncheckedSignedUnit as GenericUncheckedSignedUnit, @@ -276,10 +248,9 @@ mod tests { }; type UncheckedSignedUnit = GenericUncheckedSignedUnit; - type TestBackupItem = BackupItem; struct PrepareTestResponse { task: F, - loaded_data_rx: oneshot::Receiver>, + loaded_data_rx: oneshot::Receiver>, highest_response_tx: oneshot::Sender, starting_round_rx: oneshot::Receiver>, } @@ -331,7 +302,7 @@ mod tests { .collect() } - fn encode_all(items: Vec) -> Vec> { + fn encode_all(items: Vec) -> Vec> { items.iter().map(|u| u.encode()).collect() } @@ -376,13 +347,12 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(Some(0))); - assert_eq!(loaded_data_rx.await, Ok((Vec::new(), Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(Vec::new())); } #[tokio::test] async fn something_loaded_nothing_collected_succeeds() { - let units: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); - let items: Vec<_> = units.clone().into_iter().map(BackupItem::Unit).collect(); + let items: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); let encoded_items = encode_all(items.clone()).into_iter().flatten().collect(); let PrepareTestResponse { @@ -400,13 +370,12 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(Some(5))); - assert_eq!(loaded_data_rx.await, Ok((units, Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(items)); } #[tokio::test] async fn something_loaded_something_collected_succeeds() { - let units: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); - let items: Vec<_> = units.clone().into_iter().map(BackupItem::Unit).collect(); + let items: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); let encoded_items = encode_all(items.clone()).into_iter().flatten().collect(); let PrepareTestResponse { @@ -424,7 +393,7 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(Some(5))); - assert_eq!(loaded_data_rx.await, Ok((units, Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(items)); } #[tokio::test] @@ -444,13 +413,12 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(None)); - assert_eq!(loaded_data_rx.await, Ok((Vec::new(), Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(Vec::new())); } #[tokio::test] async fn loaded_smaller_then_collected_fails() { - let units: Vec<_> = produce_units(3, SESSION_ID).into_iter().flatten().collect(); - let items: Vec<_> = units.clone().into_iter().map(BackupItem::Unit).collect(); + let items: Vec<_> = produce_units(3, SESSION_ID).into_iter().flatten().collect(); let encoded_items = encode_all(items.clone()).into_iter().flatten().collect(); let PrepareTestResponse { @@ -468,13 +436,12 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(None)); - assert_eq!(loaded_data_rx.await, Ok((units, Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(items)); } #[tokio::test] async fn dropped_collection_fails() { - let units: Vec<_> = produce_units(3, SESSION_ID).into_iter().flatten().collect(); - let items: Vec<_> = units.clone().into_iter().map(BackupItem::Unit).collect(); + let items: Vec<_> = produce_units(3, SESSION_ID).into_iter().flatten().collect(); let encoded_items = encode_all(items.clone()).into_iter().flatten().collect(); let PrepareTestResponse { @@ -492,13 +459,12 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(None)); - assert_eq!(loaded_data_rx.await, Ok((units, Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(items)); } #[tokio::test] async fn backup_with_corrupted_encoding_fails() { - let units: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); - let items: Vec<_> = units.into_iter().map(BackupItem::Unit).collect(); + let items: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); let mut item_encodings = encode_all(items); let unit2_encoding_len = item_encodings[2].len(); item_encodings[2].resize(unit2_encoding_len - 1, 0); // remove the last byte @@ -523,8 +489,7 @@ mod tests { #[tokio::test] async fn backup_with_missing_parent_fails() { - let units: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); - let mut items: Vec<_> = units.into_iter().map(BackupItem::Unit).collect(); + let mut items: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); items.remove(2); // it is a parent of all units of round 3 let encoded_items = encode_all(items).into_iter().flatten().collect(); @@ -547,18 +512,17 @@ mod tests { #[tokio::test] async fn backup_with_duplicate_unit_succeeds() { - let mut units: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); - let unit2_duplicate = units[2].clone(); - units.insert(3, unit2_duplicate); - let items: Vec<_> = units.clone().into_iter().map(BackupItem::Unit).collect(); - let encoded_units = encode_all(items.clone()).into_iter().flatten().collect(); + let mut items: Vec<_> = produce_units(5, SESSION_ID).into_iter().flatten().collect(); + let unit2_duplicate = items[2].clone(); + items.insert(3, unit2_duplicate); + let encoded_items = encode_all(items.clone()).into_iter().flatten().collect(); let PrepareTestResponse { task, loaded_data_rx, highest_response_tx, starting_round_rx, - } = prepare_test(encoded_units); + } = prepare_test(encoded_items); let handle = tokio::spawn(async { task.await; @@ -568,13 +532,12 @@ mod tests { handle.await.unwrap(); assert_eq!(starting_round_rx.await, Ok(Some(5))); - assert_eq!(loaded_data_rx.await, Ok((units, Vec::new()))); + assert_eq!(loaded_data_rx.await, Ok(items)); } #[tokio::test] async fn backup_with_units_of_one_creator_fails() { - let units = units_of_creator(produce_units(5, SESSION_ID), NodeIndex(NODE_ID.0 + 1)); - let items: Vec<_> = units.into_iter().map(BackupItem::Unit).collect(); + let items = units_of_creator(produce_units(5, SESSION_ID), NodeIndex(NODE_ID.0 + 1)); let encoded_items = encode_all(items).into_iter().flatten().collect(); let PrepareTestResponse { @@ -597,11 +560,10 @@ mod tests { #[tokio::test] async fn backup_with_wrong_session_fails() { - let units: Vec<_> = produce_units(5, SESSION_ID + 1) + let items: Vec<_> = produce_units(5, SESSION_ID + 1) .into_iter() .flatten() .collect(); - let items: Vec<_> = units.into_iter().map(BackupItem::Unit).collect(); let encoded_items = encode_all(items).into_iter().flatten().collect(); let PrepareTestResponse { diff --git a/consensus/src/backup/mod.rs b/consensus/src/backup/mod.rs index 3c9cb28..d180733 100644 --- a/consensus/src/backup/mod.rs +++ b/consensus/src/backup/mod.rs @@ -1,16 +1,5 @@ -use codec::{Decode, Encode}; -use std::fmt::Debug; - -pub use loader::{BackupLoader, LoadedData}; +pub use loader::BackupLoader; pub use saver::BackupSaver; -use crate::{alerts::AlertData, units::UncheckedSignedUnit, Data, Hasher, MultiKeychain}; - mod loader; mod saver; - -#[derive(Clone, Debug, Decode, Encode, PartialEq)] -pub enum BackupItem { - Unit(UncheckedSignedUnit), - AlertData(AlertData), -} diff --git a/consensus/src/backup/saver.rs b/consensus/src/backup/saver.rs index d83417c..bb3d409 100644 --- a/consensus/src/backup/saver.rs +++ b/consensus/src/backup/saver.rs @@ -5,42 +5,33 @@ use codec::Encode; use futures::{FutureExt, StreamExt}; use log::{debug, error}; -use crate::{ - alerts::AlertData, backup::BackupItem, units::UncheckedSignedUnit, Data, Hasher, MultiKeychain, - Receiver, Sender, -}; +use crate::{units::UncheckedSignedUnit, Data, Hasher, Receiver, Sender, Signature}; const LOG_TARGET: &str = "AlephBFT-backup-saver"; -/// Component responsible for saving units and alert data into backup. +/// Component responsible for saving units into backup. /// It waits for items to appear on its receivers, and writes them to backup. /// It announces a successful write through an appropriate response sender. -pub struct BackupSaver { - units_from_runway: Receiver>, - data_from_alerter: Receiver>, - responses_for_runway: Sender>, - responses_for_alerter: Sender>, +pub struct BackupSaver { + units_from_runway: Receiver>, + responses_for_runway: Sender>, backup: W, } -impl BackupSaver { +impl BackupSaver { pub fn new( - units_from_runway: Receiver>, - data_from_alerter: Receiver>, - responses_for_runway: Sender>, - responses_for_alerter: Sender>, + units_from_runway: Receiver>, + responses_for_runway: Sender>, backup: W, - ) -> BackupSaver { + ) -> BackupSaver { BackupSaver { units_from_runway, - data_from_alerter, responses_for_runway, - responses_for_alerter, backup, } } - pub fn save_item(&mut self, item: BackupItem) -> Result<(), std::io::Error> { + pub fn save_item(&mut self, item: &UncheckedSignedUnit) -> Result<(), std::io::Error> { self.backup.write_all(&item.encode())?; self.backup.flush()?; Ok(()) @@ -51,41 +42,22 @@ impl BackupSaver { loop { futures::select! { unit = self.units_from_runway.next() => { - let unit = match unit { + let item = match unit { Some(unit) => unit, None => { error!(target: LOG_TARGET, "receiver of units to save closed early"); break; }, }; - let item = BackupItem::Unit(unit.clone()); - if let Err(e) = self.save_item(item) { + if let Err(e) = self.save_item(&item) { error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); break; } - if self.responses_for_runway.unbounded_send(unit).is_err() { + if self.responses_for_runway.unbounded_send(item).is_err() { error!(target: LOG_TARGET, "couldn't respond with saved unit to runway"); break; } }, - data = self.data_from_alerter.next() => { - let data = match data { - Some(data) => data, - None => { - error!(target: LOG_TARGET, "receiver of alert data to save closed early"); - break; - }, - }; - let item = BackupItem::AlertData(data.clone()); - if let Err(e) = self.save_item(item) { - error!(target: LOG_TARGET, "couldn't save item to backup: {:?}", e); - break; - } - if self.responses_for_alerter.unbounded_send(data).is_err() { - error!(target: LOG_TARGET, "couldn't respond with saved alert data to runway"); - break; - } - } _ = terminator.get_exit().fuse() => { debug!(target: LOG_TARGET, "backup saver received exit signal."); terminator_exit = true; @@ -112,45 +84,29 @@ mod tests { use aleph_bft_types::Terminator; use crate::{ - alerts::{Alert, AlertData}, backup::BackupSaver, units::{creator_set, preunit_to_unchecked_signed_unit, UncheckedSignedUnit}, NodeCount, NodeIndex, }; - type TestBackupSaver = BackupSaver; + type TestBackupSaver = BackupSaver; type TestUnit = UncheckedSignedUnit; - type TestAlertData = AlertData; struct PrepareSaverResponse { task: F, units_for_saver: mpsc::UnboundedSender, units_from_saver: mpsc::UnboundedReceiver, - alerts_for_saver: mpsc::UnboundedSender, - alerts_from_saver: mpsc::UnboundedReceiver, exit_tx: oneshot::Sender<()>, } - enum Item { - Alert, - Unit, - } - fn prepare_saver() -> PrepareSaverResponse { let (units_for_saver, units_from_runway) = mpsc::unbounded(); let (units_for_runway, units_from_saver) = mpsc::unbounded(); - let (alerts_for_saver, alerts_from_alerter) = mpsc::unbounded(); - let (alerts_for_alerter, alerts_from_saver) = mpsc::unbounded(); let (exit_tx, exit_rx) = oneshot::channel(); let backup = Saver::new(); let task = { - let mut saver: TestBackupSaver = BackupSaver::new( - units_from_runway, - alerts_from_alerter, - units_for_runway, - alerts_for_alerter, - backup, - ); + let mut saver: TestBackupSaver = + BackupSaver::new(units_from_runway, units_for_runway, backup); async move { saver.run(Terminator::create_root(exit_rx, "saver")).await; @@ -161,8 +117,6 @@ mod tests { task, units_for_saver, units_from_saver, - alerts_for_saver, - alerts_from_saver, exit_tx, } } @@ -173,8 +127,6 @@ mod tests { task, units_for_saver, mut units_from_saver, - alerts_for_saver, - mut alerts_from_saver, exit_tx, } = prepare_saver(); @@ -195,40 +147,9 @@ mod tests { ) }) .collect(); - let alerts: Vec = (0..5) - .map(|k| { - TestAlertData::OwnAlert(Alert::new( - NodeIndex(0), - (units[k].clone(), units[k].clone()), - vec![], - )) - }) - .collect(); - let backup_save_ordering = vec![ - Item::Unit, - Item::Alert, - Item::Alert, - Item::Alert, - Item::Unit, - Item::Unit, - Item::Alert, - Item::Unit, - Item::Alert, - Item::Unit, - ]; - let mut units_iter = units.iter(); - let mut alerts_iter = alerts.iter(); - - for i in backup_save_ordering { - match i { - Item::Unit => units_for_saver - .unbounded_send(units_iter.next().unwrap().clone()) - .unwrap(), - Item::Alert => alerts_for_saver - .unbounded_send(alerts_iter.next().unwrap().clone()) - .unwrap(), - } + for u in units.iter() { + units_for_saver.unbounded_send(u.clone()).unwrap(); } for u in units { @@ -236,11 +157,6 @@ mod tests { assert_eq!(u, u_backup); } - for a in alerts { - let a_backup = alerts_from_saver.next().await.unwrap(); - assert_eq!(a, a_backup); - } - exit_tx.send(()).unwrap(); handle.await.unwrap(); } diff --git a/consensus/src/runway/mod.rs b/consensus/src/runway/mod.rs index 16f405e..0ebe831 100644 --- a/consensus/src/runway/mod.rs +++ b/consensus/src/runway/mod.rs @@ -30,7 +30,7 @@ use std::{ mod collection; mod packer; -use crate::backup::{BackupLoader, BackupSaver, LoadedData}; +use crate::backup::{BackupLoader, BackupSaver}; #[cfg(feature = "initial_unit_collection")] use collection::{Collection, IO as CollectionIO}; pub use collection::{NewestUnitResponse, Salt}; @@ -724,7 +724,7 @@ where async fn run( mut self, - data_from_backup: oneshot::Receiver>, + data_from_backup: oneshot::Receiver>>, mut terminator: Terminator, ) { let index = self.index(); @@ -735,7 +735,7 @@ where let mut status_ticker = Delay::new(status_ticker_delay).fuse(); match data_from_backup.await { - Ok((units, _alert_data)) => { + Ok(units) => { for unit in units { self.on_unit_received(unit, false); } @@ -951,16 +951,12 @@ pub(crate) async fn run( let (backup_units_for_saver, backup_units_from_runway) = mpsc::unbounded(); let (backup_units_for_runway, backup_units_from_saver) = mpsc::unbounded(); - let (alert_data_for_saver, alert_data_from_alerter) = mpsc::unbounded(); - let (alert_data_for_alerter, alert_data_from_saver) = mpsc::unbounded(); let backup_saver_terminator = terminator.add_offspring_connection("AlephBFT-backup-saver"); let backup_saver_handle = spawn_handle.spawn_essential("runway/backup_saver", { - let mut backup_saver: BackupSaver<_, _, MK, _> = BackupSaver::new( + let mut backup_saver = BackupSaver::new( backup_units_from_runway, - alert_data_from_alerter, backup_units_for_runway, - alert_data_for_alerter, runway_io.backup_write, ); async move { @@ -986,8 +982,6 @@ pub(crate) async fn run( messages_from_network: alert_messages_from_network, notifications_for_units: alert_notifications_for_units, alerts_from_units, - data_for_backup: alert_data_for_saver, - responses_from_backup: alert_data_from_saver, }, alerter_handler, ); diff --git a/consensus/src/testing/alerts.rs b/consensus/src/testing/alerts.rs index 8e8eeee..c475c43 100644 --- a/consensus/src/testing/alerts.rs +++ b/consensus/src/testing/alerts.rs @@ -215,8 +215,6 @@ impl TestCase { let (notifications_for_units, mut notifications_from_alerter) = mpsc::unbounded(); let (alerts_for_alerter, alerts_from_units) = mpsc::unbounded(); let (exit_alerter_tx, exit_alerter_rx) = oneshot::channel(); - // mock communication with backup - data sent to backup immediately returns to alerter - let (data_for_backup, responses_from_backup) = mpsc::unbounded(); let alerter_handler = Handler::new(keychain, 0); let mut alerter_service = Service::new( @@ -226,8 +224,6 @@ impl TestCase { messages_from_network, notifications_for_units, alerts_from_units, - data_for_backup, - responses_from_backup, }, alerter_handler, ); diff --git a/consensus/src/testing/crash_recovery.rs b/consensus/src/testing/crash_recovery.rs index 731d1ca..9bf7050 100644 --- a/consensus/src/testing/crash_recovery.rs +++ b/consensus/src/testing/crash_recovery.rs @@ -1,10 +1,9 @@ use crate::{ - backup::BackupItem, testing::{init_log, spawn_honest_member, HonestMember, Network, ReconnectSender}, - units::UnitCoord, + units::{UncheckedSignedUnit, UnitCoord}, NodeCount, NodeIndex, SpawnHandle, TaskHandle, }; -use aleph_bft_mock::{Data, Hasher64, Keychain, Router, Spawner}; +use aleph_bft_mock::{Data, Hasher64, Router, Signature, Spawner}; use codec::Decode; use futures::{ channel::{mpsc, oneshot}, @@ -130,11 +129,7 @@ fn verify_backup(buf: &mut &[u8]) -> HashSet { let mut already_saved = HashSet::new(); while !buf.is_empty() { - let item = BackupItem::::decode(buf).unwrap(); - let unit = match item { - BackupItem::Unit(unit) => unit, - _ => continue, - }; + let unit = UncheckedSignedUnit::::decode(buf).unwrap(); let full_unit = unit.as_signable(); let coord = full_unit.coord(); let parent_ids = &full_unit.as_pre_unit().control_hash().parents_mask;