From d660666fe3e32e4de409e53cadf461d14f9434cc Mon Sep 17 00:00:00 2001 From: hauke strasdat Date: Sat, 13 Jan 2024 18:14:42 -0800 Subject: [PATCH] feat: request reply & refactor --- examples/one_dim_robot.rs | 3 + hollywood_macros/src/lib.rs | 24 ++- src/{actors/mod.rs => actors.rs} | 0 src/actors/periodic.rs | 108 ++++++------- src/actors/printer.rs | 32 ++-- src/{compute/mod.rs => compute.rs} | 0 src/compute/context.rs | 8 +- src/compute/pipeline.rs | 3 +- src/compute/topology.rs | 2 +- src/{core/mod.rs => core.rs} | 13 +- src/core/actor.rs | 165 ++++++++++---------- src/core/actor_builder.rs | 44 +++--- src/core/connection.rs | 34 +++++ src/core/connection/outbound_connection.rs | 100 ++++++++++++ src/core/connection/request_connection.rs | 135 ++++++++++++++++ src/core/inbound.rs | 118 +++++++++----- src/core/outbound.rs | 111 +------------- src/core/request.rs | 170 +++++++++++++++++++++ src/core/runner.rs | 95 +++++++----- src/core/value.rs | 6 - src/{examples/mod.rs => examples.rs} | 1 - src/examples/moving_average/mod.rs | 32 ++-- src/examples/one_dim_robot/draw.rs | 18 ++- src/examples/one_dim_robot/filter.rs | 41 +++-- src/examples/one_dim_robot/sim.rs | 69 +++++++-- src/{introspect/mod.rs => introspect.rs} | 0 src/lib.rs | 1 - 27 files changed, 918 insertions(+), 415 deletions(-) rename src/{actors/mod.rs => actors.rs} (100%) rename src/{compute/mod.rs => compute.rs} (100%) rename src/{core/mod.rs => core.rs} (78%) create mode 100644 src/core/connection.rs create mode 100644 src/core/connection/outbound_connection.rs create mode 100644 src/core/connection/request_connection.rs create mode 100644 src/core/request.rs rename src/{examples/mod.rs => examples.rs} (98%) rename src/{introspect/mod.rs => introspect.rs} (100%) diff --git a/examples/one_dim_robot.rs b/examples/one_dim_robot.rs index cc86f77..1be0529 100644 --- a/examples/one_dim_robot.rs +++ b/examples/one_dim_robot.rs @@ -63,6 +63,9 @@ async fn run_robot_example() { sim.outbound .true_robot .connect(context, &mut truth_printer.inbound.printable); + + + sim.request.ask_for_time.connect(context, &mut filter.inbound.request); context.register_cancel_requester(&mut sim.outbound.cancel_request); filter diff --git a/hollywood_macros/src/lib.rs b/hollywood_macros/src/lib.rs index 059f091..b32245a 100644 --- a/hollywood_macros/src/lib.rs +++ b/hollywood_macros/src/lib.rs @@ -116,6 +116,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream { let prop_type = &args.prop_type; let state_type = &args.state_type; let output_type = &args.output_type; + let request_type = &args.request_type; let inbound = fields.iter().map(|variant| { let variant_name = variant.ident.clone(); @@ -197,6 +198,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream { type Prop = #prop_type; type State = #state_type; type OutboundHub = #output_type; + type RequestHub = #request_type; fn inbound_channel(&self) -> String { match self { @@ -205,9 +207,9 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream { } } - impl InboundHub<#prop_type, #state_type, #output_type, #name> for #struct_name { + impl InboundHub<#prop_type, #state_type, #output_type, #request_type,#name> for #struct_name { - fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type, #name>, + fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type,#request_type, #name>, actor_name: &str) -> Self { #(#from_builder_inbounds)* @@ -258,6 +260,8 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { let mut maybe_inbounds = None; let mut maybe_state = None; let mut maybe_outputs = None; + let mut maybe_requests = None; + if let Item::Type(item_type) = inbound_clone { if let Type::Path(type_path) = *item_type.ty { if type_path.path.segments.last().unwrap().ident != "Actor" { @@ -267,10 +271,10 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { } for segment in type_path.path.segments { if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments { - if angle_bracketed_args.args.len() != 4 { + if angle_bracketed_args.args.len() != 5 { return Error::new_spanned( &angle_bracketed_args, - "Expected three type arguments: Actor", + "Expected 5 type arguments: Actor", ) .to_compile_error() .into(); @@ -279,6 +283,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { maybe_inbounds = Some(angle_bracketed_args.args[1].clone()); maybe_state = Some(angle_bracketed_args.args[2].clone()); maybe_outputs = Some(angle_bracketed_args.args[3].clone()); + maybe_requests = Some(angle_bracketed_args.args[4].clone()); } } } else { @@ -294,16 +299,17 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream { let inbound = maybe_inbounds.unwrap(); let state_type = maybe_state.unwrap(); let out = maybe_outputs.unwrap(); + let requests = maybe_requests.unwrap(); - let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out> }; + let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> }; let gen = quote! { /// #( #attrs )* - pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>; + pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>; - impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type> + impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type> for #actor_name { fn name_hint(prop: &#prop) -> String { @@ -320,6 +326,7 @@ struct ActorInbound { prop_type: Ident, state_type: Ident, output_type: Ident, + request_type: Ident, } impl Parse for ActorInbound { @@ -333,11 +340,14 @@ impl Parse for ActorInbound { let state_type: Ident = content.parse()?; let _: Token![,] = content.parse()?; let output_type: Ident = content.parse()?; + let _: Token![,] = content.parse()?; + let request_type: Ident = content.parse()?; Ok(ActorInbound { struct_name, prop_type, state_type, output_type, + request_type }) } } diff --git a/src/actors/mod.rs b/src/actors.rs similarity index 100% rename from src/actors/mod.rs rename to src/actors.rs diff --git a/src/actors/periodic.rs b/src/actors/periodic.rs index aa85cdd..4d9f5fc 100644 --- a/src/actors/periodic.rs +++ b/src/actors/periodic.rs @@ -3,30 +3,27 @@ use std::sync::Arc; use async_trait::async_trait; use crate::compute::context::Context; +use crate::core::connection::ConnectionEnum; + +use crate::core::request::NullRequest; use crate::core::{ - actor::{FromPropState, ActorNode, DormantActorNode, GenericActor}, + actor::{ActorNode, FromPropState, GenericActor}, inbound::{ForwardMessage, NullInbound, NullMessage}, - outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub}, + outbound::{Morph, OutboundChannel, OutboundHub}, runner::Runner, - value::Value, }; -use crate::macros::*; - - -/// Outbound hub of periodic actor, which consists of a single outbound channel. -#[actor_outputs] -pub struct PeriodicOutbound { - /// Time stamp outbound channel, which sends a messages every `period` - /// seconds with the current time stamp. - pub time_stamp: OutboundChannel, -} - /// A periodic actor. /// /// This is an actor that periodically sends a message to its outbound. -pub type Periodic = - GenericActor; +pub type Periodic = GenericActor< + PeriodicProp, + NullInbound, + PeriodicState, + PeriodicOutbound, + NullRequest, + PeriodicRunner, +>; impl Periodic { /// Create a new periodic actor, with a period of `period` seconds. @@ -51,7 +48,8 @@ impl NullInbound, PeriodicState, PeriodicOutbound, - NullMessage, + NullMessage, + NullRequest, PeriodicRunner, > for Periodic { @@ -76,8 +74,6 @@ impl Default for PeriodicProp { } } -impl Value for PeriodicProp {} - /// State of the periodic actor. #[derive(Clone, Debug)] pub struct PeriodicState { @@ -94,9 +90,32 @@ impl Default for PeriodicState { } } -impl Value for PeriodicState {} +/// Outbound hub of periodic actor, which consists of a single outbound channel. +pub struct PeriodicOutbound { + /// Time stamp outbound channel, which sends a messages every `period` + /// seconds with the current time stamp. + pub time_stamp: OutboundChannel, +} + +impl Morph for PeriodicOutbound { + fn extract(&mut self) -> Self { + Self { + time_stamp: self.time_stamp.extract(), + } + } + fn activate(&mut self) { + self.time_stamp.activate(); + } +} +impl OutboundHub for PeriodicOutbound { + fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { + Self { + time_stamp: OutboundChannel::::new(context, "time_stamp".to_owned(), actor_name), + } + } +} /// The custom runner for the periodic actor. pub struct PeriodicRunner {} @@ -107,16 +126,17 @@ impl NullInbound, PeriodicState, PeriodicOutbound, - NullMessage, + NullRequest, + NullMessage, > for PeriodicRunner { - /// Create a new dormant actor. - fn new_dormant_actor( + /// Create a new actor node. + fn new_actor_node( name: String, prop: PeriodicProp, state: PeriodicState, _receiver: tokio::sync::mpsc::Receiver< - NullMessage, + NullMessage, >, _forward: std::collections::HashMap< String, @@ -125,54 +145,36 @@ impl PeriodicProp, PeriodicState, PeriodicOutbound, - NullMessage, + NullRequest, + NullMessage, > + Send + Sync, >, >, outbound: PeriodicOutbound, - ) -> Box { - Box::new(DormantPeriodic { + _request: NullRequest, + ) -> Box { + Box::new(PeriodicActor { name: name.clone(), prop, init_state: state.clone(), - outbound, - }) - } -} - -/// The dormant periodic actor. -pub struct DormantPeriodic { - name: String, - prop: PeriodicProp, - init_state: PeriodicState, - outbound: PeriodicOutbound, -} - -impl DormantActorNode for DormantPeriodic { - fn activate(mut self: Box) -> Box { - self.outbound.activate(); - Box::new(ActivePeriodic { - name: self.name.clone(), - prop: self.prop.clone(), - init_state: self.init_state.clone(), state: None, - outbound: Arc::new(self.outbound), + outbound: Some(outbound), }) } } /// The active periodic actor. -pub struct ActivePeriodic { +pub struct PeriodicActor { name: String, prop: PeriodicProp, init_state: PeriodicState, state: Option, - outbound: Arc, + outbound: Option, } #[async_trait] -impl ActorNode for ActivePeriodic { +impl ActorNode for PeriodicActor { fn name(&self) -> &String { &self.name } @@ -182,6 +184,8 @@ impl ActorNode for ActivePeriodic { } async fn run(&mut self, mut kill: tokio::sync::broadcast::Receiver<()>) { + let mut outbound = self.outbound.take().unwrap(); + outbound.activate(); self.reset(); let state = self.state.as_mut().unwrap(); @@ -190,7 +194,7 @@ impl ActorNode for ActivePeriodic { (1000.0 * self.prop.period) as u64, )); - let conns = Arc::new(self.outbound.clone()); + let conns = Arc::new(outbound); loop { interval.tick().await; diff --git a/src/actors/printer.rs b/src/actors/printer.rs index dfeb220..0badfe3 100644 --- a/src/actors/printer.rs +++ b/src/actors/printer.rs @@ -1,8 +1,8 @@ use std::fmt::{Debug, Display}; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, - InboundMessageNew, NullOutbound, NullState, OnMessage, Value, + request::NullRequest, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, + InboundHub, InboundMessage, InboundMessageNew, NullOutbound, NullState, OnMessage, }; /// Configuration properties for the printer actor. @@ -20,8 +20,6 @@ impl Default for PrinterProp { } } -impl Value for PrinterProp {} - /// Inbound message for the printer actor. #[derive(Clone, Debug)] pub enum PrinterInboundMessage { @@ -30,7 +28,13 @@ pub enum PrinterInboundMessage { } impl OnMessage for PrinterInboundMessage { - fn on_message(&self, prop: &PrinterProp, _state: &mut Self::State, _outputs: &Self::OutboundHub) { + fn on_message( + self, + prop: &PrinterProp, + _state: &mut Self::State, + _outputs: &Self::OutboundHub, + _request: &Self::RequestHub, + ) { match self { PrinterInboundMessage::Printable(printable) => { println!("{}: {}", prop.topic, printable); @@ -48,16 +52,17 @@ impl InboundMessageNew } /// Generic printer actor. -pub type Printer = Actor, NullState, NullOutbound>; +pub type Printer = Actor, NullState, NullOutbound, NullRequest>; -impl +impl FromPropState< PrinterProp, PrinterInbound, NullState, NullOutbound, PrinterInboundMessage, - DefaultRunner, NullState, NullOutbound>, + NullRequest, + DefaultRunner, NullState, NullOutbound, NullRequest>, > for Printer { fn name_hint(prop: &PrinterProp) -> String { @@ -77,6 +82,7 @@ impl InboundMessage type Prop = PrinterProp; type State = NullState; type OutboundHub = NullOutbound; + type RequestHub = NullRequest; fn inbound_channel(&self) -> String { match self { @@ -86,11 +92,17 @@ impl InboundMessage } impl - InboundHub> + InboundHub> for PrinterInbound { fn from_builder( - builder: &mut ActorBuilder>, + builder: &mut ActorBuilder< + PrinterProp, + NullState, + NullOutbound, + NullRequest, + PrinterInboundMessage, + >, actor_name: &str, ) -> Self { let m = InboundChannel::new( diff --git a/src/compute/mod.rs b/src/compute.rs similarity index 100% rename from src/compute/mod.rs rename to src/compute.rs diff --git a/src/compute/context.rs b/src/compute/context.rs index 7b28acf..d648967 100644 --- a/src/compute/context.rs +++ b/src/compute/context.rs @@ -3,14 +3,14 @@ use std::sync::Arc; use crate::compute::{CancelRequest, Pipeline, Topology}; use crate::core::{ - DormantActorNode, InboundChannel, InboundMessage, OutboundChannel, OutboundConnection, + InboundChannel, InboundMessage, OutboundChannel, OutboundConnection, ActorNode, }; /// The context of the compute graph which is used to configure the network topology. /// /// It is an opaque type created by the Context::configure() method. pub struct Context { - pub(crate) actors: Vec>, + pub(crate) actors: Vec>, pub(crate) topology: Topology, pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender, pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver, @@ -84,7 +84,7 @@ impl Context { } pub(crate) fn connect_impl< - T: Default + Clone + std::fmt::Debug + Sync + Send + 'static, + T: Clone + std::fmt::Debug + Sync + Send + 'static, M: InboundMessage, >( &mut self, @@ -92,5 +92,5 @@ impl Context { inbound: &mut InboundChannel, ) { self.topology.connect(outbound, inbound); - } + } } diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs index a0c27b7..20256f9 100644 --- a/src/compute/pipeline.rs +++ b/src/compute/pipeline.rs @@ -22,6 +22,7 @@ impl InboundMessage for CancelRequest { type Prop = NullProp; type State = NullState; type OutboundHub = NullOutbound; + type RequestHub = NullOutbound; /// This messages is only meant to use for the cancel request inbound channel of the pipeline. /// Hence, the inbound name is the constant [CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL]. @@ -50,7 +51,7 @@ impl Pipeline { pub(crate) fn from_context(context: Context) -> Self { let mut active = vec![]; for actor in context.actors.into_iter() { - active.push(actor.activate()); + active.push(actor); } let compute_graph = Pipeline { actors: active, diff --git a/src/compute/topology.rs b/src/compute/topology.rs index b4c7da4..2278629 100644 --- a/src/compute/topology.rs +++ b/src/compute/topology.rs @@ -149,7 +149,7 @@ impl Topology { } pub(crate) fn connect< - T: Default + Clone + std::fmt::Debug + Sync + Send + 'static, + T: Clone + std::fmt::Debug + Sync + Send + 'static, M: InboundMessage, >( &mut self, diff --git a/src/core/mod.rs b/src/core.rs similarity index 78% rename from src/core/mod.rs rename to src/core.rs index dd4c299..2047f92 100644 --- a/src/core/mod.rs +++ b/src/core.rs @@ -1,10 +1,9 @@ -#![deny(missing_docs)] //! Core of hollywood actor framework. /// Actor pub mod actor; pub use actor::{Actor, FromPropState}; -pub(crate) use actor::{ActorNode, DormantActorNode}; +pub(crate) use actor::ActorNode; /// Actor builder pub mod actor_builder; @@ -18,15 +17,21 @@ pub use inbound::{ OnMessage, }; -/// OutboundChannel +/// Outbound pub mod outbound; pub(crate) use outbound::OutboundConnection; pub use outbound::{Morph, NullOutbound, OutboundChannel, OutboundHub}; +/// Request +pub mod request; + +/// Connection +pub mod connection; + /// Run pub mod runner; pub use runner::DefaultRunner; /// State pub mod value; -pub use value::{NullProp, NullState, Value}; +pub use value::{NullProp, NullState}; diff --git a/src/core/actor.rs b/src/core/actor.rs index 98660e6..f67f8ba 100644 --- a/src/core/actor.rs +++ b/src/core/actor.rs @@ -1,5 +1,5 @@ use async_trait::async_trait; -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use tokio::select; use crate::compute::context::Context; @@ -8,51 +8,65 @@ use crate::core::{ inbound::{ForwardMessage, InboundHub, InboundMessage}, outbound::OutboundHub, runner::{DefaultRunner, Runner}, - value::Value, }; +use super::request::RequestHub; + /// A generic actor in the hollywood compute graph framework. /// /// An actor consists of its unique name, a set of inbound channels, a set of /// outbound channels as well as its properties, state and runner types. /// /// The generic actor struct is merely a user-facing facade to configure network connections. Actual -/// properties, state and inbound routing is stored in the [DormantActorNode] and [ActorNode] -/// structs. -pub struct GenericActor { +/// properties, state and inbound routing is stored in the [ActorNode] structs. +pub struct GenericActor { /// unique identifier of the actor pub actor_name: String, /// a collection of inbound channels pub inbound: Inbound, /// a collection of outbound channels pub outbound: Outbound, + /// a collection of request channels + pub request: Request, pub(crate) phantom: std::marker::PhantomData<(Prop, State, Run)>, } /// An actor of the default runner type, but otherwise generic over its, prop, state, inbound /// and outbound channel types. -pub type Actor = GenericActor< +pub type Actor = GenericActor< Prop, Inbound, State, OutboundHub, - DefaultRunner, + Request, + DefaultRunner, >; /// New actor from properties and state. pub trait FromPropState< Prop, - Inbound: InboundHub, - State: Value, + Inbound: InboundHub, + State: Default, Outbound: OutboundHub, M: InboundMessage, - Run: Runner, + Request: RequestHub, + Run: Runner, > { /// Produces a hint for the actor. The name_hint is used as a base to /// generate a unique name. fn name_hint(prop: &Prop) -> String; + /// Produces a new actor with default state. + /// + /// Also, a dormant actor node is created added to the context. + fn new_default_init_state( + context: &mut Context, + prop: Prop, + ) -> GenericActor { + Self::from_prop_and_state(context, prop, State::default()) + } + /// Produces a new actor with the given state. /// /// Also, a dormant actor node is created added to the context. @@ -60,29 +74,20 @@ pub trait FromPropState< context: &mut Context, prop: Prop, initial_state: State, - ) -> GenericActor { + ) -> GenericActor { let actor_name = context.add_new_unique_name(Self::name_hint(&prop).to_string()); let out = Outbound::from_context_and_parent(context, &actor_name); let mut builder = ActorBuilder::new(context, &actor_name, prop, initial_state); + let request = Request::from_context_and_parent(&actor_name, &builder.sender); + let inbound = Inbound::from_builder(&mut builder, &actor_name); - builder.build::(inbound, out) + builder.build::(inbound, out, request) } } -/// A dormant actor of the pipeline. -#[async_trait] -pub trait DormantActorNode { - /// An active actor is returned leaving a shell behind. - /// - /// Repeated calls to this method may and will often lead to a panic. This method is not - /// intended to be called directly. It is called by the Pipeline::from_context() construction - /// method. - fn activate(self: Box) -> Box; -} - -/// Active actor node of the pipeline. It is created by the [DormantActorNode::activate()] method. +/// Actor node of the pipeline. It is created by the [Runner::new_actor_node()] method. #[async_trait] pub trait ActorNode { /// Return the actor's name. @@ -93,15 +98,15 @@ pub trait ActorNode { /// Run the actor as a node within the compute pipeline: /// - /// * For each inbound channel there are zero, one or more incoming connections. Messages on + /// * For each inbound channel there are zero, one or more incoming connections. Messages on /// these incoming streams are merged into a single stream. - /// * Messages for all inbound channels are processed sequentially using the - /// [OnMessage::on_message()](crate::core::OnMessage::on_message()) method. Sequential - /// processing is crucial to ensure that the actor's state is updated in a consistent - /// manner. Sequential mutable access to the state is enforced by the borrow checker at + /// * Messages for all inbound channels are processed sequentially using the + /// [OnMessage::on_message()](crate::core::OnMessage::on_message()) method. Sequential + /// processing is crucial to ensure that the actor's state is updated in a consistent + /// manner. Sequential mutable access to the state is enforced by the borrow checker at /// compile time. - /// * Outbound messages are produced by - /// [OnMessage::on_message()](crate::core::OnMessage::on_message()) the method and sent to + /// * Outbound messages are produced by + /// [OnMessage::on_message()](crate::core::OnMessage::on_message()) the method and sent to /// the through the corresponding outbound channel to downstream actors. /// /// Note: It is an async function which returns a future a completion handler. This method is @@ -109,25 +114,34 @@ pub trait ActorNode { async fn run(&mut self, kill: tokio::sync::broadcast::Receiver<()>); } -pub(crate) struct ActorNodeImpl { +/// A table to forward outbound messages to message handlers of downstream actors. +pub type ForwardTable = + HashMap + Send + Sync>>; + +pub(crate) struct ActorNodeImpl { pub(crate) name: String, pub(crate) prop: Prop, pub(crate) init_state: State, pub(crate) state: Option, pub(crate) receiver: Option>, - pub(crate) outbound: Arc, - pub(crate) forward: - HashMap + Send + Sync>>, + pub(crate) outbound: OutboundHub, + pub(crate) request: Request, + pub(crate) forward: ForwardTable, } -impl - ActorNodeImpl +impl + ActorNodeImpl { } #[async_trait] -impl ActorNode - for ActorNodeImpl +impl< + Prop: std::marker::Send + std::marker::Sync + 'static, + State: Clone + std::marker::Send + std::marker::Sync + 'static, + Outbound: OutboundHub, + Request: RequestHub, + M: InboundMessage, + > ActorNode for ActorNodeImpl { fn name(&self) -> &String { &self.name @@ -138,15 +152,21 @@ impl ActorN } async fn run(&mut self, kill: tokio::sync::broadcast::Receiver<()>) { + self.outbound.activate(); + self.request.activate(); + let new_state = self.init_state.clone(); let (state, recv) = on_message( self.name.clone(), - self.receiver.take().unwrap(), &self.prop, - new_state, + OnMessageMutValues { + state: new_state, + receiver: self.receiver.take().unwrap(), + kill, + }, &self.forward, - self.outbound.clone(), - kill, + &self.outbound, + &self.request, ) .await; @@ -155,68 +175,45 @@ impl ActorN } } -pub(crate) struct DormantActorImpl { - pub(crate) name: String, - pub(crate) prop: Prop, - pub(crate) receiver: tokio::sync::mpsc::Receiver, - pub(crate) outbound: OutboundHub, - pub(crate) forward: - HashMap + Send + Sync>>, - pub(crate) init_state: State, -} - -impl DormantActorNode - for DormantActorImpl -{ - fn activate(mut self: Box) -> Box { - self.outbound.activate(); - - Box::new(ActorNodeImpl:: { - name: self.name.clone(), - prop: self.prop.clone(), - state: None, - init_state: self.init_state.clone(), - receiver: Some(self.receiver), - outbound: Arc::new(self.outbound), - forward: self.forward, - }) - } +pub(crate) struct OnMessageMutValues { + state: State, + receiver: tokio::sync::mpsc::Receiver, + kill: tokio::sync::broadcast::Receiver<()>, } pub(crate) async fn on_message< - Prop: Value, - State: Value, + Prop, + State, Outbound: Sync + Send, + Request: Sync + Send, M: InboundMessage, >( _actor_name: String, - mut receiver: tokio::sync::mpsc::Receiver, prop: &Prop, - mut state: State, - forward: &HashMap + Send + Sync>>, - outbound: Arc, - mut kill: tokio::sync::broadcast::Receiver<()>, + mut values: OnMessageMutValues, + forward: &ForwardTable, + outbound: &Outbound, + request: &Request, ) -> (State, tokio::sync::mpsc::Receiver) { - let outbound = outbound.clone(); loop { select! { - _ = kill.recv() => { + _ = values.kill.recv() => { - while receiver.try_recv().is_ok(){} + while values.receiver.try_recv().is_ok(){} - return (state, receiver); + return (values.state, values.receiver); }, - m = receiver.recv() => { + m = values.receiver.recv() => { if m.is_none() { - let _ = kill.try_recv(); - return (state, receiver); + let _ = values.kill.try_recv(); + return (values.state, values.receiver); } let m = m.unwrap(); let t = forward.get(&m.inbound_channel()); if t.is_none() { continue; } - t.unwrap().forward_message(prop, &mut state, &outbound, m); + t.unwrap().forward_message(prop, &mut values.state, outbound, request, m); } } } diff --git a/src/core/actor_builder.rs b/src/core/actor_builder.rs index 06eccb0..7c6c995 100644 --- a/src/core/actor_builder.rs +++ b/src/core/actor_builder.rs @@ -1,34 +1,42 @@ -use std::collections::HashMap; + use crate::compute::context::Context; use crate::core::{ actor::GenericActor, - inbound::{ForwardMessage, InboundHub, InboundMessage}, + inbound::{InboundHub, InboundMessage}, outbound::OutboundHub, runner::Runner, - value::Value, }; +use super::actor::ForwardTable; +use super::request::RequestHub; + /// Creates actor from its components. -/// +/// /// Used in [`InboundHub::from_builder`] public interface. -pub struct ActorBuilder<'a, Prop, State: Value, OutboundHub, M: InboundMessage> { +pub struct ActorBuilder< + 'a, + Prop, + State, + OutboundHub, + Request: RequestHub, + M: InboundMessage, +> { /// unique identifier of the actor pub actor_name: String, prop: Prop, state: State, /// execution context - pub context: &'a mut Context, + pub context: &'a mut Context, /// a channel for sending messages to the actor pub sender: tokio::sync::mpsc::Sender, pub(crate) receiver: tokio::sync::mpsc::Receiver, /// a collection of inbound channels - pub forward: - HashMap + Send + Sync>>, + pub forward: ForwardTable, } -impl<'a, Prop, State: Value, Outbound: OutboundHub, M: InboundMessage> - ActorBuilder<'a, Prop, State, Outbound, M> +impl<'a, Prop, State, Outbound: OutboundHub, Request: RequestHub, M: InboundMessage> + ActorBuilder<'a, Prop, State, Outbound, Request, M> { pub(crate) fn new( context: &'a mut Context, @@ -45,33 +53,35 @@ impl<'a, Prop, State: Value, Outbound: OutboundHub, M: InboundMessage> context, sender: sender.clone(), receiver, - forward: HashMap::new(), + forward: ForwardTable::new(), } } pub(crate) fn build< - Inbound: InboundHub, - Run: Runner, + Inbound: InboundHub, + Run: Runner, >( self, inbound: Inbound, outbound: Outbound, - ) -> GenericActor { + request: Request, + ) -> GenericActor { let mut actor = GenericActor { actor_name: self.actor_name.clone(), inbound, outbound, + request, phantom: std::marker::PhantomData {}, }; - let sleeping = Run::new_dormant_actor( + self.context.actors.push(Run::new_actor_node( self.actor_name, self.prop, self.state, self.receiver, self.forward, actor.outbound.extract(), - ); - self.context.actors.push(sleeping); + actor.request.extract(), + )); actor } } diff --git a/src/core/connection.rs b/src/core/connection.rs new file mode 100644 index 0000000..b468fae --- /dev/null +++ b/src/core/connection.rs @@ -0,0 +1,34 @@ +use std::sync::Arc; + +use self::{ + outbound_connection::{ActiveConnection, ConnectionConfig}, + request_connection::{ + ActiveRequestConnection, GenericRequestConnection, RequestConnectionConfig, + }, +}; + +use super::outbound::GenericConnection; + +/// Infrastructure to connect an outbound channel of one actor to an inbound channel of another actor. +/// +/// Note that the implementation is a bit over-engineered and can likely be simplified. +pub mod outbound_connection; + +/// Infrastructure to connect an request channel of one actor to an inbound channel of another actor. +/// +/// Note that the implementation is a bit over-engineered and can likely be simplified. +pub mod request_connection; + +type ConnectionRegister = Vec + Send + Sync>>; + +pub(crate) enum ConnectionEnum { + Config(ConnectionConfig), + Active(ActiveConnection), +} + +type RequestConnectionRegister = Option + Send + Sync>>; + +pub(crate) enum RequestConnectionEnum { + Config(RequestConnectionConfig), + Active(ActiveRequestConnection), +} diff --git a/src/core/connection/outbound_connection.rs b/src/core/connection/outbound_connection.rs new file mode 100644 index 0000000..533a800 --- /dev/null +++ b/src/core/connection/outbound_connection.rs @@ -0,0 +1,100 @@ +use std::sync::Arc; + +use crate::core::{outbound::GenericConnection, Morph}; + +use super::{ConnectionRegister, ConnectionEnum}; + +pub(crate) struct ConnectionConfig { + pub connection_register: ConnectionRegister, + pub maybe_register_launch_pad: Option>>, + pub maybe_register_landing_pad: Option>>, +} + +impl Drop for ConnectionConfig { + fn drop(&mut self) { + if let Some(connection_launch_pad) = self.maybe_register_launch_pad.take() { + let connection_register = std::mem::take(&mut self.connection_register); + let _ = connection_launch_pad.send(connection_register); + } else { + panic!("ConnectionConfig dropped when launch pad is is empty"); + } + } +} + +impl ConnectionConfig { + pub fn new() -> Self { + let (connection_launch_pad, connection_landing_pad) = tokio::sync::oneshot::channel(); + Self { + connection_register: vec![], + maybe_register_launch_pad: Some(connection_launch_pad), + maybe_register_landing_pad: Some(connection_landing_pad), + } + } +} + +pub(crate) struct ActiveConnection { + pub maybe_registers: Option>, + pub maybe_register_landing_pad: Option>>, +} + + +impl ConnectionEnum { + pub fn new() -> Self { + Self::Config(ConnectionConfig::new()) + } + + pub fn push(&mut self, connection: Arc + Send + Sync>) { + match self { + Self::Config(config) => { + config.connection_register.push(connection); + } + Self::Active(_) => { + panic!("Cannot push to active connection"); + } + } + } + + pub(crate) fn send(&self, msg: T) { + match self { + Self::Config(_) => { + panic!("Cannot send to config connection"); + } + Self::Active(active) => { + for i in active.maybe_registers.as_ref().unwrap().iter() { + i.send_impl(msg.clone()); + } + } + } + } +} + +impl Morph for ConnectionEnum { + fn extract(&mut self) -> Self { + match self { + Self::Config(config) => Self::Active(ActiveConnection { + maybe_registers: None, + maybe_register_landing_pad: Some(config.maybe_register_landing_pad.take().unwrap()), + }), + Self::Active(_) => { + panic!("Cannot extract active connection"); + } + } + } + + fn activate(&mut self) { + match self { + Self::Config(_) => { + panic!("Cannot activate config connection"); + } + Self::Active(active) => { + let connection_register = active + .maybe_register_landing_pad + .take() + .unwrap() + .try_recv() + .unwrap(); + active.maybe_registers = Some(connection_register); + } + } + } +} diff --git a/src/core/connection/request_connection.rs b/src/core/connection/request_connection.rs new file mode 100644 index 0000000..5b4d8da --- /dev/null +++ b/src/core/connection/request_connection.rs @@ -0,0 +1,135 @@ +use std::{marker::PhantomData, sync::Arc}; + +use tokio::sync::mpsc::error::SendError; + +use crate::core::{InboundMessage, InboundMessageNew, Morph}; + +use super::{RequestConnectionEnum, RequestConnectionRegister}; + +pub(crate) trait GenericRequestConnection: Send + Sync { + fn send_impl(&self, msg: T); +} + +#[derive(Debug, Clone)] +pub(crate) struct RequestConnection { + pub(crate) sender: tokio::sync::mpsc::Sender, + pub(crate) inbound_channel: String, + pub(crate) phantom: PhantomData, +} + +impl> GenericRequestConnection + for RequestConnection +{ + fn send_impl(&self, msg: T) { + let msg = M::new(self.inbound_channel.clone(), msg); + let c = self.sender.clone(); + let handler = tokio::spawn(async move { + match c.send(msg).await { + Ok(_) => {} + Err(SendError(_)) => { + println!("SendError"); + } + } + }); + std::mem::drop(handler); + } +} + +pub(crate) struct RequestConnectionConfig { + pub connection_register: RequestConnectionRegister, + pub maybe_register_launch_pad: + Option>>, + pub maybe_register_landing_pad: + Option>>, +} + +impl Drop for RequestConnectionConfig { + fn drop(&mut self) { + if let Some(connection_launch_pad) = self.maybe_register_launch_pad.take() { + let connection_register = std::mem::take(&mut self.connection_register); + let _ = connection_launch_pad.send(connection_register); + } else { + panic!("ConnectionConfig dropped when launch pad is is empty"); + } + } +} + +impl RequestConnectionConfig { + pub fn new() -> Self { + let (connection_launch_pad, connection_landing_pad) = tokio::sync::oneshot::channel(); + Self { + connection_register: None, + maybe_register_launch_pad: Some(connection_launch_pad), + maybe_register_landing_pad: Some(connection_landing_pad), + } + } +} + +pub(crate) struct ActiveRequestConnection { + pub maybe_registers: Option>, + pub maybe_register_landing_pad: + Option>>, +} + +impl RequestConnectionEnum { + pub fn new() -> Self { + Self::Config(RequestConnectionConfig::new()) + } + + pub fn push(&mut self, connection: Arc + Send + Sync>) { + match self { + Self::Config(config) => { + assert!(config.connection_register.is_none()); + config.connection_register = Some(connection); + } + Self::Active(_) => { + panic!("Cannot push to active connection"); + } + } + } + + pub(crate) fn send(&self, msg: T) { + match self { + Self::Config(_) => { + panic!("Cannot send to config connection"); + } + Self::Active(active) => { + let maybe_connection = active.maybe_registers.as_ref().unwrap(); + if maybe_connection.is_some() { + maybe_connection.as_ref().unwrap().send_impl(msg); + } + } + } + } +} + +impl Morph for RequestConnectionEnum { + fn extract(&mut self) -> Self { + match self { + Self::Config(config) => Self::Active(ActiveRequestConnection { + maybe_registers: None, + maybe_register_landing_pad: Some(config.maybe_register_landing_pad.take().unwrap()), + }), + Self::Active(_) => { + panic!("Cannot extract active connection"); + } + } + } + + fn activate(&mut self) { + match self { + Self::Config(_) => { + panic!("Cannot activate config connection"); + } + Self::Active(active) => { + let connection_register = active + .maybe_register_landing_pad + .take() + .unwrap() + .try_recv() + .unwrap(); + active.maybe_registers = Some(connection_register); + } + } + } +} diff --git a/src/core/inbound.rs b/src/core/inbound.rs index 4921603..23d43cb 100644 --- a/src/core/inbound.rs +++ b/src/core/inbound.rs @@ -1,11 +1,15 @@ use crate::compute::context::Context; -use crate::core::{actor_builder::ActorBuilder, outbound::OutboundHub, value::Value}; +use crate::core::{actor_builder::ActorBuilder, outbound::OutboundHub}; + +use super::request::{NullRequest, RequestHub}; /// The inbound hub is a collection of inbound channels. -pub trait InboundHub: Send + Sync { +pub trait InboundHub, M: InboundMessage>: + Send + Sync +{ /// Create a new inbound hub for an actor. fn from_builder( - builder: &mut ActorBuilder, + builder: &mut ActorBuilder, actor_name: &str, ) -> Self; } @@ -14,11 +18,11 @@ pub trait InboundHub: Send + #[derive(Debug, Clone)] pub struct NullInbound {} -impl - InboundHub for NullInbound +impl> + InboundHub for NullInbound { fn from_builder( - _builder: &mut ActorBuilder, + _builder: &mut ActorBuilder, _actor_name: &str, ) -> Self { Self {} @@ -26,8 +30,8 @@ impl } /// Inbound channel to receive messages of a specific type `T`. -/// -/// Inbound channels can be connected to one or more outbound channels of upstream actors. +/// +/// Inbound channels can be connected to one or more outbound channels of upstream actors. #[derive(Debug, Clone)] pub struct InboundChannel { /// Unique identifier of the inbound channel. @@ -38,9 +42,7 @@ pub struct InboundChannel { pub(crate) phantom: std::marker::PhantomData, } -impl - InboundChannel -{ +impl InboundChannel { /// Creates a new inbound channel. pub fn new( context: &mut Context, @@ -61,14 +63,17 @@ impl String; } @@ -76,7 +81,13 @@ pub trait InboundMessage: Send + Sync + Clone + 'static { /// Customization point for processing inbound messages. pub trait OnMessage: InboundMessage { /// Process the inbound message - user code with main business logic goes here. - fn on_message(&self, prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub); + fn on_message( + self, + prop: &Self::Prop, + state: &mut Self::State, + outbound: &Self::OutboundHub, + request: &Self::RequestHub, + ); } /// Trait for creating inbound messages of compatible types `T`. @@ -88,62 +99,93 @@ pub trait InboundMessageNew: } /// Message forwarder. -pub trait ForwardMessage< -Prop: Value, -State: Value, OutboundHub, M: InboundMessage> { +pub trait ForwardMessage { /// Forward the message to the OnMessage customization point. - fn forward_message(&self, prop: &Prop, state: &mut State, outbound: &OutboundHub, msg: M); + fn forward_message( + &self, + prop: &Prop, + state: &mut State, + outbound: &OutboundHub, + request: &RequestHub, + msg: M, + ); } impl< - T: Default + Clone + Send + Sync + std::fmt::Debug + 'static, - Prop: Value, - State: Value, + T: Clone + Send + Sync + std::fmt::Debug + 'static, + Prop, + State, OutboundHub, - M: OnMessage, - > ForwardMessage for InboundChannel + RequestHub, + M: OnMessage, + > ForwardMessage for InboundChannel { - fn forward_message(&self, prop: &Prop, state: &mut State, outbound: &OutboundHub, msg: M) { - msg.on_message(prop, state, outbound); + fn forward_message( + &self, + prop: &Prop, + state: &mut State, + outbound: &OutboundHub, + request: &RequestHub, + msg: M, + ) { + msg.on_message(prop, state, outbound, request); } } /// Null message is a marker type for actors with no inbound channels. #[derive(Debug)] -pub enum NullMessage { +pub enum NullMessage { /// Null message. - NullMessage(std::marker::PhantomData<(P, S, O)>), + NullMessage(std::marker::PhantomData<(P, S, O, NullRequest)>), } -impl NullMessage { - /// Creates a new null message. - pub fn new() -> Self { - NullMessage::NullMessage(std::marker::PhantomData {}) +impl Default for NullMessage { + fn default() -> Self { + Self::new() } } -impl Default for NullMessage { - fn default() -> Self { - Self::new() +impl NullMessage { + /// Creates a new null message. + pub fn new() -> Self { + NullMessage::NullMessage(std::marker::PhantomData {}) } } -impl Clone for NullMessage { +impl Clone for NullMessage { fn clone(&self) -> Self { Self::new() } } -impl InboundMessage for NullMessage { +impl< + P: std::marker::Send + std::marker::Sync + 'static, + S: std::marker::Send + std::marker::Sync + 'static, + O: OutboundHub, + > InboundMessage for NullMessage +{ type Prop = P; type State = S; type OutboundHub = O; + type RequestHub = NullRequest; fn inbound_channel(&self) -> String { "".to_owned() } } -impl OnMessage for NullMessage { - fn on_message(&self, _prop: &P, _state: &mut Self::State, _outputs: &Self::OutboundHub) {} +impl< + P: std::marker::Send + std::marker::Sync + 'static, + S: std::marker::Send + std::marker::Sync + 'static, + O: OutboundHub, + > OnMessage for NullMessage +{ + fn on_message( + self, + _prop: &P, + _state: &mut Self::State, + _outputs: &Self::OutboundHub, + _request: &Self::RequestHub, + ) { + } } diff --git a/src/core/outbound.rs b/src/core/outbound.rs index 237da39..8f26d3f 100644 --- a/src/core/outbound.rs +++ b/src/core/outbound.rs @@ -1,9 +1,11 @@ -use std::{marker::PhantomData, sync::Arc, vec}; +use std::{marker::PhantomData, sync::Arc}; use tokio::sync::mpsc::error::SendError; use crate::compute::context::Context; use crate::core::inbound::{InboundChannel, InboundMessage, InboundMessageNew}; +use super::connection::ConnectionEnum; + /// OutboundHub is a collection of outbound channels for the actor. pub trait OutboundHub: Send + Sync + 'static + Morph { /// Creates the OutboundHub from context and the actor name. @@ -12,8 +14,7 @@ pub trait OutboundHub: Send + Sync + 'static + Morph { /// An empty outbound hub - used for actors that do not have any outbound channels. #[derive(Debug, Clone)] -pub struct NullOutbound { -} +pub struct NullOutbound {} impl Morph for NullOutbound { fn extract(&mut self) -> Self { @@ -119,107 +120,3 @@ impl> GenericConnection for OutboundC std::mem::drop(handler); } } - -type ConnectionRegister = Vec + Send + Sync>>; - -pub(crate) struct ConnectionConfig { - pub connection_register: ConnectionRegister, - pub maybe_register_launch_pad: Option>>, - pub maybe_register_landing_pad: Option>>, -} - -impl Drop for ConnectionConfig { - fn drop(&mut self) { - if let Some(connection_launch_pad) = self.maybe_register_launch_pad.take() { - let connection_register = std::mem::take(&mut self.connection_register); - let _ = connection_launch_pad.send(connection_register); - } else { - panic!("ConnectionConfig dropped when launch pad is is empty"); - } - } -} - -impl ConnectionConfig { - pub fn new() -> Self { - let (connection_launch_pad, connection_landing_pad) = tokio::sync::oneshot::channel(); - Self { - connection_register: vec![], - maybe_register_launch_pad: Some(connection_launch_pad), - maybe_register_landing_pad: Some(connection_landing_pad), - } - } -} - -pub(crate) struct ActiveConnection { - pub maybe_registers: Option>, - pub maybe_register_landing_pad: Option>>, -} - -pub(crate) enum ConnectionEnum { - Config(ConnectionConfig), - Active(ActiveConnection), -} - -impl ConnectionEnum { - pub fn new() -> Self { - Self::Config(ConnectionConfig::new()) - } - - pub fn push(&mut self, connection: Arc + Send + Sync>) { - match self { - Self::Config(config) => { - config.connection_register.push(connection); - } - Self::Active(_) => { - panic!("Cannot push to active connection"); - } - } - } - - fn send(&self, msg: T) { - match self { - Self::Config(_) => { - panic!("Cannot send to config connection"); - } - Self::Active(active) => { - for i in active.maybe_registers.as_ref().unwrap().iter() { - i.send_impl(msg.clone()); - } - } - } - } -} - -impl Morph for ConnectionEnum { - fn extract(&mut self) -> Self { - - println!("ConnectionEnum::extract"); - match self { - Self::Config(config) => Self::Active(ActiveConnection { - maybe_registers: None, - maybe_register_landing_pad: Some(config.maybe_register_landing_pad.take().unwrap()), - }), - Self::Active(_) => { - panic!("Cannot extract active connection"); - } - } - } - - fn activate(&mut self) { - println!("ConnectionEnum::activate"); - match self { - Self::Config(_) => { - panic!("Cannot activate config connection"); - } - Self::Active(active) => { - let connection_register = active - .maybe_register_landing_pad - .take() - .unwrap() - .try_recv() - .unwrap(); - active.maybe_registers = Some(connection_register); - } - } - } -} diff --git a/src/core/request.rs b/src/core/request.rs new file mode 100644 index 0000000..dd2f641 --- /dev/null +++ b/src/core/request.rs @@ -0,0 +1,170 @@ +use std::fmt::Debug; +use std::{marker::PhantomData, sync::Arc}; + +use crate::compute::Context; + +use super::connection::request_connection::RequestConnection; +use super::connection::RequestConnectionEnum; +use super::{InboundChannel, InboundMessage, InboundMessageNew, Morph}; + +/// A request hub is used to send requests to other actors. +pub trait RequestHub: Send + Sync + 'static + Morph { + /// Create a new request hub for an actor. + fn from_context_and_parent(actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self; +} + +/// A request message with a reply channel. +#[derive(Debug, Clone, Default)] +pub struct RequestMessage { + /// The request. + request: Request, + /// The reply channel. + reply_channel: Option>>>, +} + +impl RequestMessage { + /// Reply to the request immediately. + pub fn reply(self, func: F) + where + F: FnOnce(Request) -> Reply, + { + let reply_struct = self.reply_later(); + let reply = func(reply_struct.request); + reply_struct + .reply_channel + .send(ReplyMessage { reply }) + .unwrap(); + } + + /// Reply to the request later using the provided reply channel. + pub fn reply_later(self) -> ReplyLater { + let reply_channel = Arc::into_inner( + self.reply_channel + .expect("self.reply must not be None. This is a bug in the hollywood crate."), + ) + .expect("self.reply must have a ref count of 1. This is a bug in the hollywood crate."); + ReplyLater:: { + request: self.request, + reply_channel, + } + } +} + +/// A request with a reply channel. +pub struct ReplyLater { + /// The request. + pub request: Request, + /// The reply channel. + pub reply_channel: tokio::sync::oneshot::Sender>, +} + +impl ReplyLater { + /// Send the reply to the request. + pub fn send_reply(self, reply: Reply) { + self.reply_channel.send(ReplyMessage { reply }).unwrap(); + } +} + +/// A reply to a request. +#[derive(Debug, Clone, Default)] +pub struct ReplyMessage { + /// The reply value. + pub reply: Reply, +} + +/// OutboundChannel is a connections for messages which are sent to a downstream actor. +pub struct RequestChannel { + /// Unique name of the outbound. + pub name: String, + /// Name of the actor that sends the outbound messages. + pub actor_name: String, + pub(crate) connection_register: RequestConnectionEnum>, + + pub(crate) sender: tokio::sync::mpsc::Sender, + // phantom: std::marker::PhantomData<(Request, Reply)>, +} + +impl Morph for RequestChannel { + fn extract(&mut self) -> Self { + Self { + name: self.name.clone(), + actor_name: self.actor_name.clone(), + connection_register: self.connection_register.extract(), + sender: self.sender.clone(), + } + } + + fn activate(&mut self) { + self.connection_register.activate(); + } +} + +impl< + Request: Clone + Send + Sync + std::fmt::Debug + 'static, + Reply: Clone + Send + Sync + std::fmt::Debug + 'static, + M: InboundMessageNew>, + > RequestChannel +{ + /// Create a new outbound for actor in provided context. + pub fn new(name: String, actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self { + Self { + name: name.clone(), + actor_name: actor_name.to_owned(), + connection_register: RequestConnectionEnum::new(), + sender: sender.clone(), + } + } + + /// Connect the outbound channel from this actor to the inbound channel of another actor. + pub fn connect>>( + &mut self, + _ctx: &mut Context, + inbound: &mut InboundChannel, Me>, + ) { + //ctx.connect_impl(self, inbound); + self.connection_register.push(Arc::new(RequestConnection { + sender: inbound.sender.clone(), + inbound_channel: inbound.name.clone(), + phantom: PhantomData {}, + })); + } + + /// Send a message to the connected inbound channels to other actors. + pub fn send_request(&self, msg: Request) { + let (sender, receiver) = tokio::sync::oneshot::channel(); + let msg = RequestMessage { + request: msg, + reply_channel: Some(Arc::new(sender)), + }; + self.connection_register.send(msg); + //self.sender.send(receiver.into_future()) + //todo!() + + let sender = self.sender.clone(); + let name = self.name.clone(); + + //receiver.into_future()) + tokio::spawn(async move { + let r = receiver.await.unwrap(); + sender.send(M::new(name, r)).await + }); + } +} + +/// An empty request hub - used for actors that do not have any request channels. +#[derive(Debug, Clone, Default)] +pub struct NullRequest {} + +impl RequestHub for NullRequest { + fn from_context_and_parent(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender) -> Self { + Self {} + } +} + +impl Morph for NullRequest { + fn extract(&mut self) -> Self { + Self {} + } + + fn activate(&mut self) {} +} diff --git a/src/core/runner.rs b/src/core/runner.rs index 3230972..f753491 100644 --- a/src/core/runner.rs +++ b/src/core/runner.rs @@ -1,47 +1,69 @@ -use std::collections::HashMap; + use crate::core::{ - actor::{DormantActorImpl, DormantActorNode}, - inbound::{ForwardMessage, InboundHub, InboundMessage}, + inbound::{InboundHub, InboundMessage}, outbound::OutboundHub, - value::Value, +}; + +use super::{ + actor::{ActorNodeImpl, ForwardTable}, + request::RequestHub, + ActorNode, }; /// Runner executes the pipeline. pub trait Runner< Prop, - Inbound: InboundHub, - State: Value, + Inbound: InboundHub, + State, Outbound: OutboundHub, + Request: RequestHub, M: InboundMessage, > { - /// Create a new dormant actor to be stored by the context. - fn new_dormant_actor( + /// Create a new actor to be stored by the context. + fn new_actor_node( name: String, prop: Prop, state: State, receiver: tokio::sync::mpsc::Receiver, - forward: HashMap< - String, - Box + Send + Sync>, - >, + forward: ForwardTable, outbound: Outbound, - ) -> Box; + request: Request, + ) -> Box; } /// The default runner. pub struct DefaultRunner< Prop, Inbound: Send + Sync, - State: Value, + State, Outbound: Send + Sync + 'static, + Request: Send + Sync + 'static, > { - phantom: std::marker::PhantomData<(Prop, Inbound, State, Outbound)>, + phantom: std::marker::PhantomData<(Prop, Inbound, State, Outbound, Request)>, } -impl - DefaultRunner +impl< + Prop, + State, + Inbound: Send + Sync, + Outbound: Send + Sync + 'static, + Request: Send + Sync + 'static, + > Default for DefaultRunner +{ + fn default() -> Self { + Self::new() + } +} + +impl< + Prop, + State, + Inbound: Send + Sync, + Outbound: Send + Sync + 'static, + Request: Send + Sync + 'static, + > DefaultRunner { /// Create a new default runner. pub fn new() -> Self { @@ -51,41 +73,34 @@ impl } } -impl Default - for DefaultRunner -{ - fn default() -> Self { - Self::new() - } -} - impl< - Prop: Value, - Inbound: InboundHub, - State: Value, + Prop: std::marker::Send + std::marker::Sync + 'static, + Inbound: InboundHub, + State: Clone + std::marker::Send + std::marker::Sync + 'static, Outbound: OutboundHub, M: InboundMessage, - > Runner - for DefaultRunner + Request: RequestHub, + > Runner + for DefaultRunner { - fn new_dormant_actor( + fn new_actor_node( name: String, prop: Prop, init_state: State, receiver: tokio::sync::mpsc::Receiver, - forward: HashMap< - String, - Box + Send + Sync>, - >, + forward: ForwardTable, outbound: Outbound, - ) -> Box { - Box::new(DormantActorImpl:: { - name: name.clone(), + request: Request, + ) -> Box { + Box::new(ActorNodeImpl:: { + name, prop, - receiver, + init_state, + state: None, + receiver: Some(receiver), outbound, forward, - init_state, + request, }) } } diff --git a/src/core/value.rs b/src/core/value.rs index b31988c..2ef32ff 100644 --- a/src/core/value.rs +++ b/src/core/value.rs @@ -1,15 +1,9 @@ -/// Trait for actor state and props. -pub trait Value: std::fmt::Debug + Send + Sync + Clone + 'static {} - - /// Empty state - for stateless actors. #[derive(Clone, Debug, Default)] pub struct NullState {} -impl Value for NullState {} /// Empty prop - for actors without props. #[derive(Clone, Debug, Default)] pub struct NullProp {} -impl Value for NullProp {} diff --git a/src/examples/mod.rs b/src/examples.rs similarity index 98% rename from src/examples/mod.rs rename to src/examples.rs index cbf4200..de0975d 100644 --- a/src/examples/mod.rs +++ b/src/examples.rs @@ -1,4 +1,3 @@ -#![deny(missing_docs)] //! Examples of hollywood actor framework. /// Moving average example diff --git a/src/examples/moving_average/mod.rs b/src/examples/moving_average/mod.rs index a2ee7b8..e175c56 100644 --- a/src/examples/moving_average/mod.rs +++ b/src/examples/moving_average/mod.rs @@ -2,16 +2,16 @@ use crate::macros::*; // needed for actor_outputs macro pub use crate::compute::Context; +use crate::core::request::NullRequest; pub use crate::core::{Morph, OutboundChannel, OutboundHub}; // needed for actor_inputs macro pub use crate::core::{ ActorBuilder, InboundChannel, InboundHub, InboundMessage, InboundMessageNew, OnMessage, - Value, }; // needed for actor macro -pub use crate::core::{Actor, DefaultRunner, FromPropState}; +pub use crate::core::{Actor, FromPropState, DefaultRunner}; /// Outbound hub for the MovingAverage. #[actor_outputs] @@ -41,10 +41,6 @@ impl Default for MovingAverageProp { } } -impl Value for MovingAverageProp {} - - - /// State of the MovingAverage actor. #[derive(Clone, Debug, Default)] pub struct MovingAverageState { @@ -52,14 +48,10 @@ pub struct MovingAverageState { pub moving_average: f64, } -impl Value for MovingAverageState {} - - - /// Inbound message for the MovingAverage actor. /// #[derive(Clone, Debug)] -#[actor_inputs(MovingAverageInbound, {MovingAverageProp, MovingAverageState, MovingAverageOutbound})] +#[actor_inputs(MovingAverageInbound, {MovingAverageProp, MovingAverageState, MovingAverageOutbound, NullRequest})] pub enum MovingAverageMessage { /// a float value Value(f64), @@ -67,7 +59,13 @@ pub enum MovingAverageMessage { impl OnMessage for MovingAverageMessage { /// Process the inbound time_stamp message. - fn on_message(&self, prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub) { + fn on_message( + self, + prop: &Self::Prop, + state: &mut Self::State, + outbound: &Self::OutboundHub, + _request: &Self::RequestHub, + ) { match &self { MovingAverageMessage::Value(new_value) => { state.moving_average = @@ -92,9 +90,13 @@ impl InboundMessageNew for MovingAverageMessage { /// The MovingAverage actor. /// #[actor(MovingAverageMessage)] -type MovingAverage = - Actor; - +type MovingAverage = Actor< + MovingAverageProp, + MovingAverageInbound, + MovingAverageState, + MovingAverageOutbound, + NullRequest, +>; /// Manual implementation of the moving average actor pub mod manual; diff --git a/src/examples/one_dim_robot/draw.rs b/src/examples/one_dim_robot/draw.rs index 9a967c8..32f169f 100644 --- a/src/examples/one_dim_robot/draw.rs +++ b/src/examples/one_dim_robot/draw.rs @@ -1,7 +1,7 @@ +use crate::core::request::NullRequest; use crate::core::InboundMessageNew; use crate::core::NullOutbound; use crate::core::OnMessage; -use crate::core::Value; use crate::core::*; use crate::examples::one_dim_robot::{NamedFilterState, Robot, Stamped}; use crate::macros::*; @@ -9,7 +9,7 @@ use drawille::Canvas; /// Inbound channels for the draw actor #[derive(Clone, Debug)] -#[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound})] +#[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound,NullRequest})] pub enum DrawInboundMessage { /// True position of the robot. TruePos(Stamped), @@ -21,11 +21,17 @@ pub enum DrawInboundMessage { /// Draw actor for one-dim-robot example. #[actor(DrawInboundMessage)] -pub type DrawActor = Actor; +pub type DrawActor = Actor; impl OnMessage for DrawInboundMessage { /// Forward the message to the correct handler method of [DrawState]. - fn on_message(&self, _prop: &NullProp, state: &mut Self::State, outbound: &Self::OutboundHub) { + fn on_message( + self, + _prop: &NullProp, + state: &mut Self::State, + outbound: &Self::OutboundHub, + _request: &Self::RequestHub, + ) { match self { DrawInboundMessage::TruePos(msg) => { state.true_pos(msg.clone(), outbound); @@ -99,7 +105,7 @@ impl DrawState { self.draw(); } - /// Draw the current state to the console if all information of the most recent timestamp is + /// Draw the current state to the console if all information of the most recent timestamp is /// available. pub fn draw(&mut self) { let factor = 6.0; @@ -186,5 +192,3 @@ impl DrawState { } } } - -impl Value for DrawState {} diff --git a/src/examples/one_dim_robot/filter.rs b/src/examples/one_dim_robot/filter.rs index f92f148..018603e 100644 --- a/src/examples/one_dim_robot/filter.rs +++ b/src/examples/one_dim_robot/filter.rs @@ -1,38 +1,55 @@ use std::fmt::{Debug, Display}; use crate::compute::Context; +use crate::core::request::{NullRequest, RequestMessage}; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, - InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, Value, + Actor, ActorBuilder, FromPropState, DefaultRunner, InboundChannel, InboundHub, InboundMessage, + InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, }; use crate::examples::one_dim_robot::{RangeMeasurementModel, Stamped}; use hollywood_macros::{actor, actor_inputs, actor_outputs}; /// Inbound channels for the filter actor. #[derive(Clone, Debug)] -#[actor_inputs(FilterInbound,{NullProp, FilterState,FilterOutbound})] +#[actor_inputs(FilterInbound,{NullProp, FilterState,FilterOutbound,NullRequest})] pub enum FilterInboundMessage { /// noisy velocity measurements NoisyVelocity(Stamped), /// noisy range measurements NoisyRange(Stamped), + /// Request + Request(RequestMessage), } #[actor(FilterInboundMessage)] -type Filter = Actor; +type Filter = Actor; impl OnMessage for FilterInboundMessage { /// Process the inbound message NoisyVelocity or NoisyRange. /// /// On NoisyVelocity, FilterState::prediction is called. /// On NoisyRange, FilterState::update is called. - fn on_message(&self, _prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub) { - match &self { + fn on_message( + self, + _prop: &Self::Prop, + state: &mut Self::State, + outbound: &Self::OutboundHub, + _request: &Self::RequestHub, + ) { + match self { FilterInboundMessage::NoisyVelocity(v) => { - state.prediction(v, outbound); + state.prediction(&v, outbound); } FilterInboundMessage::NoisyRange(r) => { - state.update(r, outbound); + state.update(&r, outbound); + } + FilterInboundMessage::Request(msg) => { + let msg: RequestMessage = msg; + + msg.reply(|input| { + println!("Got request: {}", input); + 42.0 + }); } } } @@ -48,6 +65,12 @@ impl InboundMessageNew> for FilterInboundMessage { } } +impl InboundMessageNew> for FilterInboundMessage { + fn new(_inbound_channel: String, msg: RequestMessage) -> Self { + FilterInboundMessage::Request(msg) + } +} + /// Filter state #[derive(Clone, Debug, Default)] pub struct FilterState { @@ -57,8 +80,6 @@ pub struct FilterState { pub robot_position: PositionBelieve, } -impl Value for FilterState {} - impl Display for FilterState { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { write!( diff --git a/src/examples/one_dim_robot/sim.rs b/src/examples/one_dim_robot/sim.rs index 9c35586..4ddde96 100644 --- a/src/examples/one_dim_robot/sim.rs +++ b/src/examples/one_dim_robot/sim.rs @@ -3,35 +3,47 @@ use std::fmt::Debug; use rand_distr::{Distribution, Normal}; use crate::compute::Context; +use crate::core::request::{ReplyMessage, RequestChannel, RequestHub}; use crate::core::{ - Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage, - InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, Value, + Actor, ActorBuilder, FromPropState, DefaultRunner, InboundChannel, InboundHub, InboundMessage, + InboundMessageNew, Morph, NullProp, OnMessage, OutboundChannel, OutboundHub, }; use crate::examples::one_dim_robot::{RangeMeasurementModel, Robot, Stamped}; use crate::macros::*; /// Inbound channels for the simulation actor. #[derive(Clone, Debug)] -#[actor_inputs(SimInbound, {NullProp, SimState, SimOutbound})] +#[actor_inputs(SimInbound, {NullProp, SimState, SimOutbound, SimRequest})] pub enum SimInboundMessage { /// Time-stamp message to drive the simulation. TimeStamp(f64), + /// Reply message from the compute pipeline. + SimReply(ReplyMessage), } /// Simulation for the one-dimensional Robot. #[actor(SimInboundMessage)] -pub type Sim = Actor; +pub type Sim = Actor; impl OnMessage for SimInboundMessage { /// Invokes [SimState::process_time_stamp()] on TimeStamp. - fn on_message(&self, _prop: &Self::Prop, state: &mut Self::State, outbound: &Self::OutboundHub) { + fn on_message( + self, + _prop: &Self::Prop, + state: &mut Self::State, + outbound: &Self::OutboundHub, + request: &Self::RequestHub, + ) { match self { SimInboundMessage::TimeStamp(time) => { - state.process_time_stamp(*time, outbound); - if time >= &state.shutdown_time { + state.process_time_stamp(time, outbound, request); + if time >= state.shutdown_time { outbound.cancel_request.send(()); } } + SimInboundMessage::SimReply(msg) => { + println!("SimReply: {}", msg.reply); + } } } } @@ -42,6 +54,12 @@ impl InboundMessageNew for SimInboundMessage { } } +impl InboundMessageNew> for SimInboundMessage { + fn new(_inbound_name: String, msg: ReplyMessage) -> Self { + SimInboundMessage::SimReply(msg) + } +} + /// Simulation state #[derive(Clone, Debug, Default)] pub struct SimState { @@ -57,7 +75,7 @@ impl SimState { const RANGE_MODEL: RangeMeasurementModel = RangeMeasurementModel {}; /// One step of the simulation. - pub fn process_time_stamp(&mut self, time: f64, outbound: &SimOutbound) { + pub fn process_time_stamp(&mut self, time: f64, outbound: &SimOutbound, request: &SimRequest) { self.time = time; self.true_robot.position += self.true_robot.velocity * time; self.true_robot.velocity = 0.25 * (0.25 * time).cos(); @@ -89,11 +107,13 @@ impl SimState { outbound .noisy_velocity .send(Stamped::from_stamp_and_value(time, &noisy_velocity)); + + request.ask_for_time.send_request("ask_for_time".to_owned()); + + // println!("time: {}", time); } } -impl Value for SimState {} - /// OutboundChannel channels for the simulation actor. #[actor_outputs] pub struct SimOutbound { @@ -110,3 +130,32 @@ pub struct SimOutbound { /// Compute pipeline cancel request. pub cancel_request: OutboundChannel<()>, } + +/// RequestChannel channels for the simulation actor. +pub struct SimRequest { + /// RequestChannel for time-stamp messages. + pub ask_for_time: RequestChannel, +} + +impl RequestHub for SimRequest { + fn from_context_and_parent( + actor_name: &str, + sender: &tokio::sync::mpsc::Sender, + ) -> Self { + Self { + ask_for_time: RequestChannel::new(actor_name.to_owned(), "ask_for_time", sender), + } + } +} + +impl Morph for SimRequest { + fn extract(&mut self) -> Self { + Self { + ask_for_time: self.ask_for_time.extract(), + } + } + + fn activate(&mut self) { + self.ask_for_time.activate(); + } +} diff --git a/src/introspect/mod.rs b/src/introspect.rs similarity index 100% rename from src/introspect/mod.rs rename to src/introspect.rs diff --git a/src/lib.rs b/src/lib.rs index 2a2a64b..97ec585 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,3 @@ -#![deny(missing_docs)] //! # Hollywood //! //! Hollywood is an actor framework for Rust.