From bf5aa8e80c8ed92840afbfe9c0e79e6bed3c359d Mon Sep 17 00:00:00 2001 From: Hauke Strasdat Date: Wed, 10 Apr 2024 22:24:54 -0500 Subject: [PATCH] feat: preludes --- Cargo.toml | 26 ++-- examples/egui.rs | 50 ++++---- examples/moving_average.rs | 9 +- examples/nudge.rs | 5 +- examples/one_dim_robot.rs | 6 +- examples/print_ticks.rs | 7 +- examples/zip.rs | 5 +- hollywood_macros/Cargo.toml | 9 +- hollywood_macros/src/actors/zip.rs | 26 ++-- hollywood_macros/src/core.rs | 16 +-- src/actors/egui.rs | 25 ++-- src/actors/nudge.rs | 34 ++--- src/actors/periodic.rs | 38 ++---- src/actors/printer.rs | 22 +--- src/actors/zip.rs | 18 +-- src/compute.rs | 5 - src/compute/context.rs | 28 ++--- src/compute/pipeline.rs | 23 ++-- src/compute/topology.rs | 14 +-- src/core.rs | 22 ---- src/core/actor.rs | 75 +++++------ src/core/actor_builder.rs | 32 ++--- src/core/connection.rs | 16 ++- src/core/connection/outbound_connection.rs | 27 ++-- src/core/connection/request_connection.rs | 17 +-- src/core/inbound.rs | 73 ++++++----- src/core/outbound.rs | 52 ++++---- src/core/request.rs | 28 ++--- src/core/runner.rs | 33 +++-- src/example_actors/moving_average.rs | 26 +--- src/example_actors/one_dim_robot/draw.rs | 13 +- src/example_actors/one_dim_robot/filter.rs | 29 ++--- src/example_actors/one_dim_robot/sim.rs | 35 ++---- src/introspect/flow_graph.rs | 10 +- src/lib.rs | 138 +++++++++++++++------ 35 files changed, 433 insertions(+), 559 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index fc65163..786d0a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,26 +11,28 @@ keywords = ["actor", "compute", "graph", "pipeline"] license = "Apache-2.0" readme = "README.md" repository = "https://github.com/farm-ng/hollywood/" -version = "0.5.0" +version = "0.5.1" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-trait = "0.1.51" -drawille = "0.3.0" -eframe = {version = "0.26", features = ["wgpu"], optional = true} -env_logger = {version = "0.11.1", optional = true} -grid = "0.13.0" -hollywood_macros = {version = "0.5.0", path = "hollywood_macros"} +async-trait = "0.1" +drawille = "0.3" +# hollywood intends to use only basic features of egui, hence +# future versions of egui will likely/hopefully work +eframe = {version = ">= 0.27, <1.0", features = ["wgpu"], optional = true} +env_logger = {version = "0.11", optional = true} +grid = "0.13" +hollywood_macros = {version = "0.5", path = "hollywood_macros"} # hollywood intends to use only very basic features of nalgebra, hence # future versions of nalgebra before the major < 1.0 release are likely to work nalgebra = ">= 0.32, <1.0" -petgraph = "0.6.3" -rand = "0.8.4" -rand_distr = "0.4.3" +petgraph = "0.6" +rand = "0.8" +rand_distr = "0.4" # executor feature needed -tokio = {version = "1.28.0", features = ["full"]} -tokio-stream = "0.1.14" +tokio = {version = "1.28", features = ["full"]} +tokio-stream = "0.1" [features] default = ["egui"] diff --git a/examples/egui.rs b/examples/egui.rs index 85687f7..c6604cc 100644 --- a/examples/egui.rs +++ b/examples/egui.rs @@ -4,18 +4,17 @@ use hollywood::actors::egui::EguiActor; use hollywood::actors::egui::EguiAppFromBuilder; use hollywood::actors::egui::GenericEguiBuilder; use hollywood::actors::egui::Stream; -use hollywood::compute::Context; -use hollywood::core::request::RequestMessage; -use hollywood::core::*; -use hollywood_macros::*; +use hollywood::prelude::*; #[derive(Clone, Debug)] -#[actor_inputs(ContentGeneratorInbound,{NullProp, ContentGeneratorState, ContentGeneratorOutbound, NullRequest})] +#[actor_inputs( + ContentGeneratorInbound, + {NullProp, ContentGeneratorState, ContentGeneratorOutbound, NullRequest})] pub enum ContentGeneratorInboundMessage { Tick(f64), } -impl OnMessage for ContentGeneratorInboundMessage { +impl HasOnMessage for ContentGeneratorInboundMessage { /// Process the inbound tick message. fn on_message( self, @@ -26,11 +25,8 @@ impl OnMessage for ContentGeneratorInboundMessage { ) { match &self { ContentGeneratorInboundMessage::Tick(new_value) => { - match state.reset_request.try_recv() { - Ok(_) => { - state.offset = -*new_value; - } - Err(_) => {} + if state.reset_request.try_recv().is_ok() { + state.offset = -*new_value; } let x = *new_value + state.offset; @@ -48,7 +44,7 @@ impl OnMessage for ContentGeneratorInboundMessage { } } -impl InboundMessageNew for ContentGeneratorInboundMessage { +impl IsInboundMessageNew for ContentGeneratorInboundMessage { fn new(_inbound_name: String, msg: f64) -> Self { ContentGeneratorInboundMessage::Tick(msg) } @@ -121,22 +117,18 @@ impl EguiAppFromBuilder for EguiAppExample { type State = String; } use eframe::egui; +use hollywood::RequestMessage; impl eframe::App for EguiAppExample { fn update(&mut self, ctx: &egui::Context, _frame: &mut eframe::Frame) { - loop { - match self.message_recv.try_recv() { - Ok(value) => match value.msg { - PlotMessage::SinPlot((x, y)) => { - self.x = x; - self.sin_value = y; - } - PlotMessage::CosPlot((x, y)) => { - self.x = x; - self.cos_value = y; - } - }, - Err(_) => { - break; + while let Ok(value) = self.message_recv.try_recv() { + match value.msg { + PlotMessage::SinPlot((x, y)) => { + self.x = x; + self.sin_value = y; + } + PlotMessage::CosPlot((x, y)) => { + self.x = x; + self.cos_value = y; } } } @@ -169,7 +161,7 @@ pub async fn run_viewer_example() { }); // Pipeline configuration - let pipeline = hollywood::compute::Context::configure(&mut |context| { + let pipeline = Hollywood::configure(&mut |context| { // Actor creation: // 1. Periodic timer to drive the simulation let mut timer = hollywood::actors::Periodic::new_with_period(context, 0.1); @@ -198,7 +190,9 @@ pub async fn run_viewer_example() { }); // The cancel_requester is used to cancel the pipeline. - builder.cancel_request_sender = pipeline.cancel_request_sender_template.clone(); + builder + .cancel_request_sender + .clone_from(&pipeline.cancel_request_sender_template); // Plot the pipeline graph to the console. pipeline.print_flow_graph(); diff --git a/examples/moving_average.rs b/examples/moving_average.rs index 04569e7..3f6be88 100644 --- a/examples/moving_average.rs +++ b/examples/moving_average.rs @@ -1,17 +1,14 @@ use hollywood::actors::printer::PrinterProp; use hollywood::actors::Periodic; use hollywood::actors::Printer; -use hollywood::compute::Context; -use hollywood::core::FromPropState; -use hollywood::core::NullState; - use hollywood::example_actors::moving_average::MovingAverage; use hollywood::example_actors::moving_average::MovingAverageProp; use hollywood::example_actors::moving_average::MovingAverageState; +use hollywood::prelude::*; -/// +/// Run the moving average example pub async fn run_moving_average_example() { - let pipeline = Context::configure(&mut |context| { + let pipeline = Hollywood::configure(&mut |context| { let mut timer = Periodic::new_with_period(context, 1.0); let mut moving_average = MovingAverage::from_prop_and_state( context, diff --git a/examples/nudge.rs b/examples/nudge.rs index 2a7c611..646f9a7 100644 --- a/examples/nudge.rs +++ b/examples/nudge.rs @@ -1,11 +1,10 @@ use hollywood::actors::printer::PrinterProp; use hollywood::actors::Nudge; use hollywood::actors::Printer; -use hollywood::compute::Context; -use hollywood::core::*; +use hollywood::prelude::*; pub async fn run_tick_print_example() { - let pipeline = Context::configure(&mut |context| { + let pipeline = Hollywood::configure(&mut |context| { let mut nudge = Nudge::::new(context, "nudge".to_owned()); let mut nudge_printer = Printer::::from_prop_and_state( context, diff --git a/examples/one_dim_robot.rs b/examples/one_dim_robot.rs index 7c82798..e1eb66d 100644 --- a/examples/one_dim_robot.rs +++ b/examples/one_dim_robot.rs @@ -4,9 +4,6 @@ use hollywood::actors::zip::ZipPair; use hollywood::actors::Periodic; use hollywood::actors::Printer; use hollywood::actors::Zip3; -use hollywood::compute::Context; -use hollywood::core::*; - use hollywood::example_actors::one_dim_robot::draw::DrawState; use hollywood::example_actors::one_dim_robot::filter::FilterState; use hollywood::example_actors::one_dim_robot::DrawActor; @@ -16,9 +13,10 @@ use hollywood::example_actors::one_dim_robot::Robot; use hollywood::example_actors::one_dim_robot::Sim; use hollywood::example_actors::one_dim_robot::SimState; use hollywood::example_actors::one_dim_robot::Stamped; +use hollywood::prelude::*; async fn run_robot_example() { - let pipeline = Context::configure(&mut |context| { + let pipeline = Hollywood::configure(&mut |context| { let mut timer = Periodic::new_with_period(context, 0.1); let mut sim = Sim::from_prop_and_state( context, diff --git a/examples/print_ticks.rs b/examples/print_ticks.rs index 9fa644b..ca069ee 100644 --- a/examples/print_ticks.rs +++ b/examples/print_ticks.rs @@ -1,12 +1,11 @@ use hollywood::actors::printer::PrinterProp; use hollywood::actors::Periodic; use hollywood::actors::Printer; -use hollywood::compute::Context; -use hollywood::core::*; +use hollywood::prelude::*; -/// +/// Run the tick print example pub async fn run_tick_print_example() { - let pipeline = Context::configure(&mut |context| { + let pipeline = Hollywood::configure(&mut |context| { let mut timer = Periodic::new_with_period(context, 1.0); let mut time_printer = Printer::::from_prop_and_state( context, diff --git a/examples/zip.rs b/examples/zip.rs index 8d45e1a..e056d1d 100644 --- a/examples/zip.rs +++ b/examples/zip.rs @@ -5,11 +5,10 @@ use hollywood::actors::zip::Zip2State; use hollywood::actors::zip::ZipPair; use hollywood::actors::Printer; use hollywood::actors::Zip2; -use hollywood::compute::Context; -use hollywood::core::*; +use hollywood::prelude::*; pub async fn run_tick_print_example() { - let pipeline = Context::configure(&mut |context| { + let pipeline = Hollywood::configure(&mut |context| { let mut periodic = periodic::Periodic::new_with_period(context, 1.0); let mut zip = Zip2::::from_prop_and_state( diff --git a/hollywood_macros/Cargo.toml b/hollywood_macros/Cargo.toml index f683de4..865b5b4 100644 --- a/hollywood_macros/Cargo.toml +++ b/hollywood_macros/Cargo.toml @@ -11,14 +11,13 @@ keywords = ["actor", "compute", "graph", "pipeline"] license = "Apache-2.0" readme = "../README.md" repository = "https://github.com/farm-ng/hollywood/tree/main/hollywood_macros" -version = "0.5.0" +version = "0.5.1" -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [lib] proc-macro = true [dependencies] -convert_case = "0.6.0" +convert_case = "0.6" proc-macro2 = "1.0" -quote = "1.0.9" -syn = {version = "2.0.18", features = ["full"]} +quote = "1.0" +syn = {version = "2.0", features = ["full"]} diff --git a/hollywood_macros/src/actors/zip.rs b/hollywood_macros/src/actors/zip.rs index 0ca4318..322df76 100644 --- a/hollywood_macros/src/actors/zip.rs +++ b/hollywood_macros/src/actors/zip.rs @@ -118,7 +118,7 @@ pub(crate) fn zip_outbound_n_impl(input: TokenStream) -> TokenStream { } impl - Activate for #outbound_struct + HasActivate for #outbound_struct { fn extract(&mut self) -> Self { Self { @@ -132,9 +132,9 @@ pub(crate) fn zip_outbound_n_impl(input: TokenStream) -> TokenStream { } impl - OutboundHub for #outbound_struct + IsOutboundHub for #outbound_struct { - fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { + fn from_context_and_parent(context: &mut Hollywood, actor_name: &str) -> Self { Self { zipped: OutboundChannel::<#tuple_struct>::new( context, @@ -189,7 +189,7 @@ pub(crate) fn zip_inbound_message_n_impl(input: TokenStream) -> TokenStream { Err(err) => return TokenStream::from(err.to_compile_error()), }; - let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + let inbound_message_enum = format_ident!("Zip{}IsInboundMessage", num_fields); let state_struct = format_ident!("Zip{}State", num_fields); let outbound_struct = format_ident!("Zip{}Outbound", num_fields); @@ -232,7 +232,7 @@ pub(crate) fn zip_inbound_message_n_impl(input: TokenStream) -> TokenStream { + Send + 'static, #( #type_with_bounds_seq ),* - > InboundMessageNew> + > IsInboundMessageNew> for #inbound_message_enum { fn new(_inbound_name: String, msg: ZipPair<#i, Key, #item>) -> Self { @@ -262,7 +262,7 @@ pub(crate) fn zip_inbound_message_n_impl(input: TokenStream) -> TokenStream { Key: Default + Clone + Debug + PartialEq + Eq + PartialOrd + Ord + Sync + Send + 'static, #( #type_with_bounds_seq ),* - > InboundMessage for #inbound_message_enum + > IsInboundMessage for #inbound_message_enum { type Prop = NullProp; type State = #state_struct; @@ -290,7 +290,7 @@ pub(crate) fn zip_n_impl(input: TokenStream) -> TokenStream { let state_struct = format_ident!("Zip{}State", num_fields); let inbound_struct = format_ident!("Zip{}Inbound", num_fields); let outbound_struct = format_ident!("Zip{}Outbound", num_fields); - let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + let inbound_message_enum = format_ident!("Zip{}IsInboundMessage", num_fields); let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); let type_seq2 = type_seq.clone(); @@ -329,7 +329,7 @@ pub(crate) fn zip_n_impl(input: TokenStream) -> TokenStream { + Sync + Send + 'static, #( #type_with_bounds_seq ),* > - FromPropState< + HasFromPropState< NullProp, #inbound_struct, #state_struct, @@ -364,7 +364,7 @@ pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream { let inbound_struct = format_ident!("Zip{}Inbound", num_fields); let state_struct = format_ident!("Zip{}State", num_fields); let outbound_struct = format_ident!("Zip{}Outbound", num_fields); - let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + let inbound_message_enum = format_ident!("Zip{}IsInboundMessage", num_fields); let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); let type_seq4 = type_seq.clone(); @@ -422,7 +422,7 @@ pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream { + Sync + Send + 'static, #( #type_with_bounds_seq ),* > - InboundHub< + IsInboundHub< NullProp, #state_struct, #outbound_struct, @@ -431,7 +431,7 @@ pub(crate) fn zip_inbound_n_impl(input: TokenStream) -> TokenStream { > for #inbound_struct { fn from_builder( - builder: &mut crate::core::ActorBuilder< + builder: &mut ActorBuilder< NullProp, #state_struct, #outbound_struct, @@ -467,7 +467,7 @@ pub(crate) fn zip_onmessage_n_impl(input: TokenStream) -> TokenStream { }; let tuple_struct = format_ident!("Tuple{}", num_fields); - let inbound_message_enum = format_ident!("Zip{}InboundMessage", num_fields); + let inbound_message_enum = format_ident!("Zip{}IsInboundMessage", num_fields); let type_seq = (0..num_fields).map(|i| format_ident!("Item{}", i)); @@ -521,7 +521,7 @@ pub(crate) fn zip_onmessage_n_impl(input: TokenStream) -> TokenStream { let expanded = quote! { impl OnMessage for #inbound_message_enum + #( #type_with_bounds_seq ),*> HasOnMessage for #inbound_message_enum { fn on_message( self, diff --git a/hollywood_macros/src/core.rs b/hollywood_macros/src/core.rs index ca049c0..e7cf4f5 100644 --- a/hollywood_macros/src/core.rs +++ b/hollywood_macros/src/core.rs @@ -67,15 +67,15 @@ pub(crate) fn actor_outputs_impl(_attr: TokenStream, item: TokenStream) -> Token }); let gen = quote! { - impl #impl_generics OutboundHub for #struct_name #ty_generics #where_clause { - fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { + impl #impl_generics IsOutboundHub for #struct_name #ty_generics #where_clause { + fn from_context_and_parent(context: &mut Hollywood, actor_name: &str) -> Self { Self { #(#output_assignments),* } } } - impl #impl_generics Activate for #struct_name #ty_generics #where_clause { + impl #impl_generics HasActivate for #struct_name #ty_generics #where_clause { fn extract(&mut self) -> Self { Self { #(#output_extract),* @@ -161,7 +161,7 @@ pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> Toke let m_type = is_request_type(&field0.ty).unwrap()[2]; let gen = quote! { - impl #impl_generics RequestHub<#m_type> for #struct_name #ty_generics #where_clause { + impl #impl_generics IsRequestHub<#m_type> for #struct_name #ty_generics #where_clause { fn from_parent_and_sender( actor_name: &str, sender: &tokio::sync::mpsc::Sender<#m_type> ) -> Self { @@ -171,7 +171,7 @@ pub(crate) fn actor_requests_impl(_attr: TokenStream, item: TokenStream) -> Toke } } - impl #impl_generics Activate for #struct_name #ty_generics #where_clause { + impl #impl_generics HasActivate for #struct_name #ty_generics #where_clause { fn extract(&mut self) -> Self { Self { #(#request_extract),* @@ -324,7 +324,7 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream #(#inbound),* } - impl #impl_generics InboundMessage for #name #ty_generics #where_clause { + impl #impl_generics IsInboundMessage for #name #ty_generics #where_clause { type Prop = #prop_type; type State = #state_type; type OutboundHub = #output_type; @@ -337,7 +337,7 @@ pub fn actor_inputs_impl(args: TokenStream, inbound: TokenStream) -> TokenStream } } - impl #impl_generics InboundHub< + impl #impl_generics IsInboundHub< #prop_type, #state_type, #output_type, @@ -493,7 +493,7 @@ pub fn actor_impl(attr: TokenStream, item: TokenStream) -> TokenStream { #( #attrs )* pub type #actor_name = Actor<#prop, #inbound, #state_type, #out, #requests>; - impl FromPropState< + impl HasFromPropState< #prop, #inbound, #state_type, #out, #message_type, #requests, #runner_type > for #actor_name { diff --git a/src/actors/egui.rs b/src/actors/egui.rs index 2b00fbc..ad802b3 100644 --- a/src/actors/egui.rs +++ b/src/actors/egui.rs @@ -1,11 +1,9 @@ -use crate::compute::CancelRequest; -use crate::compute::Context; -use crate::core::*; +use crate::core::request::IsRequestMessage; +use crate::prelude::*; +use crate::CancelRequest; +use crate::RequestMessage; use std::fmt::Debug; -use self::request::IsRequestMessage; -use self::request::RequestMessage; - /// The inbound message for the egui actor. #[derive(Clone, Debug, Default)] pub struct EguiState @@ -40,7 +38,7 @@ impl< T: Default + Debug + Clone + Send + Sync + 'static, Request: Default + Debug + Clone + Send + Sync + 'static, Reply: Default + Debug + Clone + Send + Sync + 'static, - > InboundMessageNew> for EguiInboundMessage + > IsInboundMessageNew> for EguiInboundMessage { fn new(_inbound_name: String, p: Stream) -> Self { EguiInboundMessage::::Stream(p) @@ -51,7 +49,8 @@ impl< T: Default + Debug + Clone + Send + Sync + 'static, Request: Default + Debug + Clone + Send + Sync + 'static, Reply: Default + Debug + Clone + Send + Sync + 'static, - > InboundMessageNew> for EguiInboundMessage + > IsInboundMessageNew> + for EguiInboundMessage { fn new(_inbound_name: String, p: RequestMessage) -> Self { EguiInboundMessage::::Request(p) @@ -62,7 +61,7 @@ impl< T: Default + Debug + Clone + Send + Sync + 'static, Request: Default + Debug + Clone + Send + Sync + 'static, Reply: Default + Debug + Clone + Send + Sync + 'static, - > InboundMessage for EguiInboundMessage + > IsInboundMessage for EguiInboundMessage { type Prop = NullProp; @@ -95,7 +94,7 @@ impl< Request: Default + Debug + Clone + Send + Sync + 'static, Reply: Default + Debug + Clone + Send + Sync + 'static, > - InboundHub< + IsInboundHub< NullProp, EguiState>, NullOutbound, @@ -141,7 +140,7 @@ impl< T: Default + Debug + Clone + Send + Sync + 'static, Request: Default + Debug + Clone + Send + Sync + 'static, Reply: Default + Debug + Clone + Send + Sync + 'static, - > OnMessage for EguiInboundMessage + > HasOnMessage for EguiInboundMessage { /// Forward the message to the egui app. fn on_message( @@ -182,7 +181,7 @@ impl< Request: Default + Debug + Clone + Send + Sync + 'static, Reply: Default + Debug + Clone + Send + Sync + 'static, > - FromPropState< + HasFromPropState< NullProp, ViewerInbound, EguiState>, @@ -211,7 +210,7 @@ impl< { /// Create a new egui actor from the builder. pub fn from_builder>>( - context: &mut Context, + context: &mut Hollywood, builder: &Builder, ) -> Self { Self::from_prop_and_state( diff --git a/src/actors/nudge.rs b/src/actors/nudge.rs index d3e499d..5a4087a 100644 --- a/src/actors/nudge.rs +++ b/src/actors/nudge.rs @@ -1,23 +1,9 @@ -use std::fmt::Debug; - -use async_trait::async_trait; -use hollywood_macros::actor_outputs; - -use crate::compute::context::Context; -use crate::core::connection::ConnectionEnum; -use crate::core::NullState; - -use crate::core::actor::ActorNode; -use crate::core::actor::FromPropState; -use crate::core::actor::GenericActor; -use crate::core::inbound::ForwardMessage; -use crate::core::inbound::NullInbound; -use crate::core::inbound::NullMessage; -use crate::core::outbound::Activate; -use crate::core::outbound::OutboundChannel; -use crate::core::outbound::OutboundHub; -use crate::core::request::NullRequest; use crate::core::runner::Runner; +use crate::prelude::*; +use crate::ConnectionEnum; +use crate::GenericActor; +use async_trait::async_trait; +use std::fmt::Debug; /// Prop for the nudge actor. #[derive(Clone, Debug, Default)] @@ -40,13 +26,13 @@ pub type Nudge = GenericActor< impl Nudge { /// Create a new nudge actor - pub fn new(context: &mut Context, item: Item) -> Nudge { + pub fn new(context: &mut Hollywood, item: Item) -> Nudge { Nudge::from_prop_and_state(context, NudgeProp:: { item }, NullState::default()) } } impl - FromPropState< + HasFromPropState< NudgeProp, NullInbound, NullState, @@ -92,7 +78,7 @@ impl _forward: std::collections::HashMap< String, Box< - dyn ForwardMessage< + dyn HasForwardMessage< NudgeProp, NullState, NudgeOutbound, @@ -104,7 +90,7 @@ impl >, outbound: NudgeOutbound, _request: NullRequest, - ) -> Box { + ) -> Box { Box::new(NudgeActor:: { name: name.clone(), prop, @@ -125,7 +111,7 @@ pub struct NudgeActor ActorNode +impl IsActorNode for NudgeActor { fn name(&self) -> &String { diff --git a/src/actors/periodic.rs b/src/actors/periodic.rs index 8a76c40..e896299 100644 --- a/src/actors/periodic.rs +++ b/src/actors/periodic.rs @@ -1,21 +1,9 @@ -use std::sync::Arc; - -use async_trait::async_trait; - -use crate::compute::context::Context; use crate::core::connection::ConnectionEnum; - -use crate::core::actor::ActorNode; -use crate::core::actor::FromPropState; -use crate::core::actor::GenericActor; -use crate::core::inbound::ForwardMessage; -use crate::core::inbound::NullInbound; -use crate::core::inbound::NullMessage; -use crate::core::outbound::Activate; -use crate::core::outbound::OutboundChannel; -use crate::core::outbound::OutboundHub; -use crate::core::request::NullRequest; -use crate::core::runner::Runner; +use crate::prelude::*; +use crate::GenericActor; +use crate::Runner; +use async_trait::async_trait; +use std::sync::Arc; /// A periodic actor. /// @@ -31,7 +19,7 @@ pub type Periodic = GenericActor< impl Periodic { /// Create a new periodic actor, with a period of `period` seconds. - pub fn new_with_period(context: &mut Context, period: f64) -> Periodic { + pub fn new_with_period(context: &mut Hollywood, period: f64) -> Periodic { Periodic::from_prop_and_state( context, PeriodicProp { @@ -47,7 +35,7 @@ impl Periodic { } impl - FromPropState< + HasFromPropState< PeriodicProp, NullInbound, PeriodicState, @@ -101,7 +89,7 @@ pub struct PeriodicOutbound { pub time_stamp: OutboundChannel, } -impl Activate for PeriodicOutbound { +impl HasActivate for PeriodicOutbound { fn extract(&mut self) -> Self { Self { time_stamp: self.time_stamp.extract(), @@ -113,8 +101,8 @@ impl Activate for PeriodicOutbound { } } -impl OutboundHub for PeriodicOutbound { - fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self { +impl IsOutboundHub for PeriodicOutbound { + fn from_context_and_parent(context: &mut Hollywood, actor_name: &str) -> Self { Self { time_stamp: OutboundChannel::::new(context, "time_stamp".to_owned(), actor_name), } @@ -145,7 +133,7 @@ impl _forward: std::collections::HashMap< String, Box< - dyn ForwardMessage< + dyn HasForwardMessage< PeriodicProp, PeriodicState, PeriodicOutbound, @@ -157,7 +145,7 @@ impl >, outbound: PeriodicOutbound, _request: NullRequest, - ) -> Box { + ) -> Box { Box::new(PeriodicActor { name: name.clone(), prop, @@ -178,7 +166,7 @@ pub struct PeriodicActor { } #[async_trait] -impl ActorNode for PeriodicActor { +impl IsActorNode for PeriodicActor { fn name(&self) -> &String { &self.name } diff --git a/src/actors/printer.rs b/src/actors/printer.rs index 859bc01..d6856c7 100644 --- a/src/actors/printer.rs +++ b/src/actors/printer.rs @@ -1,21 +1,7 @@ +use crate::prelude::*; use std::fmt::Debug; use std::fmt::Display; -use hollywood_macros::actor_inputs; - -use crate::core::request::NullRequest; -use crate::core::Actor; -use crate::core::ActorBuilder; -use crate::core::DefaultRunner; -use crate::core::FromPropState; -use crate::core::InboundChannel; -use crate::core::InboundHub; -use crate::core::InboundMessage; -use crate::core::InboundMessageNew; -use crate::core::NullOutbound; -use crate::core::NullState; -use crate::core::OnMessage; - /// Configuration properties for the printer actor. #[derive(Clone, Debug)] pub struct PrinterProp { @@ -39,7 +25,7 @@ pub enum PrinterInboundMessage OnMessage +impl HasOnMessage for PrinterInboundMessage { fn on_message( @@ -57,7 +43,7 @@ impl OnMessage } } -impl InboundMessageNew +impl IsInboundMessageNew for PrinterInboundMessage { fn new(_inbound_name: String, msg: T) -> Self { @@ -69,7 +55,7 @@ impl InboundMessag pub type Printer = Actor, NullState, NullOutbound, NullRequest>; impl - FromPropState< + HasFromPropState< PrinterProp, PrinterInbound, NullState, diff --git a/src/actors/zip.rs b/src/actors/zip.rs index 4931646..d41af12 100644 --- a/src/actors/zip.rs +++ b/src/actors/zip.rs @@ -1,24 +1,8 @@ +use crate::prelude::*; use std::cmp::Reverse; use std::fmt::Debug; use std::fmt::Display; -use hollywood_macros::zip_n; - -use crate::compute::Context; -use crate::core::request::NullRequest; -use crate::core::Activate; -use crate::core::Actor; -use crate::core::DefaultRunner; -use crate::core::FromPropState; -use crate::core::InboundChannel; -use crate::core::InboundHub; -use crate::core::InboundMessage; -use crate::core::InboundMessageNew; -use crate::core::NullProp; -use crate::core::OnMessage; -use crate::core::OutboundChannel; -use crate::core::OutboundHub; - /// Type of the Xth inbound channel for the zip actor. #[derive(Clone, Debug, Default)] pub struct ZipPair { diff --git a/src/compute.rs b/src/compute.rs index 9df60cc..c4b9998 100644 --- a/src/compute.rs +++ b/src/compute.rs @@ -1,13 +1,8 @@ /// The compute context. pub mod context; -pub use context::Context; /// The compute graph of actors. pub mod pipeline; -pub(crate) use pipeline::CancelRequest; -pub use pipeline::Pipeline; /// The graph topology. pub mod topology; -pub(crate) use topology::ActorNode; -pub(crate) use topology::Topology; diff --git a/src/compute/context.rs b/src/compute/context.rs index 968ad4b..1a52553 100644 --- a/src/compute/context.rs +++ b/src/compute/context.rs @@ -1,32 +1,28 @@ +use crate::compute::topology::Topology; +use crate::core::outbound::OutboundConnection; +use crate::prelude::*; +use crate::CancelRequest; +use crate::Pipeline; use std::marker::PhantomData; use std::sync::Arc; -use crate::compute::CancelRequest; -use crate::compute::Pipeline; -use crate::compute::Topology; -use crate::core::ActorNode; -use crate::core::InboundChannel; -use crate::core::InboundMessage; -use crate::core::OutboundChannel; -use crate::core::OutboundConnection; - /// The context of the compute graph which is used to configure the network topology. /// -/// It is an opaque type created by the Context::configure() method. -pub struct Context { - pub(crate) actors: Vec>, +/// It is an opaque type created by the Hollywood::configure() method. +pub struct Hollywood { + pub(crate) actors: Vec>, pub(crate) topology: Topology, pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender, pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver, } -impl Context { +impl Hollywood { /// Create a new context. /// /// This is the main entry point to configure the compute graph. The network topology is defined /// by the user within the callback function. - pub fn configure(callback: &mut dyn FnMut(&mut Context)) -> Pipeline { - let mut context = Context::new(); + pub fn configure(callback: &mut dyn FnMut(&mut Hollywood)) -> Pipeline { + let mut context = Hollywood::new(); callback(&mut context); Pipeline::from_context(context) } @@ -89,7 +85,7 @@ impl Context { pub(crate) fn connect_impl< T0: Clone + std::fmt::Debug + Sync + Send + 'static, T1: Clone + std::fmt::Debug + Sync + Send + 'static, - M: InboundMessage, + M: IsInboundMessage, >( &mut self, outbound: &mut OutboundChannel, diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs index 4ae87f3..924ed91 100644 --- a/src/compute/pipeline.rs +++ b/src/compute/pipeline.rs @@ -1,14 +1,7 @@ +use crate::compute::topology::Topology; +use crate::prelude::*; use std::mem::swap; -use crate::compute::Context; -use crate::compute::Topology; -use crate::core::ActorNode; -use crate::core::InboundMessage; -use crate::core::InboundMessageNew; -use crate::core::NullOutbound; -use crate::core::NullProp; -use crate::core::NullState; - /// Message to request stop execution of the compute pipeline. #[derive(Debug, Clone)] pub enum CancelRequest { @@ -22,7 +15,7 @@ impl CancelRequest { pub const CANCEL_REQUEST_INBOUND_CHANNEL: &'static str = "CANCEL"; } -impl InboundMessage for CancelRequest { +impl IsInboundMessage for CancelRequest { type Prop = NullProp; type State = NullState; type OutboundHub = NullOutbound; @@ -35,16 +28,16 @@ impl InboundMessage for CancelRequest { } } -impl InboundMessageNew<()> for CancelRequest { +impl IsInboundMessageNew<()> for CancelRequest { fn new(_inbound_name: String, _: ()) -> Self { CancelRequest::Cancel(()) } } /// Compute pipeline, strictly speaking a DAG (directed acyclic graph) of actors. It is created by -/// the [Context::configure()] method. +/// the [Hollywood::configure()] method. pub struct Pipeline { - actors: Vec>, + actors: Vec>, topology: Topology, /// We have this here to keep receiver alive pub cancel_request_sender_template: Option>, @@ -52,7 +45,7 @@ pub struct Pipeline { } impl Pipeline { - pub(crate) fn from_context(context: Context) -> Self { + pub(crate) fn from_context(context: Hollywood) -> Self { let mut active = vec![]; for actor in context.actors.into_iter() { active.push(actor); @@ -73,7 +66,7 @@ impl Pipeline { /// an async function) that resolves to the pipeline itself. The future is completed when all /// actors have completed their execution. /// - /// In particular, [ActorNode::run()] is called for each actor in the pipeline in a dedicated + /// In particular, [IsActorNode::run()] is called for each actor in the pipeline in a dedicated /// tokio task. Hence, the actors run concurrently. /// /// TODO: diff --git a/src/compute/topology.rs b/src/compute/topology.rs index 506f72f..7be8d93 100644 --- a/src/compute/topology.rs +++ b/src/compute/topology.rs @@ -1,11 +1,7 @@ -use std::collections::BTreeSet; - -use petgraph::stable_graph::StableDiGraph; - -use crate::core::InboundChannel; -use crate::core::InboundMessage; -use crate::core::OutboundChannel; use crate::introspect::flow_graph::FlowGraph; +use crate::prelude::*; +use petgraph::stable_graph::StableDiGraph; +use std::collections::BTreeSet; // A node in a compute graph. #[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord)] @@ -111,7 +107,7 @@ impl Topology { } count += 1; } - self.graph[node_idx].name = unique_name.clone(); + self.graph[node_idx].name.clone_from(&unique_name.clone()); unique_name } @@ -151,7 +147,7 @@ impl Topology { pub(crate) fn connect< T0: Clone + std::fmt::Debug + Sync + Send + 'static, T1: Clone + std::fmt::Debug + Sync + Send + 'static, - M: InboundMessage, + M: IsInboundMessage, >( &mut self, outbound: &mut OutboundChannel, diff --git a/src/core.rs b/src/core.rs index df37d1b..6b8a2fa 100644 --- a/src/core.rs +++ b/src/core.rs @@ -2,46 +2,24 @@ /// Actor pub mod actor; -pub use actor::Actor; -pub(crate) use actor::ActorNode; -pub use actor::FromPropState; /// Actor builder pub mod actor_builder; -pub use actor_builder::ActorBuilder; /// Inbound pub mod inbound; -pub use inbound::InboundChannel; -pub use inbound::InboundHub; -pub use inbound::InboundMessage; -pub use inbound::InboundMessageNew; -pub use inbound::NullInbound; -pub use inbound::NullMessage; -pub use inbound::OnMessage; - /// Outbound pub mod outbound; -pub use outbound::Activate; -pub use outbound::NullOutbound; -pub use outbound::OutboundChannel; -pub(crate) use outbound::OutboundConnection; -pub use outbound::OutboundHub; /// Request pub mod request; -pub use request::NullRequest; -pub use request::RequestHub; /// Connection pub mod connection; /// Run pub mod runner; -pub use runner::DefaultRunner; /// State pub mod value; -pub use value::NullProp; -pub use value::NullState; diff --git a/src/core/actor.rs b/src/core/actor.rs index e2813eb..7537fc1 100644 --- a/src/core/actor.rs +++ b/src/core/actor.rs @@ -1,26 +1,17 @@ +use crate::core::runner::Runner; +use crate::prelude::*; use async_trait::async_trait; use std::collections::HashMap; use tokio::select; -use crate::compute::context::Context; -use crate::core::actor_builder::ActorBuilder; -use crate::core::inbound::ForwardMessage; -use crate::core::inbound::InboundHub; -use crate::core::inbound::InboundMessage; -use crate::core::outbound::OutboundHub; -use crate::core::runner::DefaultRunner; -use crate::core::runner::Runner; - -use super::request::RequestHub; - /// A generic actor in the hollywood compute graph framework. /// /// An actor consists of its unique name, a set of inbound channels, a set of /// outbound channels as well as its properties, state and runner types. /// /// The generic actor struct is merely a user-facing facade to configure network connections. Actual -/// properties, state and inbound routing is stored in the [ActorNode] structs. -pub struct GenericActor { +/// properties, state and inbound routing is stored in the [IsActorNode] structs. +pub struct GenericActor { /// unique identifier of the actor pub actor_name: String, /// a collection of inbound channels @@ -34,23 +25,23 @@ pub struct GenericActor = GenericActor< +pub type Actor = GenericActor< Prop, Inbound, State, - OutboundHub, + IsOutboundHub, Request, - DefaultRunner, + DefaultRunner, >; /// New actor from properties and state. -pub trait FromPropState< +pub trait HasFromPropState< Prop, - Inbound: InboundHub, + Inbound: IsInboundHub, State, - Outbound: OutboundHub, - M: InboundMessage, - Request: RequestHub, + Outbound: IsOutboundHub, + M: IsInboundMessage, + Request: IsRequestHub, Run: Runner, > { @@ -62,7 +53,7 @@ pub trait FromPropState< /// /// Also, a dormant actor node is created added to the context. fn from_prop_and_state( - context: &mut Context, + context: &mut Hollywood, prop: Prop, initial_state: State, ) -> GenericActor { @@ -80,7 +71,7 @@ pub trait FromPropState< /// Actor node of the pipeline. It is created by the [Runner::new_actor_node()] method. #[async_trait] -pub trait ActorNode { +pub trait IsActorNode { /// Return the actor's name. fn name(&self) -> &String; @@ -89,12 +80,10 @@ pub trait ActorNode { /// * For each inbound channel there are zero, one or more incoming connections. Messages on /// these incoming streams are merged into a single stream. /// * Messages for all inbound channels are processed sequentially using the - /// [OnMessage::on_message()](crate::core::OnMessage::on_message()) method. Sequential - /// processing is crucial to ensure that the actor's state is updated in a consistent - /// manner. Sequential mutable access to the state is enforced by the borrow checker at - /// compile time. - /// * Outbound messages are produced by - /// [OnMessage::on_message()](crate::core::OnMessage::on_message()) the method and sent to + /// [HasOnMessage::on_message()] method. Sequential processing is crucial to ensure that + /// the actor's state is updated in a consistent manner. Sequential mutable access to the + /// state is enforced by the borrow checker at compile time. + /// * Outbound messages are produced by [HasOnMessage::on_message()] the method and sent to /// the through the corresponding outbound channel to downstream actors. /// /// Note: It is an async function which returns a future a completion handler. This method is @@ -103,21 +92,23 @@ pub trait ActorNode { } /// A table to forward outbound messages to message handlers of downstream actors. -pub type ForwardTable = - HashMap + Send + Sync>>; +pub type ForwardTable = HashMap< + String, + Box + Send + Sync>, +>; -pub(crate) struct ActorNodeImpl { +pub(crate) struct IsActorNodeImpl { pub(crate) name: String, pub(crate) prop: Prop, pub(crate) state: Option, pub(crate) receiver: Option>, - pub(crate) outbound: OutboundHub, + pub(crate) outbound: IsOutboundHub, pub(crate) request: Request, - pub(crate) forward: ForwardTable, + pub(crate) forward: ForwardTable, } -impl - ActorNodeImpl +impl + IsActorNodeImpl { } @@ -125,10 +116,10 @@ impl impl< Prop: std::marker::Send + std::marker::Sync + 'static, State: std::marker::Send + std::marker::Sync + 'static, - Outbound: OutboundHub, - Request: RequestHub, - M: InboundMessage, - > ActorNode for ActorNodeImpl + Outbound: IsOutboundHub, + Request: IsRequestHub, + M: IsInboundMessage, + > IsActorNode for IsActorNodeImpl { fn name(&self) -> &String { &self.name @@ -156,7 +147,7 @@ impl< } } -pub(crate) struct OnMessageMutValues { +pub(crate) struct OnMessageMutValues { state: State, receiver: tokio::sync::mpsc::Receiver, kill: tokio::sync::broadcast::Receiver<()>, @@ -167,7 +158,7 @@ pub(crate) async fn on_message< State, Outbound: Sync + Send, Request: Sync + Send, - M: InboundMessage, + M: IsInboundMessage, >( _actor_name: String, prop: &Prop, diff --git a/src/core/actor_builder.rs b/src/core/actor_builder.rs index e99a698..1d9faa5 100644 --- a/src/core/actor_builder.rs +++ b/src/core/actor_builder.rs @@ -1,35 +1,37 @@ -use crate::compute::context::Context; -use crate::core::actor::GenericActor; -use crate::core::inbound::InboundHub; -use crate::core::inbound::InboundMessage; -use crate::core::outbound::OutboundHub; use crate::core::runner::Runner; - -use super::actor::ForwardTable; -use super::request::RequestHub; +use crate::prelude::*; +use crate::ForwardTable; +use crate::GenericActor; /// Creates actor from its components. /// -/// Used in [`InboundHub::from_builder`] public interface. -pub struct ActorBuilder<'a, Prop, State, OutboundHub, Request: RequestHub, M: InboundMessage> { +/// Used in [`IsInboundHub::from_builder`] public interface. +pub struct ActorBuilder< + 'a, + Prop, + State, + IsOutboundHub, + Request: IsRequestHub, + M: IsInboundMessage, +> { /// unique identifier of the actor pub actor_name: String, prop: Prop, state: State, /// execution context - pub context: &'a mut Context, + pub context: &'a mut Hollywood, /// a channel for sending messages to the actor pub sender: tokio::sync::mpsc::Sender, pub(crate) receiver: tokio::sync::mpsc::Receiver, /// a collection of inbound channels - pub forward: ForwardTable, + pub forward: ForwardTable, } -impl<'a, Prop, State, Outbound: OutboundHub, Request: RequestHub, M: InboundMessage> +impl<'a, Prop, State, Outbound: IsOutboundHub, Request: IsRequestHub, M: IsInboundMessage> ActorBuilder<'a, Prop, State, Outbound, Request, M> { pub(crate) fn new( - context: &'a mut Context, + context: &'a mut Hollywood, actor_name: &str, prop: Prop, initial_state: State, @@ -48,7 +50,7 @@ impl<'a, Prop, State, Outbound: OutboundHub, Request: RequestHub, M: InboundM } pub(crate) fn build< - Inbound: InboundHub, + Inbound: IsInboundHub, Run: Runner, >( self, diff --git a/src/core/connection.rs b/src/core/connection.rs index 502084d..2f3cb20 100644 --- a/src/core/connection.rs +++ b/src/core/connection.rs @@ -1,13 +1,11 @@ +use crate::core::connection::outbound_connection::ActiveConnection; +use crate::core::connection::outbound_connection::ConnectionConfig; +use crate::core::connection::request_connection::ActiveRequestConnection; +use crate::core::connection::request_connection::GenericRequestConnection; +use crate::core::connection::request_connection::RequestConnectionConfig; +use crate::prelude::*; use std::sync::Arc; -use self::outbound_connection::ActiveConnection; -use self::outbound_connection::ConnectionConfig; -use self::request_connection::ActiveRequestConnection; -use self::request_connection::GenericRequestConnection; -use self::request_connection::RequestConnectionConfig; - -use super::outbound::GenericConnection; - /// Infrastructure to connect an outbound channel of one actor to an inbound channel of another actor. /// /// Note that the implementation is a bit over-engineered and can likely be simplified. @@ -18,7 +16,7 @@ pub mod outbound_connection; /// Note that the implementation is a bit over-engineered and can likely be simplified. pub mod request_connection; -type ConnectionRegister = Vec + Send + Sync>>; +type ConnectionRegister = Vec + Send + Sync>>; /// Connection pub enum ConnectionEnum { diff --git a/src/core/connection/outbound_connection.rs b/src/core/connection/outbound_connection.rs index 72f0f58..18056c3 100644 --- a/src/core/connection/outbound_connection.rs +++ b/src/core/connection/outbound_connection.rs @@ -1,11 +1,8 @@ +use crate::core::connection::ConnectionRegister; +use crate::prelude::*; +use crate::ConnectionEnum; use std::sync::Arc; -use crate::core::outbound::GenericConnection; -use crate::core::Activate; - -use super::ConnectionEnum; -use super::ConnectionRegister; - /// Connection configuration pub struct ConnectionConfig { /// List of connections @@ -28,7 +25,7 @@ impl Drop for ConnectionConfig { } impl ConnectionConfig { - /// + /// Create connection configuration pub fn new() -> Self { let (connection_launch_pad, connection_landing_pad) = tokio::sync::oneshot::channel(); Self { @@ -39,6 +36,12 @@ impl ConnectionConfig { } } +impl Default for ConnectionConfig { + fn default() -> Self { + Self::new() + } +} + /// Active connection pub struct ActiveConnection { /// List of connections @@ -47,6 +50,12 @@ pub struct ActiveConnection { pub maybe_register_landing_pad: Option>>, } +impl Default for ConnectionEnum { + fn default() -> Self { + Self::new() + } +} + impl ConnectionEnum { /// new connection pub fn new() -> Self { @@ -54,7 +63,7 @@ impl ConnectionEnum { } /// push connection - pub fn push(&mut self, connection: Arc + Send + Sync>) { + pub fn push(&mut self, connection: Arc + Send + Sync>) { match self { Self::Config(config) => { config.connection_register.push(connection); @@ -79,7 +88,7 @@ impl ConnectionEnum { } } -impl Activate for ConnectionEnum { +impl HasActivate for ConnectionEnum { fn extract(&mut self) -> Self { match self { Self::Config(config) => Self::Active(ActiveConnection { diff --git a/src/core/connection/request_connection.rs b/src/core/connection/request_connection.rs index a4a0d10..0275ec2 100644 --- a/src/core/connection/request_connection.rs +++ b/src/core/connection/request_connection.rs @@ -1,27 +1,22 @@ +use crate::core::connection::RequestConnectionEnum; +use crate::core::connection::RequestConnectionRegister; +use crate::prelude::*; use std::marker::PhantomData; use std::sync::Arc; - use tokio::sync::mpsc::error::SendError; -use crate::core::Activate; -use crate::core::InboundMessage; -use crate::core::InboundMessageNew; - -use super::RequestConnectionEnum; -use super::RequestConnectionRegister; - pub(crate) trait GenericRequestConnection: Send + Sync { fn send_impl(&self, msg: T); } #[derive(Debug, Clone)] -pub(crate) struct RequestConnection { +pub(crate) struct RequestConnection { pub(crate) sender: tokio::sync::mpsc::Sender, pub(crate) inbound_channel: String, pub(crate) phantom: PhantomData, } -impl> GenericRequestConnection +impl> GenericRequestConnection for RequestConnection { fn send_impl(&self, msg: T) { @@ -107,7 +102,7 @@ impl RequestConnectionEnum { } } -impl Activate for RequestConnectionEnum { +impl HasActivate for RequestConnectionEnum { fn extract(&mut self) -> Self { match self { Self::Config(config) => Self::Active(ActiveRequestConnection { diff --git a/src/core/inbound.rs b/src/core/inbound.rs index 1a12f75..b285984 100644 --- a/src/core/inbound.rs +++ b/src/core/inbound.rs @@ -1,17 +1,12 @@ -use crate::compute::context::Context; -use crate::core::actor_builder::ActorBuilder; -use crate::core::outbound::OutboundHub; - -use super::request::NullRequest; -use super::request::RequestHub; +use crate::prelude::*; /// The inbound hub is a collection of inbound channels. -pub trait InboundHub, M: InboundMessage>: +pub trait IsInboundHub, M: IsInboundMessage>: Send + Sync { /// Create a new inbound hub for an actor. fn from_builder( - builder: &mut ActorBuilder, + builder: &mut ActorBuilder, actor_name: &str, ) -> Self; } @@ -20,11 +15,16 @@ pub trait InboundHub, M: Inboun #[derive(Debug, Clone)] pub struct NullInbound {} -impl> - InboundHub for NullInbound +impl< + Prop, + State, + IsOutboundHub, + NullMessage: IsInboundMessage, + Request: IsRequestHub, + > IsInboundHub for NullInbound { fn from_builder( - _builder: &mut ActorBuilder, + _builder: &mut ActorBuilder, _actor_name: &str, ) -> Self { Self {} @@ -35,7 +35,7 @@ impl { +pub struct InboundChannel { /// Unique identifier of the inbound channel. pub name: String, /// Name of the actor that the inbound messages are for. @@ -44,10 +44,10 @@ pub struct InboundChannel { pub(crate) phantom: std::marker::PhantomData, } -impl InboundChannel { +impl InboundChannel { /// Creates a new inbound channel. pub fn new( - context: &mut Context, + context: &mut Hollywood, actor_name: &str, sender: &tokio::sync::mpsc::Sender, name: String, @@ -63,17 +63,17 @@ impl Inbo } /// Inbound messages to be received by the actor. -pub trait InboundMessage: Send + Sync + Clone + 'static { +pub trait IsInboundMessage: Send + Sync + Clone + 'static { /// Prop type of the receiving actor. type Prop; /// State type of the receiving actor. type State; - /// OutboundHub type of the receiving actor, to produce outbound messages downstream. + /// IsOutboundHub type of the receiving actor, to produce outbound messages downstream. type OutboundHub: Send + Sync + 'static; - /// RequestHub type of the receiving actor, to send requests upstream. + /// IsRequestHub type of the receiving actor, to send requests upstream. type RequestHub: Send + Sync + 'static; /// Name of the inbound channel that this message is for. @@ -81,7 +81,7 @@ pub trait InboundMessage: Send + Sync + Clone + 'static { } /// Customization point for processing inbound messages. -pub trait OnMessage: InboundMessage { +pub trait HasOnMessage: IsInboundMessage { /// Process the inbound message - user code with main business logic goes here. fn on_message( self, @@ -93,22 +93,22 @@ pub trait OnMessage: InboundMessage { } /// Trait for creating inbound messages of compatible types `T`. -pub trait InboundMessageNew: - std::fmt::Debug + Send + Sync + Clone + 'static + InboundMessage +pub trait IsInboundMessageNew: + std::fmt::Debug + Send + Sync + Clone + 'static + IsInboundMessage { /// Create a new inbound message from the inbound channel name and the message value of type `T`. fn new(inbound_channel: String, value: T) -> Self; } /// Message forwarder. -pub trait ForwardMessage { - /// Forward the message to the OnMessage customization point. +pub trait HasForwardMessage { + /// Forward the message to the HasOnMessage customization point. fn forward_message( &self, prop: &Prop, state: &mut State, - outbound: &OutboundHub, - request: &RequestHub, + outbound: &IsOutboundHub, + request: &IsRequestHub, msg: M, ); } @@ -119,8 +119,13 @@ impl< State, OutboundHub, RequestHub, - M: OnMessage, - > ForwardMessage for InboundChannel + M: HasOnMessage< + Prop = Prop, + State = State, + OutboundHub = OutboundHub, + RequestHub = RequestHub, + >, + > HasForwardMessage for InboundChannel { fn forward_message( &self, @@ -136,25 +141,25 @@ impl< /// Null message is a marker type for actors with no inbound channels. #[derive(Debug)] -pub enum NullMessage { +pub enum NullMessage { /// Null message. NullMessage(std::marker::PhantomData<(P, S, O, NullRequest)>), } -impl Default for NullMessage { +impl Default for NullMessage { fn default() -> Self { Self::new() } } -impl NullMessage { +impl NullMessage { /// Creates a new null message. pub fn new() -> Self { NullMessage::NullMessage(std::marker::PhantomData {}) } } -impl Clone for NullMessage { +impl Clone for NullMessage { fn clone(&self) -> Self { Self::new() } @@ -163,8 +168,8 @@ impl Clone for NullMessage InboundMessage for NullMessage + O: IsOutboundHub, + > IsInboundMessage for NullMessage { type Prop = P; type State = S; @@ -179,8 +184,8 @@ impl< impl< P: std::marker::Send + std::marker::Sync + 'static, S: std::marker::Send + std::marker::Sync + 'static, - O: OutboundHub, - > OnMessage for NullMessage + O: IsOutboundHub, + > HasOnMessage for NullMessage { fn on_message( self, diff --git a/src/core/outbound.rs b/src/core/outbound.rs index dd116ff..e8b83c0 100644 --- a/src/core/outbound.rs +++ b/src/core/outbound.rs @@ -1,26 +1,22 @@ -use std::marker::PhantomData; -use std::sync::Arc; -use tokio::sync::mpsc::error::SendError; - use super::connection::ConnectionEnum; -use crate::compute::context::Context; -use crate::core::inbound::InboundChannel; -use crate::core::inbound::InboundMessage; -use crate::core::inbound::InboundMessageNew; +use crate::prelude::*; use std::fmt::Debug; use std::fmt::Formatter; +use std::marker::PhantomData; +use std::sync::Arc; +use tokio::sync::mpsc::error::SendError; -/// OutboundHub is a collection of outbound channels for the actor. -pub trait OutboundHub: Send + Sync + 'static + Activate { - /// Creates the OutboundHub from context and the actor name. - fn from_context_and_parent(context: &mut Context, actor_name: &str) -> Self; +/// IsOutboundHub is a collection of outbound channels for the actor. +pub trait IsOutboundHub: Send + Sync + 'static + HasActivate { + /// Creates the IsOutboundHub from context and the actor name. + fn from_context_and_parent(context: &mut Hollywood, actor_name: &str) -> Self; } /// An empty outbound hub - used for actors that do not have any outbound channels. #[derive(Debug, Clone)] pub struct NullOutbound {} -impl Activate for NullOutbound { +impl HasActivate for NullOutbound { fn extract(&mut self) -> Self { Self {} } @@ -28,8 +24,8 @@ impl Activate for NullOutbound { fn activate(&mut self) {} } -impl OutboundHub for NullOutbound { - fn from_context_and_parent(_context: &mut Context, _actor_name: &str) -> Self { +impl IsOutboundHub for NullOutbound { + fn from_context_and_parent(_context: &mut Hollywood, _actor_name: &str) -> Self { Self {} } } @@ -46,7 +42,7 @@ pub struct OutboundChannel { impl OutboundChannel { /// Create a new outbound for actor in provided context. - pub fn new(context: &mut Context, name: String, actor_name: &str) -> Self { + pub fn new(context: &mut Hollywood, name: String, actor_name: &str) -> Self { context.assert_unique_outbound_name(name.clone(), actor_name); Self { @@ -57,9 +53,9 @@ impl OutboundChannel>( + pub fn connect>( &mut self, - ctx: &mut Context, + ctx: &mut Hollywood, inbound: &mut InboundChannel, ) { ctx.connect_impl(self, inbound); @@ -75,10 +71,10 @@ impl OutboundChannel, + M: IsInboundMessageNew, >( &mut self, - ctx: &mut Context, + ctx: &mut Hollywood, adapter: fn(OutT) -> InT, inbound: &mut InboundChannel, ) { @@ -98,7 +94,7 @@ impl OutboundChannel Self; @@ -106,7 +102,7 @@ pub trait Activate { fn activate(&mut self); } -impl Activate for OutboundChannel { +impl HasActivate for OutboundChannel { fn activate(&mut self) { self.connection_register.activate(); } @@ -121,20 +117,20 @@ impl Activate for OutboundChannel { } #[derive(Clone, Debug)] -pub(crate) struct OutboundConnection { +pub(crate) struct OutboundConnection { pub(crate) sender: tokio::sync::mpsc::Sender, pub(crate) inbound_channel: String, pub(crate) phantom: std::marker::PhantomData, } #[derive(Clone)] -pub(crate) struct OutboundConnectionWithAdapter { +pub(crate) struct OutboundConnectionWithAdapter { pub(crate) sender: tokio::sync::mpsc::Sender, pub(crate) inbound_channel: String, pub(crate) adapter: fn(Out) -> InT, } -impl Debug for OutboundConnectionWithAdapter { +impl Debug for OutboundConnectionWithAdapter { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { f.debug_struct("OutboundConnection") .field("inbound_channel", &self.inbound_channel) @@ -143,12 +139,12 @@ impl Debug for OutboundConnectionWithAdapter: Send + Sync { +pub trait IsGenericConnection: Send + Sync { /// Send a message to the connected inbound channels to other actors. fn send_impl(&self, msg: T); } -impl> GenericConnection +impl> IsGenericConnection for OutboundConnection { fn send_impl(&self, msg: Out) { @@ -166,7 +162,7 @@ impl> GenericConnection } } -impl> GenericConnection +impl> IsGenericConnection for OutboundConnectionWithAdapter { fn send_impl(&self, msg: Out) { diff --git a/src/core/request.rs b/src/core/request.rs index 7ad57df..2b79487 100644 --- a/src/core/request.rs +++ b/src/core/request.rs @@ -1,18 +1,12 @@ +use crate::core::connection::request_connection::RequestConnection; +use crate::core::connection::RequestConnectionEnum; +use crate::prelude::*; use std::fmt::Debug; use std::marker::PhantomData; use std::sync::Arc; -use crate::compute::Context; - -use super::connection::request_connection::RequestConnection; -use super::connection::RequestConnectionEnum; -use super::Activate; -use super::InboundChannel; -use super::InboundMessage; -use super::InboundMessageNew; - /// A request hub is used to send requests to other actors which will reply later. -pub trait RequestHub: Send + Sync + 'static + Activate { +pub trait IsRequestHub: Send + Sync + 'static + HasActivate { /// Create a new request hub for an actor. fn from_parent_and_sender(actor_name: &str, sender: &tokio::sync::mpsc::Sender) -> Self; } @@ -94,7 +88,7 @@ pub struct ReplyMessage { } /// RequestChannel is a connections for messages which are sent to a downstream actor. -pub struct RequestChannel { +pub struct RequestChannel { /// Unique name of the request channel. pub name: String, /// Name of the actor that sends the request messages. @@ -104,7 +98,7 @@ pub struct RequestChannel { pub(crate) sender: tokio::sync::mpsc::Sender, } -impl Activate for RequestChannel { +impl HasActivate for RequestChannel { fn extract(&mut self) -> Self { Self { name: self.name.clone(), @@ -122,7 +116,7 @@ impl Activate for RequestChannel>, + M: IsInboundMessageNew>, > RequestChannel { /// Create a new request channel for actor in provided context. @@ -136,9 +130,9 @@ impl< } /// Connect the request channel from this actor to the inbound channel of another actor. - pub fn connect>>( + pub fn connect>>( &mut self, - _ctx: &mut Context, + _ctx: &mut Hollywood, inbound: &mut InboundChannel, Me>, ) { self.connection_register.push(Arc::new(RequestConnection { @@ -171,13 +165,13 @@ impl< #[derive(Debug, Clone, Default)] pub struct NullRequest {} -impl RequestHub for NullRequest { +impl IsRequestHub for NullRequest { fn from_parent_and_sender(_actor_name: &str, _sender: &tokio::sync::mpsc::Sender) -> Self { Self {} } } -impl Activate for NullRequest { +impl HasActivate for NullRequest { fn extract(&mut self) -> Self { Self {} } diff --git a/src/core/runner.rs b/src/core/runner.rs index ceb2224..c12d691 100644 --- a/src/core/runner.rs +++ b/src/core/runner.rs @@ -1,20 +1,15 @@ -use crate::core::inbound::InboundHub; -use crate::core::inbound::InboundMessage; -use crate::core::outbound::OutboundHub; - -use super::actor::ActorNodeImpl; -use super::actor::ForwardTable; -use super::request::RequestHub; -use super::ActorNode; +use crate::core::actor::IsActorNodeImpl; +use crate::prelude::*; +use crate::ForwardTable; /// Runner executes the pipeline. pub trait Runner< Prop, - Inbound: InboundHub, + Inbound: IsInboundHub, State, - Outbound: OutboundHub, - Request: RequestHub, - M: InboundMessage, + Outbound: IsOutboundHub, + Request: IsRequestHub, + M: IsInboundMessage, > { /// Create a new actor to be stored by the context. @@ -26,7 +21,7 @@ pub trait Runner< forward: ForwardTable, outbound: Outbound, request: Request, - ) -> Box; + ) -> Box; } /// The default runner. @@ -71,11 +66,11 @@ impl< impl< Prop: std::marker::Send + std::marker::Sync + 'static, - Inbound: InboundHub, + Inbound: IsInboundHub, State: std::marker::Send + std::marker::Sync + 'static, - Outbound: OutboundHub, - M: InboundMessage, - Request: RequestHub, + Outbound: IsOutboundHub, + M: IsInboundMessage, + Request: IsRequestHub, > Runner for DefaultRunner { @@ -87,8 +82,8 @@ impl< forward: ForwardTable, outbound: Outbound, request: Request, - ) -> Box { - Box::new(ActorNodeImpl:: { + ) -> Box { + Box::new(IsActorNodeImpl:: { name, prop, state: Some(init_state), diff --git a/src/example_actors/moving_average.rs b/src/example_actors/moving_average.rs index b240e2b..d3408ba 100644 --- a/src/example_actors/moving_average.rs +++ b/src/example_actors/moving_average.rs @@ -1,24 +1,4 @@ -use crate::macros::*; - -// needed for actor_outputs macro -pub use crate::compute::Context; -use crate::core::request::NullRequest; -pub use crate::core::Activate; -pub use crate::core::OutboundChannel; -pub use crate::core::OutboundHub; - -// needed for actor_inputs macro -pub use crate::core::ActorBuilder; -pub use crate::core::InboundChannel; -pub use crate::core::InboundHub; -pub use crate::core::InboundMessage; -pub use crate::core::InboundMessageNew; -pub use crate::core::OnMessage; - -// needed for actor macro -pub use crate::core::Actor; -pub use crate::core::DefaultRunner; -pub use crate::core::FromPropState; +use crate::prelude::*; /// Outbound hub for the MovingAverage. #[actor_outputs] @@ -64,7 +44,7 @@ pub enum MovingAverageMessage { Value(f64), } -impl OnMessage for MovingAverageMessage { +impl HasOnMessage for MovingAverageMessage { /// Process the inbound time_stamp message. fn on_message( self, @@ -86,7 +66,7 @@ impl OnMessage for MovingAverageMessage { } } -impl InboundMessageNew for MovingAverageMessage { +impl IsInboundMessageNew for MovingAverageMessage { fn new(_inbound_name: String, msg: f64) -> Self { MovingAverageMessage::Value(msg) } diff --git a/src/example_actors/one_dim_robot/draw.rs b/src/example_actors/one_dim_robot/draw.rs index d46ccb9..3fe0643 100644 --- a/src/example_actors/one_dim_robot/draw.rs +++ b/src/example_actors/one_dim_robot/draw.rs @@ -1,18 +1,13 @@ use crate::actors::zip::Tuple3; -use crate::core::request::NullRequest; -use crate::core::InboundMessageNew; -use crate::core::NullOutbound; -use crate::core::OnMessage; -use crate::core::*; use crate::example_actors::one_dim_robot::NamedFilterState; use crate::example_actors::one_dim_robot::Robot; use crate::example_actors::one_dim_robot::Stamped; -use crate::macros::*; +use crate::prelude::*; use drawille::Canvas; /// Inbound channels for the draw actor #[derive(Clone, Debug)] -#[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound,NullRequest})] +#[actor_inputs(DrawInbound, {NullProp, DrawState, NullOutbound, NullRequest})] pub enum DrawInboundMessage { /// Tuple of true pos, true range and filter state Zipped(Tuple3, Stamped, NamedFilterState>), @@ -22,7 +17,7 @@ pub enum DrawInboundMessage { #[actor(DrawInboundMessage)] pub type DrawActor = Actor; -impl OnMessage for DrawInboundMessage { +impl HasOnMessage for DrawInboundMessage { /// Forward the message to the correct handler method of [DrawState]. fn on_message( self, @@ -39,7 +34,7 @@ impl OnMessage for DrawInboundMessage { } } -impl InboundMessageNew, Stamped, NamedFilterState>> +impl IsInboundMessageNew, Stamped, NamedFilterState>> for DrawInboundMessage { fn new( diff --git a/src/example_actors/one_dim_robot/filter.rs b/src/example_actors/one_dim_robot/filter.rs index 7fb8865..762cbc7 100644 --- a/src/example_actors/one_dim_robot/filter.rs +++ b/src/example_actors/one_dim_robot/filter.rs @@ -1,27 +1,12 @@ -use std::fmt::Debug; -use std::fmt::Display; - -use crate::compute::Context; -use crate::core::request::NullRequest; -use crate::core::request::RequestMessage; -use crate::core::Activate; -use crate::core::Actor; -use crate::core::ActorBuilder; -use crate::core::DefaultRunner; -use crate::core::FromPropState; -use crate::core::InboundChannel; -use crate::core::InboundHub; -use crate::core::InboundMessage; -use crate::core::InboundMessageNew; -use crate::core::NullProp; -use crate::core::OnMessage; -use crate::core::OutboundChannel; -use crate::core::OutboundHub; use crate::example_actors::one_dim_robot::RangeMeasurementModel; use crate::example_actors::one_dim_robot::Stamped; +use crate::prelude::*; +use crate::RequestMessage; use hollywood_macros::actor; use hollywood_macros::actor_inputs; use hollywood_macros::actor_outputs; +use std::fmt::Debug; +use std::fmt::Display; use super::sim::PingPong; @@ -40,7 +25,7 @@ pub enum FilterInboundMessage { #[actor(FilterInboundMessage)] type Filter = Actor; -impl OnMessage for FilterInboundMessage { +impl HasOnMessage for FilterInboundMessage { /// Process the inbound message NoisyVelocity or NoisyRange. /// /// On NoisyVelocity, FilterState::prediction is called. @@ -69,7 +54,7 @@ impl OnMessage for FilterInboundMessage { } } -impl InboundMessageNew> for FilterInboundMessage { +impl IsInboundMessageNew> for FilterInboundMessage { fn new(inbound_channel: String, msg: Stamped) -> Self { if inbound_channel == "NoisyRange" { FilterInboundMessage::NoisyRange(msg) @@ -79,7 +64,7 @@ impl InboundMessageNew> for FilterInboundMessage { } } -impl InboundMessageNew> for FilterInboundMessage { +impl IsInboundMessageNew> for FilterInboundMessage { fn new(_inbound_channel: String, request: RequestMessage) -> Self { FilterInboundMessage::PingPongRequest(request) } diff --git a/src/example_actors/one_dim_robot/sim.rs b/src/example_actors/one_dim_robot/sim.rs index 0f15ab7..d928483 100644 --- a/src/example_actors/one_dim_robot/sim.rs +++ b/src/example_actors/one_dim_robot/sim.rs @@ -1,29 +1,12 @@ -use std::fmt::Debug; - -use rand_distr::Distribution; -use rand_distr::Normal; - -use crate::compute::Context; -use crate::core::request::ReplyMessage; -use crate::core::request::RequestChannel; -use crate::core::request::RequestHub; -use crate::core::Activate; -use crate::core::Actor; -use crate::core::ActorBuilder; -use crate::core::DefaultRunner; -use crate::core::FromPropState; -use crate::core::InboundChannel; -use crate::core::InboundHub; -use crate::core::InboundMessage; -use crate::core::InboundMessageNew; -use crate::core::NullProp; -use crate::core::OnMessage; -use crate::core::OutboundChannel; -use crate::core::OutboundHub; use crate::example_actors::one_dim_robot::RangeMeasurementModel; use crate::example_actors::one_dim_robot::Robot; use crate::example_actors::one_dim_robot::Stamped; -use crate::macros::*; +use crate::prelude::*; +use crate::ReplyMessage; +use crate::RequestChannel; +use rand_distr::Distribution; +use rand_distr::Normal; +use std::fmt::Debug; /// Ping-pong request message. #[derive(Clone, Debug, Default)] @@ -48,7 +31,7 @@ pub enum SimInboundMessage { #[actor(SimInboundMessage)] pub type Sim = Actor; -impl OnMessage for SimInboundMessage { +impl HasOnMessage for SimInboundMessage { /// Invokes [SimState::process_time_stamp()] on TimeStamp. fn on_message( self, @@ -71,13 +54,13 @@ impl OnMessage for SimInboundMessage { } } -impl InboundMessageNew for SimInboundMessage { +impl IsInboundMessageNew for SimInboundMessage { fn new(_inbound_name: String, msg: f64) -> Self { SimInboundMessage::TimeStamp(msg) } } -impl InboundMessageNew> for SimInboundMessage { +impl IsInboundMessageNew> for SimInboundMessage { fn new(_inbound_name: String, msg: ReplyMessage) -> Self { SimInboundMessage::PingPongReply(msg) } diff --git a/src/introspect/flow_graph.rs b/src/introspect/flow_graph.rs index 07c4de7..4dd330e 100644 --- a/src/introspect/flow_graph.rs +++ b/src/introspect/flow_graph.rs @@ -1,15 +1,13 @@ -use std::collections::HashMap; -use std::collections::VecDeque; - +use crate::compute::topology::ActorNode; +use crate::compute::topology::Topology; use drawille::Canvas; use drawille::PixelColor; use grid::Grid; use rand_distr::num_traits::FromPrimitive; use rand_distr::num_traits::ToPrimitive; use rand_distr::num_traits::{self}; - -use crate::compute::ActorNode; -use crate::compute::Topology; +use std::collections::HashMap; +use std::collections::VecDeque; /// A super node is an actor with its inbound and outbound channels. pub(crate) struct FlowSuperNode { diff --git a/src/lib.rs b/src/lib.rs index 83358ad..1b0f384 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -22,22 +22,22 @@ //! //! The library is organized in the following modules: //! -//! - The [core] module contains the core structs and traits of the library. [Actor](core::Actor) -//! is a generic struct that represents an actor. [InboundHub](core::InboundHub) is the trait +//! - The [core] module contains the core structs and traits of the library. [Actor] +//! is a generic struct that represents an actor. [IsInboundHub] is the trait //! which represents the collection of inbound channels of an actor. Similarly, -//! [OutboundHub](core::OutboundHub) is the trait which represents the collection of outbound +//! [IsOutboundHub] is the trait which represents the collection of outbound //! channels of an actor. //! -//! Most importantly, [OnMessage](core::OnMessage) is the main entry point for user code and sets -//! the behavior of a user-defines actor. [OnMessage::on_message()](core::OnMessage::on_message()) +//! Most importantly, [HasOnMessage] is the main entry point for user code and sets +//! the behavior of a user-defines actor. [HasOnMessage::on_message()] //! processes incoming messages, updates the actor's state and sends messages to downstream actors //! in the pipeline. //! //! - The [macros] module contains the three macros that are used to define an actors with //! minimal boilerplate. //! -//! - The [compute] module contains the [Context](compute::Context) and -//! [Pipeline](compute::Pipeline) which are used to configure a set of actors, connect +//! - The [compute] module contains the [Hollywood] context and +//! [Pipeline] which are used to configure a set of actors, connect //! them into a graph and to execute flow. //! //! - The [actors] module contains a set of predefined actors that can be used as part of a compute @@ -121,7 +121,7 @@ //! Value(f64), //! } //! -//! impl OnMessage for MovingAverageMessage { +//! impl HasOnMessage for MovingAverageMessage { //! /// Process the inbound time-stamp message. //! fn on_message( //! &self, @@ -143,7 +143,7 @@ //! } //! } //! -//! impl InboundMessageNew for MovingAverageMessage { +//! impl IsInboundMessageNew for MovingAverageMessage { //! fn new(_inbound_name: String, msg: f64) -> Self { //! MovingAverageMessage::Value(msg) //! } @@ -151,7 +151,7 @@ //! ``` //! //! The moving average is calculated from the stream of values received on this channel. -//! OnMessage trait implementation the actual business logic of the actor is implemented. +//! HasOnMessage trait implementation the actual business logic of the actor is implemented. //! //! ### The actor //! @@ -185,11 +185,9 @@ //! //! ```rust //! # use hollywood::actors::{Periodic, Printer, PrinterProp}; -//! # use hollywood::core::FromPropState; -//! # use hollywood::core::value::NullState; -//! # use hollywood::compute::Context; +//! # use hollywood::prelude::*; //! # use hollywood::example_actors::moving_average::{MovingAverage, MovingAverageProp, MovingAverageState}; -//! let pipeline = Context::configure(&mut |context| { +//! let pipeline = Hollywood::configure(&mut |context| { //! let mut timer = Periodic::new_with_period(context, 1.0); //! let mut moving_average = MovingAverage::from_prop_and_state( //! context, @@ -230,8 +228,8 @@ //! pipeline.run(); //! ``` //! -//! The [Pipeline::print_flow_graph()](compute::Pipeline::print_flow_graph()) method prints the -//! topology of the compute pipeline to the console. +//! The [Pipeline::print_flow_graph()]method prints the topology of the compute pipeline to the +//! console. //! //! ``` text //! * Periodic_0 * @@ -267,7 +265,7 @@ //! of one actor to a compatible inbound channel of another actor downstream. //! //! These channel connections are configured at runtime using -//! [OutboundChannel::connect()](core::OutboundChannel::connect) during the pipeline configuration +//! [OutboundChannel::connect()] during the pipeline configuration //! step. In the simple example above, we had a 1:2 and a 1:1 connection: The process_time_stamp //! outbound channel of the Periodic actor was connected to two inbound channels. The average //! outbound channel of the MovingAverage actor was connected to one inbound channel. In general, @@ -275,21 +273,53 @@ //! channels. Similarly, each inbound channel can be connected to zero, one or more outbound //! channels. If an outbound channel is connected to multiple inbound channels, the messages are //! broadcasted to all connected inbound channels. This is the main reason why -//! [InboundMessage](core::InboundMessage) must be [Clone]. +//! [IsInboundMessage] must be [Clone]. //! //! The types of connected outbound channels must match the type of the connected inbound channel. //! An inbound channel is uniquely identified by a **variant** of the -//! [InboundMessage](core::InboundMessage) enum. Messages received from connected outbound channels +//! [IsInboundMessage] enum. Messages received from connected outbound channels //! are merged into a single stream and processed in the corresponding match arm (for that -//! **variant**) within the [OnMessage::on_message()](core::OnMessage::on_message()) method in a +//! **variant**) within the [HasOnMessage::on_message()] method in a //! uniform manner, regardless of the outbound channel (and actor) the message originated //! from. //! /// The core framework concepts such as actors, state, inbound, outbound and runners. pub mod core; +pub use crate::core::actor::Actor; +pub use crate::core::actor::ForwardTable; +pub use crate::core::actor::GenericActor; +pub use crate::core::actor::HasFromPropState; +pub use crate::core::actor::IsActorNode; +pub use crate::core::connection::ConnectionEnum; +pub use crate::core::inbound::HasForwardMessage; +pub use crate::core::outbound::IsGenericConnection; +pub use crate::core::outbound::IsOutboundHub; +pub use crate::core::request::IsRequestHub; +pub use crate::core::request::ReplyMessage; +pub use crate::core::request::RequestChannel; +pub use crate::core::request::RequestMessage; +pub use crate::core::runner::Runner; +pub use core::actor_builder::ActorBuilder; +pub use core::inbound::HasOnMessage; +pub use core::inbound::InboundChannel; +pub use core::inbound::IsInboundHub; +pub use core::inbound::IsInboundMessage; +pub use core::inbound::IsInboundMessageNew; +pub use core::inbound::NullInbound; +pub use core::inbound::NullMessage; +pub use core::outbound::HasActivate; +pub use core::outbound::NullOutbound; +pub use core::outbound::OutboundChannel; +pub use core::request::NullRequest; +pub use core::runner::DefaultRunner; +pub use core::value::NullProp; +pub use core::value::NullState; /// The compute context and compute graph. pub mod compute; +pub use crate::compute::context::Hollywood; +pub use crate::compute::pipeline::CancelRequest; +pub use compute::pipeline::Pipeline; /// Introspection pub mod introspect; @@ -333,8 +363,8 @@ pub mod macros { /// struct consists of a zero, one or more outbound channels. Each outbound channel has a /// user-specified name CHANNEL* and a user specified type TYPE*. /// - /// Effect: The macro generates the [OutboundHub](crate::core::OutboundHub) and - /// [Activate](crate::core::Activate) implementations for the provided struct OUTBOUND. + /// Effect: The macro generates the [IsOutboundHub](crate::IsOutboundHub) and + /// [HasActivate](crate::HasActivate) implementations for the provided struct OUTBOUND. /// /// This is the first of three macros to define an actor. The other two are [macro@actor_inputs] /// and [macro@actor]. @@ -359,8 +389,8 @@ pub mod macros { /// Each request channel has name CHANNEL*, a request type REQ_TYPE*, a reply type REPL_TYPE*, /// and a message type M*. /// - /// Effect: The macro generates the [RequestHub](crate::core::RequestHub) and - /// [Activate](crate::core::Activate) implementations for the provided struct REQUEST. + /// Effect: The macro generates the [IsRequestHub](crate::IsRequestHub) and + /// [HasActivate](crate::HasActivate) implementations for the provided struct REQUEST. /// pub use hollywood_macros::actor_requests; @@ -383,17 +413,17 @@ pub mod macros { /// variant has a user-specified name VARIENT* and type TYPE*. /// /// Prerequisite: - /// - The OUTBOUND struct is defined and implements [OutboundHub](crate::core::OutboundHub) - /// and [Activate](crate::core::Activate), typically using the [macro@actor_outputs] macro. - /// - The REQUEST struct is defined and implements [RequestHub](crate::core::RequestHub) and - /// [Activate](crate::core::Activate), e.g. using the [actor_requests] macro. + /// - The OUTBOUND struct is defined and implements [IsOutboundHub](crate::IsOutboundHub) + /// and [HasActivate](crate::HasActivate), typically using the [macro@actor_outputs] macro. + /// - The REQUEST struct is defined and implements [IsRequestHub](crate::IsRequestHub) and + /// [HasActivate](crate::HasActivate), e.g. using the [actor_requests] macro. /// - The PROP and STATE structs are defined. /// /// Effects: /// - The macro defines the struct INBOUND that contains an inbound channel field for each /// variant of the INBOUND_MESSAGE enum, and implements the - /// [InboundHub](crate::core::InboundHub) trait for it. - /// - Implements the [InboundMessage](crate::core::InboundMessage) trait for INBOUND_MESSAGE. + /// [IsInboundHub](crate::IsInboundHub) trait for it. + /// - Implements the [IsInboundMessage](crate::IsInboundMessage) trait for INBOUND_MESSAGE. /// pub use hollywood_macros::actor_inputs; @@ -407,22 +437,52 @@ pub mod macros { /// ``` /// /// Here, ACTOR is the user-specified name of the actor type. The actor type shall be defined - /// right after the macro invocation as an alias of [Actor](crate::core::Actor). + /// right after the macro invocation as an alias of [Actor](crate::Actor). /// /// Prerequisites: - /// - The OUTBOUND struct is defined and implements (OutboundHub)[crate::core::OutboundHub] - /// and [Activate](crate::core::Activate), e.g. using the [actor_outputs] macro. - /// - The REQUEST struct is defined and implements [RequestHub](crate::core::RequestHub) and - /// [Activate](crate::core::Activate), e.g. using the [actor_requests] macro. + /// - The OUTBOUND struct is defined and implements [IsOutboundHub](crate::IsOutboundHub) and + /// [HasActivate](crate::HasActivate), e.g. using the [actor_outputs] macro. + /// - The REQUEST struct is defined and implements [IsRequestHub](crate::IsRequestHub) and + /// [HasActivate](crate::HasActivate), e.g. using the [actor_requests] macro. /// - The INBOUND_MESSAGE enum is defined and implements - /// [InboundMessage](crate::core::InboundMessage), as well as the INBOUND - /// struct is defined and implements the [InboundHub](crate::core::InboundHub) trait, e.g. - /// through the [macro@actor_inputs] macro. + /// [IsInboundMessage](crate::IsInboundMessage), as well as the INBOUND struct is defined + /// and implements the [IsInboundHub](crate::IsInboundHub) trait, e.g through the + /// [actor_inputs] macro. /// - The PROP and STATE structs are defined. /// /// Effect: - /// - This macro implements the [FromPropState](crate::core::FromPropState) trait for the ACTOR + /// - This macro implements the [HasFromPropState](crate::HasFromPropState) trait for the ACTOR /// type. /// pub use hollywood_macros::actor; + + pub use hollywood_macros::zip_n; +} + +/// The prelude module contains the most important traits and structs of the library. +pub mod prelude { + pub use crate::macros::*; + pub use crate::Actor; + pub use crate::ActorBuilder; + pub use crate::DefaultRunner; + pub use crate::HasActivate; + pub use crate::HasForwardMessage; + pub use crate::HasFromPropState; + pub use crate::HasOnMessage; + pub use crate::Hollywood; + pub use crate::InboundChannel; + pub use crate::IsActorNode; + pub use crate::IsGenericConnection; + pub use crate::IsInboundHub; + pub use crate::IsInboundMessage; + pub use crate::IsInboundMessageNew; + pub use crate::IsOutboundHub; + pub use crate::IsRequestHub; + pub use crate::NullInbound; + pub use crate::NullMessage; + pub use crate::NullOutbound; + pub use crate::NullProp; + pub use crate::NullRequest; + pub use crate::NullState; + pub use crate::OutboundChannel; }