Skip to content

Commit

Permalink
Merge pull request #520 from dora-rs/improve-coordinator-port-config
Browse files Browse the repository at this point in the history
Improve coordinator port config
  • Loading branch information
haixuanTao authored Jun 4, 2024
2 parents d89ba3b + 89b97d7 commit d4ff586
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 113 deletions.
4 changes: 2 additions & 2 deletions binaries/cli/src/check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@ use dora_core::topics::{ControlRequest, ControlRequestReply};
use eyre::{bail, Context};
use std::{
io::{IsTerminal, Write},
net::IpAddr,
net::SocketAddr,
};
use termcolor::{Color, ColorChoice, ColorSpec, WriteColor};

pub fn check_environment(coordinator_addr: Option<IpAddr>) -> eyre::Result<()> {
pub fn check_environment(coordinator_addr: SocketAddr) -> eyre::Result<()> {
let mut error_occured = false;

let color_choice = if std::io::stdout().is_terminal() {
Expand Down
155 changes: 96 additions & 59 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use dora_coordinator::Event;
use dora_core::{
descriptor::Descriptor,
topics::{
control_socket_addr, ControlRequest, ControlRequestReply, DataflowId,
DORA_COORDINATOR_PORT_CONTROL, DORA_COORDINATOR_PORT_DEFAULT,
ControlRequest, ControlRequestReply, DataflowId, DORA_COORDINATOR_PORT_CONTROL_DEFAULT,
DORA_COORDINATOR_PORT_DEFAULT,
},
};
use dora_daemon::Daemon;
Expand All @@ -31,6 +31,9 @@ mod logs;
mod template;
mod up;

const LOCALHOST: IpAddr = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1));
const LISTEN_WILDCARD: IpAddr = IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0));

#[derive(Debug, clap::Parser)]
#[clap(version)]
struct Args {
Expand All @@ -46,8 +49,12 @@ enum Command {
/// Path to the dataflow descriptor file (enables additional checks)
#[clap(long, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
dataflow: Option<PathBuf>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Generate a visualization of the given graph using mermaid.js. Use --open to open browser.
Graph {
Expand All @@ -74,23 +81,23 @@ enum Command {
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
/// Spawn a coordinator and a daemon.
/// Spawn coordinator and daemon in local mode (with default config)
Up {
/// Use a custom configuration
#[clap(long, hide = true, value_name = "PATH", value_hint = clap::ValueHint::FilePath)]
config: Option<PathBuf>,
// TODO
#[clap(long)]
coordinator_addr: Option<IpAddr>,
},
/// Destroy running coordinator and daemon. If some dataflows are still running, they will be stopped first.
Destroy {
/// Use a custom configuration
#[clap(long, hide = true)]
config: Option<PathBuf>,
// TODO
#[clap(long)]
coordinator_addr: Option<IpAddr>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// Start the given dataflow path. Attach a name to the running dataflow by using --name.
Start {
Expand All @@ -100,8 +107,12 @@ enum Command {
/// Assign a name to the dataflow
#[clap(long)]
name: Option<String>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
/// Attach to the dataflow and wait for its completion
#[clap(long, action)]
attach: bool,
Expand All @@ -120,13 +131,21 @@ enum Command {
#[clap(long, value_name = "DURATION")]
#[arg(value_parser = parse)]
grace_duration: Option<Duration>,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
/// List running dataflows.
List {
#[clap(long)]
coordinator_addr: Option<IpAddr>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Planned for future releases:
// Dashboard,
Expand All @@ -139,8 +158,12 @@ enum Command {
/// Show logs for the given node
#[clap(value_name = "NAME")]
node: String,
#[clap(long)]
coordinator_addr: Option<IpAddr>,
/// Address of the dora coordinator
#[clap(long, value_name = "IP", default_value_t = LOCALHOST)]
coordinator_addr: IpAddr,
/// Port number of the coordinator control server
#[clap(long, value_name = "PORT", default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
coordinator_port: u16,
},
// Metrics,
// Stats,
Expand All @@ -152,24 +175,30 @@ enum Command {
#[clap(long)]
machine_id: Option<String>,
/// The IP address and port this daemon will bind to.
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), 0)
)]
#[clap(long, default_value_t = SocketAddr::new(LISTEN_WILDCARD, 0))]
addr: SocketAddr,
#[clap(long)]
coordinator_addr: Option<SocketAddr>,

/// Address and port number of the dora coordinator
#[clap(long, default_value_t = SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT))]
coordinator_addr: SocketAddr,
#[clap(long, hide = true)]
run_dataflow: Option<PathBuf>,
},
/// Run runtime
Runtime,
/// Run coordinator
Coordinator {
#[clap(long, default_value_t = SocketAddr::new(
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)), DORA_COORDINATOR_PORT_DEFAULT)
)]
addr: SocketAddr,
/// Network interface to bind to for daemon communication
#[clap(long, default_value_t = LISTEN_WILDCARD)]
interface: IpAddr,
/// Port number to bind to for daemon communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_DEFAULT)]
port: u16,
/// Network interface to bind to for control communication
#[clap(long, default_value_t = LISTEN_WILDCARD)]
control_interface: IpAddr,
/// Port number to bind to for control communication
#[clap(long, default_value_t = DORA_COORDINATOR_PORT_CONTROL_DEFAULT)]
control_port: u16,
},
}

