Skip to content

Commit

Permalink
Add --force and --no-deps flags to run command
Browse files Browse the repository at this point in the history
The --force flag forces submission of targets,  without performing any
IO (so not stat'ing the filesystem).

The --no-deps flag will ignore dependencies, so only the targets
specified (by default the endpoints) will be submitted.

Closes #417.
  • Loading branch information
dansondergaard committed Dec 15, 2024
1 parent e2e1033 commit 914c604
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 7 deletions.
6 changes: 5 additions & 1 deletion src/gwf/plugins/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ def clean_logs(working_dir, graph):
@click.command()
@click.argument("targets", nargs=-1)
@click.option("-d", "--dry-run", is_flag=True, default=False)
@click.option("-f", "--force", is_flag=True, default=False)
@click.option("-n", "--no-deps", is_flag=True, default=False)
@pass_context
def run(ctx, targets, dry_run):
def run(ctx, targets, dry_run, force, no_deps):
"""Run the specified workflow."""
workflow = Workflow.from_context(ctx)

Expand All @@ -56,4 +58,6 @@ def run(ctx, targets, dry_run):
spec_hashes,
backend,
dry_run=dry_run,
force=force,
no_deps=no_deps,
)
39 changes: 33 additions & 6 deletions src/gwf/scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,29 @@ def should_run(target, fs, spec_hashes):
return False


def schedule(endpoints, graph, fs, spec_hashes, status_func, submit_func):
def schedule(
endpoints,
graph,
fs,
spec_hashes,
status_func,
submit_func,
force=False,
no_deps=False,
):
def _schedule(target):
submitted_deps = []
for dep in sorted(graph.dependencies[target], key=lambda t: t.name):
status = _cached_schedule(dep)
if status in SUBMITTED_STATES:
submitted_deps.append(dep)

if not no_deps:
for dep in sorted(graph.dependencies[target], key=lambda t: t.name):
status = _cached_schedule(dep)
if status in SUBMITTED_STATES:
submitted_deps.append(dep)

if force:
logger.debug("Target %s is being forcibly submitted", target)
submit_func(target, dependencies=submitted_deps)
return Status.SHOULDRUN

if status_func(target) == BackendStatus.SUBMITTED:
logger.debug("Target %s is already submitted", target)
Expand Down Expand Up @@ -147,7 +163,16 @@ def submit_backend(target, dependencies, backend, spec_hashes):
spec_hashes.update(target)


def submit_workflow(endpoints, graph, fs, spec_hashes, backend, dry_run=False):
def submit_workflow(
endpoints,
graph,
fs,
spec_hashes,
backend,
dry_run=False,
force=False,
no_deps=False,
):
"""Submit a workflow to a backend."""
submit_func = partial(
_submit_dryrun if dry_run else submit_backend,
Expand All @@ -161,6 +186,8 @@ def submit_workflow(endpoints, graph, fs, spec_hashes, backend, dry_run=False):
spec_hashes,
status_func=backend.status,
submit_func=submit_func,
force=force,
no_deps=no_deps,
)


Expand Down

0 comments on commit 914c604

Please sign in to comment.