From 9ba386852e520261b4b6e3f4daa4b0c5ba380886 Mon Sep 17 00:00:00 2001 From: Hauke Strasdat Date: Sun, 1 Oct 2023 18:22:32 -0700 Subject: [PATCH] cancel request fixes/tweaks --- examples/print_ticks.rs | 35 +++++++++++++++++++++++++++++++++++ src/compute/context.rs | 27 +++++++++++++++------------ src/compute/pipeline.rs | 25 +++++++++++++++++++------ 3 files changed, 69 insertions(+), 18 deletions(-) create mode 100644 examples/print_ticks.rs diff --git a/examples/print_ticks.rs b/examples/print_ticks.rs new file mode 100644 index 0000000..b4d39ce --- /dev/null +++ b/examples/print_ticks.rs @@ -0,0 +1,35 @@ +use hollywood::actors::printer::PrinterProp; +use hollywood::actors::{Periodic, Printer}; +use hollywood::compute::Context; +use hollywood::core::ActorFacade; + +/// +pub async fn run_tick_print_example() { + let pipeline = Context::configure(&mut |context| { + let mut timer = Periodic::new_with_period(context, 1.0); + let mut time_printer = Printer::::new_default_init_state( + context, + PrinterProp { + topic: "time".to_string(), + }, + ); + timer + .outbound + .time_stamp + .connect(context, &mut time_printer.inbound.printable); + + }); + + pipeline.print_flow_graph(); + pipeline.run().await; +} + +fn main() { + tokio::runtime::Builder::new_multi_thread() + .enable_all() + .build() + .unwrap() + .block_on(async { + run_tick_print_example().await; + }) +} diff --git a/src/compute/context.rs b/src/compute/context.rs index ef9d5d5..7b28acf 100644 --- a/src/compute/context.rs +++ b/src/compute/context.rs @@ -12,13 +12,11 @@ use crate::core::{ pub struct Context { pub(crate) actors: Vec>, pub(crate) topology: Topology, - pub(crate) cancel_request_request_inbound: InboundChannel, + pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender, pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver, } impl Context { - const CONTEXT_NAME: &str = "CONTEXT"; - /// Create a new context. /// /// This is the main entry point to configure the compute graph. The network topology is defined @@ -33,27 +31,32 @@ impl Context { /// /// Upon receiving a cancel request the registered outbound channel, the execution of the /// pipeline will be stopped. + pub fn get_cancel_request_sender(&mut self) -> tokio::sync::mpsc::Sender { + self.cancel_request_sender_template.clone() + } + + /// Registers an outbound channel for cancel request. + /// + /// Upon receiving a cancel request the registered outbound channel, the execution of the + /// pipeline will be stopped. pub fn register_cancel_requester(&mut self, outbound: &mut OutboundChannel<()>) { outbound .connection_register .push(Arc::new(OutboundConnection { - sender: self.cancel_request_request_inbound.sender.clone(), - inbound_channel: self.cancel_request_request_inbound.name.clone(), + sender: self.cancel_request_sender_template.clone(), + inbound_channel: "CANCEL".to_string(), phantom: PhantomData {}, })); } + fn new() -> Self { - let (exit_request_sender, cancel_request_receiver) = tokio::sync::mpsc::channel(1); + let (cancel_request_sender_template, cancel_request_receiver) = + tokio::sync::mpsc::channel(1); Self { actors: vec![], topology: Topology::new(), - cancel_request_request_inbound: InboundChannel:: { - name: CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL .to_owned(), - actor_name: Self::CONTEXT_NAME.to_owned(), - sender: exit_request_sender, - phantom: std::marker::PhantomData {}, - }, + cancel_request_sender_template, cancel_request_receiver, } } diff --git a/src/compute/pipeline.rs b/src/compute/pipeline.rs index 084c8f7..4a7c321 100644 --- a/src/compute/pipeline.rs +++ b/src/compute/pipeline.rs @@ -41,6 +41,8 @@ impl InboundMessageNew<()> for CancelRequest { pub struct Pipeline { actors: Vec>, topology: Topology, + /// We have this here to keep receiver alive + pub cancel_request_sender_template: Option>, cancel_request_receiver: Option>, } @@ -53,6 +55,7 @@ impl Pipeline { let compute_graph = Pipeline { actors: active, topology: context.topology, + cancel_request_sender_template: Some(context.cancel_request_sender_template), cancel_request_receiver: Some(context.cancel_request_receiver), }; compute_graph.topology.analyze_graph_topology(); @@ -88,12 +91,17 @@ impl Pipeline { let (exit_tx, exit_rx) = tokio::sync::oneshot::channel(); let h_exit = tokio::spawn(async move { - let msg = cancel_request_receiver.recv().await.unwrap(); - match msg { - CancelRequest::Cancel(_) => { + match cancel_request_receiver.recv().await { + Some(msg) => { println!("Cancel requested"); - - let _ = exit_tx.send(cancel_request_receiver); + match msg { + CancelRequest::Cancel(_) => { + let _ = exit_tx.send(cancel_request_receiver); + } + } + } + None => { + println!("Cancel request channel closed"); } } }); @@ -110,7 +118,12 @@ impl Pipeline { handles.push(h); } - h_exit.await.unwrap(); + match h_exit.await { + Ok(_) => {} + Err(err) => { + println!("Error in cancel request handler: {}", err); + } + } kill_sender.send(()).unwrap(); for h in handles { h.await.unwrap();