From b1b0e4688c87a3fbe5e922965b45cbf523be0ff0 Mon Sep 17 00:00:00 2001 From: Philipp Oppermann Date: Wed, 6 Nov 2024 13:23:49 +0100 Subject: [PATCH] Implement `dora run` command Runs a dataflow locally, without requiring any any daemon or coordinator processes. This exposes the internal `dora daemon --run-dataflow` command that we use for testing. This addition was proposed in https://github.com/orgs/dora-rs/discussions/698#discussioncomment-11125465 . --- binaries/cli/src/main.rs | 39 +++++++++++++++++-- .../extensions/telemetry/tracing/src/lib.rs | 24 ++++++++---- 2 files changed, 53 insertions(+), 10 deletions(-) diff --git a/binaries/cli/src/main.rs b/binaries/cli/src/main.rs index f6197724e..8683b8cf9 100644 --- a/binaries/cli/src/main.rs +++ b/binaries/cli/src/main.rs @@ -18,7 +18,7 @@ use dora_message::{ }; #[cfg(feature = "tracing")] use dora_tracing::set_up_tracing; -use dora_tracing::set_up_tracing_opts; +use dora_tracing::{set_up_tracing_opts, FileLogging}; use duration_str::parse; use eyre::{bail, Context}; use formatting::FormatDataflowError; @@ -30,6 +30,7 @@ use std::{ }; use tabwriter::TabWriter; use tokio::runtime::Builder; +use tracing::level_filters::LevelFilter; use uuid::Uuid; mod attach; @@ -91,6 +92,15 @@ enum Command { #[clap(hide = true, long)] internal_create_with_path_dependencies: bool, }, + /// Run a dataflow locally. + /// + /// Directly runs the given dataflow without connecting to a dora + /// coordinator or daemon. The dataflow is executed on the local machine. + Run { + /// Path to the dataflow descriptor file + #[clap(value_name = "PATH")] + dataflow: String, + }, /// Spawn coordinator and daemon in local mode (with default config) Up { /// Use a custom configuration @@ -277,7 +287,12 @@ fn run() -> eyre::Result<()> { .as_ref() .map(|id| format!("{name}-{id}")) .unwrap_or(name.to_string()); - set_up_tracing_opts(name, !quiet, Some(&filename)) + let stdout = (!quiet).then_some(LevelFilter::WARN); + let file = Some(FileLogging { + file_name: filename, + filter: LevelFilter::INFO, + }); + set_up_tracing_opts(name, stdout, file) .context("failed to set up tracing subscriber")?; } Command::Runtime => { @@ -285,7 +300,16 @@ fn run() -> eyre::Result<()> { } Command::Coordinator { quiet, .. } => { let name = "dora-coordinator"; - set_up_tracing_opts(name, !quiet, Some(name)) + let stdout = (!quiet).then_some(LevelFilter::WARN); + let file = Some(FileLogging { + file_name: name.to_owned(), + filter: LevelFilter::INFO, + }); + set_up_tracing_opts(name, stdout, file) + .context("failed to set up tracing subscriber")?; + } + Command::Run { .. } => { + set_up_tracing_opts("run", Some(LevelFilter::INFO), None) .context("failed to set up tracing subscriber")?; } _ => { @@ -331,6 +355,15 @@ fn run() -> eyre::Result<()> { args, internal_create_with_path_dependencies, } => template::create(args, internal_create_with_path_dependencies)?, + Command::Run { dataflow } => { + let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?; + let rt = Builder::new_multi_thread() + .enable_all() + .build() + .context("tokio runtime failed")?; + let result = rt.block_on(Daemon::run_dataflow(&dataflow_path))?; + handle_dataflow_result(result, None)? + } Command::Up { config } => { up::up(config.as_deref())?; } diff --git a/libraries/extensions/telemetry/tracing/src/lib.rs b/libraries/extensions/telemetry/tracing/src/lib.rs index 10c277237..48fa6820f 100644 --- a/libraries/extensions/telemetry/tracing/src/lib.rs +++ b/libraries/extensions/telemetry/tracing/src/lib.rs @@ -16,25 +16,35 @@ use tracing_subscriber::Registry; pub mod telemetry; pub fn set_up_tracing(name: &str) -> eyre::Result<()> { - set_up_tracing_opts(name, true, None) + set_up_tracing_opts(name, Some(LevelFilter::WARN), None) } -pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> { +pub struct FileLogging { + pub file_name: String, + pub filter: LevelFilter, +} + +pub fn set_up_tracing_opts( + name: &str, + stdout: Option, + file: Option, +) -> eyre::Result<()> { let mut layers = Vec::new(); - if stdout { + if let Some(level) = stdout { // Filter log using `RUST_LOG`. More useful for CLI. - let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN); + let env_filter = EnvFilter::from_default_env().or(level); let layer = tracing_subscriber::fmt::layer() .compact() .with_filter(env_filter); layers.push(layer.boxed()); } - if let Some(filename) = filename { + if let Some(file) = file { + let FileLogging { file_name, filter } = file; let out_dir = Path::new("out"); std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?; - let path = out_dir.join(filename).with_extension("txt"); + let path = out_dir.join(file_name).with_extension("txt"); let file = std::fs::OpenOptions::new() .create(true) .append(true) @@ -44,7 +54,7 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> let layer = tracing_subscriber::fmt::layer() .with_ansi(false) .with_writer(file) - .with_filter(LevelFilter::INFO); + .with_filter(filter); layers.push(layer.boxed()); }