diff --git a/crates/topos-certificate-spammer/src/lib.rs b/crates/topos-certificate-spammer/src/lib.rs index 4e62dbaab..00a237560 100644 --- a/crates/topos-certificate-spammer/src/lib.rs +++ b/crates/topos-certificate-spammer/src/lib.rs @@ -191,6 +191,7 @@ pub async fn run( ) -> Result<(), Error> { // Is list of nodes is specified in the command line use them otherwise use // config file provided nodes + debug!("{:#?}", args); let target_nodes = if args.benchmark { if let (Some(target_hosts), Some(number)) = (args.target_hosts, args.number) { let uri = target_hosts @@ -304,7 +305,7 @@ pub async fn run( number_of_peer_nodes ); async { - info!("Starting batch {batch_number}"); + info!("Start batch {batch_number}"); let mut batch: Vec = Vec::new(); // Certificates for this batch for b in 0..args.cert_per_batch { @@ -344,12 +345,19 @@ pub async fn run( // Dispatch certs in this batch for cert in batch { // Randomly choose target tce node for every certificate from related source_subnet_id connection list - let target_node_connection = &target_node_connections[&cert.source_subnet_id] - [rand::random::() % target_nodes.len()]; - dispatch(cert, target_node_connection) - .instrument(Span::current()) - .with_current_context() - .await; + // let target_node_connection = &target_node_connections[&cert.source_subnet_id] + // [rand::random::() % target_nodes.len()]; + + for connection in &target_node_connections[&cert.source_subnet_id] { + info!( + "Sending certificate {cert:?} to target node {}", + connection.address + ); + dispatch(cert.clone(), connection) + .instrument(Span::current()) + .with_current_context() + .await; + } } } .instrument(span) diff --git a/crates/topos-tce/src/app_context.rs b/crates/topos-tce/src/app_context.rs index fedc50b66..95c1baa6d 100644 --- a/crates/topos-tce/src/app_context.rs +++ b/crates/topos-tce/src/app_context.rs @@ -21,7 +21,7 @@ use topos_tce_storage::store::ReadStore; use topos_tce_storage::types::CertificateDeliveredWithPositions; use topos_tce_storage::validator::ValidatorStore; use topos_tce_storage::StorageClient; -use topos_tce_synchronizer::SynchronizerEvent; +// use topos_tce_synchronizer::SynchronizerEvent; use tracing::{error, info, warn}; mod api; @@ -94,7 +94,7 @@ impl AppContext { mut network_stream: impl Stream + Unpin, mut tce_stream: impl Stream + Unpin, mut api_stream: impl Stream + Unpin, - mut synchronizer_stream: impl Stream + Unpin, + // mut synchronizer_stream: impl Stream + Unpin, mut broadcast_stream: impl Stream + Unpin, shutdown: (CancellationToken, mpsc::Sender<()>), ) { @@ -126,9 +126,9 @@ impl AppContext { self.on_api_event(event).await; } - // Synchronizer events - Some(_event) = synchronizer_stream.next() => { - } + // // Synchronizer events + // Some(_event) = synchronizer_stream.next() => { + // } // Shutdown signal _ = shutdown.0.cancelled() => { diff --git a/crates/topos-tce/src/lib.rs b/crates/topos-tce/src/lib.rs index 42f654dc7..fd3183d3e 100644 --- a/crates/topos-tce/src/lib.rs +++ b/crates/topos-tce/src/lib.rs @@ -151,17 +151,17 @@ pub async fn run( let _network_handle = network_runtime.bootstrap(&mut event_stream).await?; debug!("P2P layer bootstrapped"); - debug!("Creating the Synchronizer"); - - let (synchronizer_runtime, synchronizer_stream) = - topos_tce_synchronizer::Synchronizer::builder() - .with_config(config.synchronization.clone()) - .with_shutdown(shutdown.0.child_token()) - .with_store(validator_store.clone()) - .with_network_client(network_client.clone()) - .build()?; - - debug!("Synchronizer created"); + // debug!("Creating the Synchronizer"); + // + // let (synchronizer_runtime, synchronizer_stream) = + // topos_tce_synchronizer::Synchronizer::builder() + // .with_config(config.synchronization.clone()) + // .with_shutdown(shutdown.0.child_token()) + // .with_store(validator_store.clone()) + // .with_network_client(network_client.clone()) + // .build()?; + // + // debug!("Synchronizer created"); debug!("Starting gRPC api"); let (broadcast_sender, broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE); @@ -203,7 +203,7 @@ pub async fn run( debug!("Reliable broadcast started"); - spawn(synchronizer_runtime.into_future()); + // spawn(synchronizer_runtime.into_future()); // setup transport-tce-storage-api connector let (app_context, _tce_stream) = AppContext::new( is_validator, @@ -220,7 +220,7 @@ pub async fn run( event_stream, tce_stream, api_stream, - synchronizer_stream, + // synchronizer_stream, BroadcastStream::new(broadcast_receiver).filter_map(|v| futures::future::ready(v.ok())), shutdown, )) diff --git a/crates/topos-test-sdk/src/tce/mod.rs b/crates/topos-test-sdk/src/tce/mod.rs index 2faaed193..3e9728b5f 100644 --- a/crates/topos-test-sdk/src/tce/mod.rs +++ b/crates/topos-test-sdk/src/tce/mod.rs @@ -273,7 +273,7 @@ pub async fn start_node( let (gatekeeper_client, gatekeeper_join_handle) = create_gatekeeper().await.unwrap(); - let (synchronizer_stream, synchronizer_join_handle) = create_synchronizer( + let (_synchronizer_stream, synchronizer_join_handle) = create_synchronizer( gatekeeper_client.clone(), network_client.clone(), validator_store.clone(), @@ -302,7 +302,7 @@ pub async fn start_node( network_stream, tce_stream, api_stream, - synchronizer_stream, + // synchronizer_stream, BroadcastStream::new(receiver).filter_map(|v| futures::future::ready(v.ok())), (shutdown_token, shutdown_sender), )