Skip to content

Commit

Permalink
feat: request reply & refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
strasdat committed Jan 14, 2024
1 parent 2b4e1e9 commit d660666
Show file tree
Hide file tree
Showing 27 changed files with 918 additions and 415 deletions.
3 changes: 3 additions & 0 deletions examples/one_dim_robot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ async fn run_robot_example() {
sim.outbound
.true_robot
.connect(context, &mut truth_printer.inbound.printable);


sim.request.ask_for_time.connect(context, &mut filter.inbound.request);
context.register_cancel_requester(&mut sim.outbound.cancel_request);

filter
Expand Down
24 changes: 17 additions & 7 deletions hollywood_macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
let prop_type = &args.prop_type;
let state_type = &args.state_type;
let output_type = &args.output_type;
let request_type = &args.request_type;

let inbound = fields.iter().map(|variant| {
let variant_name = variant.ident.clone();
Expand Down Expand Up @@ -197,6 +198,7 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
type Prop = #prop_type;
type State = #state_type;
type OutboundHub = #output_type;
type RequestHub = #request_type;

fn inbound_channel(&self) -> String {
match self {
Expand All @@ -205,9 +207,9 @@ pub fn actor_inputs(args: TokenStream, inbound: TokenStream) -> TokenStream {
}
}

impl InboundHub<#prop_type, #state_type, #output_type, #name> for #struct_name {
impl InboundHub<#prop_type, #state_type, #output_type, #request_type,#name> for #struct_name {

fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type, #name>,
fn from_builder(builder: &mut ActorBuilder<#prop_type, #state_type, #output_type,#request_type, #name>,
actor_name: &str) -> Self {
#(#from_builder_inbounds)*

Expand Down Expand Up @@ -258,6 +260,8 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
let mut maybe_inbounds = None;
let mut maybe_state = None;
let mut maybe_outputs = None;
let mut maybe_requests = None;

if let Item::Type(item_type) = inbound_clone {
if let Type::Path(type_path) = *item_type.ty {
if type_path.path.segments.last().unwrap().ident != "Actor" {
Expand All @@ -267,10 +271,10 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
}
for segment in type_path.path.segments {
if let PathArguments::AngleBracketed(angle_bracketed_args) = segment.arguments {
if angle_bracketed_args.args.len() != 4 {
if angle_bracketed_args.args.len() != 5 {
return Error::new_spanned(
&angle_bracketed_args,
"Expected three type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS>",
"Expected 5 type arguments: Actor<PROP, INBOUNDS, STATE, OUTBOUNDS, REQUESTS>",
)
.to_compile_error()
.into();
Expand All @@ -279,6 +283,7 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
maybe_inbounds = Some(angle_bracketed_args.args[1].clone());
maybe_state = Some(angle_bracketed_args.args[2].clone());
maybe_outputs = Some(angle_bracketed_args.args[3].clone());
maybe_requests = Some(angle_bracketed_args.args[4].clone());
}
}
} else {
Expand All @@ -294,16 +299,17 @@ pub fn actor(attr: TokenStream, item: TokenStream) -> TokenStream {
let inbound = maybe_inbounds.unwrap();
let state_type = maybe_state.unwrap();
let out = maybe_outputs.unwrap();
let requests = maybe_requests.unwrap();

let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out> };
let runner_type = quote! { DefaultRunner<#prop, #inbound, #state_type, #out, #requests> };

let gen = quote! {

///
#( #attrs )*
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out>;
pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>;

impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #runner_type>
impl FromPropState<#prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type>
for #actor_name
{
fn name_hint(prop: &#prop) -> String {
Expand All @@ -320,6 +326,7 @@ struct ActorInbound {
prop_type: Ident,
state_type: Ident,
output_type: Ident,
request_type: Ident,
}

impl Parse for ActorInbound {
Expand All @@ -333,11 +340,14 @@ impl Parse for ActorInbound {
let state_type: Ident = content.parse()?;
let _: Token![,] = content.parse()?;
let output_type: Ident = content.parse()?;
let _: Token![,] = content.parse()?;
let request_type: Ident = content.parse()?;
Ok(ActorInbound {
struct_name,
prop_type,
state_type,
output_type,
request_type
})
}
}
File renamed without changes.
108 changes: 56 additions & 52 deletions src/actors/periodic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,27 @@ use std::sync::Arc;
use async_trait::async_trait;

use crate::compute::context::Context;
use crate::core::connection::ConnectionEnum;

use crate::core::request::NullRequest;
use crate::core::{
actor::{FromPropState, ActorNode, DormantActorNode, GenericActor},
actor::{ActorNode, FromPropState, GenericActor},
inbound::{ForwardMessage, NullInbound, NullMessage},
outbound::{ConnectionEnum, Morph, OutboundChannel, OutboundHub},
outbound::{Morph, OutboundChannel, OutboundHub},
runner::Runner,
value::Value,
};
use crate::macros::*;


/// Outbound hub of periodic actor, which consists of a single outbound channel.
#[actor_outputs]
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}


/// A periodic actor.
///
/// This is an actor that periodically sends a message to its outbound.
pub type Periodic =
GenericActor<PeriodicProp, NullInbound, PeriodicState, PeriodicOutbound, PeriodicRunner>;
pub type Periodic = GenericActor<
PeriodicProp,
NullInbound,
PeriodicState,
PeriodicOutbound,
NullRequest,
PeriodicRunner,
>;

impl Periodic {
/// Create a new periodic actor, with a period of `period` seconds.
Expand All @@ -51,7 +48,8 @@ impl
NullInbound,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
NullRequest,
PeriodicRunner,
> for Periodic
{
Expand All @@ -76,8 +74,6 @@ impl Default for PeriodicProp {
}
}

impl Value for PeriodicProp {}

/// State of the periodic actor.
#[derive(Clone, Debug)]
pub struct PeriodicState {
Expand All @@ -94,9 +90,32 @@ impl Default for PeriodicState {
}
}

impl Value for PeriodicState {}
/// Outbound hub of periodic actor, which consists of a single outbound channel.
pub struct PeriodicOutbound {
/// Time stamp outbound channel, which sends a messages every `period`
/// seconds with the current time stamp.
pub time_stamp: OutboundChannel<f64>,
}

impl Morph for PeriodicOutbound {
fn extract(&mut self) -> Self {
Self {
time_stamp: self.time_stamp.extract(),
}
}

fn activate(&mut self) {
self.time_stamp.activate();
}
}

impl OutboundHub for PeriodicOutbound {
fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self {
Self {
time_stamp: OutboundChannel::<f64>::new(context, "time_stamp".to_owned(), actor_name),
}
}
}

/// The custom runner for the periodic actor.
pub struct PeriodicRunner {}
Expand All @@ -107,16 +126,17 @@ impl
NullInbound,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullRequest,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
> for PeriodicRunner
{
/// Create a new dormant actor.
fn new_dormant_actor(
/// Create a new actor node.
fn new_actor_node(
name: String,
prop: PeriodicProp,
state: PeriodicState,
_receiver: tokio::sync::mpsc::Receiver<
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
>,
_forward: std::collections::HashMap<
String,
Expand All @@ -125,54 +145,36 @@ impl
PeriodicProp,
PeriodicState,
PeriodicOutbound,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound>,
NullRequest,
NullMessage<PeriodicProp, PeriodicState, PeriodicOutbound, NullRequest>,
> + Send
+ Sync,
>,
>,
outbound: PeriodicOutbound,
) -> Box<dyn DormantActorNode + Send + Sync> {
Box::new(DormantPeriodic {
_request: NullRequest,
) -> Box<dyn ActorNode + Send + Sync> {
Box::new(PeriodicActor {
name: name.clone(),
prop,
init_state: state.clone(),
outbound,
})
}
}

/// The dormant periodic actor.
pub struct DormantPeriodic {
name: String,
prop: PeriodicProp,
init_state: PeriodicState,
outbound: PeriodicOutbound,
}

impl DormantActorNode for DormantPeriodic {
fn activate(mut self: Box<Self>) -> Box<dyn ActorNode + Send> {
self.outbound.activate();
Box::new(ActivePeriodic {
name: self.name.clone(),
prop: self.prop.clone(),
init_state: self.init_state.clone(),
state: None,
outbound: Arc::new(self.outbound),
outbound: Some(outbound),
})
}
}

/// The active periodic actor.
pub struct ActivePeriodic {
pub struct PeriodicActor {
name: String,
prop: PeriodicProp,
init_state: PeriodicState,
state: Option<PeriodicState>,
outbound: Arc<PeriodicOutbound>,
outbound: Option<PeriodicOutbound>,
}

#[async_trait]
impl ActorNode for ActivePeriodic {
impl ActorNode for PeriodicActor {
fn name(&self) -> &String {
&self.name
}
Expand All @@ -182,6 +184,8 @@ impl ActorNode for ActivePeriodic {
}

async fn run(&mut self, mut kill: tokio::sync::broadcast::Receiver<()>) {
let mut outbound = self.outbound.take().unwrap();
outbound.activate();
self.reset();

let state = self.state.as_mut().unwrap();
Expand All @@ -190,7 +194,7 @@ impl ActorNode for ActivePeriodic {
(1000.0 * self.prop.period) as u64,
));

let conns = Arc::new(self.outbound.clone());
let conns = Arc::new(outbound);

loop {
interval.tick().await;
Expand Down
32 changes: 22 additions & 10 deletions src/actors/printer.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use std::fmt::{Debug, Display};

use crate::core::{
Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel, InboundHub, InboundMessage,
InboundMessageNew, NullOutbound, NullState, OnMessage, Value,
request::NullRequest, Actor, ActorBuilder, DefaultRunner, FromPropState, InboundChannel,
InboundHub, InboundMessage, InboundMessageNew, NullOutbound, NullState, OnMessage,
};

/// Configuration properties for the printer actor.
Expand All @@ -20,8 +20,6 @@ impl Default for PrinterProp {
}
}

impl Value for PrinterProp {}

/// Inbound message for the printer actor.
#[derive(Clone, Debug)]
pub enum PrinterInboundMessage<T: Display + Clone + Sync + Send + 'static> {
Expand All @@ -30,7 +28,13 @@ pub enum PrinterInboundMessage<T: Display + Clone + Sync + Send + 'static> {
}

impl<T: Debug + Display + Clone + Sync + Send + 'static> OnMessage for PrinterInboundMessage<T> {
fn on_message(&self, prop: &PrinterProp, _state: &mut Self::State, _outputs: &Self::OutboundHub) {
fn on_message(
self,
prop: &PrinterProp,
_state: &mut Self::State,
_outputs: &Self::OutboundHub,
_request: &Self::RequestHub,
) {
match self {
PrinterInboundMessage::Printable(printable) => {
println!("{}: {}", prop.topic, printable);
Expand All @@ -48,16 +52,17 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessageNew<T>
}

/// Generic printer actor.
pub type Printer<T> = Actor<PrinterProp, PrinterInbound<T>, NullState, NullOutbound>;
pub type Printer<T> = Actor<PrinterProp, PrinterInbound<T>, NullState, NullOutbound, NullRequest>;

impl<T: Clone + Sync + Send + 'static + Debug + Display + Default>
impl<T: Clone + Sync + Default + Send + 'static + Debug + Display>
FromPropState<
PrinterProp,
PrinterInbound<T>,
NullState,
NullOutbound,
PrinterInboundMessage<T>,
DefaultRunner<PrinterProp, PrinterInbound<T>, NullState, NullOutbound>,
NullRequest,
DefaultRunner<PrinterProp, PrinterInbound<T>, NullState, NullOutbound, NullRequest>,
> for Printer<T>
{
fn name_hint(prop: &PrinterProp) -> String {
Expand All @@ -77,6 +82,7 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessage
type Prop = PrinterProp;
type State = NullState;
type OutboundHub = NullOutbound;
type RequestHub = NullRequest;

fn inbound_channel(&self) -> String {
match self {
Expand All @@ -86,11 +92,17 @@ impl<T: Debug + Display + Clone + Sync + Send + 'static> InboundMessage
}

impl<T: Clone + Debug + Display + Default + Sync + Send + 'static>
InboundHub<PrinterProp, NullState, NullOutbound, PrinterInboundMessage<T>>
InboundHub<PrinterProp, NullState, NullOutbound, NullRequest, PrinterInboundMessage<T>>
for PrinterInbound<T>
{
fn from_builder(
builder: &mut ActorBuilder<PrinterProp, NullState, NullOutbound, PrinterInboundMessage<T>>,
builder: &mut ActorBuilder<
PrinterProp,
NullState,
NullOutbound,
NullRequest,
PrinterInboundMessage<T>,
>,
actor_name: &str,
) -> Self {
let m = InboundChannel::new(
Expand Down
File renamed without changes.
Loading

0 comments on commit d660666

Please sign in to comment.