From 2ccba2b953de977419d91aa9c056fc14b64f046c Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 30 May 2024 13:27:17 +0200 Subject: [PATCH 1/5] Allow setting custom control port in coordinator --- binaries/cli/src/main.rs | 41 +++++++++++++++++++------- binaries/coordinator/src/lib.rs | 52 +++++++++++++++------------------ libraries/core/src/topics.rs | 17 ++--------- 3 files changed, 56 insertions(+), 54 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 2f36fc34d..c1393cc9f 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,8 +5,8 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -140,10 +140,18 @@ enum Command { Runtime, /// Run coordinator Coordinator { - #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) - )] - addr: SocketAddr, + /// Network interface to bind to for daemon communication + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + interface: IpAddr, + /// Port number to bind to for daemon communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + port: u16, + /// Network interface to bind to for control communication + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + control_interface: IpAddr, + /// Port number to bind to for control communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + control_port: u16, }, } @@ -319,14 +327,22 @@ fn run() -> eyre::Result<()> { config, coordinator_addr, } => up::destroy(config.as_deref(), coordinator_addr)?, - Command::Coordinator { addr } => { + Command::Coordinator { + interface, + port, + control_interface, + control_port, + } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { - let (_port, task) = - dora_coordinator::start(addr, futures::stream::empty::()).await?; + let bind = SocketAddr::new(interface, port); + let bind_control = SocketAddr::new(control_interface, control_port); + let (port, task) = + dora_coordinator::start(bind, bind_control, futures::stream::empty::()) + .await?; task.await }) .context("failed to run dora-coordinator")? @@ -504,9 +520,12 @@ fn connect_to_coordinator( if let Some(coordinator_addr) = coordinator_addr { TcpLayer::new().connect(SocketAddr::new( coordinator_addr, - DORA_COORDINATOR_PORT_CONTROL, + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, )) } else { - TcpLayer::new().connect(control_socket_addr()) + TcpLayer::new().connect(SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + )) } } diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0a30ac103..5032d6a5f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -22,11 +22,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::{ - net::{TcpListener, TcpStream}, - sync::mpsc, - task::JoinHandle, -}; +use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use uuid::Uuid; @@ -37,6 +33,7 @@ mod tcp_utils; pub async fn start( bind: SocketAddr, + bind_control: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { let listener = listener::create_listener(bind).await?; @@ -44,13 +41,30 @@ pub async fn start( .local_addr() .wrap_err("failed to get local addr of listener")? .port(); + let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { + c.map(Event::NewDaemonConnection) + .wrap_err("failed to open connection") + .unwrap_or_else(Event::DaemonConnectError) + }); + let mut tasks = FuturesUnordered::new(); + let control_events = control::control_events(bind_control, &tasks) + .await + .wrap_err("failed to create control events")?; // Setup ctrl-c handler let ctrlc_events = set_up_ctrlc_handler()?; + let events = ( + external_events, + new_daemon_connections, + control_events, + ctrlc_events, + ) + .merge(); + let future = async move { - start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?; + start_inner(events, &tasks).await?; tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); while let Some(join_result) = tasks.next().await { @@ -100,40 +114,22 @@ fn resolve_name( } async fn start_inner( - listener: TcpListener, + events: impl Stream + Unpin, tasks: &FuturesUnordered>, - external_events: impl Stream + Unpin, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); - let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { - c.map(Event::NewDaemonConnection) - .wrap_err("failed to open connection") - .unwrap_or_else(Event::DaemonConnectError) - }); - let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2); let mut daemon_events_tx = Some(daemon_events_tx); let daemon_events = ReceiverStream::new(daemon_events); - let control_events = control::control_events(control_socket_addr(), tasks) - .await - .wrap_err("failed to create control events")?; - let daemon_heartbeat_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` - let (abortable_events, abort_handle) = futures::stream::abortable( - ( - control_events, - new_daemon_connections, - external_events, - daemon_heartbeat_interval, - ) - .merge(), - ); + let (abortable_events, abort_handle) = + futures::stream::abortable((events, daemon_heartbeat_interval).merge()); let mut events = (abortable_events, daemon_events).merge(); diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 69ad75a5c..506c1b420 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,10 +1,4 @@ -use std::{ - collections::BTreeSet, - fmt::Display, - net::{Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, -}; +use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration}; use uuid::Uuid; use crate::{ @@ -13,17 +7,10 @@ use crate::{ }; pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; -pub const DORA_COORDINATOR_PORT_CONTROL: u16 = 0x177C; +pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; -pub fn control_socket_addr() -> SocketAddr { - SocketAddr::new( - Ipv4Addr::new(127, 0, 0, 1).into(), - DORA_COORDINATOR_PORT_CONTROL, - ) -} - #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { Start { From 88d43d8b0cb9ee21a11f1a918303943648ebde98 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 30 May 2024 13:31:26 +0200 Subject: [PATCH 2/5] Set default in CLI parsing This way, the default address is shown as part of the `--help` output. --- binaries/cli/src/check.rs | 2 +- binaries/cli/src/main.rs | 45 +++++++++++++++++---------------------- binaries/cli/src/up.rs | 4 ++-- 3 files changed, 22 insertions(+), 29 deletions(-) diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 277dd002c..fea9dfc46 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -8,7 +8,7 @@ use std::{ }; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment(coordinator_addr: Option) -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: IpAddr) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index c1393cc9f..f4b2c4bb4 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -45,8 +45,8 @@ enum Command { Check { #[clap(long)] dataflow: Option, - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -69,23 +69,23 @@ enum Command { Up { #[clap(long)] config: Option, - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { dataflow: PathBuf, #[clap(long)] name: Option, - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, #[clap(long, action)] attach: bool, #[clap(long, action)] @@ -99,13 +99,13 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, }, /// List running dataflows. List { - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, }, // Planned for future releases: // Dashboard, @@ -114,8 +114,8 @@ enum Command { Logs { dataflow: Option, node: String, - #[clap(long)] - coordinator_addr: Option, + #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + coordinator_addr: IpAddr, }, // Metrics, // Stats, @@ -515,17 +515,10 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: Option, + coordinator_addr: IpAddr, ) -> std::io::Result> { - if let Some(coordinator_addr) = coordinator_addr { - TcpLayer::new().connect(SocketAddr::new( - coordinator_addr, - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - )) - } else { - TcpLayer::new().connect(SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - )) - } + TcpLayer::new().connect(SocketAddr::new( + coordinator_addr, + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + )) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index b404eaa73..4816a01ca 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -5,7 +5,7 @@ use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, @@ -47,7 +47,7 @@ pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option) - pub(crate) fn destroy( config_path: Option<&Path>, - coordinator_addr: Option, + coordinator_addr: IpAddr, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; match connect_to_coordinator(coordinator_addr) { From 142d2186109c42915de58e4fef2e868f45461edc Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Thu, 30 May 2024 13:49:15 +0200 Subject: [PATCH 3/5] Add support for custom coordinator control port numbers in CLI --- binaries/cli/src/check.rs | 4 +- binaries/cli/src/main.rs | 98 ++++++++++++++++++++++---------- binaries/cli/src/up.rs | 6 +- examples/multiple-daemons/run.rs | 18 ++++-- 4 files changed, 88 insertions(+), 38 deletions(-) diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index fea9dfc46..14a3cd359 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -4,11 +4,11 @@ use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::{ io::{IsTerminal, Write}, - net::IpAddr, + net::SocketAddr, }; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment(coordinator_addr: IpAddr) -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f4b2c4bb4..92d08b874 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -45,8 +45,12 @@ enum Command { Check { #[clap(long)] dataflow: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -69,23 +73,35 @@ enum Command { Up { #[clap(long)] config: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { #[clap(long)] config: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { dataflow: PathBuf, #[clap(long)] name: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, #[clap(long, action)] attach: bool, #[clap(long, action)] @@ -99,13 +115,21 @@ enum Command { #[clap(long)] #[arg(value_parser = parse)] grace_duration: Option, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// List running dataflows. List { - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, // Planned for future releases: // Dashboard, @@ -114,8 +138,12 @@ enum Command { Logs { dataflow: Option, node: String, - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, // Metrics, // Stats, @@ -130,9 +158,9 @@ enum Command { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) )] addr: SocketAddr, - #[clap(long)] - coordinator_addr: Option, - + /// Address and port number of the dora coordinator + #[clap(long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DORA_COORDINATOR_PORT_DEFAULT))] + coordinator_addr: SocketAddr, #[clap(long)] run_dataflow: Option, }, @@ -210,6 +238,7 @@ fn run() -> eyre::Result<()> { Command::Check { dataflow, coordinator_addr, + coordinator_port, } => match dataflow { Some(dataflow) => { let working_dir = dataflow @@ -219,9 +248,9 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment(coordinator_addr)? + check::check_environment((coordinator_addr, coordinator_port).into())? } - None => check::check_environment(coordinator_addr)?, + None => check::check_environment((coordinator_addr, coordinator_port).into())?, }, Command::Graph { dataflow, @@ -240,15 +269,20 @@ fn run() -> eyre::Result<()> { Command::Up { config, coordinator_addr, + coordinator_port, } => { - up::up(config.as_deref(), coordinator_addr)?; + up::up( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?; } Command::Logs { dataflow, node, coordinator_addr, + coordinator_port, } => { - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; @@ -269,6 +303,7 @@ fn run() -> eyre::Result<()> { dataflow, name, coordinator_addr, + coordinator_port, attach, hot_reload, } => { @@ -284,7 +319,7 @@ fn run() -> eyre::Result<()> { .check(&working_dir) .wrap_err("Could not validate yaml")?; - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), @@ -303,7 +338,10 @@ fn run() -> eyre::Result<()> { )? } } - Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) { + Command::List { + coordinator_addr, + coordinator_port, + } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { Ok(mut session) => list(&mut *session)?, Err(_) => { bail!("No dora coordinator seems to be running."); @@ -314,8 +352,9 @@ fn run() -> eyre::Result<()> { name, grace_duration, coordinator_addr, + coordinator_port, } => { - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, @@ -326,7 +365,11 @@ fn run() -> eyre::Result<()> { Command::Destroy { config, coordinator_addr, - } => up::destroy(config.as_deref(), coordinator_addr)?, + coordinator_port, + } => up::destroy( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?, Command::Coordinator { interface, port, @@ -343,6 +386,7 @@ fn run() -> eyre::Result<()> { let (port, task) = dora_coordinator::start(bind, bind_control, futures::stream::empty::()) .await?; + println!("Listening for incoming daemon connection on {port}"); task.await }) .context("failed to run dora-coordinator")? @@ -357,11 +401,12 @@ fn run() -> eyre::Result<()> { .enable_all() .build() .context("tokio runtime failed")?; + let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); rt.block_on(async { match run_dataflow { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if let Some(coordinator_addr) = coordinator_addr { + if coordinator_addr != SocketAddr::new(localhost, DORA_COORDINATOR_PORT_DEFAULT){ tracing::info!( "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", coordinator_addr @@ -371,12 +416,10 @@ fn run() -> eyre::Result<()> { Daemon::run_dataflow(&dataflow_path).await } None => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| { + if coordinator_addr.ip() == localhost { tracing::info!("Starting in local mode"); - let localhost = Ipv4Addr::new(127, 0, 0, 1); - (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() - }); - Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await + } + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await } } }) @@ -515,10 +558,7 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: IpAddr, + coordinator_addr: SocketAddr, ) -> std::io::Result> { - TcpLayer::new().connect(SocketAddr::new( - coordinator_addr, - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - )) + TcpLayer::new().connect(coordinator_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 4816a01ca..38889edf0 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,11 +1,11 @@ use crate::{check::daemon_running, connect_to_coordinator}; use dora_core::topics::ControlRequest; use eyre::Context; -use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration}; +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: SocketAddr) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, @@ -47,7 +47,7 @@ pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: IpAddr) -> eyre:: pub(crate) fn destroy( config_path: Option<&Path>, - coordinator_addr: IpAddr, + coordinator_addr: SocketAddr, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; match connect_to_coordinator(coordinator_addr) { diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 6f9bcd4e5..32a695cc5 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,7 +1,10 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, + topics::{ + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, + }, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; @@ -38,9 +41,16 @@ async fn main() -> eyre::Result<()> { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT, ); - let (coordinator_port, coordinator) = - dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) - .await?; + let coordinator_control_bind = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ); + let (coordinator_port, coordinator) = dora_coordinator::start( + coordinator_bind, + coordinator_control_bind, + ReceiverStream::new(coordinator_events_rx), + ) + .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); From 02b5c7d4437a1b881530544acd88cfb1d9ff5de7 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 31 May 2024 18:23:16 +0200 Subject: [PATCH 4/5] Remove `coordinator_addr` and `coordinator_port` arguments from `dora up` The `dora up` command always starts a coordinator and daemon in local mode using the default config. For non-standard config and for remote setups, users need to use `dora coordinator` and `dora daemon` directly. --- binaries/cli/src/main.rs | 19 +++---------------- binaries/cli/src/up.rs | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 19 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f3ac54a05..0ddbc3ed5 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -78,17 +78,11 @@ enum Command { #[clap(hide = true, long)] internal_create_with_path_dependencies: bool, }, - /// Spawn a coordinator and a daemon. + /// Spawn coordinator and daemon in local mode (with default config) Up { /// Use a custom configuration #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] config: Option, - /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] - coordinator_addr: IpAddr, - /// Port number of the coordinator control server - #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] - coordinator_port: u16, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { @@ -295,15 +289,8 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { - config, - coordinator_addr, - coordinator_port, - } => { - up::up( - config.as_deref(), - (coordinator_addr, coordinator_port).into(), - )?; + Command::Up { config } => { + up::up(config.as_deref())?; } Command::Logs { dataflow, diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 38889edf0..732bf85c8 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,12 +1,23 @@ use crate::{check::daemon_running, connect_to_coordinator}; -use dora_core::topics::ControlRequest; +use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use eyre::Context; -use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; +use std::{ + fs, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::Path, + process::Command, + time::Duration, +}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: SocketAddr) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; + let coordinator_addr = ( + IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ) + .into(); let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, Err(_) => { From 89b97d75f20407528518f340ed6a989e1f75c7e6 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Fri, 31 May 2024 18:28:11 +0200 Subject: [PATCH 5/5] Add `LOCALHOST` and `LISTEN_WILDCARD` constants to simplify code --- binaries/cli/src/main.rs | 30 +++++++++++++++--------------- binaries/cli/src/up.rs | 16 +++------------- 2 files changed, 18 insertions(+), 28 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 0ddbc3ed5..099aed2e8 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -31,6 +31,9 @@ mod logs; mod template; mod up; +const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + #[derive(Debug, clap::Parser)] #[clap(version)] struct Args { @@ -47,7 +50,7 @@ enum Command { #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] dataflow: Option, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] coordinator_addr: IpAddr, /// Port number of the coordinator control server #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -90,7 +93,7 @@ enum Command { #[clap(long, hide = true)] config: Option, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] coordinator_addr: IpAddr, /// Port number of the coordinator control server #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -105,7 +108,7 @@ enum Command { #[clap(long)] name: Option, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] coordinator_addr: IpAddr, /// Port number of the coordinator control server #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -129,7 +132,7 @@ enum Command { #[arg(value_parser = parse)] grace_duration: Option, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] coordinator_addr: IpAddr, /// Port number of the coordinator control server #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -138,7 +141,7 @@ enum Command { /// List running dataflows. List { /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] coordinator_addr: IpAddr, /// Port number of the coordinator control server #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -156,7 +159,7 @@ enum Command { #[clap(value_name = "NAME")] node: String, /// Address of the dora coordinator - #[clap(long, value_name = "IP", default_value_t = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)))] + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] coordinator_addr: IpAddr, /// Port number of the coordinator control server #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -172,12 +175,10 @@ enum Command { #[clap(long)] machine_id: Option, /// The IP address and port this daemon will bind to. - #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) - )] + #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] addr: SocketAddr, /// Address and port number of the dora coordinator - #[clap(long, default_value_t = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), DORA_COORDINATOR_PORT_DEFAULT))] + #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] coordinator_addr: SocketAddr, #[clap(long, hide = true)] run_dataflow: Option, @@ -187,13 +188,13 @@ enum Command { /// Run coordinator Coordinator { /// Network interface to bind to for daemon communication - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + #[clap(long, default_value_t = LISTEN_WILDCARD)] interface: IpAddr, /// Port number to bind to for daemon communication #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] port: u16, /// Network interface to bind to for control communication - #[clap(long, default_value_t = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))] + #[clap(long, default_value_t = LISTEN_WILDCARD)] control_interface: IpAddr, /// Port number to bind to for control communication #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] @@ -417,12 +418,11 @@ fn run() -> eyre::Result<()> { .enable_all() .build() .context("tokio runtime failed")?; - let localhost = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); rt.block_on(async { match run_dataflow { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if coordinator_addr != SocketAddr::new(localhost, DORA_COORDINATOR_PORT_DEFAULT){ + if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){ tracing::info!( "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", coordinator_addr @@ -432,7 +432,7 @@ fn run() -> eyre::Result<()> { Daemon::run_dataflow(&dataflow_path).await } None => { - if coordinator_addr.ip() == localhost { + if coordinator_addr.ip() == LOCALHOST { tracing::info!("Starting in local mode"); } Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index 732bf85c8..66b2bd20d 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,23 +1,13 @@ -use crate::{check::daemon_running, connect_to_coordinator}; +use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use eyre::Context; -use std::{ - fs, - net::{IpAddr, Ipv4Addr, SocketAddr}, - path::Path, - process::Command, - time::Duration, -}; +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; - let coordinator_addr = ( - IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), - DORA_COORDINATOR_PORT_CONTROL_DEFAULT, - ) - .into(); + let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into(); let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, Err(_) => {