Skip to content

Commit

Permalink
cancel request fixes/tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
strasdat committed Oct 2, 2023
1 parent a432df3 commit 9ba3868
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 18 deletions.
35 changes: 35 additions & 0 deletions examples/print_ticks.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
use hollywood::actors::printer::PrinterProp;
use hollywood::actors::{Periodic, Printer};
use hollywood::compute::Context;
use hollywood::core::ActorFacade;

///
pub async fn run_tick_print_example() {
let pipeline = Context::configure(&mut |context| {
let mut timer = Periodic::new_with_period(context, 1.0);
let mut time_printer = Printer::<f64>::new_default_init_state(
context,
PrinterProp {
topic: "time".to_string(),
},
);
timer
.outbound
.time_stamp
.connect(context, &mut time_printer.inbound.printable);

});

pipeline.print_flow_graph();
pipeline.run().await;
}

fn main() {
tokio::runtime::Builder::new_multi_thread()
.enable_all()
.build()
.unwrap()
.block_on(async {
run_tick_print_example().await;
})
}
27 changes: 15 additions & 12 deletions src/compute/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,11 @@ use crate::core::{
pub struct Context {
pub(crate) actors: Vec<Box<dyn DormantActorNode + Send>>,
pub(crate) topology: Topology,
pub(crate) cancel_request_request_inbound: InboundChannel<bool, CancelRequest>,
pub(crate) cancel_request_sender_template: tokio::sync::mpsc::Sender<CancelRequest>,
pub(crate) cancel_request_receiver: tokio::sync::mpsc::Receiver<CancelRequest>,
}

impl Context {
const CONTEXT_NAME: &str = "CONTEXT";

/// Create a new context.
///
/// This is the main entry point to configure the compute graph. The network topology is defined
Expand All @@ -33,27 +31,32 @@ impl Context {
///
/// Upon receiving a cancel request the registered outbound channel, the execution of the
/// pipeline will be stopped.
pub fn get_cancel_request_sender(&mut self) -> tokio::sync::mpsc::Sender<CancelRequest> {
self.cancel_request_sender_template.clone()
}

/// Registers an outbound channel for cancel request.
///
/// Upon receiving a cancel request the registered outbound channel, the execution of the
/// pipeline will be stopped.
pub fn register_cancel_requester(&mut self, outbound: &mut OutboundChannel<()>) {
outbound
.connection_register
.push(Arc::new(OutboundConnection {
sender: self.cancel_request_request_inbound.sender.clone(),
inbound_channel: self.cancel_request_request_inbound.name.clone(),
sender: self.cancel_request_sender_template.clone(),
inbound_channel: "CANCEL".to_string(),
phantom: PhantomData {},
}));
}


fn new() -> Self {
let (exit_request_sender, cancel_request_receiver) = tokio::sync::mpsc::channel(1);
let (cancel_request_sender_template, cancel_request_receiver) =
tokio::sync::mpsc::channel(1);
Self {
actors: vec![],
topology: Topology::new(),
cancel_request_request_inbound: InboundChannel::<bool, CancelRequest> {
name: CancelRequest::CANCEL_REQUEST_INBOUND_CHANNEL .to_owned(),
actor_name: Self::CONTEXT_NAME.to_owned(),
sender: exit_request_sender,
phantom: std::marker::PhantomData {},
},
cancel_request_sender_template,
cancel_request_receiver,
}
}
Expand Down
25 changes: 19 additions & 6 deletions src/compute/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ impl InboundMessageNew<()> for CancelRequest {
pub struct Pipeline {
actors: Vec<Box<dyn ActorNode + Send>>,
topology: Topology,
/// We have this here to keep receiver alive
pub cancel_request_sender_template: Option<tokio::sync::mpsc::Sender<CancelRequest>>,
cancel_request_receiver: Option<tokio::sync::mpsc::Receiver<CancelRequest>>,
}

Expand All @@ -53,6 +55,7 @@ impl Pipeline {
let compute_graph = Pipeline {
actors: active,
topology: context.topology,
cancel_request_sender_template: Some(context.cancel_request_sender_template),
cancel_request_receiver: Some(context.cancel_request_receiver),
};
compute_graph.topology.analyze_graph_topology();
Expand Down Expand Up @@ -88,12 +91,17 @@ impl Pipeline {
let (exit_tx, exit_rx) = tokio::sync::oneshot::channel();

let h_exit = tokio::spawn(async move {
let msg = cancel_request_receiver.recv().await.unwrap();
match msg {
CancelRequest::Cancel(_) => {
match cancel_request_receiver.recv().await {
Some(msg) => {
println!("Cancel requested");

let _ = exit_tx.send(cancel_request_receiver);
match msg {
CancelRequest::Cancel(_) => {
let _ = exit_tx.send(cancel_request_receiver);
}
}
}
None => {
println!("Cancel request channel closed");
}
}
});
Expand All @@ -110,7 +118,12 @@ impl Pipeline {

handles.push(h);
}
h_exit.await.unwrap();
match h_exit.await {
Ok(_) => {}
Err(err) => {
println!("Error in cancel request handler: {}", err);
}
}
kill_sender.send(()).unwrap();
for h in handles {
h.await.unwrap();
Expand Down

0 comments on commit 9ba3868

Please sign in to comment.