Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(consensus): add config to Context #3268

Open
wants to merge 1 commit into
base: guyn/config/remove_consensus_network_topic
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions config/papyrus/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,21 @@
"privacy": "Public",
"value": "consensus_votes"
},
"context.batcher_build_buffer": {
"description": "The buffer size for the batcher when building proposals.",
"privacy": "Public",
"value": 100
},
"context.chain_id": {
"description": "The chain id of the Starknet chain.",
"pointer_target": "chain_id",
"privacy": "Public"
},
"context.num_validators": {
"description": "The number of validators.",
"privacy": "Public",
"value": 1
},
"monitoring_gateway.collect_metrics": {
"description": "If true, collect and return metrics in the monitoring gateway.",
"pointer_target": "collect_metrics",
Expand Down
5 changes: 5 additions & 0 deletions crates/papyrus_node/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use std::io::{BufWriter, Write};
use std::mem::discriminant;
use std::ops::IndexMut;
use std::path::{Path, PathBuf};
use std::task::Context;
use std::time::Duration;
use std::{env, fs, io};

Expand All @@ -26,6 +27,7 @@ use papyrus_config::dumping::{
use papyrus_config::loading::load_and_process_config;
use papyrus_config::{ConfigError, ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::types::ContextConfig;
use papyrus_monitoring_gateway::MonitoringGatewayConfig;
use papyrus_network::NetworkConfig;
use papyrus_p2p_sync::client::{P2PSyncClient, P2PSyncClientConfig};
Expand Down Expand Up @@ -64,6 +66,7 @@ pub struct NodeConfig {
// TODO(yair): Change NodeConfig to have an option of enum of SyncConfig or P2PSyncConfig.
pub p2p_sync: Option<P2PSyncClientConfig>,
pub consensus: Option<ConsensusConfig>,
pub context: ContextConfig,
// TODO(shahak): Make network non-optional once it's developed enough.
pub network: Option<NetworkConfig>,
pub collect_profiling_metrics: bool,
Expand All @@ -82,6 +85,7 @@ impl Default for NodeConfig {
sync: Some(SyncConfig::default()),
p2p_sync: None,
consensus: None,
context: ContextConfig::default(),
network: None,
collect_profiling_metrics: false,
}
Expand All @@ -99,6 +103,7 @@ impl SerializeConfig for NodeConfig {
ser_optional_sub_config(&self.sync, "sync"),
ser_optional_sub_config(&self.p2p_sync, "p2p_sync"),
ser_optional_sub_config(&self.consensus, "consensus"),
append_sub_config_name(self.context.dump(), "context"),
ser_optional_sub_config(&self.network, "network"),
BTreeMap::from_iter([ser_param(
"collect_profiling_metrics",
Expand Down
1 change: 1 addition & 0 deletions crates/papyrus_node/src/config/pointers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ pub static CONFIG_POINTERS: LazyLock<ConfigPointers> = LazyLock::new(|| {
),
set_pointing_param_paths(&[
"consensus.chain_id",
"context.chain_id",
"consensus.network_config.chain_id",
"network.chain_id",
"rpc.chain_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,25 @@ expression: dumped_default_config
"value": "consensus_votes",
"privacy": "Public"
},
"context.batcher_build_buffer": {
"description": "The buffer size for the batcher when building proposals.",
"value": {
"$serde_json::private::Number": "100"
},
"privacy": "Public"
},
"context.chain_id": {
"description": "The chain id of the Starknet chain.",
"value": "SN_MAIN",
"privacy": "Public"
},
"context.num_validators": {
"description": "The number of validators.",
"value": {
"$serde_json::private::Number": "1"
},
"privacy": "Public"
},
"monitoring_gateway.collect_metrics": {
"description": "If true, collect and return metrics in the monitoring gateway.",
"value": false,
Expand Down
30 changes: 17 additions & 13 deletions crates/papyrus_node/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use papyrus_config::presentation::get_config_presentation;
use papyrus_config::validators::config_validate;
use papyrus_consensus::config::ConsensusConfig;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ContextConfig;
use papyrus_consensus_orchestrator::papyrus_consensus_context::PapyrusConsensusContext;
use papyrus_monitoring_gateway::MonitoringServer;
use papyrus_network::gossipsub_impl::Topic;
Expand Down Expand Up @@ -176,23 +177,25 @@ fn spawn_monitoring_server(
}

fn spawn_consensus(
config: Option<&ConsensusConfig>,
consensus_config: Option<&ConsensusConfig>,
context_config: ContextConfig,
storage_reader: StorageReader,
network_manager: Option<&mut NetworkManager>,
) -> anyhow::Result<JoinHandle<anyhow::Result<()>>> {
let (Some(config), Some(network_manager)) = (config, network_manager) else {
let (Some(consensus_config), Some(network_manager)) = (consensus_config, network_manager)
else {
info!("Consensus is disabled.");
return Ok(tokio::spawn(future::pending()));
};
let config = config.clone();
debug!("Consensus configuration: {config:?}");
let consensus_config = consensus_config.clone();
debug!("Consensus configuration: {consensus_config:?}");

let network_channels = network_manager
.register_broadcast_topic(Topic::new(config.votes_topic.clone()), BUFFER_SIZE)?;
.register_broadcast_topic(Topic::new(consensus_config.votes_topic.clone()), BUFFER_SIZE)?;
let proposal_network_channels: BroadcastTopicChannels<
StreamMessage<ProposalPart, HeightAndRound>,
> = network_manager
.register_broadcast_topic(Topic::new(config.proposals_topic), BUFFER_SIZE)?;
.register_broadcast_topic(Topic::new(consensus_config.proposals_topic), BUFFER_SIZE)?;
let BroadcastTopicChannels {
broadcasted_messages_receiver: inbound_network_receiver,
broadcast_topic_client: outbound_network_sender,
Expand All @@ -203,23 +206,23 @@ fn spawn_consensus(
StreamHandler::get_channels(inbound_network_receiver, outbound_network_sender);

let context = PapyrusConsensusContext::new(
context_config,
storage_reader.clone(),
network_channels.broadcast_topic_client.clone(),
outbound_internal_sender,
config.num_validators,
None,
);

Ok(tokio::spawn(async move {
Ok(papyrus_consensus::run_consensus(
context,
config.start_height,
consensus_config.start_height,
// TODO(Asmaa): replace with the correct value.
config.start_height,
config.validator_id,
config.consensus_delay,
config.timeouts.clone(),
config.sync_retry_interval,
consensus_config.start_height,
consensus_config.validator_id,
consensus_config.consensus_delay,
consensus_config.timeouts.clone(),
consensus_config.sync_retry_interval,
network_channels.into(),
inbound_internal_receiver,
)
Expand Down Expand Up @@ -355,6 +358,7 @@ async fn run_threads(
} else {
spawn_consensus(
config.consensus.as_ref(),
config.context.clone(),
resources.storage_reader.clone(),
resources.maybe_network_manager.as_mut(),
)?
Expand Down
49 changes: 48 additions & 1 deletion crates/sequencing/papyrus_consensus/src/types.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
//! Types for interfacing between consensus and the node.

use std::collections::BTreeMap;
use std::fmt::Debug;
use std::time::Duration;

use async_trait::async_trait;
use futures::channel::{mpsc, oneshot};
use papyrus_config::dumping::{ser_param, SerializeConfig};
use papyrus_config::{ParamPath, ParamPrivacyInput, SerializedParam};
use papyrus_network::network_manager::{
BroadcastTopicChannels,
BroadcastTopicClient,
Expand All @@ -13,8 +16,10 @@ use papyrus_network::network_manager::{
use papyrus_network_types::network_types::BroadcastedMessageMetadata;
use papyrus_protobuf::consensus::{ProposalFin, ProposalInit, Vote};
use papyrus_protobuf::converters::ProtobufConversionError;
use serde::{Deserialize, Serialize};
use starknet_api::block::{BlockHash, BlockNumber};
use starknet_api::core::ContractAddress;
use starknet_api::core::{ChainId, ContractAddress};
use validator::Validate;

/// Used to identify the node by consensus.
/// 1. This ID is derived from the id registered with Starknet's L2 staking contract.
Expand All @@ -25,6 +30,48 @@ pub type ValidatorId = ContractAddress;
pub type Round = u32;
pub type ProposalContentId = BlockHash;

/// Configuration for the Context struct.
#[derive(Debug, Deserialize, Serialize, Clone, PartialEq, Validate)]
pub struct ContextConfig {
/// The buffer size for the batcher when building proposals.
pub batcher_build_buffer: usize,
/// The number of validators.
pub num_validators: u64,
/// The chain id of the Starknet chain.
pub chain_id: ChainId,
}

impl SerializeConfig for ContextConfig {
fn dump(&self) -> BTreeMap<ParamPath, SerializedParam> {
BTreeMap::from_iter([
ser_param(
"batcher_build_buffer",
&self.batcher_build_buffer,
"The buffer size for the batcher when building proposals.",
ParamPrivacyInput::Public,
),
ser_param(
"num_validators",
&self.num_validators,
"The number of validators.",
ParamPrivacyInput::Public,
),
ser_param(
"chain_id",
&self.chain_id,
"The chain id of the Starknet chain.",
ParamPrivacyInput::Public,
),
])
}
}

impl Default for ContextConfig {
fn default() -> Self {
Self { batcher_build_buffer: 100, num_validators: 1, chain_id: ChainId::Mainnet }
}
}

/// Interface for consensus to call out to the node.
///
/// Function calls should be assumed to not be cancel safe.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use futures::{SinkExt, StreamExt};
use papyrus_consensus::types::{
ConsensusContext,
ConsensusError,
ContextConfig,
ProposalContentId,
Round,
ValidatorId,
Expand Down Expand Up @@ -46,6 +47,7 @@ type HeightToIdToContent = BTreeMap<BlockNumber, HashMap<ProposalContentId, Vec<
const CHANNEL_SIZE: usize = 100;

pub struct PapyrusConsensusContext {
_config: ContextConfig,
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<Vote>,
network_proposal_sender: mpsc::Sender<(HeightAndRound, mpsc::Receiver<ProposalPart>)>,
Expand All @@ -59,13 +61,15 @@ pub struct PapyrusConsensusContext {

impl PapyrusConsensusContext {
pub fn new(
config: ContextConfig,
storage_reader: StorageReader,
network_broadcast_client: BroadcastTopicClient<Vote>,
network_proposal_sender: mpsc::Sender<(HeightAndRound, mpsc::Receiver<ProposalPart>)>,
num_validators: u64,
sync_broadcast_sender: Option<BroadcastTopicClient<Vote>>,
) -> Self {
let num_validators = config.num_validators;
Self {
_config: config,
storage_reader,
network_broadcast_client,
network_proposal_sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use std::time::Duration;
use futures::channel::{mpsc, oneshot};
use futures::StreamExt;
use papyrus_consensus::stream_handler::StreamHandler;
use papyrus_consensus::types::ConsensusContext;
use papyrus_consensus::types::{ConsensusContext, ContextConfig};
use papyrus_network::network_manager::test_utils::{
mock_register_broadcast_topic,
BroadcastNetworkMock,
Expand Down Expand Up @@ -142,10 +142,10 @@ fn test_setup()
let sync_channels = mock_register_broadcast_topic().unwrap();

let papyrus_context = PapyrusConsensusContext::new(
ContextConfig::default(),
storage_reader.clone(),
network_channels.subscriber_channels.broadcast_topic_client,
outbound_internal_sender,
4,
Some(sync_channels.subscriber_channels.broadcast_topic_client),
);
(block, papyrus_context, network_channels.mock_network, sync_channels.mock_network)
Expand Down
Loading