From b6e25a50d46c4faa7988e2010991603468ec94e0 Mon Sep 17 00:00:00 2001 From: Romain Ruetschi Date: Tue, 14 Jan 2025 11:16:11 +0100 Subject: [PATCH] Implement `RestreamProposal` in example and test app --- code/crates/app-channel/src/connector.rs | 2 +- code/crates/core-consensus/src/effect.rs | 4 +-- .../core-consensus/src/handle/driver.rs | 2 +- code/crates/engine/src/consensus.rs | 4 +-- code/crates/engine/src/host.rs | 6 ++--- code/crates/starknet/host/src/actor.rs | 6 ++--- code/crates/test/app/src/app.rs | 27 +++++++++++++++++-- code/crates/test/app/src/state.rs | 17 ++++++++++++ code/examples/channel/src/app.rs | 27 +++++++++++++++++-- code/examples/channel/src/state.rs | 17 ++++++++++++ 10 files changed, 96 insertions(+), 16 deletions(-) diff --git a/code/crates/app-channel/src/connector.rs b/code/crates/app-channel/src/connector.rs index 9f2425674..4fab4e732 100644 --- a/code/crates/app-channel/src/connector.rs +++ b/code/crates/app-channel/src/connector.rs @@ -98,7 +98,7 @@ where reply_to.send(rx.await?)?; } - HostMsg::RestreamValue { + HostMsg::RestreamProposal { height, round, valid_round, diff --git a/code/crates/core-consensus/src/effect.rs b/code/crates/core-consensus/src/effect.rs index 8884ed0cd..5816f9196 100644 --- a/code/crates/core-consensus/src/effect.rs +++ b/code/crates/core-consensus/src/effect.rs @@ -92,11 +92,11 @@ where /// Requests the application to re-stream a proposal that it has already seen. /// - /// The application MUST re-publish again to its pwers all + /// The application MUST re-publish again to its peers all /// the proposal parts pertaining to that value. /// /// Resume with: [`resume::Continue`] - RestreamValue( + RestreamProposal( /// Height of the value Ctx::Height, /// Round of the value diff --git a/code/crates/core-consensus/src/handle/driver.rs b/code/crates/core-consensus/src/handle/driver.rs index 94e5ba7f4..993e759e5 100644 --- a/code/crates/core-consensus/src/handle/driver.rs +++ b/code/crates/core-consensus/src/handle/driver.rs @@ -199,7 +199,7 @@ where if signed_proposal.pol_round().is_defined() { perform!( co, - Effect::RestreamValue( + Effect::RestreamProposal( signed_proposal.height(), signed_proposal.round(), signed_proposal.pol_round(), diff --git a/code/crates/engine/src/consensus.rs b/code/crates/engine/src/consensus.rs index 8be84484a..bc84c78b1 100644 --- a/code/crates/engine/src/consensus.rs +++ b/code/crates/engine/src/consensus.rs @@ -904,9 +904,9 @@ where Ok(r.resume_with(validator_set)) } - Effect::RestreamValue(height, round, valid_round, address, value_id, r) => { + Effect::RestreamProposal(height, round, valid_round, address, value_id, r) => { self.host - .cast(HostMsg::RestreamValue { + .cast(HostMsg::RestreamProposal { height, round, valid_round, diff --git a/code/crates/engine/src/host.rs b/code/crates/engine/src/host.rs index 120a8349b..80d04bd52 100644 --- a/code/crates/engine/src/host.rs +++ b/code/crates/engine/src/host.rs @@ -58,7 +58,7 @@ pub enum HostMsg { proposer: Ctx::Address, }, - /// Request to build a local block/value from Driver + /// Request to build a local value to propose GetValue { height: Ctx::Height, round: Round, @@ -66,8 +66,8 @@ pub enum HostMsg { reply_to: RpcReplyPort>, }, - /// Request to restream an existing block/value from Driver - RestreamValue { + /// Request to restream an existing proposal + RestreamProposal { height: Ctx::Height, round: Round, valid_round: Round, diff --git a/code/crates/starknet/host/src/actor.rs b/code/crates/starknet/host/src/actor.rs index 1853042bd..4ee85f366 100644 --- a/code/crates/starknet/host/src/actor.rs +++ b/code/crates/starknet/host/src/actor.rs @@ -135,14 +135,14 @@ impl Host { reply_to, } => on_get_value(state, &self.network, height, round, timeout, reply_to).await, - HostMsg::RestreamValue { + HostMsg::RestreamProposal { height, round, valid_round, address, value_id, } => { - on_restream_value( + on_restream_proposal( state, &self.network, height, @@ -373,7 +373,7 @@ async fn find_previously_built_value( Ok(proposed_value) } -async fn on_restream_value( +async fn on_restream_proposal( state: &mut HostState, network: &NetworkRef, height: Height, diff --git a/code/crates/test/app/src/app.rs b/code/crates/test/app/src/app.rs index 8656b669e..0e7f33605 100644 --- a/code/crates/test/app/src/app.rs +++ b/code/crates/test/app/src/app.rs @@ -224,8 +224,31 @@ pub async fn run( } } - AppMsg::RestreamProposal { .. } => { - error!("RestreamProposal not implemented"); + AppMsg::RestreamProposal { + height, + round, + valid_round, + address, + value_id, + } => { + info!(%height, %round, %value_id, "Restreaming existing proposal..."); + + let Some(proposal) = state + .get_proposal(height, round, valid_round, address, value_id) + .await + else { + error!(%height, %round, %value_id, "Failed to find proposal to restream"); + return Ok(()); + }; + + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, %value_id, "Publishing proposal part: {stream_message:?}"); + + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } } AppMsg::PeerJoined { peer_id } => { diff --git a/code/crates/test/app/src/state.rs b/code/crates/test/app/src/state.rs index e4dc83356..9aded7e63 100644 --- a/code/crates/test/app/src/state.rs +++ b/code/crates/test/app/src/state.rs @@ -20,6 +20,7 @@ use malachitebft_app_channel::app::types::PeerId; use malachitebft_test::codec::proto::ProtobufCodec; use malachitebft_test::{ Address, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, Value, + ValueId, }; use crate::store::{DecidedValue, Store}; @@ -218,6 +219,22 @@ impl State { Value::new(value) } + pub async fn get_proposal( + &self, + height: Height, + round: Round, + _valid_round: Round, + _proposer: Address, + value_id: ValueId, + ) -> Option> { + Some(LocallyProposedValue::new( + height, + round, + Value::new(value_id.as_u64()), + None, + )) + } + /// Creates a new proposal value for the given height /// Returns either a previously built proposal or creates a new one pub async fn propose_value( diff --git a/code/examples/channel/src/app.rs b/code/examples/channel/src/app.rs index affc72d21..b22032c2b 100644 --- a/code/examples/channel/src/app.rs +++ b/code/examples/channel/src/app.rs @@ -226,8 +226,31 @@ pub async fn run( } } - AppMsg::RestreamProposal { .. } => { - error!("RestreamProposal not implemented"); + AppMsg::RestreamProposal { + height, + round, + valid_round, + address, + value_id, + } => { + info!(%height, %round, "Restreaming existing proposal..."); + + let Some(proposal) = state + .get_proposal(height, round, valid_round, address, value_id) + .await + else { + error!(%height, %round, "Failed to find proposal to restream"); + return Ok(()); + }; + + for stream_message in state.stream_proposal(proposal) { + info!(%height, %round, "Publishing proposal part: {stream_message:?}"); + + channels + .network + .send(NetworkMsg::PublishProposalPart(stream_message)) + .await?; + } } AppMsg::PeerJoined { peer_id } => { diff --git a/code/examples/channel/src/state.rs b/code/examples/channel/src/state.rs index 2d67ded4a..f5e757a68 100644 --- a/code/examples/channel/src/state.rs +++ b/code/examples/channel/src/state.rs @@ -19,6 +19,7 @@ use malachitebft_app_channel::app::types::PeerId; use malachitebft_test::codec::proto::ProtobufCodec; use malachitebft_test::{ Address, Height, ProposalData, ProposalFin, ProposalInit, ProposalPart, TestContext, Value, + ValueId, }; use crate::store::{DecidedValue, Store}; @@ -300,6 +301,22 @@ impl State { parts } + + pub async fn get_proposal( + &self, + height: Height, + round: Round, + _valid_round: Round, + _proposer: Address, + value_id: ValueId, + ) -> Option> { + Some(LocallyProposedValue::new( + height, + round, + Value::new(value_id.as_u64()), + None, + )) + } } /// Re-assemble a [`ProposedValue`] from its [`ProposalParts`].