diff --git a/elfo-core/src/actor_status.rs b/elfo-core/src/actor_status.rs index 9264d0bc..6d62a08f 100644 --- a/elfo-core/src/actor_status.rs +++ b/elfo-core/src/actor_status.rs @@ -1,5 +1,7 @@ -use std::sync::atomic::{self, AtomicU8}; -use std::{fmt, mem}; +use std::{ + fmt, mem, + sync::atomic::{self, AtomicU8}, +}; use serde::{Deserialize, Serialize}; @@ -142,8 +144,9 @@ impl AtomicActorStatusKind { let result = self.0.load(ordering); // SAFETY: `ActorStatusKind` has `#[repr(u8)]` annotation. The only - // place where value may be changed is `Self::store`, which consumes `ActorStatusKind`, thus, - // guarantees that possibly invalid value cannot be stored + // place where value may be changed is `Self::store`, which consumes + // `ActorStatusKind`, thus, guarantees that possibly invalid value + // cannot be stored unsafe { mem::transmute::(result) } } } diff --git a/elfo-core/src/context.rs b/elfo-core/src/context.rs index 8df2a48a..735ef0f0 100644 --- a/elfo-core/src/context.rs +++ b/elfo-core/src/context.rs @@ -32,6 +32,7 @@ use crate::{ use self::stats::Stats; +mod send; mod stats; static DUMPER: Lazy = Lazy::new(|| Dumper::new(INTERNAL_CLASS)); @@ -210,9 +211,8 @@ impl Context { /// ``` /// /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html - pub async fn send(&self, message: M) -> Result<(), SendError> { - let kind = MessageKind::regular(self.actor_addr); - self.do_send_async(message, kind).await + pub fn send(&self, message: M) -> send::Send<'_, M, C, K> { + send::Send::new(message, self) } /// Tries to send a message using the [inter-group routing] system. @@ -244,59 +244,7 @@ impl Context { /// /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html pub fn try_send(&self, message: M) -> Result<(), TrySendError> { - // XXX: avoid duplication with `unbounded_send()` and `send()`. - - let kind = MessageKind::regular(self.actor_addr); - - self.stats.on_sent_message(&message); // TODO: only if successful? - - trace!("> {:?}", message); - if let Some(permit) = DUMPER.acquire_m(&message) { - permit.record(Dump::message(&message, &kind, Direction::Out)); - } - - let envelope = Envelope::new(message, kind); - let addrs = self.demux.filter(&envelope); - - if addrs.is_empty() { - return Err(TrySendError::Closed(e2m(envelope))); - } - - let guard = EbrGuard::new(); - - if addrs.len() == 1 { - return match self.book.get(addrs[0], &guard) { - Some(object) => object - .try_send(Addr::NULL, envelope) - .map_err(|err| err.map(e2m)), - None => Err(TrySendError::Closed(e2m(envelope))), - }; - } - - let mut unused = None; - let mut has_full = false; - let mut success = false; - - for (addr, envelope) in addrs_with_envelope(envelope, &addrs) { - match self.book.get(addr, &guard) { - Some(object) => match object.try_send(Addr::NULL, envelope) { - Ok(()) => success = true, - Err(err) => { - has_full |= err.is_full(); - unused = Some(err.into_inner()); - } - }, - None => unused = Some(envelope), - }; - } - - if success { - Ok(()) - } else if has_full { - Err(TrySendError::Full(e2m(unused.unwrap()))) - } else { - Err(TrySendError::Closed(e2m(unused.unwrap()))) - } + self.send(message).try_() } /// Sends a message using the [inter-group routing] system. @@ -331,51 +279,7 @@ impl Context { /// /// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html pub fn unbounded_send(&self, message: M) -> Result<(), SendError> { - let kind = MessageKind::regular(self.actor_addr); - - self.stats.on_sent_message(&message); // TODO: only if successful? - - trace!("> {:?}", message); - if let Some(permit) = DUMPER.acquire_m(&message) { - permit.record(Dump::message(&message, &kind, Direction::Out)); - } - - let envelope = Envelope::new(message, kind); - let addrs = self.demux.filter(&envelope); - - if addrs.is_empty() { - return Err(SendError(e2m(envelope))); - } - - let guard = EbrGuard::new(); - - if addrs.len() == 1 { - return match self.book.get(addrs[0], &guard) { - Some(object) => object - .unbounded_send(Addr::NULL, envelope) - .map_err(|err| err.map(e2m)), - None => Err(SendError(e2m(envelope))), - }; - } - - let mut unused = None; - let mut success = false; - - for (addr, envelope) in addrs_with_envelope(envelope, &addrs) { - match self.book.get(addr, &guard) { - Some(object) => match object.unbounded_send(Addr::NULL, envelope) { - Ok(()) => success = true, - Err(err) => unused = Some(err.into_inner()), - }, - None => unused = Some(envelope), - }; - } - - if success { - Ok(()) - } else { - Err(SendError(e2m(unused.unwrap()))) - } + self.send(message).unbounded() } /// Returns a request builder to send a request (on `resolve()`) using @@ -508,17 +412,8 @@ impl Context { /// } /// # } /// ``` - pub async fn send_to( - &self, - recipient: Addr, - message: M, - ) -> Result<(), SendError> { - let kind = MessageKind::regular(self.actor_addr); - self.do_send_to(recipient, message, kind, |object, envelope| { - Object::send(object, recipient, envelope) - })? - .await - .map_err(|err| err.map(e2m)) + pub fn send_to(&self, recipient: Addr, message: M) -> send::Send<'_, M, C, K> { + self.send(message).to(recipient) } /// Tries to send a message to the specified recipient. @@ -553,12 +448,7 @@ impl Context { recipient: Addr, message: M, ) -> Result<(), TrySendError> { - let kind = MessageKind::regular(self.actor_addr); - self.do_send_to(recipient, message, kind, |object, envelope| { - object - .try_send(recipient, envelope) - .map_err(|err| err.map(e2m)) - })? + self.send(message).to(recipient).try_() } /// Sends a message to the specified recipient. @@ -595,12 +485,7 @@ impl Context { recipient: Addr, message: M, ) -> Result<(), SendError> { - let kind = MessageKind::regular(self.actor_addr); - self.do_send_to(recipient, message, kind, |object, envelope| { - object - .unbounded_send(recipient, envelope) - .map_err(|err| err.map(e2m)) - })? + self.send(message).to(recipient).unbounded() } #[inline(always)] diff --git a/elfo-core/src/context/send.rs b/elfo-core/src/context/send.rs new file mode 100644 index 00000000..2f8c0c68 --- /dev/null +++ b/elfo-core/src/context/send.rs @@ -0,0 +1,121 @@ +#![allow(unreachable_pub)] + +use std::{ + future::{Future, IntoFuture}, + marker, + pin::Pin, +}; + +use idr_ebr::EbrGuard; +use tracing::trace; + +use crate::{ + context::{addrs_with_envelope, DUMPER}, + dumping::{Direction, Dump}, + envelope::MessageKind, + errors::{SendError, TrySendError}, + Addr, Context, Envelope, Message, + _priv::Object, +}; + +use super::e2m; + +pub struct Send<'a, M, C, K> { + ctx: &'a Context, + dest: Option, + msg: M, +} + +impl<'a, M, C, K> IntoFuture for Send<'a, M, C, K> +where + C: marker::Send + marker::Sync, + K: marker::Sync, + M: Message, +{ + type IntoFuture = Pin + marker::Send + 'a>>; + type Output = Result<(), SendError>; + + fn into_future(self) -> Self::IntoFuture { + let Self { ctx, dest, msg } = self; + let kind = MessageKind::regular(ctx.actor_addr); + if let Some(addr) = dest { + let fut = async move { + ctx.do_send_to(addr, msg, kind, |object, envelope| { + Object::send(object, addr, envelope) + })? + .await + .map_err(|err| err.map(e2m)) + }; + Box::pin(fut) + } else { + Box::pin(ctx.do_send_async(msg, kind)) + } + } +} + +impl<'a, M, C, K> Send<'a, M, C, K> +where + M: Message, +{ + pub fn unbounded(self) -> Result<(), SendError> { + let Self { ctx, dest, msg } = self; + let kind = MessageKind::regular(ctx.actor_addr); + + if let Some(dest) = dest { + ctx.do_send_to(dest, msg, kind, |obj, env| { + obj.unbounded_send(dest, env).map_err(|err| err.map(e2m)) + })? + } else { + ctx.stats.on_sent_message(&msg); + trace!("> {msg:?}"); + if let Some(permit) = DUMPER.acquire_m(&msg) { + permit.record(Dump::message(&msg, &kind, Direction::Out)); + } + + let env = Envelope::new(msg, kind); + let guard = EbrGuard::new(); + let addrs = ctx.demux.filter(&env); + + let mut success = false; + let mut unused = None; + + for (addr, env) in addrs_with_envelope(env, &addrs) { + if let Some(obj) = ctx.book.get(addr, &guard) { + match obj.unbounded_send(Addr::NULL, env) { + Err(e) => unused = Some(e.into_inner()), + Ok(()) => success = true, + } + } + } + + if success { + Ok(()) + } else { + Err(SendError(e2m(unused.unwrap()))) + } + } + } + + pub fn try_(self) -> Result<(), TrySendError> { + if let Some(addr) = self.dest { + self.ctx.try_send_to(addr, self.msg) + } else { + self.ctx.try_send(self.msg) + } + } +} + +impl<'a, M, C, K> Send<'a, M, C, K> { + pub fn new(msg: M, ctx: &'a Context) -> Self { + Self { + msg, + ctx, + dest: None, + } + } + + pub fn to(mut self, dest: Addr) -> Self { + self.dest = Some(dest); + self + } +} diff --git a/elfo-core/src/init.rs b/elfo-core/src/init.rs index 37c013a3..56fa1303 100644 --- a/elfo-core/src/init.rs +++ b/elfo-core/src/init.rs @@ -1,5 +1,5 @@ use std::{ - future::Future, + future::{Future, IntoFuture}, sync::Arc, time::{Duration, SystemTime}, }; @@ -336,7 +336,7 @@ async fn terminate_group(ctx: &Context, addr: Addr, name: String, started_at: In // Terminate::default info!(group = %name, "sending polite Terminate"); - let fut = ctx.send_to(addr, Terminate::default()); + let fut = ctx.send_to(addr, Terminate::default()).into_future(); if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() { let elapsed = started_at.elapsed(); @@ -354,7 +354,7 @@ async fn terminate_group(ctx: &Context, addr: Addr, name: String, started_at: In group = %name, elapsed = ?started_at.elapsed(), ); - let fut = ctx.send_to(addr, Terminate::closing()); + let fut = ctx.send_to(addr, Terminate::closing()).into_future(); if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() { let elapsed = started_at.elapsed();