Expand Down Expand Up @@ -233,6 +262,7 @@ fn run() -> eyre::Result<()> {
Command::Check {
dataflow,
coordinator_addr,
coordinator_port,
} => match dataflow {
Some(dataflow) => {
let working_dir = dataflow
Expand All @@ -242,9 +272,9 @@ fn run() -> eyre::Result<()> {
.ok_or_else(|| eyre::eyre!("dataflow path has no parent dir"))?
.to_owned();
Descriptor::blocking_read(&dataflow)?.check(&working_dir)?;
check::check_environment(coordinator_addr)?
check::check_environment((coordinator_addr, coordinator_port).into())?
}
None => check::check_environment(coordinator_addr)?,
None => check::check_environment((coordinator_addr, coordinator_port).into())?,
},
Command::Graph {
dataflow,
Expand All @@ -260,18 +290,16 @@ fn run() -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Up {
config,
coordinator_addr,
} => {
up::up(config.as_deref(), coordinator_addr)?;
Command::Up { config } => {
up::up(config.as_deref())?;
}
Command::Logs {
dataflow,
node,
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator(coordinator_addr)
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let uuids = query_running_dataflows(&mut *session)
.wrap_err("failed to query running dataflows")?;
Expand All @@ -292,6 +320,7 @@ fn run() -> eyre::Result<()> {
dataflow,
name,
coordinator_addr,
coordinator_port,
attach,
hot_reload,
} => {
Expand All @@ -307,7 +336,7 @@ fn run() -> eyre::Result<()> {
.check(&working_dir)
.wrap_err("Could not validate yaml")?;

let mut session = connect_to_coordinator(coordinator_addr)
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("failed to connect to dora coordinator")?;
let dataflow_id = start_dataflow(
dataflow_descriptor.clone(),
Expand All @@ -326,7 +355,10 @@ fn run() -> eyre::Result<()> {
)?
}
}
Command::List { coordinator_addr } => match connect_to_coordinator(coordinator_addr) {
Command::List {
coordinator_addr,
coordinator_port,
} => match connect_to_coordinator((coordinator_addr, coordinator_port).into()) {
Ok(mut session) => list(&mut *session)?,
Err(_) => {
bail!("No dora coordinator seems to be running.");
Expand All @@ -337,8 +369,9 @@ fn run() -> eyre::Result<()> {
name,
grace_duration,
coordinator_addr,
coordinator_port,
} => {
let mut session = connect_to_coordinator(coordinator_addr)
let mut session = connect_to_coordinator((coordinator_addr, coordinator_port).into())
.wrap_err("could not connect to dora coordinator")?;
match (uuid, name) {
(Some(uuid), _) => stop_dataflow(uuid, grace_duration, &mut *session)?,
Expand All @@ -349,15 +382,28 @@ fn run() -> eyre::Result<()> {
Command::Destroy {
config,
coordinator_addr,
} => up::destroy(config.as_deref(), coordinator_addr)?,
Command::Coordinator { addr } => {
coordinator_port,
} => up::destroy(
config.as_deref(),
(coordinator_addr, coordinator_port).into(),
)?,
Command::Coordinator {
interface,
port,
control_interface,
control_port,
} => {
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
rt.block_on(async {
let (_port, task) =
dora_coordinator::start(addr, futures::stream::empty::<Event>()).await?;
let bind = SocketAddr::new(interface, port);
let bind_control = SocketAddr::new(control_interface, control_port);
let (port, task) =
dora_coordinator::start(bind, bind_control, futures::stream::empty::<Event>())
.await?;
println!("Listening for incoming daemon connection on {port}");
task.await
})
.context("failed to run dora-coordinator")?
Expand All @@ -376,7 +422,7 @@ fn run() -> eyre::Result<()> {
match run_dataflow {
Some(dataflow_path) => {
tracing::info!("Starting dataflow `{}`", dataflow_path.display());
if let Some(coordinator_addr) = coordinator_addr {
if coordinator_addr != SocketAddr::new(LOCALHOST, DORA_COORDINATOR_PORT_DEFAULT){
tracing::info!(
"Not using coordinator addr {} as `run_dataflow` is for local dataflow only. Please use the `start` command for remote coordinator",
coordinator_addr
Expand All @@ -386,12 +432,10 @@ fn run() -> eyre::Result<()> {
Daemon::run_dataflow(&dataflow_path).await
}
None => {
let coordination_addr = coordinator_addr.unwrap_or_else(|| {
if coordinator_addr.ip() == LOCALHOST {
tracing::info!("Starting in local mode");
let localhost = Ipv4Addr::new(127, 0, 0, 1);
(localhost, DORA_COORDINATOR_PORT_DEFAULT).into()
});
Daemon::run(coordination_addr, machine_id.unwrap_or_default(), addr).await
}
Daemon::run(coordinator_addr, machine_id.unwrap_or_default(), addr).await
}
}
})
Expand Down Expand Up @@ -530,14 +574,7 @@ fn query_running_dataflows(
}

fn connect_to_coordinator(
coordinator_addr: Option<IpAddr>,
coordinator_addr: SocketAddr,
) -> std::io::Result<Box<TcpRequestReplyConnection>> {
if let Some(coordinator_addr) = coordinator_addr {
TcpLayer::new().connect(SocketAddr::new(
coordinator_addr,
DORA_COORDINATOR_PORT_CONTROL,
))
} else {
TcpLayer::new().connect(control_socket_addr())
}
TcpLayer::new().connect(coordinator_addr)
}
11 changes: 6 additions & 5 deletions binaries/cli/src/up.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use crate::{check::daemon_running, connect_to_coordinator};
use dora_core::topics::ControlRequest;
use crate::{check::daemon_running, connect_to_coordinator, LOCALHOST};
use dora_core::topics::{ControlRequest, DORA_COORDINATOR_PORT_CONTROL_DEFAULT};
use eyre::Context;
use std::{fs, net::IpAddr, path::Path, process::Command, time::Duration};
use std::{fs, net::SocketAddr, path::Path, process::Command, time::Duration};
#[derive(Debug, Default, serde::Serialize, serde::Deserialize)]
struct UpConfig {}

pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option<IpAddr>) -> eyre::Result<()> {
pub(crate) fn up(config_path: Option<&Path>) -> eyre::Result<()> {
let UpConfig {} = parse_dora_config(config_path)?;
let coordinator_addr = (LOCALHOST, DORA_COORDINATOR_PORT_CONTROL_DEFAULT).into();
let mut session = match connect_to_coordinator(coordinator_addr) {
Ok(session) => session,
Err(_) => {
Expand Down Expand Up @@ -47,7 +48,7 @@ pub(crate) fn up(config_path: Option<&Path>, coordinator_addr: Option<IpAddr>) -

pub(crate) fn destroy(
config_path: Option<&Path>,
coordinator_addr: Option<IpAddr>,
coordinator_addr: SocketAddr,
) -> Result<(), eyre::ErrReport> {
let UpConfig {} = parse_dora_config(config_path)?;
match connect_to_coordinator(coordinator_addr) {
Expand Down
Loading

0 comments on commit d4ff586

Please sign in to comment.