diff --git a/binaries/cli/src/check.rs b/binaries/cli/src/check.rs index 277dd002c..14a3cd359 100644 --- a/binaries/cli/src/check.rs +++ b/binaries/cli/src/check.rs @@ -4,11 +4,11 @@ use dora_core::topics::{ControlRequest, ControlRequestReply}; use eyre::{bail, Context}; use std::{ io::{IsTerminal, Write}, - net::IpAddr, + net::SocketAddr, }; use termcolor::{Color, ColorChoice, ColorSpec, WriteColor}; -pub fn check_environment(coordinator_addr: Option) -> eyre::Result<()> { +pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> { let mut error_occured = false; let color_choice = if std::io::stdout().is_terminal() { diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index 2fb20c9a4..099aed2e8 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -5,8 +5,8 @@ use dora_coordinator::Event; use dora_core::{ descriptor::Descriptor, topics::{ - control_socket_addr, ControlRequest, ControlRequestReply, DataflowId, - DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT, + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, }, }; use dora_daemon::Daemon; @@ -31,6 +31,9 @@ mod logs; mod template; mod up; +const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); +const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)); + #[derive(Debug, clap::Parser)] #[clap(version)] struct Args { @@ -46,8 +49,12 @@ enum Command { /// Path to the dataflow descriptor file (enables additional checks) #[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] dataflow: Option, - #[clap(long)] - coordinator_addr: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Generate a visualization of the given graph using mermaid.js. Use --open to open browser. Graph { @@ -74,23 +81,23 @@ enum Command { #[clap(hide = true, long)] internal_create_with_path_dependencies: bool, }, - /// Spawn a coordinator and a daemon. + /// Spawn coordinator and daemon in local mode (with default config) Up { /// Use a custom configuration #[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)] config: Option, - // TODO - #[clap(long)] - coordinator_addr: Option, }, /// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first. Destroy { /// Use a custom configuration #[clap(long, hide = true)] config: Option, - // TODO - #[clap(long)] - coordinator_addr: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// Start the given dataflow path. Attach a name to the running dataflow by using --name. Start { @@ -100,8 +107,12 @@ enum Command { /// Assign a name to the dataflow #[clap(long)] name: Option, - #[clap(long)] - coordinator_addr: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, /// Attach to the dataflow and wait for its completion #[clap(long, action)] attach: bool, @@ -120,13 +131,21 @@ enum Command { #[clap(long, value_name = "DURATION")] #[arg(value_parser = parse)] grace_duration: Option, - #[clap(long)] - coordinator_addr: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, /// List running dataflows. List { - #[clap(long)] - coordinator_addr: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, // Planned for future releases: // Dashboard, @@ -139,8 +158,12 @@ enum Command { /// Show logs for the given node #[clap(value_name = "NAME")] node: String, - #[clap(long)] - coordinator_addr: Option, + /// Address of the dora coordinator + #[clap(long, value_name = "IP", default_value_t = LOCALHOST)] + coordinator_addr: IpAddr, + /// Port number of the coordinator control server + #[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + coordinator_port: u16, }, // Metrics, // Stats, @@ -152,13 +175,11 @@ enum Command { #[clap(long)] machine_id: Option, /// The IP address and port this daemon will bind to. - #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0) - )] + #[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))] addr: SocketAddr, - #[clap(long)] - coordinator_addr: Option, - + /// Address and port number of the dora coordinator + #[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))] + coordinator_addr: SocketAddr, #[clap(long, hide = true)] run_dataflow: Option, }, @@ -166,10 +187,18 @@ enum Command { Runtime, /// Run coordinator Coordinator { - #[clap(long, default_value_t = SocketAddr::new( - IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT) - )] - addr: SocketAddr, + /// Network interface to bind to for daemon communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + interface: IpAddr, + /// Port number to bind to for daemon communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)] + port: u16, + /// Network interface to bind to for control communication + #[clap(long, default_value_t = LISTEN_WILDCARD)] + control_interface: IpAddr, + /// Port number to bind to for control communication + #[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)] + control_port: u16, }, } @@ -233,6 +262,7 @@ fn run() -> eyre::Result<()> { Command::Check { dataflow, coordinator_addr, + coordinator_port, } => match dataflow { Some(dataflow) => { let working_dir = dataflow @@ -242,9 +272,9 @@ fn run() -> eyre::Result<()> { .ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))? .to_owned(); Descriptor::blocking_read(&dataflow)?.check(&working_dir)?; - check::check_environment(coordinator_addr)? + check::check_environment((coordinator_addr, coordinator_port).into())? } - None => check::check_environment(coordinator_addr)?, + None => check::check_environment((coordinator_addr, coordinator_port).into())?, }, Command::Graph { dataflow, @@ -260,18 +290,16 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, - Command::Up { - config, - coordinator_addr, - } => { - up::up(config.as_deref(), coordinator_addr)?; + Command::Up { config } => { + up::up(config.as_deref())?; } Command::Logs { dataflow, node, coordinator_addr, + coordinator_port, } => { - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; let uuids = query_running_dataflows(&mut *session) .wrap_err("failed to query running dataflows")?; @@ -292,6 +320,7 @@ fn run() -> eyre::Result<()> { dataflow, name, coordinator_addr, + coordinator_port, attach, hot_reload, } => { @@ -307,7 +336,7 @@ fn run() -> eyre::Result<()> { .check(&working_dir) .wrap_err("Could not validate yaml")?; - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("failed to connect to dora coordinator")?; let dataflow_id = start_dataflow( dataflow_descriptor.clone(), @@ -326,7 +355,10 @@ fn run() -> eyre::Result<()> { )? } } - Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) { + Command::List { + coordinator_addr, + coordinator_port, + } => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) { Ok(mut session) => list(&mut *session)?, Err(_) => { bail!("No dora coordinator seems to be running."); @@ -337,8 +369,9 @@ fn run() -> eyre::Result<()> { name, grace_duration, coordinator_addr, + coordinator_port, } => { - let mut session = connect_to_coordinator(coordinator_addr) + let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into()) .wrap_err("could not connect to dora coordinator")?; match (uuid, name) { (Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?, @@ -349,15 +382,28 @@ fn run() -> eyre::Result<()> { Command::Destroy { config, coordinator_addr, - } => up::destroy(config.as_deref(), coordinator_addr)?, - Command::Coordinator { addr } => { + coordinator_port, + } => up::destroy( + config.as_deref(), + (coordinator_addr, coordinator_port).into(), + )?, + Command::Coordinator { + interface, + port, + control_interface, + control_port, + } => { let rt = Builder::new_multi_thread() .enable_all() .build() .context("tokio runtime failed")?; rt.block_on(async { - let (_port, task) = - dora_coordinator::start(addr, futures::stream::empty::()).await?; + let bind = SocketAddr::new(interface, port); + let bind_control = SocketAddr::new(control_interface, control_port); + let (port, task) = + dora_coordinator::start(bind, bind_control, futures::stream::empty::()) + .await?; + println!("Listening for incoming daemon connection on {port}"); task.await }) .context("failed to run dora-coordinator")? @@ -376,7 +422,7 @@ fn run() -> eyre::Result<()> { match run_dataflow { Some(dataflow_path) => { tracing::info!("Starting dataflow `{}`", dataflow_path.display()); - if let Some(coordinator_addr) = coordinator_addr { + if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){ tracing::info!( "Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator", coordinator_addr @@ -386,12 +432,10 @@ fn run() -> eyre::Result<()> { Daemon::run_dataflow(&dataflow_path).await } None => { - let coordination_addr = coordinator_addr.unwrap_or_else(|| { + if coordinator_addr.ip() == LOCALHOST { tracing::info!("Starting in local mode"); - let localhost = Ipv4Addr::new(127, 0, 0, 1); - (localhost, DORA_COORDINATOR_PORT_DEFAULT).into() - }); - Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await + } + Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await } } }) @@ -530,14 +574,7 @@ fn query_running_dataflows( } fn connect_to_coordinator( - coordinator_addr: Option, + coordinator_addr: SocketAddr, ) -> std::io::Result> { - if let Some(coordinator_addr) = coordinator_addr { - TcpLayer::new().connect(SocketAddr::new( - coordinator_addr, - DORA_COORDINATOR_PORT_CONTROL, - )) - } else { - TcpLayer::new().connect(control_socket_addr()) - } + TcpLayer::new().connect(coordinator_addr) } diff --git a/binaries/cli/src/up.rs b/binaries/cli/src/up.rs index b404eaa73..66b2bd20d 100644 --- a/binaries/cli/src/up.rs +++ b/binaries/cli/src/up.rs @@ -1,12 +1,13 @@ -use crate::{check::daemon_running, connect_to_coordinator}; -use dora_core::topics::ControlRequest; +use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST}; +use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT}; use eyre::Context; -use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration}; +use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration}; #[derive(Debug, Default, serde::Serialize, serde::Deserialize)] struct UpConfig {} -pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option) -> eyre::Result<()> { +pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> { let UpConfig {} = parse_dora_config(config_path)?; + let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into(); let mut session = match connect_to_coordinator(coordinator_addr) { Ok(session) => session, Err(_) => { @@ -47,7 +48,7 @@ pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option) - pub(crate) fn destroy( config_path: Option<&Path>, - coordinator_addr: Option, + coordinator_addr: SocketAddr, ) -> Result<(), eyre::ErrReport> { let UpConfig {} = parse_dora_config(config_path)?; match connect_to_coordinator(coordinator_addr) { diff --git a/binaries/coordinator/src/lib.rs b/binaries/coordinator/src/lib.rs index 0a30ac103..5032d6a5f 100644 --- a/binaries/coordinator/src/lib.rs +++ b/binaries/coordinator/src/lib.rs @@ -9,7 +9,7 @@ use dora_core::{ daemon_messages::{DaemonCoordinatorEvent, DaemonCoordinatorReply, Timestamped}, descriptor::{Descriptor, ResolvedNode}, message::uhlc::{self, HLC}, - topics::{control_socket_addr, ControlRequest, ControlRequestReply, DataflowId}, + topics::{ControlRequest, ControlRequestReply, DataflowId}, }; use eyre::{bail, eyre, ContextCompat, WrapErr}; use futures::{stream::FuturesUnordered, Future, Stream, StreamExt}; @@ -22,11 +22,7 @@ use std::{ sync::Arc, time::{Duration, Instant}, }; -use tokio::{ - net::{TcpListener, TcpStream}, - sync::mpsc, - task::JoinHandle, -}; +use tokio::{net::TcpStream, sync::mpsc, task::JoinHandle}; use tokio_stream::wrappers::{ReceiverStream, TcpListenerStream}; use uuid::Uuid; @@ -37,6 +33,7 @@ mod tcp_utils; pub async fn start( bind: SocketAddr, + bind_control: SocketAddr, external_events: impl Stream + Unpin, ) -> Result<(u16, impl Future>), eyre::ErrReport> { let listener = listener::create_listener(bind).await?; @@ -44,13 +41,30 @@ pub async fn start( .local_addr() .wrap_err("failed to get local addr of listener")? .port(); + let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { + c.map(Event::NewDaemonConnection) + .wrap_err("failed to open connection") + .unwrap_or_else(Event::DaemonConnectError) + }); + let mut tasks = FuturesUnordered::new(); + let control_events = control::control_events(bind_control, &tasks) + .await + .wrap_err("failed to create control events")?; // Setup ctrl-c handler let ctrlc_events = set_up_ctrlc_handler()?; + let events = ( + external_events, + new_daemon_connections, + control_events, + ctrlc_events, + ) + .merge(); + let future = async move { - start_inner(listener, &tasks, (ctrlc_events, external_events).merge()).await?; + start_inner(events, &tasks).await?; tracing::debug!("coordinator main loop finished, waiting on spawned tasks"); while let Some(join_result) = tasks.next().await { @@ -100,40 +114,22 @@ fn resolve_name( } async fn start_inner( - listener: TcpListener, + events: impl Stream + Unpin, tasks: &FuturesUnordered>, - external_events: impl Stream + Unpin, ) -> eyre::Result<()> { let clock = Arc::new(HLC::default()); - let new_daemon_connections = TcpListenerStream::new(listener).map(|c| { - c.map(Event::NewDaemonConnection) - .wrap_err("failed to open connection") - .unwrap_or_else(Event::DaemonConnectError) - }); - let (daemon_events_tx, daemon_events) = tokio::sync::mpsc::channel(2); let mut daemon_events_tx = Some(daemon_events_tx); let daemon_events = ReceiverStream::new(daemon_events); - let control_events = control::control_events(control_socket_addr(), tasks) - .await - .wrap_err("failed to create control events")?; - let daemon_heartbeat_interval = tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(Duration::from_secs(3))) .map(|_| Event::DaemonHeartbeatInterval); // events that should be aborted on `dora destroy` - let (abortable_events, abort_handle) = futures::stream::abortable( - ( - control_events, - new_daemon_connections, - external_events, - daemon_heartbeat_interval, - ) - .merge(), - ); + let (abortable_events, abort_handle) = + futures::stream::abortable((events, daemon_heartbeat_interval).merge()); let mut events = (abortable_events, daemon_events).merge(); diff --git a/examples/multiple-daemons/run.rs b/examples/multiple-daemons/run.rs index 6f9bcd4e5..32a695cc5 100644 --- a/examples/multiple-daemons/run.rs +++ b/examples/multiple-daemons/run.rs @@ -1,7 +1,10 @@ use dora_coordinator::{ControlEvent, Event}; use dora_core::{ descriptor::Descriptor, - topics::{ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_DEFAULT}, + topics::{ + ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + DORA_COORDINATOR_PORT_DEFAULT, + }, }; use dora_tracing::set_up_tracing; use eyre::{bail, Context}; @@ -38,9 +41,16 @@ async fn main() -> eyre::Result<()> { IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT, ); - let (coordinator_port, coordinator) = - dora_coordinator::start(coordinator_bind, ReceiverStream::new(coordinator_events_rx)) - .await?; + let coordinator_control_bind = SocketAddr::new( + IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), + DORA_COORDINATOR_PORT_CONTROL_DEFAULT, + ); + let (coordinator_port, coordinator) = dora_coordinator::start( + coordinator_bind, + coordinator_control_bind, + ReceiverStream::new(coordinator_events_rx), + ) + .await?; let coordinator_addr = SocketAddr::new(Ipv4Addr::LOCALHOST.into(), coordinator_port); let daemon_a = run_daemon(coordinator_addr.to_string(), "A"); let daemon_b = run_daemon(coordinator_addr.to_string(), "B"); diff --git a/libraries/core/src/topics.rs b/libraries/core/src/topics.rs index 69ad75a5c..506c1b420 100644 --- a/libraries/core/src/topics.rs +++ b/libraries/core/src/topics.rs @@ -1,10 +1,4 @@ -use std::{ - collections::BTreeSet, - fmt::Display, - net::{Ipv4Addr, SocketAddr}, - path::PathBuf, - time::Duration, -}; +use std::{collections::BTreeSet, fmt::Display, path::PathBuf, time::Duration}; use uuid::Uuid; use crate::{ @@ -13,17 +7,10 @@ use crate::{ }; pub const DORA_COORDINATOR_PORT_DEFAULT: u16 = 0xD02A; -pub const DORA_COORDINATOR_PORT_CONTROL: u16 = 0x177C; +pub const DORA_COORDINATOR_PORT_CONTROL_DEFAULT: u16 = 0x177C; pub const MANUAL_STOP: &str = "dora/stop"; -pub fn control_socket_addr() -> SocketAddr { - SocketAddr::new( - Ipv4Addr::new(127, 0, 0, 1).into(), - DORA_COORDINATOR_PORT_CONTROL, - ) -} - #[derive(Debug, serde::Deserialize, serde::Serialize)] pub enum ControlRequest { Start {