diff --git a/crates/topos-tce-broadcast/src/task_manager/mod.rs b/crates/topos-tce-broadcast/src/task_manager/mod.rs index 07d0038b7..33e5b98cd 100644 --- a/crates/topos-tce-broadcast/src/task_manager/mod.rs +++ b/crates/topos-tce-broadcast/src/task_manager/mod.rs @@ -116,7 +116,6 @@ impl TaskManager { pub async fn run(mut self, shutdown_receiver: CancellationToken) { let mut pending_certificate_interval = tokio::time::interval(Duration::from_millis(10)); - let mut double_echo_messages_interval = tokio::time::interval(Duration::from_millis(50)); loop { tokio::select! { @@ -126,34 +125,32 @@ impl TaskManager { self.next_pending_certificate(); } - _ = double_echo_messages_interval.tick() => { - if let Some(msg) = self.message_receiver.recv().await { - match msg { - DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { - if self.delivered_certificates.contains(&certificate_id) { - trace!("Received message for certificate {} that has already been delivered", certificate_id); - continue; - } + Some(msg) = self.message_receiver.recv() => { + match msg { + DoubleEchoCommand::Echo { certificate_id, .. } | DoubleEchoCommand::Ready { certificate_id, .. } => { + if self.delivered_certificates.contains(&certificate_id) { + trace!("Received message for certificate {} that has already been delivered", certificate_id); + continue; + } - if let Some(task_context) = self.tasks.get(&certificate_id) { - _ = task_context.sink.send(msg).await; - } else { - self.buffered_messages - .entry(certificate_id) - .or_default() - .push(msg); - }; + if let Some(task_context) = self.tasks.get(&certificate_id) { + _ = task_context.sink.send(msg).await; + } else { + self.buffered_messages + .entry(certificate_id) + .or_default() + .push(msg); + }; + } + DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { + if self.delivered_certificates.contains(&cert.id) { + trace!("Received message for certificate {} that has already been delivered", cert.id); + continue; } - DoubleEchoCommand::Broadcast { ref cert, need_gossip, pending_id } => { - if self.delivered_certificates.contains(&cert.id) { - trace!("Received message for certificate {} that has already been delivered", cert.id); - continue; - } - trace!("Received broadcast message for certificate {} ", cert.id); + trace!("Received broadcast message for certificate {} ", cert.id); - self.create_task(cert, need_gossip, pending_id) - } + self.create_task(cert, need_gossip, pending_id) } } }