From da65c0d1f48ea6986ef560fbe3019d20d8008bc2 Mon Sep 17 00:00:00 2001 From: Michael Zaikin Date: Fri, 5 Jan 2024 18:17:05 +0200 Subject: [PATCH] Pre-blocks forwarded to DA --- Cargo.lock | 1 + Makefile | 4 ++-- crates/pre-block/Cargo.toml | 1 + crates/pre-block/src/conversion.rs | 22 ++++++++++++++---- crates/pre-block/src/digest.rs | 14 +++++++++-- crates/pre-block/src/fixture.rs | 4 ++-- crates/pre-block/src/lib.rs | 9 +++++++- crates/pre-block/src/validator.rs | 10 ++++++-- sequencer/src/consensus_client.rs | 37 ++++++++++++++++++++---------- sequencer/src/main.rs | 36 ++++++++++++++--------------- sequencer/src/rollup_client.rs | 8 ++++--- 11 files changed, 100 insertions(+), 46 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index bb43be0..02bcdd3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3779,6 +3779,7 @@ dependencies = [ "narwhal-test-utils", "narwhal-types", "narwhal-utils", + "pretty_assertions", "rand 0.8.5", "serde", "tezos_crypto_rs", diff --git a/Makefile b/Makefile index ee2bf60..f502407 100644 --- a/Makefile +++ b/Makefile @@ -67,10 +67,10 @@ run-operator: run-sequencer: $(MAKE) build-sequencer - RUST_LOG=debug ./target/debug/sequencer + RUST_LOG=info ./target/debug/sequencer run-dsn: - ./target/debug/launcher --id 1 --log-level 3 & + ./target/debug/launcher --id 1 --log-level 2 & ./target/debug/launcher --id 2 --log-level 0 & ./target/debug/launcher --id 3 --log-level 0 & ./target/debug/launcher --id 4 --log-level 0 & diff --git a/crates/pre-block/Cargo.toml b/crates/pre-block/Cargo.toml index 9b84fcc..b978dfa 100644 --- a/crates/pre-block/Cargo.toml +++ b/crates/pre-block/Cargo.toml @@ -32,6 +32,7 @@ narwhal-config.workspace = true indexmap.workspace = true fastcrypto.workspace = true rand.workspace = true +pretty_assertions.workspace = true [features] default = [] diff --git a/crates/pre-block/src/conversion.rs b/crates/pre-block/src/conversion.rs index 2a9e442..4e6e87c 100644 --- a/crates/pre-block/src/conversion.rs +++ b/crates/pre-block/src/conversion.rs @@ -2,11 +2,21 @@ // // SPDX-License-Identifier: MIT -use std::collections::BTreeSet; +use narwhal_types::{CertificateAPI, CertificateV2, HeaderV2, SystemMessage}; -use narwhal_types::{CertificateAPI, CertificateV2, HeaderV2}; +use crate::{Batch, Certificate, CertificateHeader}; -use crate::{Batch, Certificate, CertificateHeader, PreBlock}; +impl From for crate::SystemMessage { + fn from(message: SystemMessage) -> Self { + match message { + SystemMessage::DkgConfirmation(msg) => Self::DkgConfirmation(msg), + SystemMessage::DkgMessage(msg) => Self::DkgMessage(msg), + SystemMessage::RandomnessSignature(round, msg) => { + Self::RandomnessSignature(round.0, msg) + } + } + } +} impl From for CertificateHeader { fn from(narwhal_header: HeaderV2) -> Self { @@ -20,7 +30,11 @@ impl From for CertificateHeader { .into_iter() .map(|x| (x.0 .0, x.1)) .collect(), - system_messages: vec![], + system_messages: narwhal_header + .system_messages + .into_iter() + .map(|x| x.into()) + .collect(), parents: narwhal_header.parents.into_iter().map(|x| x.0).collect(), } } diff --git a/crates/pre-block/src/digest.rs b/crates/pre-block/src/digest.rs index 41fbb2d..2e1259b 100644 --- a/crates/pre-block/src/digest.rs +++ b/crates/pre-block/src/digest.rs @@ -38,7 +38,10 @@ mod tests { use indexmap::IndexMap; use narwhal_config::{AuthorityIdentifier, WorkerId}; use narwhal_test_utils::latest_protocol_version; - use narwhal_types::{BatchDigest, BatchV2, CertificateDigest, HeaderV2, TimestampMs}; + use narwhal_types::{ + BatchDigest, BatchV2, CertificateDigest, HeaderV2, RandomnessRound, SystemMessage, + TimestampMs, + }; use crate::CertificateHeader; @@ -69,14 +72,21 @@ mod tests { 2u64, 3u64, payload, - vec![], + vec![ + SystemMessage::DkgConfirmation(vec![12u8]), + SystemMessage::DkgMessage(vec![13u8]), + SystemMessage::RandomnessSignature(RandomnessRound(123), vec![14u8]), + ], parents, ); + let expected_payload = bcs::to_bytes(&header_v2).unwrap(); let expected = header_v2.digest().0; let header: CertificateHeader = header_v2.into(); + let actual_payload = bcs::to_bytes(&header).unwrap(); let actual = header.digest(); + pretty_assertions::assert_eq!(hex::encode(expected_payload), hex::encode(actual_payload)); assert_eq!(expected, actual); } } diff --git a/crates/pre-block/src/fixture.rs b/crates/pre-block/src/fixture.rs index 645c04a..ad3a4c1 100644 --- a/crates/pre-block/src/fixture.rs +++ b/crates/pre-block/src/fixture.rs @@ -2,10 +2,10 @@ use std::collections::HashMap; use std::{collections::BTreeSet, num::NonZeroUsize}; use narwhal_test_utils::{latest_protocol_version, CommitteeFixture}; -use narwhal_types::{CertificateDigest, CertificateV2, Header, VoteAPI, HeaderV2Builder}; +use narwhal_types::{CertificateDigest, CertificateV2, Header, HeaderV2Builder, VoteAPI}; use narwhal_utils::protocol_config::ProtocolConfig; -use crate::{Certificate, Digest, Batch, PreBlock, PreBlockStore, PublicKey}; +use crate::{Batch, Certificate, Digest, PreBlock, PreBlockStore, PublicKey}; pub const COMMITTEE_SIZE: usize = 4; diff --git a/crates/pre-block/src/lib.rs b/crates/pre-block/src/lib.rs index 8ef055e..a84235b 100644 --- a/crates/pre-block/src/lib.rs +++ b/crates/pre-block/src/lib.rs @@ -29,6 +29,13 @@ pub type AggregateSignature = Vec; /// Blake2B 256 bit pub type Digest = [u8; 32]; +#[derive(Debug, Clone, Deserialize, Serialize)] +pub enum SystemMessage { + DkgMessage(Vec), + DkgConfirmation(Vec), + RandomnessSignature(u64, Vec), +} + #[derive(Debug, Clone, Serialize, Deserialize, Default)] pub struct CertificateHeader { pub author: u16, @@ -36,7 +43,7 @@ pub struct CertificateHeader { pub epoch: u64, pub created_at: u64, pub payload: Vec<(Digest, (u32, u64))>, - pub system_messages: Vec<()>, // not used + pub system_messages: Vec, pub parents: BTreeSet, } diff --git a/crates/pre-block/src/validator.rs b/crates/pre-block/src/validator.rs index 6f3fd0a..f455f72 100644 --- a/crates/pre-block/src/validator.rs +++ b/crates/pre-block/src/validator.rs @@ -60,10 +60,16 @@ pub fn validate_certificate_chain( match store.get_certificate_index(parent) { Some(prev_index) if prev_index + 1 != index => { - anyhow::bail!("Parent certificate is not from a preceding sub dag {}", hex::encode(parent)) + anyhow::bail!( + "Parent certificate is not from a preceding sub dag {}", + hex::encode(parent) + ) } None => { - anyhow::bail!("Parent certificate cannot be not found {}", hex::encode(parent)); + anyhow::bail!( + "Parent certificate cannot be not found {}", + hex::encode(parent) + ); } _ => (), } diff --git a/sequencer/src/consensus_client.rs b/sequencer/src/consensus_client.rs index 517a818..bdeaa08 100644 --- a/sequencer/src/consensus_client.rs +++ b/sequencer/src/consensus_client.rs @@ -3,10 +3,10 @@ // SPDX-License-Identifier: MIT use bytes::Bytes; -use log::{info, debug}; -use std::{sync::mpsc, collections::BTreeSet}; +use log::{debug, info}; use narwhal_types::{TransactionProto, TransactionsClient}; -use pre_block::{PreBlock, Certificate, CertificateHeader}; +use pre_block::{Certificate, CertificateHeader, PreBlock, SystemMessage}; +use std::{collections::BTreeSet, sync::mpsc}; use tonic::transport::Channel; mod exporter { @@ -43,10 +43,7 @@ impl WorkerClient { let tx = TransactionProto { transaction: Bytes::from(payload), }; - let res = self.client() - .await? - .submit_transaction(tx) - .await?; + let res = self.client().await?.submit_transaction(tx).await?; debug!("[Worker client] Response {:#?}", res.metadata()); Ok(()) } @@ -80,9 +77,10 @@ impl PrimaryClient { pub async fn subscribe_pre_blocks( &mut self, from_id: u64, - pre_blocks_tx: mpsc::Sender + pre_blocks_tx: mpsc::Sender, ) -> anyhow::Result<()> { - let mut stream = self.client() + let mut stream = self + .client() .await? .export(exporter::ExportRequest { from_id }) .await? @@ -94,14 +92,25 @@ impl PrimaryClient { pre_blocks_tx.send(pre_block)?; } - + Ok(()) } } +impl From for SystemMessage { + fn from(msg: exporter::SystemMessage) -> Self { + match msg.message.unwrap() { + exporter::system_message::Message::DkgConfirmation(msg) => Self::DkgConfirmation(msg), + exporter::system_message::Message::DkgMessage(msg) => Self::DkgMessage(msg), + exporter::system_message::Message::RandomnessSignature(sig) => { + Self::RandomnessSignature(sig.randomness_round, sig.bytes) + } + } + } +} + impl From for CertificateHeader { fn from(header: exporter::Header) -> Self { - assert!(header.system_messages.is_empty()); Self { author: header.author as u16, round: header.round, @@ -117,7 +126,11 @@ impl From for CertificateHeader { ) }) .collect(), - system_messages: vec![], + system_messages: header + .system_messages + .into_iter() + .map(|msg| msg.into()) + .collect(), parents: BTreeSet::from_iter( header .parents diff --git a/sequencer/src/main.rs b/sequencer/src/main.rs index c71f894..d00689c 100644 --- a/sequencer/src/main.rs +++ b/sequencer/src/main.rs @@ -2,22 +2,22 @@ // // SPDX-License-Identifier: MIT +use axum::{ + extract::{Path, State}, + http::StatusCode, + routing::{get, post}, + Json, Router, +}; use clap::Parser; use consensus_client::WorkerClient; use log::{error, info, warn}; use rollup_client::RollupClient; +use serde::{Deserialize, Serialize}; use std::sync::mpsc; use std::sync::Arc; use std::time::Duration; use tokio::signal; use tokio::task::JoinHandle; -use serde::{Serialize, Deserialize}; -use axum::{ - extract::{Path, State}, - http::StatusCode, - routing::{get, post}, - Json, Router, -}; use crate::consensus_client::PrimaryClient; use crate::da_batcher::publish_pre_blocks; @@ -43,7 +43,7 @@ impl AppState { #[derive(Debug, Clone, Serialize, Deserialize)] struct Transaction { - pub data: String + pub data: String, } async fn broadcast_transaction( @@ -51,10 +51,10 @@ async fn broadcast_transaction( Json(tx): Json, ) -> Result, StatusCode> { info!("Broadcasting tx `{}`", tx.data); - let tx_payload = hex::decode(tx.data) - .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; + let tx_payload = hex::decode(tx.data).map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - state.worker_client + state + .worker_client .as_ref() .clone() // cloning channel is cheap and encouraged .send_transaction(tx_payload) @@ -68,7 +68,8 @@ async fn get_block_by_level( State(state): State, Path(level): Path, ) -> Result>, StatusCode> { - let block = state.rollup_client + let block = state + .rollup_client .get_block_by_level(level) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -82,7 +83,8 @@ async fn get_block_by_level( } async fn get_head(State(state): State) -> Result, StatusCode> { - let head = state.rollup_client + let head = state + .rollup_client .get_head() .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; @@ -93,15 +95,13 @@ async fn get_authorities( State(state): State, Path(epoch): Path, ) -> Result>, StatusCode> { - let authorities = state.rollup_client + let authorities = state + .rollup_client .get_authorities(epoch) .await .map_err(|_| StatusCode::INTERNAL_SERVER_ERROR)?; - let res: Vec = authorities - .into_iter() - .map(|a| hex::encode(a)) - .collect(); + let res: Vec = authorities.into_iter().map(|a| hex::encode(a)).collect(); Ok(Json(res)) } diff --git a/sequencer/src/rollup_client.rs b/sequencer/src/rollup_client.rs index b5652ac..dda3ae2 100644 --- a/sequencer/src/rollup_client.rs +++ b/sequencer/src/rollup_client.rs @@ -149,9 +149,11 @@ impl RollupClient { pub async fn get_head(&self) -> anyhow::Result { let res = self.store_get("/head".into()).await?; if let Some(bytes) = res { - let index = u32::from_be_bytes(bytes.try_into().map_err(|b| { - anyhow::anyhow!("Failed to parse head: {}", hex::encode(b)) - })?); + let index = u32::from_be_bytes( + bytes + .try_into() + .map_err(|b| anyhow::anyhow!("Failed to parse head: {}", hex::encode(b)))?, + ); Ok(index + 1) } else { Ok(0)