Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(starknet_sequencer_node): get the shared client based on the execution mode #3209

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,8 @@ async fn test_mempool_receives_tx_from_other_peer(
let (config, mut broadcast_channels, _temp_dir_handles) =
setup(&tx_generator, TestIdentifier::MempoolReceivesTxFromOtherPeerTest).await;
let (clients, servers) = create_node_modules(&config);
let mempool_client = clients.get_mempool_shared_client().unwrap();
let mempool_client =
clients.get_mempool_shared_client(&config.components.mempool_p2p.execution_mode).unwrap();
// Build and run the sequencer node.
let sequencer_node_future = run_component_servers(servers);
let _sequencer_node_handle = tokio::spawn(sequencer_node_future);
Expand Down
79 changes: 55 additions & 24 deletions crates/starknet_sequencer_node/src/clients.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,50 +54,67 @@ pub struct SequencerNodeClients {
}

/// A macro to retrieve a shared client wrapped in an `Arc`. The returned client is either the local
/// or the remote, as at most one of them exists. If neither, it returns `None`.
/// or the remote, based on the provided execution mode. If the execution mode is `Disabled` or
/// neither client exists, it returns `None`.
///
/// # Arguments
///
/// * `$self` - The `self` reference to the struct that contains the client field.
/// * `$client_field` - The field name (within `self`) representing the client, which has both
/// `local_client` and `remote_client` as options.
/// * `$execution_mode` - A reference to the `ReactiveComponentExecutionMode` that determines which
/// client to return (`local_client` or `remote_client`).
///
/// # Returns
///
/// An `Option<Arc<dyn ClientTrait>>` containing the available client (local_client or
/// remote_client), wrapped in Arc. If neither exists, returns None.
/// remote_client), wrapped in `Arc`. If the execution mode is `Disabled` or neither client exists,
/// returns `None`.
///
/// # Example
///
/// ```rust,ignore
/// // Assuming `SequencerNodeClients` struct has fields `batcher_client` and `mempool_client.
/// // Assuming `SequencerNodeClients` struct has fields `batcher_client` and `mempool_client`.
/// impl SequencerNodeClients {
/// pub fn get_batcher_shared_client(&self) -> Option<Arc<dyn BatcherClient>> {
/// get_shared_client!(self, batcher_client)
/// pub fn get_batcher_shared_client(
/// &self,
/// execution_mode: &ReactiveComponentExecutionMode,
/// ) -> Option<Arc<dyn BatcherClient>> {
/// get_shared_client!(self, batcher_client, execution_mode)
/// }
///
/// pub fn get_mempool_shared_client(&self) -> Option<Arc<dyn MempoolClient>> {
/// get_shared_client!(self, mempool_client)
/// pub fn get_mempool_shared_client(
/// &self,
/// execution_mode: &ReactiveComponentExecutionMode,
/// ) -> Option<Arc<dyn MempoolClient>> {
/// get_shared_client!(self, mempool_client, execution_mode)
/// }
/// }
/// ```
#[macro_export]
macro_rules! get_shared_client {
($self:ident, $client_field:ident) => {{
($self:ident, $client_field:ident, $execution_mode:expr) => {{
let client = &$self.$client_field;
if let Some(local_client) = client.get_local_client() {
return Some(Arc::new(local_client));
} else if let Some(remote_client) = client.get_remote_client() {
return Some(Arc::new(remote_client));
match &$execution_mode {
ReactiveComponentExecutionMode::Disabled => None,
ReactiveComponentExecutionMode::Remote => {
Some(Arc::new(client.get_remote_client().expect("Failed to get remote client")))
}
ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled => {
Some(Arc::new(client.get_local_client().expect("Failed to get local client")))
}
}
None
}};
}

// TODO(Nadin): Refactor getters to remove code duplication.
impl SequencerNodeClients {
pub fn get_batcher_shared_client(&self) -> Option<SharedBatcherClient> {
get_shared_client!(self, batcher_client)
pub fn get_batcher_shared_client(
&self,
execution_mode: &ReactiveComponentExecutionMode,
) -> Option<SharedBatcherClient> {
get_shared_client!(self, batcher_client, execution_mode)
}

pub fn get_batcher_local_client(
Expand All @@ -106,8 +123,11 @@ impl SequencerNodeClients {
self.batcher_client.get_local_client()
}

pub fn get_mempool_shared_client(&self) -> Option<SharedMempoolClient> {
get_shared_client!(self, mempool_client)
pub fn get_mempool_shared_client(
&self,
execution_mode: &ReactiveComponentExecutionMode,
) -> Option<SharedMempoolClient> {
get_shared_client!(self, mempool_client, execution_mode)
}

pub fn get_mempool_local_client(
Expand All @@ -116,8 +136,11 @@ impl SequencerNodeClients {
self.mempool_client.get_local_client()
}

pub fn get_gateway_shared_client(&self) -> Option<SharedGatewayClient> {
get_shared_client!(self, gateway_client)
pub fn get_gateway_shared_client(
&self,
execution_mode: &ReactiveComponentExecutionMode,
) -> Option<SharedGatewayClient> {
get_shared_client!(self, gateway_client, execution_mode)
}

pub fn get_gateway_local_client(
Expand All @@ -132,14 +155,19 @@ impl SequencerNodeClients {
self.l1_provider_client.get_local_client()
}

pub fn get_l1_provider_shared_client(&self) -> Option<SharedL1ProviderClient> {
get_shared_client!(self, l1_provider_client)
pub fn get_l1_provider_shared_client(
&self,
execution_mode: &ReactiveComponentExecutionMode,
) -> Option<SharedL1ProviderClient> {
get_shared_client!(self, l1_provider_client, execution_mode)
}

pub fn get_mempool_p2p_propagator_shared_client(
&self,

execution_mode: &ReactiveComponentExecutionMode,
) -> Option<SharedMempoolP2pPropagatorClient> {
get_shared_client!(self, mempool_p2p_propagator_client)
get_shared_client!(self, mempool_p2p_propagator_client, execution_mode)
}

pub fn get_mempool_p2p_propagator_local_client(
Expand All @@ -149,8 +177,11 @@ impl SequencerNodeClients {
self.mempool_p2p_propagator_client.get_local_client()
}

pub fn get_state_sync_shared_client(&self) -> Option<SharedStateSyncClient> {
get_shared_client!(self, state_sync_client)
pub fn get_state_sync_shared_client(
&self,
execution_mode: &ReactiveComponentExecutionMode,
) -> Option<SharedStateSyncClient> {
get_shared_client!(self, state_sync_client, execution_mode)
}

pub fn get_state_sync_local_client(
Expand Down
68 changes: 38 additions & 30 deletions crates/starknet_sequencer_node/src/components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,23 @@ pub fn create_node_components(
let batcher = match config.components.batcher.execution_mode {
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
let mempool_client =
clients.get_mempool_shared_client().expect("Mempool Client should be available");
let mempool_client = clients
.get_mempool_shared_client(&config.components.mempool.execution_mode)
.expect("Mempool Client should be available");
let l1_provider_client = clients
.get_l1_provider_shared_client()
.get_l1_provider_shared_client(&config.components.l1_provider.execution_mode)
.expect("L1 Provider Client should be available");
Some(create_batcher(config.batcher_config.clone(), mempool_client, l1_provider_client))
}
ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => None,
};
let consensus_manager = match config.components.consensus_manager.execution_mode {
ActiveComponentExecutionMode::Enabled => {
let batcher_client =
clients.get_batcher_shared_client().expect("Batcher Client should be available");
let batcher_client = clients
.get_batcher_shared_client(&config.components.batcher.execution_mode)
.expect("Batcher Client should be available");
let state_sync_client = clients
.get_state_sync_shared_client()
.get_state_sync_shared_client(&config.components.state_sync.execution_mode)
.expect("State Sync Client should be available");
Some(ConsensusManager::new(
config.consensus_manager_config.clone(),
Expand All @@ -74,10 +76,11 @@ pub fn create_node_components(
let gateway = match config.components.gateway.execution_mode {
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
let mempool_client =
clients.get_mempool_shared_client().expect("Mempool Client should be available");
let mempool_client = clients
.get_mempool_shared_client(&config.components.mempool.execution_mode)
.expect("Mempool Client should be available");
let state_sync_client = clients
.get_state_sync_shared_client()
.get_state_sync_shared_client(&config.components.state_sync.execution_mode)
.expect("State Sync Client should be available");

Some(create_gateway(
Expand All @@ -91,37 +94,40 @@ pub fn create_node_components(
};
let http_server = match config.components.http_server.execution_mode {
ActiveComponentExecutionMode::Enabled => {
let gateway_client =
clients.get_gateway_shared_client().expect("Gateway Client should be available");
let gateway_client = clients
.get_gateway_shared_client(&config.components.gateway.execution_mode)
.expect("Gateway Client should be available");

Some(create_http_server(config.http_server_config.clone(), gateway_client))
}
ActiveComponentExecutionMode::Disabled => None,
};

let (mempool_p2p_propagator, mempool_p2p_runner) = match config
.components
.mempool_p2p
.execution_mode
{
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
let gateway_client =
clients.get_gateway_shared_client().expect("Gateway Client should be available");
let (mempool_p2p_propagator, mempool_p2p_runner) =
create_p2p_propagator_and_runner(config.mempool_p2p_config.clone(), gateway_client);
(Some(mempool_p2p_propagator), Some(mempool_p2p_runner))
}
ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => {
(None, None)
}
};
let (mempool_p2p_propagator, mempool_p2p_runner) =
match config.components.mempool_p2p.execution_mode {
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
let gateway_client = clients
.get_gateway_shared_client(&config.components.gateway.execution_mode)
.expect("Gateway Client should be available");
let (mempool_p2p_propagator, mempool_p2p_runner) = create_p2p_propagator_and_runner(
config.mempool_p2p_config.clone(),
gateway_client,
);
(Some(mempool_p2p_propagator), Some(mempool_p2p_runner))
}
ReactiveComponentExecutionMode::Disabled | ReactiveComponentExecutionMode::Remote => {
(None, None)
}
};

let mempool = match config.components.mempool.execution_mode {
ReactiveComponentExecutionMode::LocalExecutionWithRemoteDisabled
| ReactiveComponentExecutionMode::LocalExecutionWithRemoteEnabled => {
let mempool_p2p_propagator_client = clients
.get_mempool_p2p_propagator_shared_client()
.get_mempool_p2p_propagator_shared_client(
&config.components.mempool_p2p.execution_mode,
)
.expect("Propagator Client should be available");
let mempool = create_mempool(mempool_p2p_propagator_client);
Some(mempool)
Expand Down Expand Up @@ -159,7 +165,9 @@ pub fn create_node_components(

let l1_scraper = match config.components.l1_scraper.execution_mode {
ActiveComponentExecutionMode::Enabled => {
let l1_provider_client = clients.get_l1_provider_shared_client().unwrap();
let l1_provider_client = clients
.get_l1_provider_shared_client(&config.components.l1_provider.execution_mode)
.unwrap();
let l1_scraper_config = config.l1_scraper_config.clone();
let base_layer = EthereumBaseLayerContract::new(config.base_layer_config.clone());

Expand Down