Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test(starknet_integration_tests): late node #3232

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions config/sequencer/default_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@
"consensus_manager_config.consensus_config.network_config.peer_manager_config.unstable_timeout_millis": {
"description": "The duration in milliseconds a peer blacklisted after being reported as unstable.",
"privacy": "Public",
"value": 1000
"value": 1
},
"consensus_manager_config.consensus_config.network_config.secret_key": {
"description": "The secret key used for building the peer id. If it's an empty string a random one will be used.",
Expand Down Expand Up @@ -817,7 +817,7 @@
"mempool_p2p_config.network_config.peer_manager_config.unstable_timeout_millis": {
"description": "The duration in milliseconds a peer blacklisted after being reported as unstable.",
"privacy": "Public",
"value": 1000
"value": 1
},
"mempool_p2p_config.network_config.secret_key": {
"description": "The secret key used for building the peer id. If it's an empty string a random one will be used.",
Expand Down Expand Up @@ -912,7 +912,7 @@
"state_sync_config.network_config.peer_manager_config.unstable_timeout_millis": {
"description": "The duration in milliseconds a peer blacklisted after being reported as unstable.",
"privacy": "Public",
"value": 1000
"value": 1
},
"state_sync_config.network_config.secret_key": {
"description": "The secret key used for building the peer id. If it's an empty string a random one will be used.",
Expand Down
14 changes: 12 additions & 2 deletions crates/papyrus_network/src/network_manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
"Received an inbound query while the buffer is full. Dropping query for session \
{inbound_session_id:?}"
),
true,
);
}

Expand All @@ -458,13 +459,15 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
self.sqmr_outbound_response_senders.get_mut(&outbound_session_id)
{
// TODO(shahak): Close the channel if the buffer is full.
// TODO(Eitan): Close the channel if query was dropped by user.
send_now(
response_sender,
response,
format!(
"Received response for an outbound query while the buffer is full. Dropping \
it. Session: {outbound_session_id:?}"
),
false,
);
}
}
Expand Down Expand Up @@ -620,12 +623,19 @@ impl<SwarmT: SwarmTrait> GenericNetworkManager<SwarmT> {
}
}

