Skip to content

Commit

Permalink
fix after rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
erer1243 committed Nov 26, 2024
1 parent 289296e commit 411d912
Show file tree
Hide file tree
Showing 3 changed files with 10 additions and 9 deletions.
4 changes: 2 additions & 2 deletions crates/hamgrd/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod runtime;
use runtime::{Actor, ActorRuntime, Outbox, ResendQueueConfig};
use std::time::Duration;
use swbus_edge::simple_client::IncomingMessage;
use swbus_proto::swbus::{MessageId, ServicePath};
use swbus_proto::swbus::ServicePath;

#[tokio::main]
async fn main() {
Expand Down Expand Up @@ -36,7 +36,7 @@ impl Actor for TestActor {
println!("Received message {msg:?}")
}

async fn handle_message_failure(&mut self, id: MessageId, _outbox: Outbox) {
async fn handle_message_failure(&mut self, id: u64, _outbox: Outbox) {
println!("Message failed to send: {id:?}");
}
}
2 changes: 1 addition & 1 deletion crates/hamgrd/src/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ impl MessageBridge {

async fn handle_incoming_message(&mut self, msg: IncomingMessage) {
if let MessageBody::Response(RequestResponse { request_id, .. }) = &msg.body {
self.resend_queue.message_acknowledged(request_id.unwrap());
self.resend_queue.message_acknowledged(*request_id);
}
self.inbox_tx.send(InboxMessage::Message(msg)).await.unwrap();
}
Expand Down
13 changes: 7 additions & 6 deletions crates/hamgrd/src/runtime/resend_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@ use std::{
time::Duration,
};

use futures::future::Either;
use swbus_proto::swbus::{MessageId, SwbusMessage};
use tokio::{sync::Notify, time::Instant};
use swbus_proto::swbus::SwbusMessage;
use tokio::time::Instant;

type MessageId = u64;

/// Settings that determine the behavior of [`ResendQueue`].
#[derive(Debug, Copy, Clone)]
Expand Down Expand Up @@ -41,7 +42,7 @@ impl ResendQueue {

/// Add a message to the ResendQueue. Assumes the message has already been sent once.
pub fn enqueue(&mut self, message: SwbusMessage) {
let id = message.header.as_ref().unwrap().id.unwrap();
let id = message.header.as_ref().unwrap().id;
let strong_message = Arc::new(Box::new(message));
let weak_message = Arc::downgrade(&strong_message);
self.unacked_messages.insert(id, strong_message);
Expand Down Expand Up @@ -78,7 +79,7 @@ impl ResendQueue {
Some(msg) if queued_msg.tries >= self.config.max_tries => {
// This message has been resent too may times.
// We are going to drop it, and tell the caller.
let id = msg.header.as_ref().unwrap().id.unwrap();
let id = msg.header.as_ref().unwrap().id;
self.unacked_messages.remove(&id);
return Some(ResendMessage::TooManyTries(id));
}
Expand Down Expand Up @@ -110,7 +111,7 @@ impl ResendQueue {

/// Signal that a message was acknowledged and no longer needs to be resent. Removes the message
/// with this id from the resend queue.
pub fn message_acknowledged(&mut self, id: MessageId) {
pub fn message_acknowledged(&mut self, id: u64) {
self.unacked_messages.remove(&id);
}
}
Expand Down

0 comments on commit 411d912

Please sign in to comment.