Skip to content

Commit

Permalink
Implement dora run command
Browse files Browse the repository at this point in the history
Runs a dataflow locally, without requiring any any daemon or coordinator processes.

This exposes the internal `dora daemon --run-dataflow` command that we use for testing.

This addition was proposed in https://github.com/orgs/dora-rs/discussions/698#discussioncomment-11125465 .
  • Loading branch information
phil-opp committed Nov 6, 2024
1 parent 1d33d0e commit b1b0e46
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 10 deletions.
39 changes: 36 additions & 3 deletions binaries/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use dora_message::{
};
#[cfg(feature = "tracing")]
use dora_tracing::set_up_tracing;
use dora_tracing::set_up_tracing_opts;
use dora_tracing::{set_up_tracing_opts, FileLogging};
use duration_str::parse;
use eyre::{bail, Context};
use formatting::FormatDataflowError;
Expand All @@ -30,6 +30,7 @@ use std::{
};
use tabwriter::TabWriter;
use tokio::runtime::Builder;
use tracing::level_filters::LevelFilter;
use uuid::Uuid;

mod attach;
Expand Down Expand Up @@ -91,6 +92,15 @@ enum Command {
#[clap(hide = true, long)]
internal_create_with_path_dependencies: bool,
},
/// Run a dataflow locally.
///
/// Directly runs the given dataflow without connecting to a dora
/// coordinator or daemon. The dataflow is executed on the local machine.
Run {
/// Path to the dataflow descriptor file
#[clap(value_name = "PATH")]
dataflow: String,
},
/// Spawn coordinator and daemon in local mode (with default config)
Up {
/// Use a custom configuration
Expand Down Expand Up @@ -277,15 +287,29 @@ fn run() -> eyre::Result<()> {
.as_ref()
.map(|id| format!("{name}-{id}"))
.unwrap_or(name.to_string());
set_up_tracing_opts(name, !quiet, Some(&filename))
let stdout = (!quiet).then_some(LevelFilter::WARN);
let file = Some(FileLogging {
file_name: filename,
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
}
Command::Runtime => {
// Do not set the runtime in the cli.
}
Command::Coordinator { quiet, .. } => {
let name = "dora-coordinator";
set_up_tracing_opts(name, !quiet, Some(name))
let stdout = (!quiet).then_some(LevelFilter::WARN);
let file = Some(FileLogging {
file_name: name.to_owned(),
filter: LevelFilter::INFO,
});
set_up_tracing_opts(name, stdout, file)
.context("failed to set up tracing subscriber")?;
}
Command::Run { .. } => {
set_up_tracing_opts("run", Some(LevelFilter::INFO), None)
.context("failed to set up tracing subscriber")?;
}
_ => {
Expand Down Expand Up @@ -331,6 +355,15 @@ fn run() -> eyre::Result<()> {
args,
internal_create_with_path_dependencies,
} => template::create(args, internal_create_with_path_dependencies)?,
Command::Run { dataflow } => {
let dataflow_path = resolve_dataflow(dataflow).context("could not resolve dataflow")?;
let rt = Builder::new_multi_thread()
.enable_all()
.build()
.context("tokio runtime failed")?;
let result = rt.block_on(Daemon::run_dataflow(&dataflow_path))?;
handle_dataflow_result(result, None)?
}
Command::Up { config } => {
up::up(config.as_deref())?;
}
Expand Down
24 changes: 17 additions & 7 deletions libraries/extensions/telemetry/tracing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,25 +16,35 @@ use tracing_subscriber::Registry;
pub mod telemetry;

pub fn set_up_tracing(name: &str) -> eyre::Result<()> {
set_up_tracing_opts(name, true, None)
set_up_tracing_opts(name, Some(LevelFilter::WARN), None)
}

pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) -> eyre::Result<()> {
pub struct FileLogging {
pub file_name: String,
pub filter: LevelFilter,
}

pub fn set_up_tracing_opts(
name: &str,
stdout: Option<LevelFilter>,
file: Option<FileLogging>,
) -> eyre::Result<()> {
let mut layers = Vec::new();

if stdout {
if let Some(level) = stdout {
// Filter log using `RUST_LOG`. More useful for CLI.
let env_filter = EnvFilter::from_default_env().or(LevelFilter::WARN);
let env_filter = EnvFilter::from_default_env().or(level);
let layer = tracing_subscriber::fmt::layer()
.compact()
.with_filter(env_filter);
layers.push(layer.boxed());
}

if let Some(filename) = filename {
if let Some(file) = file {
let FileLogging { file_name, filter } = file;
let out_dir = Path::new("out");
std::fs::create_dir_all(out_dir).context("failed to create `out` directory")?;
let path = out_dir.join(filename).with_extension("txt");
let path = out_dir.join(file_name).with_extension("txt");
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
Expand All @@ -44,7 +54,7 @@ pub fn set_up_tracing_opts(name: &str, stdout: bool, filename: Option<&str>) ->
let layer = tracing_subscriber::fmt::layer()
.with_ansi(false)
.with_writer(file)
.with_filter(LevelFilter::INFO);
.with_filter(filter);
layers.push(layer.boxed());
}

Expand Down

0 comments on commit b1b0e46

Please sign in to comment.