fn send_now<Item>(sender: &mut GenericSender<Item>, item: Item, buffer_full_message: String) {
fn send_now<Item>(
sender: &mut GenericSender<Item>,
item: Item,
buffer_full_message: String,
should_panic_upon_disconnect: bool,
) {
pin_mut!(sender);
match sender.as_mut().send(item).now_or_never() {
Some(Ok(())) => {}
Some(Err(error)) => {
error!("Received error while sending message: {:?}", error);
if should_panic_upon_disconnect || !error.is_disconnected() {
panic!("Received error while sending message: {:?}", error);
}
}
None => {
warn!(buffer_full_message);
Expand Down
4 changes: 2 additions & 2 deletions crates/papyrus_p2p_sync/src/client/stream_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ where
if let Some(block) = Self::get_internal_block_at(&mut internal_blocks_received, &mut internal_block_receiver, current_block_number)
.now_or_never()
{
debug!("Added internally {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number);
info!("Added internally {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number);
yield Ok(Box::<dyn BlockData>::from(Box::new(block)));
current_block_number = current_block_number.unchecked_next();
continue 'send_query_and_parse_responses;
Expand Down Expand Up @@ -189,7 +189,7 @@ where
}
}
block = Self::get_internal_block_at(&mut internal_blocks_received, &mut internal_block_receiver, current_block_number) => {
debug!("Added internally {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number);
info!("Added internally {:?} for block {}.", Self::TYPE_DESCRIPTION, current_block_number);
current_block_number = current_block_number.unchecked_next();
yield Ok(Box::<dyn BlockData>::from(Box::new(block)));
debug!("Network query ending at block {} for {:?} being ignored due to internal block", end_block_number, Self::TYPE_DESCRIPTION);
Expand Down
4 changes: 2 additions & 2 deletions crates/starknet_gateway/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,13 +59,13 @@ impl Gateway {
}
}

#[instrument(skip(self), ret)]
// #[instrument(skip(self), ret)]
pub async fn add_tx(
&self,
tx: RpcTransaction,
p2p_message_metadata: Option<BroadcastedMessageMetadata>,
) -> GatewayResult<TransactionHash> {
info!("Processing tx");
// info!("Processing tx");
let blocking_task = ProcessTxBlockingTask::new(self, tx);
// Run the blocking task in the current span.
let curr_span = Span::current();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ use crate::sequencer_manager::{
use crate::test_identifiers::TestIdentifier;

/// The number of consolidated local sequencers that participate in the test.
const N_CONSOLIDATED_SEQUENCERS: usize = 3;
const N_CONSOLIDATED_SEQUENCERS: usize = 5;
/// The number of distributed remote sequencers that participate in the test.
const N_DISTRIBUTED_SEQUENCERS: usize = 2;
const N_DISTRIBUTED_SEQUENCERS: usize = 0;

pub async fn end_to_end_integration(tx_generator: MultiAccountTransactionGenerator) {
const EXPECTED_BLOCK_NUMBER: BlockNumber = BlockNumber(15);
const N_TXS: usize = 50;
const EXPECTED_BLOCK_NUMBER: BlockNumber = BlockNumber(20);
const N_TXS: usize = 400;
const SENDER_ACCOUNT: AccountId = 0;
let sender_address = tx_generator.account_with_id(SENDER_ACCOUNT).sender_address();

Expand Down
11 changes: 10 additions & 1 deletion crates/starknet_integration_tests/src/flow_test_setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ use crate::utils::{
create_consensus_manager_configs_and_channels,
create_mempool_p2p_configs,
create_node_config,
create_state_sync_configs,
spawn_success_recorder,
};

Expand Down Expand Up @@ -132,13 +133,21 @@ impl FlowSequencerSetup {

let component_config = ComponentConfig::default();

let state_sync_config = create_state_sync_configs(
1,
storage_for_test.state_sync_storage_config,
&mut available_ports,
)
.pop()
.unwrap();

// Derive the configuration for the sequencer node.
let (node_config, _required_params) = create_node_config(
&mut available_ports,
sequencer_index,
chain_info,
storage_for_test.batcher_storage_config,
storage_for_test.state_sync_storage_config,
state_sync_config,
consensus_manager_config,
mempool_p2p_config,
component_config,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use starknet_monitoring_endpoint::config::MonitoringEndpointConfig;
use starknet_monitoring_endpoint::test_utils::MonitoringClient;
use starknet_sequencer_infra::test_utils::AvailablePorts;
use starknet_sequencer_node::config::component_config::ComponentConfig;
use starknet_state_sync::config::StateSyncConfig;
use tempfile::{tempdir, TempDir};
use tracing::instrument;

Expand Down Expand Up @@ -46,13 +47,15 @@ pub struct SequencerSetup {
}

impl SequencerSetup {
#[allow(clippy::too_many_arguments)]
#[instrument(skip(accounts, chain_info, consensus_manager_config), level = "debug")]
pub async fn new(
accounts: Vec<AccountTransactionGenerator>,
sequencer_index: usize,
chain_info: ChainInfo,
mut consensus_manager_config: ConsensusManagerConfig,
mempool_p2p_config: MempoolP2pConfig,
mut state_sync_config: StateSyncConfig,
available_ports: &mut AvailablePorts,
component_config: ComponentConfig,
) -> Self {
Expand All @@ -62,13 +65,15 @@ impl SequencerSetup {
let recorder_url = spawn_success_recorder(available_ports.get_next_port());
consensus_manager_config.cende_config.recorder_url = recorder_url;

state_sync_config.storage_config = storage_for_test.state_sync_storage_config;

// Derive the configuration for the sequencer node.
let (config, required_params) = create_node_config(
available_ports,
sequencer_index,
chain_info,
storage_for_test.batcher_storage_config,
storage_for_test.state_sync_storage_config,
state_sync_config,
consensus_manager_config,
mempool_p2p_config,
component_config,
Expand Down
11 changes: 9 additions & 2 deletions crates/starknet_integration_tests/src/sequencer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use itertools::izip;
use mempool_test_utils::starknet_api_test_utils::{AccountId, MultiAccountTransactionGenerator};
use papyrus_execution::execution_utils::get_nonce_at;
use papyrus_storage::state::StateStorageReader;
use papyrus_storage::StorageReader;
use papyrus_storage::{StorageConfig, StorageReader};
use starknet_api::block::BlockNumber;
use starknet_api::core::{ContractAddress, Nonce};
use starknet_api::rpc_transaction::RpcTransaction;
Expand All @@ -22,6 +22,7 @@ use crate::utils::{
create_chain_info,
create_consensus_manager_configs_and_channels,
create_mempool_p2p_configs,
create_state_sync_configs,
send_account_txs,
};

Expand Down Expand Up @@ -175,6 +176,12 @@ pub async fn get_sequencer_setup_configs(
available_ports.get_next_ports(n_distributed_sequencers + 1),
);

let mut state_sync_configs = create_state_sync_configs(
n_distributed_sequencers,
StorageConfig::default(),
&mut available_ports,
);

let mempool_p2p_configs = create_mempool_p2p_configs(
chain_info.chain_id.clone(),
available_ports.get_next_ports(n_distributed_sequencers),
Expand All @@ -195,7 +202,6 @@ pub async fn get_sequencer_setup_configs(

// TODO(Nadin/Tsabary): There are redundant p2p configs here, as each distributed node
// needs only one of them, but the current setup creates one per part. Need to refactor.

let mut sequencers = vec![];
for (
((sequencer_index, _sequencer_part_index), component_config),
Expand All @@ -209,6 +215,7 @@ pub async fn get_sequencer_setup_configs(
chain_info.clone(),
consensus_manager_config,
mempool_p2p_config,
state_sync_configs.remove(0),
&mut available_ports,
component_config.clone(),
)
Expand Down
27 changes: 17 additions & 10 deletions crates/starknet_integration_tests/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ pub async fn create_node_config(
sequencer_index: usize,
chain_info: ChainInfo,
batcher_storage_config: StorageConfig,
state_sync_storage_config: StorageConfig,
state_sync_config: StateSyncConfig,
mut consensus_manager_config: ConsensusManagerConfig,
mempool_p2p_config: MempoolP2pConfig,
component_config: ComponentConfig,
Expand All @@ -88,8 +88,6 @@ pub async fn create_node_config(
create_http_server_config(available_ports.get_next_local_host_socket());
let monitoring_endpoint_config =
MonitoringEndpointConfig { port: available_ports.get_next_port(), ..Default::default() };
let state_sync_config =
create_state_sync_config(state_sync_storage_config, available_ports.get_next_port());

(
SequencerNodeConfig {
Expand Down Expand Up @@ -339,12 +337,21 @@ fn set_validator_id(
validator_id
}

pub fn create_state_sync_config(
pub fn create_state_sync_configs(
n_sequencers: usize,
state_sync_storage_config: StorageConfig,
port: u16,
) -> StateSyncConfig {
let mut config =
StateSyncConfig { storage_config: state_sync_storage_config, ..Default::default() };
config.network_config.tcp_port = port;
config
available_ports: &mut AvailablePorts,
) -> Vec<StateSyncConfig> {
let mut state_sync_configs = vec![];
let network_configs =
create_connected_network_configs(available_ports.get_next_ports(n_sequencers));
for network_config in network_configs {
let state_sync_config = StateSyncConfig {
storage_config: state_sync_storage_config.clone(),
network_config,
..Default::default()
};
state_sync_configs.push(state_sync_config);
}
state_sync_configs
}
11 changes: 7 additions & 4 deletions crates/starknet_integration_tests/tests/mempool_p2p_flow_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use starknet_integration_tests::utils::{
create_chain_info,
create_gateway_config,
create_integration_test_tx_generator,
create_state_sync_config,
create_state_sync_configs,
create_txs_for_integration_test,
run_integration_test_scenario,
test_tx_hashes_for_integration_test,
Expand Down Expand Up @@ -79,10 +79,13 @@ async fn setup(
let gateway_config = create_gateway_config(chain_info).await;
let http_server_config =
create_http_server_config(available_ports.get_next_local_host_socket());
let state_sync_config = create_state_sync_config(
let state_sync_config = create_state_sync_configs(
1,
storage_for_test.state_sync_storage_config,
available_ports.get_next_port(),
);
&mut available_ports,
)
.pop()
.unwrap();
let ports = available_ports.get_next_ports(2);
let (mut network_configs, broadcast_channels) =
create_network_configs_connected_to_broadcast_channels::<RpcTransactionWrapper>(
Expand Down
30 changes: 26 additions & 4 deletions crates/starknet_sequencer_node/src/test_utils/node_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ impl NodeRunner {
}

pub fn get_description(&self) -> String {
self.description.clone()
self.description.to_string()
}
}

Expand All @@ -46,10 +46,32 @@ async fn spawn_node_child_process(
info!("Getting the node executable.");
let node_executable = get_node_executable_path();

// get sleep amount as integer from env var
info!("Running the node from: {}", node_executable);
let mut node_cmd: Child = create_shell_command(node_executable.as_str())
.arg("--config_file")
.arg(node_config_path.to_str().unwrap())
let start_sleep_dur_env_var: u64 = std::env::var("START_SLEEP_DURATION")
.unwrap_or("30".to_string())
.parse()
.expect("START_SLEEP_DURATION env var not a number");
let sleep_dur_env_var: u64 = std::env::var("SLEEP_DURATION")
.unwrap_or("30".to_string())
.parse()
.expect("SLEEP_AMOUNT env var not a number");

let mut command = node_executable;
// let mut command = format!("sleep {} && ", if node_runner.index == 2 { env_var } else { 0 });
// command.push_str(&node_executable);
command.push_str(" --config_file ");
command.push_str(node_config_path.to_str().unwrap());
if node_runner.index == 2 {
command.push_str(&format!(
" & pid=$!; sleep {start_sleep_dur_env_var}; kill -STOP $pid; sleep \
{sleep_dur_env_var}; kill -CONT $pid; sleep 1000"
));
}
let mut node_cmd: Child = create_shell_command("sh").arg("-c").arg(command)
// let mut node_cmd: Child = create_shell_command(&node_executable)
// .arg("--config_file")
// .arg(node_config_path.to_str().unwrap())
.stderr(Stdio::inherit())
.stdout(Stdio::piped())
.kill_on_drop(true) // Required for stopping when the handle is dropped.
Expand Down