From d36b049648332506b5a757a1931dbb65b8af1e51 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 6 Nov 2024 15:38:45 +0100 Subject: [PATCH] Add ctrl-c handler and kill child processes on second ctrl-c On first ctrl-c, send a stop command to all nodes. On second ctrl-c, exit immediately and kill all spawned nodes. On third ctrl-c, abort the process directly without waiting (child processes keep running). This change affects both `dora run` and `dora daemon` commands. --- binaries/daemon/src/lib.rs | 107 ++++++++++++++++++++++++++--------- binaries/daemon/src/spawn.rs | 8 ++- 2 files changed, 85 insertions(+), 30 deletions(-) diff --git a/binaries/daemon/src/lib.rs b/binaries/daemon/src/lib.rs index 166d31642..75d358a4e 100644 --- a/binaries/daemon/src/lib.rs +++ b/binaries/daemon/src/lib.rs @@ -177,6 +177,8 @@ impl Daemon { let clock = Arc::new(HLC::default()); + let ctrlc_events = set_up_ctrlc_handler(clock.clone())?; + let exit_when_done = spawn_command .nodes .iter() @@ -193,8 +195,9 @@ impl Daemon { timestamp, } }); + let events = (coordinator_events, ctrlc_events).merge(); let run_result = Self::run_general( - Box::pin(coordinator_events), + Box::pin(events), None, "".to_string(), Some(exit_when_done), @@ -324,12 +327,17 @@ impl Daemon { } } Event::CtrlC => { + tracing::info!("received ctrlc signal -> stopping all dataflows"); for dataflow in self.running.values_mut() { dataflow .stop_all(&mut self.coordinator_connection, &self.clock, None) .await?; } } + Event::SecondCtrlC => { + tracing::warn!("received second ctrlc signal -> exit immediately"); + bail!("received second ctrl-c signal"); + } } } @@ -1085,7 +1093,9 @@ impl Daemon { ) .await?; - dataflow.running_nodes.remove(node_id); + if let Some(mut pid) = dataflow.running_nodes.remove(node_id).and_then(|n| n.pid) { + pid.mark_as_stopped() + } if dataflow .running_nodes .iter() @@ -1469,12 +1479,51 @@ fn close_input( } } -#[derive(Debug, Clone)] +#[derive(Debug)] struct RunningNode { - pid: Option, + pid: Option, node_config: NodeConfig, } +#[derive(Debug)] +struct ProcessId(Option); + +impl ProcessId { + pub fn new(process_id: u32) -> Self { + Self(Some(process_id)) + } + + pub fn mark_as_stopped(&mut self) { + self.0 = None; + } + + pub fn kill(&mut self) -> bool { + if let Some(pid) = self.0 { + let mut system = sysinfo::System::new(); + system.refresh_processes(); + + if let Some(process) = system.process(Pid::from(pid as usize)) { + process.kill(); + self.mark_as_stopped(); + return true; + } + } + + false + } +} + +impl Drop for ProcessId { + fn drop(&mut self) { + // kill the process if it's still running + if let Some(pid) = self.0 { + if self.kill() { + warn!("process {pid} was killed on drop because it was still running") + } + } + } +} + pub struct RunningDataflow { id: Uuid, /// Local nodes that are not started yet @@ -1610,19 +1659,20 @@ impl RunningDataflow { let _ = send_with_timestamp(&channel, NodeEvent::Stop, clock); } - let running_nodes = self.running_nodes.clone(); + let running_processes: Vec<_> = self + .running_nodes + .iter_mut() + .map(|(id, n)| (id.clone(), n.pid.take())) + .collect(); let grace_duration_kills = self.grace_duration_kills.clone(); tokio::spawn(async move { let duration = grace_duration.unwrap_or(Duration::from_millis(15000)); tokio::time::sleep(duration).await; - let mut system = sysinfo::System::new(); - system.refresh_processes(); - for (node, node_details) in running_nodes.iter() { - if let Some(pid) = node_details.pid { - if let Some(process) = system.process(Pid::from(pid as usize)) { + for (node, pid) in running_processes { + if let Some(mut pid) = pid { + if pid.kill() { grace_duration_kills.insert(node.clone()); - process.kill(); warn!( "{node} was killed due to not stopping within the {:#?} grace period", duration @@ -1697,6 +1747,7 @@ pub enum Event { DynamicNode(DynamicNodeEventWrapper), HeartbeatInterval, CtrlC, + SecondCtrlC, } impl From for Event { @@ -1777,25 +1828,27 @@ fn set_up_ctrlc_handler( ) -> Result>, eyre::ErrReport> { let (ctrlc_tx, ctrlc_rx) = mpsc::channel(1); - let mut ctrlc_sent = false; + let mut ctrlc_sent = 0; ctrlc::set_handler(move || { - if ctrlc_sent { - tracing::warn!("received second ctrlc signal -> aborting immediately"); - std::process::abort(); - } else { - tracing::info!("received ctrlc signal"); - if ctrlc_tx - .blocking_send(Timestamped { - inner: Event::CtrlC, - timestamp: clock.new_timestamp(), - }) - .is_err() - { - tracing::error!("failed to report ctrl-c event to dora-coordinator"); + let event = match ctrlc_sent { + 0 => Event::CtrlC, + 1 => Event::SecondCtrlC, + _ => { + tracing::warn!("received 3rd ctrlc signal -> aborting immediately"); + std::process::abort(); } - - ctrlc_sent = true; + }; + if ctrlc_tx + .blocking_send(Timestamped { + inner: event, + timestamp: clock.new_timestamp(), + }) + .is_err() + { + tracing::error!("failed to report ctrl-c event to dora-coordinator"); } + + ctrlc_sent += 1; }) .wrap_err("failed to set ctrl-c handler")?; diff --git a/binaries/daemon/src/spawn.rs b/binaries/daemon/src/spawn.rs index 87eca5a3b..ce5c8bec5 100644 --- a/binaries/daemon/src/spawn.rs +++ b/binaries/daemon/src/spawn.rs @@ -262,6 +262,11 @@ pub async fn spawn_node( } }; + let pid = crate::ProcessId::new(child.id().context( + "Could not get the pid for the just spawned node and indicate that there is an error", + )?); + tracing::debug!("Spawned node `{dataflow_id}/{node_id}` with pid {pid:?}"); + let dataflow_dir: PathBuf = working_dir.join("out").join(dataflow_id.to_string()); if !dataflow_dir.exists() { std::fs::create_dir_all(&dataflow_dir).context("could not create dataflow_dir")?; @@ -272,9 +277,6 @@ pub async fn spawn_node( .expect("Failed to create log file"); let mut child_stdout = tokio::io::BufReader::new(child.stdout.take().expect("failed to take stdout")); - let pid = child.id().context( - "Could not get the pid for the just spawned node and indicate that there is an error", - )?; let running_node = RunningNode { pid: Some(pid), node_config,