Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): implement Context::send in the builder-like manner #142

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions elfo-core/src/actor_status.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
use std::sync::atomic::{self, AtomicU8};
use std::{fmt, mem};
use std::{
fmt, mem,
sync::atomic::{self, AtomicU8},
};

use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -142,8 +144,9 @@ impl AtomicActorStatusKind {
let result = self.0.load(ordering);

// SAFETY: `ActorStatusKind` has `#[repr(u8)]` annotation. The only
// place where value may be changed is `Self::store`, which consumes `ActorStatusKind`, thus,
// guarantees that possibly invalid value cannot be stored
// place where value may be changed is `Self::store`, which consumes
// `ActorStatusKind`, thus, guarantees that possibly invalid value
// cannot be stored
unsafe { mem::transmute::<u8, ActorStatusKind>(result) }
}
}
133 changes: 9 additions & 124 deletions elfo-core/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::{

use self::stats::Stats;

mod send;
mod stats;

static DUMPER: Lazy<Dumper> = Lazy::new(|| Dumper::new(INTERNAL_CLASS));
Expand Down Expand Up @@ -210,9 +211,8 @@ impl<C, K> Context<C, K> {
/// ```
///
/// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html
pub async fn send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
let kind = MessageKind::regular(self.actor_addr);
self.do_send_async(message, kind).await
pub fn send<M: Message>(&self, message: M) -> send::Send<'_, M, C, K> {
send::Send::new(message, self)
}

/// Tries to send a message using the [inter-group routing] system.
Expand Down Expand Up @@ -244,59 +244,7 @@ impl<C, K> Context<C, K> {
///
/// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html
pub fn try_send<M: Message>(&self, message: M) -> Result<(), TrySendError<M>> {
// XXX: avoid duplication with `unbounded_send()` and `send()`.

let kind = MessageKind::regular(self.actor_addr);

self.stats.on_sent_message(&message); // TODO: only if successful?

trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind);
let addrs = self.demux.filter(&envelope);

if addrs.is_empty() {
return Err(TrySendError::Closed(e2m(envelope)));
}

let guard = EbrGuard::new();

if addrs.len() == 1 {
return match self.book.get(addrs[0], &guard) {
Some(object) => object
.try_send(Addr::NULL, envelope)
.map_err(|err| err.map(e2m)),
None => Err(TrySendError::Closed(e2m(envelope))),
};
}

let mut unused = None;
let mut has_full = false;
let mut success = false;

for (addr, envelope) in addrs_with_envelope(envelope, &addrs) {
match self.book.get(addr, &guard) {
Some(object) => match object.try_send(Addr::NULL, envelope) {
Ok(()) => success = true,
Err(err) => {
has_full |= err.is_full();
unused = Some(err.into_inner());
}
},
None => unused = Some(envelope),
};
}

if success {
Ok(())
} else if has_full {
Err(TrySendError::Full(e2m(unused.unwrap())))
} else {
Err(TrySendError::Closed(e2m(unused.unwrap())))
}
self.send(message).try_()
}

/// Sends a message using the [inter-group routing] system.
Expand Down Expand Up @@ -331,51 +279,7 @@ impl<C, K> Context<C, K> {
///
/// [inter-group routing]: https://actoromicon.rs/ch04-01-routing.html
pub fn unbounded_send<M: Message>(&self, message: M) -> Result<(), SendError<M>> {
let kind = MessageKind::regular(self.actor_addr);

self.stats.on_sent_message(&message); // TODO: only if successful?

trace!("> {:?}", message);
if let Some(permit) = DUMPER.acquire_m(&message) {
permit.record(Dump::message(&message, &kind, Direction::Out));
}

let envelope = Envelope::new(message, kind);
let addrs = self.demux.filter(&envelope);

if addrs.is_empty() {
return Err(SendError(e2m(envelope)));
}

let guard = EbrGuard::new();

if addrs.len() == 1 {
return match self.book.get(addrs[0], &guard) {
Some(object) => object
.unbounded_send(Addr::NULL, envelope)
.map_err(|err| err.map(e2m)),
None => Err(SendError(e2m(envelope))),
};
}

let mut unused = None;
let mut success = false;

for (addr, envelope) in addrs_with_envelope(envelope, &addrs) {
match self.book.get(addr, &guard) {
Some(object) => match object.unbounded_send(Addr::NULL, envelope) {
Ok(()) => success = true,
Err(err) => unused = Some(err.into_inner()),
},
None => unused = Some(envelope),
};
}

if success {
Ok(())
} else {
Err(SendError(e2m(unused.unwrap())))
}
self.send(message).unbounded()
}

/// Returns a request builder to send a request (on `resolve()`) using
Expand Down Expand Up @@ -508,17 +412,8 @@ impl<C, K> Context<C, K> {
/// }
/// # }
/// ```
pub async fn send_to<M: Message>(
&self,
recipient: Addr,
message: M,
) -> Result<(), SendError<M>> {
let kind = MessageKind::regular(self.actor_addr);
self.do_send_to(recipient, message, kind, |object, envelope| {
Object::send(object, recipient, envelope)
})?
.await
.map_err(|err| err.map(e2m))
pub fn send_to<M: Message>(&self, recipient: Addr, message: M) -> send::Send<'_, M, C, K> {
self.send(message).to(recipient)
}

/// Tries to send a message to the specified recipient.
Expand Down Expand Up @@ -553,12 +448,7 @@ impl<C, K> Context<C, K> {
recipient: Addr,
message: M,
) -> Result<(), TrySendError<M>> {
let kind = MessageKind::regular(self.actor_addr);
self.do_send_to(recipient, message, kind, |object, envelope| {
object
.try_send(recipient, envelope)
.map_err(|err| err.map(e2m))
})?
self.send(message).to(recipient).try_()
}

/// Sends a message to the specified recipient.
Expand Down Expand Up @@ -595,12 +485,7 @@ impl<C, K> Context<C, K> {
recipient: Addr,
message: M,
) -> Result<(), SendError<M>> {
let kind = MessageKind::regular(self.actor_addr);
self.do_send_to(recipient, message, kind, |object, envelope| {
object
.unbounded_send(recipient, envelope)
.map_err(|err| err.map(e2m))
})?
self.send(message).to(recipient).unbounded()
}

#[inline(always)]
Expand Down
121 changes: 121 additions & 0 deletions elfo-core/src/context/send.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
#![allow(unreachable_pub)]

use std::{
future::{Future, IntoFuture},
marker,
pin::Pin,
};

use idr_ebr::EbrGuard;
use tracing::trace;

use crate::{
context::{addrs_with_envelope, DUMPER},
dumping::{Direction, Dump},
envelope::MessageKind,
errors::{SendError, TrySendError},
Addr, Context, Envelope, Message,
_priv::Object,
};

use super::e2m;

pub struct Send<'a, M, C, K> {
ctx: &'a Context<C, K>,
dest: Option<Addr>,
msg: M,
}

impl<'a, M, C, K> IntoFuture for Send<'a, M, C, K>
where
C: marker::Send + marker::Sync,
K: marker::Sync,
M: Message,
{
type IntoFuture = Pin<Box<dyn Future<Output = Self::Output> + marker::Send + 'a>>;
type Output = Result<(), SendError<M>>;

fn into_future(self) -> Self::IntoFuture {
let Self { ctx, dest, msg } = self;
let kind = MessageKind::regular(ctx.actor_addr);
if let Some(addr) = dest {
let fut = async move {
ctx.do_send_to(addr, msg, kind, |object, envelope| {
Object::send(object, addr, envelope)
})?
.await
.map_err(|err| err.map(e2m))
};
Box::pin(fut)
} else {
Box::pin(ctx.do_send_async(msg, kind))
}
}
}

impl<'a, M, C, K> Send<'a, M, C, K>
where
M: Message,
{
pub fn unbounded(self) -> Result<(), SendError<M>> {
let Self { ctx, dest, msg } = self;
let kind = MessageKind::regular(ctx.actor_addr);

if let Some(dest) = dest {
ctx.do_send_to(dest, msg, kind, |obj, env| {
obj.unbounded_send(dest, env).map_err(|err| err.map(e2m))
})?
} else {
ctx.stats.on_sent_message(&msg);
trace!("> {msg:?}");
if let Some(permit) = DUMPER.acquire_m(&msg) {
permit.record(Dump::message(&msg, &kind, Direction::Out));
}

let env = Envelope::new(msg, kind);
let guard = EbrGuard::new();
let addrs = ctx.demux.filter(&env);

let mut success = false;
let mut unused = None;

for (addr, env) in addrs_with_envelope(env, &addrs) {
if let Some(obj) = ctx.book.get(addr, &guard) {
match obj.unbounded_send(Addr::NULL, env) {
Err(e) => unused = Some(e.into_inner()),
Ok(()) => success = true,
}
}
}

if success {
Ok(())
} else {
Err(SendError(e2m(unused.unwrap())))
}
}
}

pub fn try_(self) -> Result<(), TrySendError<M>> {
if let Some(addr) = self.dest {
self.ctx.try_send_to(addr, self.msg)
} else {
self.ctx.try_send(self.msg)
}
}
}

impl<'a, M, C, K> Send<'a, M, C, K> {
pub fn new(msg: M, ctx: &'a Context<C, K>) -> Self {
Self {
msg,
ctx,
dest: None,
}
}

pub fn to(mut self, dest: Addr) -> Self {
self.dest = Some(dest);
self
}
}
6 changes: 3 additions & 3 deletions elfo-core/src/init.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
future::Future,
future::{Future, IntoFuture},
sync::Arc,
time::{Duration, SystemTime},
};
Expand Down Expand Up @@ -336,7 +336,7 @@ async fn terminate_group(ctx: &Context, addr: Addr, name: String, started_at: In
// Terminate::default

info!(group = %name, "sending polite Terminate");
let fut = ctx.send_to(addr, Terminate::default());
let fut = ctx.send_to(addr, Terminate::default()).into_future();

if timeout(SEND_CLOSING_TERMINATE_AFTER, fut).await.is_ok() {
let elapsed = started_at.elapsed();
Expand All @@ -354,7 +354,7 @@ async fn terminate_group(ctx: &Context, addr: Addr, name: String, started_at: In
group = %name,
elapsed = ?started_at.elapsed(),
);
let fut = ctx.send_to(addr, Terminate::closing());
let fut = ctx.send_to(addr, Terminate::closing()).into_future();

if timeout(STOP_GROUP_TERMINATION_AFTER, fut).await.is_ok() {
let elapsed = started_at.elapsed();
Expand Down
Loading