From 914c6040ae88bf0ddaf5ebef664743bd92640354 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Dan=20S=C3=B8ndergaard?= Date: Sun, 15 Dec 2024 20:14:12 +0100 Subject: [PATCH] Add --force and --no-deps flags to run command The --force flag forces submission of targets, without performing any IO (so not stat'ing the filesystem). The --no-deps flag will ignore dependencies, so only the targets specified (by default the endpoints) will be submitted. Closes #417. --- src/gwf/plugins/run.py | 6 +++++- src/gwf/scheduling.py | 39 +++++++++++++++++++++++++++++++++------ 2 files changed, 38 insertions(+), 7 deletions(-) diff --git a/src/gwf/plugins/run.py b/src/gwf/plugins/run.py index f72522ec..45586e91 100644 --- a/src/gwf/plugins/run.py +++ b/src/gwf/plugins/run.py @@ -31,8 +31,10 @@ def clean_logs(working_dir, graph): @click.command() @click.argument("targets", nargs=-1) @click.option("-d", "--dry-run", is_flag=True, default=False) +@click.option("-f", "--force", is_flag=True, default=False) +@click.option("-n", "--no-deps", is_flag=True, default=False) @pass_context -def run(ctx, targets, dry_run): +def run(ctx, targets, dry_run, force, no_deps): """Run the specified workflow.""" workflow = Workflow.from_context(ctx) @@ -56,4 +58,6 @@ def run(ctx, targets, dry_run): spec_hashes, backend, dry_run=dry_run, + force=force, + no_deps=no_deps, ) diff --git a/src/gwf/scheduling.py b/src/gwf/scheduling.py index 5dffe0d0..fcd05951 100644 --- a/src/gwf/scheduling.py +++ b/src/gwf/scheduling.py @@ -52,13 +52,29 @@ def should_run(target, fs, spec_hashes): return False -def schedule(endpoints, graph, fs, spec_hashes, status_func, submit_func): +def schedule( + endpoints, + graph, + fs, + spec_hashes, + status_func, + submit_func, + force=False, + no_deps=False, +): def _schedule(target): submitted_deps = [] - for dep in sorted(graph.dependencies[target], key=lambda t: t.name): - status = _cached_schedule(dep) - if status in SUBMITTED_STATES: - submitted_deps.append(dep) + + if not no_deps: + for dep in sorted(graph.dependencies[target], key=lambda t: t.name): + status = _cached_schedule(dep) + if status in SUBMITTED_STATES: + submitted_deps.append(dep) + + if force: + logger.debug("Target %s is being forcibly submitted", target) + submit_func(target, dependencies=submitted_deps) + return Status.SHOULDRUN if status_func(target) == BackendStatus.SUBMITTED: logger.debug("Target %s is already submitted", target) @@ -147,7 +163,16 @@ def submit_backend(target, dependencies, backend, spec_hashes): spec_hashes.update(target) -def submit_workflow(endpoints, graph, fs, spec_hashes, backend, dry_run=False): +def submit_workflow( + endpoints, + graph, + fs, + spec_hashes, + backend, + dry_run=False, + force=False, + no_deps=False, +): """Submit a workflow to a backend.""" submit_func = partial( _submit_dryrun if dry_run else submit_backend, @@ -161,6 +186,8 @@ def submit_workflow(endpoints, graph, fs, spec_hashes, backend, dry_run=False): spec_hashes, status_func=backend.status, submit_func=submit_func, + force=force, + no_deps=no_deps, )