Skip to content
This repository has been archived by the owner on Oct 31, 2024. It is now read-only.

fix: spam every target subnet #487

Closed
wants to merge 16 commits into from
22 changes: 15 additions & 7 deletions crates/topos-certificate-spammer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ pub async fn run(
) -> Result<(), Error> {
// Is list of nodes is specified in the command line use them otherwise use
// config file provided nodes
debug!("{:#?}", args);
let target_nodes = if args.benchmark {
if let (Some(target_hosts), Some(number)) = (args.target_hosts, args.number) {
let uri = target_hosts
Expand Down Expand Up @@ -304,7 +305,7 @@ pub async fn run(
number_of_peer_nodes
);
async {
info!("Starting batch {batch_number}");
info!("Start batch {batch_number}");

let mut batch: Vec<Certificate> = Vec::new(); // Certificates for this batch
for b in 0..args.cert_per_batch {
Expand Down Expand Up @@ -344,12 +345,19 @@ pub async fn run(
// Dispatch certs in this batch
for cert in batch {
// Randomly choose target tce node for every certificate from related source_subnet_id connection list
let target_node_connection = &target_node_connections[&cert.source_subnet_id]
[rand::random::<usize>() % target_nodes.len()];
dispatch(cert, target_node_connection)
.instrument(Span::current())
.with_current_context()
.await;
// let target_node_connection = &target_node_connections[&cert.source_subnet_id]
// [rand::random::<usize>() % target_nodes.len()];

for connection in &target_node_connections[&cert.source_subnet_id] {
info!(
"Sending certificate {cert:?} to target node {}",
connection.address
);
dispatch(cert.clone(), connection)
.instrument(Span::current())
.with_current_context()
.await;
}
}
}
.instrument(span)
Expand Down
10 changes: 5 additions & 5 deletions crates/topos-tce/src/app_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use topos_tce_storage::store::ReadStore;
use topos_tce_storage::types::CertificateDeliveredWithPositions;
use topos_tce_storage::validator::ValidatorStore;
use topos_tce_storage::StorageClient;
use topos_tce_synchronizer::SynchronizerEvent;
// use topos_tce_synchronizer::SynchronizerEvent;
use tracing::{error, info, warn};

mod api;
Expand Down Expand Up @@ -94,7 +94,7 @@ impl AppContext {
mut network_stream: impl Stream<Item = NetEvent> + Unpin,
mut tce_stream: impl Stream<Item = ProtocolEvents> + Unpin,
mut api_stream: impl Stream<Item = ApiEvent> + Unpin,
mut synchronizer_stream: impl Stream<Item = SynchronizerEvent> + Unpin,
// mut synchronizer_stream: impl Stream<Item = SynchronizerEvent> + Unpin,
mut broadcast_stream: impl Stream<Item = CertificateDeliveredWithPositions> + Unpin,
shutdown: (CancellationToken, mpsc::Sender<()>),
) {
Expand Down Expand Up @@ -126,9 +126,9 @@ impl AppContext {
self.on_api_event(event).await;
}

// Synchronizer events
Some(_event) = synchronizer_stream.next() => {
}
// // Synchronizer events
// Some(_event) = synchronizer_stream.next() => {
// }

// Shutdown signal
_ = shutdown.0.cancelled() => {
Expand Down
26 changes: 13 additions & 13 deletions crates/topos-tce/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,17 +151,17 @@ pub async fn run(
let _network_handle = network_runtime.bootstrap(&mut event_stream).await?;
debug!("P2P layer bootstrapped");

debug!("Creating the Synchronizer");

let (synchronizer_runtime, synchronizer_stream) =
topos_tce_synchronizer::Synchronizer::builder()
.with_config(config.synchronization.clone())
.with_shutdown(shutdown.0.child_token())
.with_store(validator_store.clone())
.with_network_client(network_client.clone())
.build()?;

debug!("Synchronizer created");
// debug!("Creating the Synchronizer");
//
// let (synchronizer_runtime, synchronizer_stream) =
// topos_tce_synchronizer::Synchronizer::builder()
// .with_config(config.synchronization.clone())
// .with_shutdown(shutdown.0.child_token())
// .with_store(validator_store.clone())
// .with_network_client(network_client.clone())
// .build()?;
//
// debug!("Synchronizer created");

debug!("Starting gRPC api");
let (broadcast_sender, broadcast_receiver) = broadcast::channel(BROADCAST_CHANNEL_SIZE);
Expand Down Expand Up @@ -203,7 +203,7 @@ pub async fn run(

debug!("Reliable broadcast started");

spawn(synchronizer_runtime.into_future());
// spawn(synchronizer_runtime.into_future());
// setup transport-tce-storage-api connector
let (app_context, _tce_stream) = AppContext::new(
is_validator,
Expand All @@ -220,7 +220,7 @@ pub async fn run(
event_stream,
tce_stream,
api_stream,
synchronizer_stream,
// synchronizer_stream,
BroadcastStream::new(broadcast_receiver).filter_map(|v| futures::future::ready(v.ok())),
shutdown,
))
Expand Down
4 changes: 2 additions & 2 deletions crates/topos-test-sdk/src/tce/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,7 @@ pub async fn start_node(

let (gatekeeper_client, gatekeeper_join_handle) = create_gatekeeper().await.unwrap();

let (synchronizer_stream, synchronizer_join_handle) = create_synchronizer(
let (_synchronizer_stream, synchronizer_join_handle) = create_synchronizer(
gatekeeper_client.clone(),
network_client.clone(),
validator_store.clone(),
Expand Down Expand Up @@ -302,7 +302,7 @@ pub async fn start_node(
network_stream,
tce_stream,
api_stream,
synchronizer_stream,
// synchronizer_stream,
BroadcastStream::new(receiver).filter_map(|v| futures::future::ready(v.ok())),
(shutdown_token, shutdown_sender),
)
Expand Down
Loading