From 411d9121addeb87866d0178d27e808941272e225 Mon Sep 17 00:00:00 2001 From: erer1243 <1377477+erer1243@users.noreply.github.com> Date: Mon, 25 Nov 2024 13:52:13 -0600 Subject: [PATCH] fix after rebase --- crates/hamgrd/src/main.rs | 4 ++-- crates/hamgrd/src/runtime.rs | 2 +- crates/hamgrd/src/runtime/resend_queue.rs | 13 +++++++------ 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/crates/hamgrd/src/main.rs b/crates/hamgrd/src/main.rs index fe52b45..c96b729 100644 --- a/crates/hamgrd/src/main.rs +++ b/crates/hamgrd/src/main.rs @@ -3,7 +3,7 @@ mod runtime; use runtime::{Actor, ActorRuntime, Outbox, ResendQueueConfig}; use std::time::Duration; use swbus_edge::simple_client::IncomingMessage; -use swbus_proto::swbus::{MessageId, ServicePath}; +use swbus_proto::swbus::ServicePath; #[tokio::main] async fn main() { @@ -36,7 +36,7 @@ impl Actor for TestActor { println!("Received message {msg:?}") } - async fn handle_message_failure(&mut self, id: MessageId, _outbox: Outbox) { + async fn handle_message_failure(&mut self, id: u64, _outbox: Outbox) { println!("Message failed to send: {id:?}"); } } diff --git a/crates/hamgrd/src/runtime.rs b/crates/hamgrd/src/runtime.rs index 3355b55..e8544c7 100644 --- a/crates/hamgrd/src/runtime.rs +++ b/crates/hamgrd/src/runtime.rs @@ -157,7 +157,7 @@ impl MessageBridge { async fn handle_incoming_message(&mut self, msg: IncomingMessage) { if let MessageBody::Response(RequestResponse { request_id, .. }) = &msg.body { - self.resend_queue.message_acknowledged(request_id.unwrap()); + self.resend_queue.message_acknowledged(*request_id); } self.inbox_tx.send(InboxMessage::Message(msg)).await.unwrap(); } diff --git a/crates/hamgrd/src/runtime/resend_queue.rs b/crates/hamgrd/src/runtime/resend_queue.rs index eb54652..419dddf 100644 --- a/crates/hamgrd/src/runtime/resend_queue.rs +++ b/crates/hamgrd/src/runtime/resend_queue.rs @@ -4,9 +4,10 @@ use std::{ time::Duration, }; -use futures::future::Either; -use swbus_proto::swbus::{MessageId, SwbusMessage}; -use tokio::{sync::Notify, time::Instant}; +use swbus_proto::swbus::SwbusMessage; +use tokio::time::Instant; + +type MessageId = u64; /// Settings that determine the behavior of [`ResendQueue`]. #[derive(Debug, Copy, Clone)] @@ -41,7 +42,7 @@ impl ResendQueue { /// Add a message to the ResendQueue. Assumes the message has already been sent once. pub fn enqueue(&mut self, message: SwbusMessage) { - let id = message.header.as_ref().unwrap().id.unwrap(); + let id = message.header.as_ref().unwrap().id; let strong_message = Arc::new(Box::new(message)); let weak_message = Arc::downgrade(&strong_message); self.unacked_messages.insert(id, strong_message); @@ -78,7 +79,7 @@ impl ResendQueue { Some(msg) if queued_msg.tries >= self.config.max_tries => { // This message has been resent too may times. // We are going to drop it, and tell the caller. - let id = msg.header.as_ref().unwrap().id.unwrap(); + let id = msg.header.as_ref().unwrap().id; self.unacked_messages.remove(&id); return Some(ResendMessage::TooManyTries(id)); } @@ -110,7 +111,7 @@ impl ResendQueue { /// Signal that a message was acknowledged and no longer needs to be resent. Removes the message /// with this id from the resend queue. - pub fn message_acknowledged(&mut self, id: MessageId) { + pub fn message_acknowledged(&mut self, id: u64) { self.unacked_messages.remove(&id); } }