From 22c7100859c93364e53996c8744417bd54bfddc7 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 5 Feb 2019 10:39:09 -0700 Subject: [PATCH 01/29] Update VERSION --- VERSION | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/VERSION b/VERSION index e8ea05d..f0bb29e 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -1.2.4 +1.3.0 From f49c98e362f55d9405812186042b5b6da66156c9 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 5 Feb 2019 10:42:05 -0700 Subject: [PATCH 02/29] Update RELEASE_NOTES.md --- RELEASE_NOTES.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 14d8006..fe913a4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,6 +3,8 @@ ## New for Version 1.3.0 * Update SLURM support to handle pack groups +* Update SLURM support to map to --qos instead of --partition +* Update SLURM support to map to --partition * Update LSF support to handle additional methods LSF uses to report the exit status * All rocoto commands have the same -a, -c, -m, and -t options * The -c and -t options can now select by cycledefs and attributes (ie. final) From 3ed48b8ed4592c54ee6acfce6e34ef7781a70eb7 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 5 Feb 2019 10:43:55 -0700 Subject: [PATCH 03/29] Update RELEASE_NOTES.md --- RELEASE_NOTES.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index fe913a4..53689c6 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -3,8 +3,8 @@ ## New for Version 1.3.0 * Update SLURM support to handle pack groups -* Update SLURM support to map to --qos instead of --partition -* Update SLURM support to map to --partition +* Update SLURM support to map `` to --qos instead of --partition +* Update SLURM support to map `` to --partition * Update LSF support to handle additional methods LSF uses to report the exit status * All rocoto commands have the same -a, -c, -m, and -t options * The -c and -t options can now select by cycledefs and attributes (ie. final) From bdcf0dd2637cac4fec4017990dcabd45eecb063a Mon Sep 17 00:00:00 2001 From: Samuel Trahan Date: Fri, 22 Feb 2019 16:26:51 +0000 Subject: [PATCH 04/29] On SLURM, some jobs were ending up in "UNAVAILABLE" status because: 1. The job disappeared from scontrol before rocotorun was run, and 2. The sacct command, by default, only prints history since midnight system time. Thus, any jobs that finish a little bit before midnight, that scontrol does not report to rocotorun, will never be seen by Rocoto. The fix: 1. The first time sacct is run, it is requested to search back one full day of history, and 2. If the job still is not found, then it will search back several days. --- lib/workflowmgr/slurmbatchsystem.rb | 42 ++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 6 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 81b56b9..622094c 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -7,6 +7,8 @@ module WorkflowMgr require 'workflowmgr/batchsystem' + require 'date' + ########################################## # # Class SLURMBatchSystem @@ -31,6 +33,7 @@ def initialize(slurm_root=nil) # Initialize an empty hash for job accounting records @jobacct={} + @jobacct_duration=0 # Assume the scheduler is up @schedup=true @@ -89,7 +92,13 @@ def status(jobid) return @jobqueue[jobid] if @jobqueue.has_key?(jobid) # Populate the job accounting log table if it is empty - refresh_jobacct if @jobacct.empty? + refresh_jobacct(1) if @jobacct_duration<1 + + # Return the jobacct record if there is one + return @jobacct[jobid] if @jobacct.has_key?(jobid) + + # Now re-populate over a longer history: + refresh_jobacct(5) if @jobacct_duration<5 # Return the jobacct record if there is one return @jobacct[jobid] if @jobacct.has_key?(jobid) @@ -372,6 +381,8 @@ def refresh_jobqueue # For each job, find the various attributes and create a job record queued_jobs.split("\n").each { |job| + WorkflowMgr.stderr("sbatch line is #{job}", 11) + # Initialize an empty job record record={} @@ -436,6 +447,9 @@ def refresh_jobqueue end record[:native_state]=jobfields["JobState"] + WorkflowMgr.stderr("From sbatch, job #{record[:jobid]} name #{record[:jobname]} for user #{record[:user]} state #{record[:state]} from native state #{record[:native_state]}.", 11) + + # Add record to job queue @jobqueue[record[:jobid]]=record @@ -449,7 +463,7 @@ def refresh_jobqueue # refresh_jobacct # ##################################################### - def refresh_jobacct + def refresh_jobacct(delta_days) begin @@ -460,15 +474,26 @@ def refresh_jobacct completed_jobs="" errors="" exit_status=0 - completed_jobs,errors,exit_status=WorkflowMgr.run4("sacct -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P",30) + mmddyy=(DateTime.now-delta_days).strftime('%m%d%y') + cmd="sacct -S #{mmddyy} -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P" + completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,30) - return if errors=~/SLURM accounting storage is disabled/ + if errors=~/SLURM accounting storage is disabled/ + WorkflowMgr.stderr("SLURM accounting storage is disabled, so I will not check sacct.",11) + return + end # Raise SchedulerDown if the command failed - raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + if exit_status != 0 + WorkflowMgr.stderr("Running sacct failed: #{cmd}: #{errors}",11) + raise WorkflowMgr::SchedulerDown,errors + end # Return if the output is empty - return if completed_jobs.empty? + if completed_jobs.empty? + WorkflowMgr.stderr("Got no output from sacct: #{cmd}",11) + return + end rescue Timeout::Error,WorkflowMgr::SchedulerDown WorkflowMgr.log("#{$!}") @@ -476,8 +501,11 @@ def refresh_jobacct raise WorkflowMgr::SchedulerDown end + WorkflowMgr.stderr("Running sacct for 0..#{delta_days} days ago.",11) + # For each job, find the various attributes and create a job record completed_jobs.split("\n").each { |job| + WorkflowMgr.stderr("sacct line is #{job}", 11) # Initialize an empty job record record={} @@ -543,6 +571,8 @@ def refresh_jobacct end record[:native_state]=jobfields[10] + WorkflowMgr.stderr("From sacct, job #{record[:jobid]} name #{record[:jobname]} for user #{record[:user]} state #{record[:state]} from native state #{record[:native_state]}.", 11) + # Add record to job queue @jobacct[record[:jobid]]=record From 5840be19347b88dee5d94aaefd016b9dfa0fb426 Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Fri, 22 Feb 2019 16:50:08 +0000 Subject: [PATCH 05/29] Remove some extra debugging code --- lib/workflowmgr/slurmbatchsystem.rb | 20 +++----------------- 1 file changed, 3 insertions(+), 17 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 622094c..558cac9 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -447,9 +447,6 @@ def refresh_jobqueue end record[:native_state]=jobfields["JobState"] - WorkflowMgr.stderr("From sbatch, job #{record[:jobid]} name #{record[:jobname]} for user #{record[:user]} state #{record[:state]} from native state #{record[:native_state]}.", 11) - - # Add record to job queue @jobqueue[record[:jobid]]=record @@ -478,22 +475,16 @@ def refresh_jobacct(delta_days) cmd="sacct -S #{mmddyy} -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P" completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,30) - if errors=~/SLURM accounting storage is disabled/ - WorkflowMgr.stderr("SLURM accounting storage is disabled, so I will not check sacct.",11) - return - end + return if errors=~/SLURM accounting storage is disabled/ # Raise SchedulerDown if the command failed if exit_status != 0 - WorkflowMgr.stderr("Running sacct failed: #{cmd}: #{errors}",11) + WorkflowMgr.stderr("Running sacct failed with status #{exit_status}: #{cmd}: #{errors}",9) raise WorkflowMgr::SchedulerDown,errors end # Return if the output is empty - if completed_jobs.empty? - WorkflowMgr.stderr("Got no output from sacct: #{cmd}",11) - return - end + return if completed_jobs.empty? rescue Timeout::Error,WorkflowMgr::SchedulerDown WorkflowMgr.log("#{$!}") @@ -501,11 +492,8 @@ def refresh_jobacct(delta_days) raise WorkflowMgr::SchedulerDown end - WorkflowMgr.stderr("Running sacct for 0..#{delta_days} days ago.",11) - # For each job, find the various attributes and create a job record completed_jobs.split("\n").each { |job| - WorkflowMgr.stderr("sacct line is #{job}", 11) # Initialize an empty job record record={} @@ -571,8 +559,6 @@ def refresh_jobacct(delta_days) end record[:native_state]=jobfields[10] - WorkflowMgr.stderr("From sacct, job #{record[:jobid]} name #{record[:jobname]} for user #{record[:user]} state #{record[:state]} from native state #{record[:native_state]}.", 11) - # Add record to job queue @jobacct[record[:jobid]]=record From a1d32f3ae5aa4dec6f0818aa909c7ed84347a71b Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Fri, 22 Feb 2019 16:51:03 +0000 Subject: [PATCH 06/29] Remove some extra debugging code and one blank line --- lib/workflowmgr/slurmbatchsystem.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 558cac9..69f0bb9 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -6,7 +6,6 @@ module WorkflowMgr require 'workflowmgr/batchsystem' - require 'date' ########################################## @@ -381,8 +380,6 @@ def refresh_jobqueue # For each job, find the various attributes and create a job record queued_jobs.split("\n").each { |job| - WorkflowMgr.stderr("sbatch line is #{job}", 11) - # Initialize an empty job record record={} From 103acb7d14da9c1b6c0bc67799ebd591db8e4fbe Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Fri, 22 Feb 2019 16:52:25 +0000 Subject: [PATCH 07/29] Remove one more block of debugging code --- lib/workflowmgr/slurmbatchsystem.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 69f0bb9..4964d4d 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -475,10 +475,7 @@ def refresh_jobacct(delta_days) return if errors=~/SLURM accounting storage is disabled/ # Raise SchedulerDown if the command failed - if exit_status != 0 - WorkflowMgr.stderr("Running sacct failed with status #{exit_status}: #{cmd}: #{errors}",9) - raise WorkflowMgr::SchedulerDown,errors - end + raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 # Return if the output is empty return if completed_jobs.empty? From 4b24e39216fddf7c8c5122d792e5826ecb703708 Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Fri, 22 Feb 2019 16:56:55 +0000 Subject: [PATCH 08/29] In SLURM, by default, all environment variables are exported to the job, leading to a variety of confusing errors. This change disables exporting of variables so that only the ones in blocks are exported. --- lib/workflowmgr/slurmbatchsystem.rb | 3 +++ 1 file changed, 3 insertions(+) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 81b56b9..c40cf12 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -287,6 +287,9 @@ def submit(task) input += this_group_nodes end + # Do not export variables by default + varinput+="--export=NONE\n" + # Add export commands to pass environment vars to the job unless task.envars.empty? varinput='' From 9b24286dfca02f7c2fc5ca74bdc0adb7cfa67c55 Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Fri, 22 Feb 2019 19:03:29 +0000 Subject: [PATCH 09/29] fix incorrect variable name in this branch --- lib/workflowmgr/slurmbatchsystem.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index c40cf12..3cf357d 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -288,7 +288,7 @@ def submit(task) end # Do not export variables by default - varinput+="--export=NONE\n" + input+="--export=NONE\n" # Add export commands to pass environment vars to the job unless task.envars.empty? From 692b5b43350e20793e32dcce35e3aa113618be5b Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Mon, 25 Feb 2019 18:53:42 +0000 Subject: [PATCH 10/29] Fix a syntax error in the generated sbatch cards from slurmbatchsystem.rb --- lib/workflowmgr/slurmbatchsystem.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 3c7ea1f..bf72e9a 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -296,7 +296,7 @@ def submit(task) end # Do not export variables by default - input+="--export=NONE\n" + input+="#SBATCH --export=NONE\n" # Add export commands to pass environment vars to the job unless task.envars.empty? From cd979b3171b183761bc19a9a1537f44fe091f147 Mon Sep 17 00:00:00 2001 From: "samuel.trahan" Date: Mon, 25 Feb 2019 19:00:31 +0000 Subject: [PATCH 11/29] In slurmbatchsystem.rb, set the jobacct_duration when calling refresh_jobacct. This fixes a bug wherein sacct is called for every job that does not show up in scontrol. --- lib/workflowmgr/slurmbatchsystem.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 3c7ea1f..f2981f1 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -462,6 +462,8 @@ def refresh_jobqueue ##################################################### def refresh_jobacct(delta_days) + @jobacct_duration=delta_days + begin # Get the username of this process From 6ae055ec24e49197e7c8368eeec4e3169594b8cd Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Wed, 6 Mar 2019 17:17:17 +0000 Subject: [PATCH 12/29] This reverts the pull request that causes environment variables NOT to be exported by default. The problem is that the setting is inherited by subsequent srun commands in the job script. That, in turn, causes the environment created by module commands in job scripts not to be propagated to the execution environment of binaries. This is unintuitive and undesirable behavior that causes huge usability problems. --- lib/workflowmgr/slurmbatchsystem.rb | 3 --- 1 file changed, 3 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 7215e2b..40c7e2e 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -295,9 +295,6 @@ def submit(task) input += this_group_nodes end - # Do not export variables by default - input+="#SBATCH --export=NONE\n" - # Add export commands to pass environment vars to the job unless task.envars.empty? varinput='' From 9d6873fe8a7b7551ea183fc594b648c2bfde9d28 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 2 Apr 2019 17:08:21 +0000 Subject: [PATCH 13/29] Added support for colon delimited partition lists for Slurm. This enhances workflow portability so users don't have to change between commas and colons when switching batch systems. It will cause problems for Slurm if a partition name contains a colon. --- lib/workflowmgr/slurmbatchsystem.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 81b56b9..c1d4f34 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -153,7 +153,7 @@ def submit(task) when :queue per_pack_group_input += "#SBATCH --qos #{value}\n" when :partition - per_pack_group_input += "#SBATCH --partition #{value}\n" + per_pack_group_input += "#SBATCH --partition #{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? From f35e2c9f2cc0c7009d696b02c1476a7c63f9d86d Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Mon, 15 Apr 2019 19:52:29 +0000 Subject: [PATCH 14/29] Remove support for Slurm heterogeneous jobs. Convert all Slurm heterogeneous requests to a homogeneous request. The user is responsible for ensuring the job launch and hostfile are modified according to the user's wishes. --- lib/workflowmgr/slurmbatchsystem.rb | 178 +++++++++------------------- 1 file changed, 54 insertions(+), 124 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index d4b3fe9..b173f78 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -37,9 +37,6 @@ def initialize(slurm_root=nil) # Assume the scheduler is up @schedup=true - # Set heterogeneous job support to nil (it will be set once in submit) - @heterogeneous_job_support - end @@ -120,36 +117,13 @@ def status(jobid) ##################################################### def submit(task) - # Check if heterogeneous jobs are supported - if @heterogeneous_job_support.nil? - - # Get version of sbatch being used - version,errors,exit_status=WorkflowMgr.run4("sbatch --version",30) - - # Raise SchedulerDown if the command failed - raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 - - # Get first four digits of version as an integer - @version = version.gsub(/[slurm.\s]/,"")[0..3].to_i - - # Check for heterogeneous job support - @heterogeneous_job_support = false - if @version >= 1808 - @heterogeneous_job_support = true - end - - end - # Initialize the submit command cmd="sbatch" input="#! /bin/sh\n" - per_pack_group_input="" - pack_group_nodes=Array.new - # Add Slurm batch system options translated from the generic options specification task.attributes.each do |option,value| - if value.is_a?(String) + if value.is_a?(String) if value.empty? WorkflowMgr.stderr("WARNING: <#{option}> has empty content and is ignored", 1) next @@ -157,99 +131,67 @@ def submit(task) end case option when :account - per_pack_group_input += "#SBATCH --account #{value}\n" + input += "#SBATCH --account #{value}\n" when :queue - per_pack_group_input += "#SBATCH --qos #{value}\n" + input += "#SBATCH --qos #{value}\n" when :partition - per_pack_group_input += "#SBATCH --partition #{value.gsub(":",",")}\n" + input += "#SBATCH --partition #{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? - if @heterogeneous_job_support - pack_group_nodes << "#SBATCH --ntasks=#{value}\n" - else - pack_group_nodes = ["#SBATCH --ntasks=#{value}\n"] - end + input += "#SBATCH --ntasks=#{value}\n" when :nodes - # Make sure exclusive access to nodes is enforced -# per_pack_group_input += "#SBATCH --exclusive\n" - - if @heterogeneous_job_support - - first_spec = true - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end - } - - # Request for this resource - pack_group_nodes << "#SBATCH --ntasks=#{nnodes*ppn} --tasks-per-node=#{ppn}\n" - - first_spec = false + # Rocoto does not currently support Slurm heterogeneous jobs. However, it does + # support requests for non-uniform processor geometries. To accomplish this, + # Rocoto will use sbatch to submit a job with the smallest uniform resource + # request that can accommodate the non-uniform request. It is up to the user to + # use the appropriate tools to manipulate the host file and use the appropriate + # MPI launcher command to specify the desired processor layout for the executable + # in the job script. + + # Get the total nodes and max ppn requested + maxppn=1 + nnodes=0 + nodespecs=value.split("+") + nodespecs.each { |nodespec| + resources=nodespec.split(":") + nnodes+=resources.shift.to_i + ppn=0 + resources.each { |resource| + case resource + when /ppn=(\d+)/ + ppn=$1.to_i + when /tpp=(\d+)/ + tpp=$1.to_i + end } - - else - - # This version of SLURM (< version 18.08) does not support submission of jobs - # (via sbatch) with non-uniform processor geometries. SLURM refers to these as - # "heterogenous jobs". To work around this, we will use sbatch to submit a job - # with the smallest uniform resource request that can accommodate the - # heterogeneous request. It is up to the user to use the appropriate host file - # manipulation and/or MPI launcher command to specify the desired processor layout - # for the executable in the job script. - - # Get the total nodes and max ppn requested - maxppn=1 - nnodes=0 - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes+=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end - } - maxppn=ppn if ppn > maxppn - } - - # Request total number of nodes - node_input = "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" - - # Request max tasks per node - node_input += "#SBATCH --tasks-per-node=#{maxppn}\n" - - pack_group_nodes = [ node_input ] # ensure only one "pack group" - - # Print a warning if multiple nodespecs are specified - if nodespecs.size > 1 - WorkflowMgr.stderr("WARNING: SLURM < 18.08 does not support requests for non-unifortm task geometries",1) - WorkflowMgr.stderr("WARNING: during batch job submission You must use the -m option of the srun command",1) - WorkflowMgr.stderr("WARNING: in your script to launch your code with an arbitrary distribution of tasks",1) - WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details",1) - WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}'",1) - WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use",1) - WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.",1) - end - + maxppn=ppn if ppn > maxppn + } + + # Request total number of nodes + input += "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" + + # Request max tasks per node + input += "#SBATCH --tasks-per-node=#{maxppn}\n" + + # Print a warning if multiple nodespecs are specified + if nodespecs.size > 1 + WorkflowMgr.stderr("WARNING: Rocoto does not support Slurm's heterogeneous job feature. However, Rocoto", 1) + WorkflowMgr.stderr("WARNING: does support Slurm requests for non-unifortm task geometries. It does this by", 1) + WorkflowMgr.stderr("WARNING: converting non-uniform requests into the smallest uniform task geometry", 1) + WorkflowMgr.stderr("WARNING: request that can accommodate the non-uniform one during batch job submission.", 1) + WorkflowMgr.stderr("WARNING: It is up to the user to use the -m option of the srun command, or other", 1) + WorkflowMgr.stderr("WARNING: appropriate tools, in the job script to launch executables with an arbitrary", 1) + WorkflowMgr.stderr("WARNING: distribution of tasks.", 1) + WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details", 1) + WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}' for this job", 1) + WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use", 1) + WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.", 1) end when :walltime # Make sure format is dd-hh:mm:ss if days are included - per_pack_group_input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" + input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" when :memory m=/^([\.\d]+)([\D]*)$/.match(value) amount=m[1].to_f @@ -267,7 +209,7 @@ def submit(task) amount=(amount / 1024.0 / 1024.0).ceil end if amount > 0 - per_pack_group_input += "#SBATCH --mem=#{amount}\n" + input += "#SBATCH --mem=#{amount}\n" end when :stdout input += "#SBATCH -o #{value}\n" @@ -281,18 +223,7 @@ def submit(task) end task.each_native do |value| - per_pack_group_input += "#SBATCH #{value}\n" - end - - first=true - pack_group_nodes.each do |this_group_nodes| - if first - first=false - else - input += "\n#SBATCH packjob\n\n" - end - input += per_pack_group_input - input += this_group_nodes + input += "#SBATCH #{value}\n" end # Add export commands to pass environment vars to the job @@ -303,7 +234,6 @@ def submit(task) } input += varinput end - input+="set -x\n" # Add the command to execute input += task.attributes[:command] @@ -313,7 +243,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input {{#{input}}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() From 9f7bd51564287e931312c889e225556854a5257b Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Mon, 15 Apr 2019 20:15:37 +0000 Subject: [PATCH 15/29] Add missing option for sorting rocotostat output by task instead of cycle. --- lib/wfmstat/wfmstatoption.rb | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/lib/wfmstat/wfmstatoption.rb b/lib/wfmstat/wfmstatoption.rb index 78e27f5..dc7c2cd 100644 --- a/lib/wfmstat/wfmstatoption.rb +++ b/lib/wfmstat/wfmstatoption.rb @@ -53,6 +53,11 @@ def add_opts(opts) @summary=true end + # task order + opts.on("-T","--task-sort","Sort by task") do + @taskfirst=true + end + end # add_opts def make_selection() From 3ea74038c2e7f6f0a8beca816252045dd1b9e3ff Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 16 Apr 2019 16:35:02 +0000 Subject: [PATCH 16/29] Revert "Remove support for Slurm heterogeneous jobs." This was mistakenly merged before creating a 1.3.0-RC3 tag. This reverts commit f35e2c9f2cc0c7009d696b02c1476a7c63f9d86d. --- lib/workflowmgr/slurmbatchsystem.rb | 178 +++++++++++++++++++--------- 1 file changed, 124 insertions(+), 54 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index b173f78..d4b3fe9 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -37,6 +37,9 @@ def initialize(slurm_root=nil) # Assume the scheduler is up @schedup=true + # Set heterogeneous job support to nil (it will be set once in submit) + @heterogeneous_job_support + end @@ -117,13 +120,36 @@ def status(jobid) ##################################################### def submit(task) + # Check if heterogeneous jobs are supported + if @heterogeneous_job_support.nil? + + # Get version of sbatch being used + version,errors,exit_status=WorkflowMgr.run4("sbatch --version",30) + + # Raise SchedulerDown if the command failed + raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + + # Get first four digits of version as an integer + @version = version.gsub(/[slurm.\s]/,"")[0..3].to_i + + # Check for heterogeneous job support + @heterogeneous_job_support = false + if @version >= 1808 + @heterogeneous_job_support = true + end + + end + # Initialize the submit command cmd="sbatch" input="#! /bin/sh\n" + per_pack_group_input="" + pack_group_nodes=Array.new + # Add Slurm batch system options translated from the generic options specification task.attributes.each do |option,value| - if value.is_a?(String) + if value.is_a?(String) if value.empty? WorkflowMgr.stderr("WARNING: <#{option}> has empty content and is ignored", 1) next @@ -131,67 +157,99 @@ def submit(task) end case option when :account - input += "#SBATCH --account #{value}\n" + per_pack_group_input += "#SBATCH --account #{value}\n" when :queue - input += "#SBATCH --qos #{value}\n" + per_pack_group_input += "#SBATCH --qos #{value}\n" when :partition - input += "#SBATCH --partition #{value.gsub(":",",")}\n" + per_pack_group_input += "#SBATCH --partition #{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? - input += "#SBATCH --ntasks=#{value}\n" + if @heterogeneous_job_support + pack_group_nodes << "#SBATCH --ntasks=#{value}\n" + else + pack_group_nodes = ["#SBATCH --ntasks=#{value}\n"] + end when :nodes - # Rocoto does not currently support Slurm heterogeneous jobs. However, it does - # support requests for non-uniform processor geometries. To accomplish this, - # Rocoto will use sbatch to submit a job with the smallest uniform resource - # request that can accommodate the non-uniform request. It is up to the user to - # use the appropriate tools to manipulate the host file and use the appropriate - # MPI launcher command to specify the desired processor layout for the executable - # in the job script. - - # Get the total nodes and max ppn requested - maxppn=1 - nnodes=0 - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes+=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end + # Make sure exclusive access to nodes is enforced +# per_pack_group_input += "#SBATCH --exclusive\n" + + if @heterogeneous_job_support + + first_spec = true + nodespecs=value.split("+") + nodespecs.each { |nodespec| + resources=nodespec.split(":") + nnodes=resources.shift.to_i + ppn=0 + resources.each { |resource| + case resource + when /ppn=(\d+)/ + ppn=$1.to_i + when /tpp=(\d+)/ + tpp=$1.to_i + end + } + + # Request for this resource + pack_group_nodes << "#SBATCH --ntasks=#{nnodes*ppn} --tasks-per-node=#{ppn}\n" + + first_spec = false } - maxppn=ppn if ppn > maxppn - } - - # Request total number of nodes - input += "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" - - # Request max tasks per node - input += "#SBATCH --tasks-per-node=#{maxppn}\n" - - # Print a warning if multiple nodespecs are specified - if nodespecs.size > 1 - WorkflowMgr.stderr("WARNING: Rocoto does not support Slurm's heterogeneous job feature. However, Rocoto", 1) - WorkflowMgr.stderr("WARNING: does support Slurm requests for non-unifortm task geometries. It does this by", 1) - WorkflowMgr.stderr("WARNING: converting non-uniform requests into the smallest uniform task geometry", 1) - WorkflowMgr.stderr("WARNING: request that can accommodate the non-uniform one during batch job submission.", 1) - WorkflowMgr.stderr("WARNING: It is up to the user to use the -m option of the srun command, or other", 1) - WorkflowMgr.stderr("WARNING: appropriate tools, in the job script to launch executables with an arbitrary", 1) - WorkflowMgr.stderr("WARNING: distribution of tasks.", 1) - WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details", 1) - WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}' for this job", 1) - WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use", 1) - WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.", 1) + + else + + # This version of SLURM (< version 18.08) does not support submission of jobs + # (via sbatch) with non-uniform processor geometries. SLURM refers to these as + # "heterogenous jobs". To work around this, we will use sbatch to submit a job + # with the smallest uniform resource request that can accommodate the + # heterogeneous request. It is up to the user to use the appropriate host file + # manipulation and/or MPI launcher command to specify the desired processor layout + # for the executable in the job script. + + # Get the total nodes and max ppn requested + maxppn=1 + nnodes=0 + nodespecs=value.split("+") + nodespecs.each { |nodespec| + resources=nodespec.split(":") + nnodes+=resources.shift.to_i + ppn=0 + resources.each { |resource| + case resource + when /ppn=(\d+)/ + ppn=$1.to_i + when /tpp=(\d+)/ + tpp=$1.to_i + end + } + maxppn=ppn if ppn > maxppn + } + + # Request total number of nodes + node_input = "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" + + # Request max tasks per node + node_input += "#SBATCH --tasks-per-node=#{maxppn}\n" + + pack_group_nodes = [ node_input ] # ensure only one "pack group" + + # Print a warning if multiple nodespecs are specified + if nodespecs.size > 1 + WorkflowMgr.stderr("WARNING: SLURM < 18.08 does not support requests for non-unifortm task geometries",1) + WorkflowMgr.stderr("WARNING: during batch job submission You must use the -m option of the srun command",1) + WorkflowMgr.stderr("WARNING: in your script to launch your code with an arbitrary distribution of tasks",1) + WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details",1) + WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}'",1) + WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use",1) + WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.",1) + end + end when :walltime # Make sure format is dd-hh:mm:ss if days are included - input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" + per_pack_group_input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" when :memory m=/^([\.\d]+)([\D]*)$/.match(value) amount=m[1].to_f @@ -209,7 +267,7 @@ def submit(task) amount=(amount / 1024.0 / 1024.0).ceil end if amount > 0 - input += "#SBATCH --mem=#{amount}\n" + per_pack_group_input += "#SBATCH --mem=#{amount}\n" end when :stdout input += "#SBATCH -o #{value}\n" @@ -223,7 +281,18 @@ def submit(task) end task.each_native do |value| - input += "#SBATCH #{value}\n" + per_pack_group_input += "#SBATCH #{value}\n" + end + + first=true + pack_group_nodes.each do |this_group_nodes| + if first + first=false + else + input += "\n#SBATCH packjob\n\n" + end + input += per_pack_group_input + input += this_group_nodes end # Add export commands to pass environment vars to the job @@ -234,6 +303,7 @@ def submit(task) } input += varinput end + input+="set -x\n" # Add the command to execute input += task.attributes[:command] @@ -243,7 +313,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input {{#{input}}}",4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() From 6a7e0e5fd72aa3ae098f380c06753f38dc0f5b99 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 16 Apr 2019 17:20:10 +0000 Subject: [PATCH 17/29] Remove support for Slurm heterogeneous jobs. This was previously committed in the wrong order. This reverts the revert of that previous commit. --- lib/workflowmgr/slurmbatchsystem.rb | 178 +++++++++------------------- 1 file changed, 54 insertions(+), 124 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index d4b3fe9..0cea5b8 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -37,9 +37,6 @@ def initialize(slurm_root=nil) # Assume the scheduler is up @schedup=true - # Set heterogeneous job support to nil (it will be set once in submit) - @heterogeneous_job_support - end @@ -120,36 +117,13 @@ def status(jobid) ##################################################### def submit(task) - # Check if heterogeneous jobs are supported - if @heterogeneous_job_support.nil? - - # Get version of sbatch being used - version,errors,exit_status=WorkflowMgr.run4("sbatch --version",30) - - # Raise SchedulerDown if the command failed - raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 - - # Get first four digits of version as an integer - @version = version.gsub(/[slurm.\s]/,"")[0..3].to_i - - # Check for heterogeneous job support - @heterogeneous_job_support = false - if @version >= 1808 - @heterogeneous_job_support = true - end - - end - # Initialize the submit command cmd="sbatch" input="#! /bin/sh\n" - per_pack_group_input="" - pack_group_nodes=Array.new - # Add Slurm batch system options translated from the generic options specification task.attributes.each do |option,value| - if value.is_a?(String) + if value.is_a?(String) if value.empty? WorkflowMgr.stderr("WARNING: <#{option}> has empty content and is ignored", 1) next @@ -157,99 +131,67 @@ def submit(task) end case option when :account - per_pack_group_input += "#SBATCH --account #{value}\n" + input += "#SBATCH --account #{value}\n" when :queue - per_pack_group_input += "#SBATCH --qos #{value}\n" + input += "#SBATCH --qos #{value}\n" when :partition - per_pack_group_input += "#SBATCH --partition #{value.gsub(":",",")}\n" + input += "#SBATCH --partition #{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? - if @heterogeneous_job_support - pack_group_nodes << "#SBATCH --ntasks=#{value}\n" - else - pack_group_nodes = ["#SBATCH --ntasks=#{value}\n"] - end + input += "#SBATCH --ntasks=#{value}\n" when :nodes - # Make sure exclusive access to nodes is enforced -# per_pack_group_input += "#SBATCH --exclusive\n" - - if @heterogeneous_job_support - - first_spec = true - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end - } - - # Request for this resource - pack_group_nodes << "#SBATCH --ntasks=#{nnodes*ppn} --tasks-per-node=#{ppn}\n" - - first_spec = false + # Rocoto does not currently support Slurm heterogeneous jobs. However, it does + # support requests for non-uniform processor geometries. To accomplish this, + # Rocoto will use sbatch to submit a job with the smallest uniform resource + # request that can accommodate the non-uniform request. It is up to the user to + # use the appropriate tools to manipulate the host file and use the appropriate + # MPI launcher command to specify the desired processor layout for the executable + # in the job script. + + # Get the total nodes and max ppn requested + maxppn=1 + nnodes=0 + nodespecs=value.split("+") + nodespecs.each { |nodespec| + resources=nodespec.split(":") + nnodes+=resources.shift.to_i + ppn=0 + resources.each { |resource| + case resource + when /ppn=(\d+)/ + ppn=$1.to_i + when /tpp=(\d+)/ + tpp=$1.to_i + end } - - else - - # This version of SLURM (< version 18.08) does not support submission of jobs - # (via sbatch) with non-uniform processor geometries. SLURM refers to these as - # "heterogenous jobs". To work around this, we will use sbatch to submit a job - # with the smallest uniform resource request that can accommodate the - # heterogeneous request. It is up to the user to use the appropriate host file - # manipulation and/or MPI launcher command to specify the desired processor layout - # for the executable in the job script. - - # Get the total nodes and max ppn requested - maxppn=1 - nnodes=0 - nodespecs=value.split("+") - nodespecs.each { |nodespec| - resources=nodespec.split(":") - nnodes+=resources.shift.to_i - ppn=0 - resources.each { |resource| - case resource - when /ppn=(\d+)/ - ppn=$1.to_i - when /tpp=(\d+)/ - tpp=$1.to_i - end - } - maxppn=ppn if ppn > maxppn - } - - # Request total number of nodes - node_input = "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" - - # Request max tasks per node - node_input += "#SBATCH --tasks-per-node=#{maxppn}\n" - - pack_group_nodes = [ node_input ] # ensure only one "pack group" - - # Print a warning if multiple nodespecs are specified - if nodespecs.size > 1 - WorkflowMgr.stderr("WARNING: SLURM < 18.08 does not support requests for non-unifortm task geometries",1) - WorkflowMgr.stderr("WARNING: during batch job submission You must use the -m option of the srun command",1) - WorkflowMgr.stderr("WARNING: in your script to launch your code with an arbitrary distribution of tasks",1) - WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details",1) - WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}'",1) - WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use",1) - WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.",1) - end - + maxppn=ppn if ppn > maxppn + } + + # Request total number of nodes + input += "#SBATCH --nodes=#{nnodes}-#{nnodes}\n" + + # Request max tasks per node + input += "#SBATCH --tasks-per-node=#{maxppn}\n" + + # Print a warning if multiple nodespecs are specified + if nodespecs.size > 1 + WorkflowMgr.stderr("WARNING: Rocoto does not support Slurm's heterogeneous job feature. However, Rocoto", 1) + WorkflowMgr.stderr("WARNING: does support Slurm requests for non-unifortm task geometries. It does this by", 1) + WorkflowMgr.stderr("WARNING: converting non-uniform requests into the smallest uniform task geometry", 1) + WorkflowMgr.stderr("WARNING: request that can accommodate the non-uniform one during batch job submission.", 1) + WorkflowMgr.stderr("WARNING: It is up to the user to use the -m option of the srun command, or other", 1) + WorkflowMgr.stderr("WARNING: appropriate tools, in the job script to launch executables with an arbitrary", 1) + WorkflowMgr.stderr("WARNING: distribution of tasks.", 1) + WorkflowMgr.stderr("WARNING: Please see https://slurm.schedmd.com/faq.html#arbitrary for details", 1) + WorkflowMgr.stderr("WARNING: Rocoto has automatically converted '#{value}' to '#{nnodes}:ppn=#{maxppn}' for this job", 1) + WorkflowMgr.stderr("WARNING: to facilitate the desired arbitrary task distribution. Use", 1) + WorkflowMgr.stderr("WARNING: #{nnodes}:ppn=#{maxppn} in your workflow to eliminate this warning message.", 1) end when :walltime # Make sure format is dd-hh:mm:ss if days are included - per_pack_group_input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" + input += "#SBATCH -t #{value.sub(/^(\d+):(\d+:\d+:\d+)$/,'\1-\2')}\n" when :memory m=/^([\.\d]+)([\D]*)$/.match(value) amount=m[1].to_f @@ -267,7 +209,7 @@ def submit(task) amount=(amount / 1024.0 / 1024.0).ceil end if amount > 0 - per_pack_group_input += "#SBATCH --mem=#{amount}\n" + input += "#SBATCH --mem=#{amount}\n" end when :stdout input += "#SBATCH -o #{value}\n" @@ -281,18 +223,7 @@ def submit(task) end task.each_native do |value| - per_pack_group_input += "#SBATCH #{value}\n" - end - - first=true - pack_group_nodes.each do |this_group_nodes| - if first - first=false - else - input += "\n#SBATCH packjob\n\n" - end - input += per_pack_group_input - input += this_group_nodes + input += "#SBATCH #{value}\n" end # Add export commands to pass environment vars to the job @@ -303,7 +234,6 @@ def submit(task) } input += varinput end - input+="set -x\n" # Add the command to execute input += task.attributes[:command] @@ -313,7 +243,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input {{#{input}}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() From 8f345addf37545187897e2ebf94604aa9d5d5700 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Thu, 18 Apr 2019 18:08:49 +0000 Subject: [PATCH 18/29] Fix bug that caused temporary rocotorc files to be left behind in ~/.rocoto --- lib/workflowmgr/workflowconfig.rb | 65 +++++++++++++++---------------- 1 file changed, 31 insertions(+), 34 deletions(-) diff --git a/lib/workflowmgr/workflowconfig.rb b/lib/workflowmgr/workflowconfig.rb index 848f2d2..ed48d16 100644 --- a/lib/workflowmgr/workflowconfig.rb +++ b/lib/workflowmgr/workflowconfig.rb @@ -43,48 +43,45 @@ def initialize # Load the configuration begin - @config=WorkflowMgr.forkit(10) do - - # Create a .rocoto directory if one does not already exist - FileUtils.mkdir_p(@config_dir) unless File.exists?(@config_dir) - - # Create a .rocoto tmp dir if one does not already exist - FileUtils.mkdir_p(@config_tmp) unless File.exists?(@config_tmp) - - # Move the legacy .wfmrc file to rocotorc file if it exists - FileUtils.mv("#{ENV['HOME']}/.wfmrc",@config_file) if File.exists?("#{ENV['HOME']}/.wfmrc") - - # Load the rocotorc config if one exists - if File.exists?(@config_file) && !File.zero?(@config_file) - config=YAML.load_file(@config_file) - if config.is_a?(Hash) - # Merge default config into rocotorc config if there are unspecified config options - if config.keys.collect {|c| c.to_s}.sort != DEFAULT_CONFIG.keys.collect {|c| c.to_s}.sort - config=DEFAULT_CONFIG.merge(config).delete_if { |k,v| !DEFAULT_CONFIG.has_key?(k) } - File.open("#{@config_file}.#{Process.ppid}","w") { |f| YAML.dump(config,f) } - end - config - else - WorkflowMgr.log("WARNING! Reverted corrupted configuration in #{@config_file} to default.") - WorkflowMgr.stderr("WARNING! Reverted corrupted configuration in #{@config_file} to default.") - File.open("#{@config_file}.#{Process.ppid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } - DEFAULT_CONFIG + + # Create a .rocoto directory if one does not already exist + FileUtils.mkdir_p(@config_dir) unless File.exists?(@config_dir) + + # Create a .rocoto tmp dir if one does not already exist + FileUtils.mkdir_p(@config_tmp) unless File.exists?(@config_tmp) + + # Move the legacy .wfmrc file to rocotorc file if it exists + FileUtils.mv("#{ENV['HOME']}/.wfmrc",@config_file) if File.exists?("#{ENV['HOME']}/.wfmrc") + + # Load the rocotorc config if one exists + if File.exists?(@config_file) && !File.zero?(@config_file) + config=YAML.load_file(@config_file) + if config.is_a?(Hash) + # Merge default config into rocotorc config if there are unspecified config options + if config.keys.collect {|c| c.to_s}.sort != DEFAULT_CONFIG.keys.collect {|c| c.to_s}.sort + config=DEFAULT_CONFIG.merge(config).delete_if { |k,v| !DEFAULT_CONFIG.has_key?(k) } + File.open("#{@config_file}.#{Process.pid}","w") { |f| YAML.dump(config,f) } end + @config = config else - # Create a rocotorc file with default settings if it does not exist - File.open("#{@config_file}.#{Process.ppid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } - DEFAULT_CONFIG + WorkflowMgr.log("WARNING! Reverted corrupted configuration in #{@config_file} to default.") + WorkflowMgr.stderr("WARNING! Reverted corrupted configuration in #{@config_file} to default.") + File.open("#{@config_file}.#{Process.pid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } + @config = DEFAULT_CONFIG end - - end # @config do - - # Update the config file in a quasi-atomic way. - FileUtils.mv("#{@config_file}.#{Process.pid}", @config_file) if File.exists?("#{@config_file}.#{Process.pid}") + else + # Create a rocotorc file with default settings if it does not exist + File.open("#{@config_file}.#{Process.pid}","w") { |f| YAML.dump(DEFAULT_CONFIG,f) } + @config = DEFAULT_CONFIG + end rescue WorkflowMgr::ForkitTimeoutException msg="ERROR: An I/O operation timed out while reading, writing, or testing for the existence of '#{@config_file}'" WorkflowMgr.log(msg) raise msg + ensure + # Update the config file in a quasi-atomic way. + FileUtils.mv("#{@config_file}.#{Process.pid}", @config_file) if File.exists?("#{@config_file}.#{Process.pid}") end end # initialize From 4a3228a1115fb22f4e316aed5cfc95c69e9edaed Mon Sep 17 00:00:00 2001 From: Samuel Trahan Date: Thu, 25 Apr 2019 22:45:24 +0000 Subject: [PATCH 19/29] Add all known SLURM job states, and a best guess translation to Rocoto's queued, running, failed, and completed states. --- lib/workflowmgr/slurmbatchsystem.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 0cea5b8..3cd59fd 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -356,11 +356,11 @@ def refresh_jobqueue # Extract job state case jobfields["JobState"] - when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/ + when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" - when /^RUNNING$/,/^COMPLETING$/ + when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ record[:state]="RUNNING" - when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/ + when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/ record[:state]="FAILED" record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs when /^COMPLETED$/ From e6c477d0435e77f0150f2564bd21e87a1041f338 Mon Sep 17 00:00:00 2001 From: Samuel Trahan Date: Thu, 25 Apr 2019 22:47:30 +0000 Subject: [PATCH 20/29] Switch SLURM queue monitoring from scontrol to squeue, subsetted by the userid. Also, raise timeouts for slurm commands from 30 seconds to 45. These changes are on top of feature/more-slurm-states out of necessity, as tests would fail without those. --- lib/workflowmgr/slurmbatchsystem.rb | 61 ++++++++++++++++------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 0cea5b8..f701a2e 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -272,6 +272,11 @@ def delete(jobid) private + ##################################################### + # + # refresh_jobqueue + # + ##################################################### ##################################################### # # refresh_jobqueue @@ -288,7 +293,7 @@ def refresh_jobqueue queued_jobs="" errors="" exit_status=0 - queued_jobs,errors,exit_status=WorkflowMgr.run4("scontrol -o show job",30) + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) # Raise SchedulerDown if the command failed raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 @@ -310,57 +315,59 @@ def refresh_jobqueue # For each job, find the various attributes and create a job record queued_jobs.split("\n").each { |job| + next if job[0..4]=='JOBID' # skip heading + # Initialize an empty job record record={} - # Look at all the attributes for this job and build the record - jobfields=Hash[job.split.collect {|f| f.split("=")}.collect{|f| f.length == 2 ? f : [f[0], '']}] - - # Skip records for other users - next unless jobfields["UserId"] =~/^#{username}\(/ - # Extract job id - record[:jobid]=jobfields["JobId"] + record[:jobid]=job[0..39].strip # Extract job name - record[:jobname]=jobfields["Name"] + record[:jobname]=job[270..job.length].strip # Extract job owner - record[:user]=jobfields["UserId"].split("(").first + record[:user]=job[40..79].strip # Extract core count - record[:cores]=jobfields["NumCPUs"].to_i + record[:cores]=job[80..89].strip # Extract the partition - record[:queue]=jobfields["Partition"] + record[:queue]=job[90..109].strip # Extract the submit time - record[:submit_time]=Time.local(*jobfields["SubmitTime"].split(/[-:T]/)).getgm + record[:submit_time]=Time.local(*job[110..139].strip.split(/[-:T]/)).getgm # Extract the start time - record[:start_time]=Time.local(*jobfields["StartTime"].split(/[-:T]/)).getgm + record[:start_time]=Time.local(*job[140..169].strip.split(/[-:T]/)).getgm # Extract the end time - record[:end_time]=Time.local(*jobfields["EndTime"].split(/[-:T]/)).getgm + record[:end_time]=Time.local(*job[170..199].strip.split(/[-:T]/)).getgm # Extract the priority - record[:priority]=jobfields["Priority"] + record[:priority]=job[200..229].strip # Extract the exit status - code,signal=jobfields["ExitCode"].split(":").collect {|i| i.to_i} - if code==0 - record[:exit_status]=signal + code_signal=job[230..239].strip + if code_signal=~ /:/ + + code,signal=core_signal.split(":").collect {|i| i.to_i} + if code==0 + record[:exit_status]=signal + else + record[:exit_status]=code + end else - record[:exit_status]=code + record[:exit_status]=code_signal end # Extract job state - case jobfields["JobState"] - when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/ + case job[240..269].strip + when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" - when /^RUNNING$/,/^COMPLETING$/ + when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ record[:state]="RUNNING" - when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/ + when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/ record[:state]="FAILED" record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs when /^COMPLETED$/ @@ -372,10 +379,10 @@ def refresh_jobqueue else record[:state]="UNKNOWN" end - record[:native_state]=jobfields["JobState"] - + record[:native_state]=job[240..269].strip # Add record to job queue @jobqueue[record[:jobid]]=record + WorkflowMgr.stderr("String [[#{job}]] becomes record #{record}",1) } # queued_jobs.find @@ -402,7 +409,7 @@ def refresh_jobacct(delta_days) exit_status=0 mmddyy=(DateTime.now-delta_days).strftime('%m%d%y') cmd="sacct -S #{mmddyy} -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P" - completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,30) + completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,45) return if errors=~/SLURM accounting storage is disabled/ From 5ff066e9e3f3e70ade869603e2016e473520934d Mon Sep 17 00:00:00 2001 From: Samuel Trahan Date: Thu, 25 Apr 2019 22:59:25 +0000 Subject: [PATCH 21/29] remove a duplicate function comment heading --- lib/workflowmgr/slurmbatchsystem.rb | 5 ----- 1 file changed, 5 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index f701a2e..d6e0554 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -272,11 +272,6 @@ def delete(jobid) private - ##################################################### - # - # refresh_jobqueue - # - ##################################################### ##################################################### # # refresh_jobqueue From 03025efff9485f65c1a706b8e78221e6e609823b Mon Sep 17 00:00:00 2001 From: Samuel Trahan Date: Thu, 25 Apr 2019 23:13:02 +0000 Subject: [PATCH 22/29] Remove a debug line that generates lots of output --- lib/workflowmgr/slurmbatchsystem.rb | 1 - 1 file changed, 1 deletion(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index d6e0554..51de5cf 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -377,7 +377,6 @@ def refresh_jobqueue record[:native_state]=job[240..269].strip # Add record to job queue @jobqueue[record[:jobid]]=record - WorkflowMgr.stderr("String [[#{job}]] becomes record #{record}",1) } # queued_jobs.find From 23eef5450daaedbf4cdc626d6ca90cb5a7a8842f Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Fri, 26 Apr 2019 16:45:51 +0000 Subject: [PATCH 23/29] Update Slurm batch system to query specific list of relevant jobs. Other minor tweaks. --- lib/workflowmgr/slurmbatchsystem.rb | 114 ++++++++++++++++++---------- 1 file changed, 72 insertions(+), 42 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 51de5cf..0f6eb76 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -40,36 +40,6 @@ def initialize(slurm_root=nil) end - ##################################################### - # - # statuses - # - ##################################################### - def statuses(jobids) - - begin - - raise WorkflowMgr::SchedulerDown unless @schedup - - # Initialize statuses to UNAVAILABLE - jobStatuses={} - jobids.each do |jobid| - jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" } - end - - jobids.each do |jobid| - jobStatuses[jobid] = self.status(jobid) - end - - rescue WorkflowMgr::SchedulerDown - @schedup=false - ensure - return jobStatuses - end - - end - - ##################################################### # # status @@ -110,6 +80,58 @@ def status(jobid) end + ##################################################### + # + # statuses + # + ##################################################### + def statuses(jobids) + + begin + + raise WorkflowMgr::SchedulerDown unless @schedup + + # Initialize statuses to UNAVAILABLE + jobStatuses={} + jobids.each do |jobid| + jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" } + end + + # Populate the job status table if it is empty + refresh_jobqueue(jobids) if @jobqueue.empty? + + # Check to see if status info is missing for any job and populate jobacct record if necessary + if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) } + + refresh_jobacct(1) if @jobacct_duration<1 + + # Check again, and re-populate over a longer history if necessary + if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) && !@jobacct.has_key?(jobid) } + refresh_jobacct(5) if @jobacct_duration<5 + end + end + + # Collect the statuses of the jobs + jobids.each do |jobid| + if @jobqueue.has_key?(jobid) + jobStatuses[jobid] = @jobqueue[jobid] + elsif @jobacct.has_key?(jobid) + jobStatuses[jobid] = @jobacct[jobid] + else + # We didn't find the job, so return an uknown status record + jobStatuses[jobid] = { :jobid => jobid, :state => "UNKNOWN", :native_state => "Unknown" } + end + end + + rescue WorkflowMgr::SchedulerDown + @schedup=false + ensure + return jobStatuses + end + + end + + ##################################################### # # submit @@ -131,11 +153,11 @@ def submit(task) end case option when :account - input += "#SBATCH --account #{value}\n" + input += "#SBATCH --account=#{value}\n" when :queue - input += "#SBATCH --qos #{value}\n" + input += "#SBATCH --qos=#{value}\n" when :partition - input += "#SBATCH --partition #{value.gsub(":",",")}\n" + input += "#SBATCH --partition=#{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? @@ -218,7 +240,7 @@ def submit(task) when :join input += "#SBATCH -o #{value}\n" when :jobname - input += "#SBATCH --job-name #{value}\n" + input += "#SBATCH --job-name=#{value}\n" end end @@ -277,7 +299,7 @@ def delete(jobid) # refresh_jobqueue # ##################################################### - def refresh_jobqueue + def refresh_jobqueue(jobids) begin @@ -288,7 +310,13 @@ def refresh_jobqueue queued_jobs="" errors="" exit_status=0 - queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + + if jobids.nil? + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + else + joblist = jobids.join(",") + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue --jobs=#{joblist} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + end # Raise SchedulerDown if the command failed raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 @@ -310,7 +338,9 @@ def refresh_jobqueue # For each job, find the various attributes and create a job record queued_jobs.split("\n").each { |job| - next if job[0..4]=='JOBID' # skip heading + # Skip headings + next if job[0..4] == 'JOBID' + next if job[0..7] == 'CLUSTER:' # Initialize an empty job record record={} @@ -325,7 +355,7 @@ def refresh_jobqueue record[:user]=job[40..79].strip # Extract core count - record[:cores]=job[80..89].strip + record[:cores]=job[80..89].strip.to_i # Extract the partition record[:queue]=job[90..109].strip @@ -353,7 +383,7 @@ def refresh_jobqueue record[:exit_status]=code end else - record[:exit_status]=code_signal + record[:exit_status]=code_signal.to_i end # Extract job state @@ -468,11 +498,11 @@ def refresh_jobacct(delta_days) # Extract job state case jobfields[10] - when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/,/^REQUEUED$/ + when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" - when /^RUNNING$/,/^COMPLETING$/ + when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ record[:state]="RUNNING" - when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/,/^OUT_OF_MEMORY$/,/^BOOT_FAIL$/,/^DEADLINE$/ + when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/ record[:state]="FAILED" record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs when /^COMPLETED$/ From 1eb6bf02f9c6e251c40adb051b2357b2b292611b Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Fri, 26 Apr 2019 16:49:56 +0000 Subject: [PATCH 24/29] Prevent --- lib/workflowmgr/workflowengine.rb | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/lib/workflowmgr/workflowengine.rb b/lib/workflowmgr/workflowengine.rb index d424fd8..2ff8b17 100644 --- a/lib/workflowmgr/workflowengine.rb +++ b/lib/workflowmgr/workflowengine.rb @@ -499,7 +499,9 @@ def boot # Roll the log file (if it already exists) @workflowIOServer.roll_log(value) end - @workflowIOServer.mkdir_p(outdir) + unless outdir.empty? + @workflowIOServer.mkdir_p(outdir) + end end end From 5bf3b88a571f6f878b0ecf716e9ccfa072407f21 Mon Sep 17 00:00:00 2001 From: Samuel Trahan Date: Mon, 29 Apr 2019 21:01:14 +0000 Subject: [PATCH 25/29] Do not use more than 64 characters to --jobs option of squeue --- lib/workflowmgr/slurmbatchsystem.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 0f6eb76..39c1279 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -311,7 +311,7 @@ def refresh_jobqueue(jobids) errors="" exit_status=0 - if jobids.nil? + if jobids.nil? or jobids.join(',').length>64 queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) else joblist = jobids.join(",") From 978d6daf39510161340306a785d233566a639c91 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Tue, 30 Apr 2019 16:44:53 +0000 Subject: [PATCH 26/29] The --cpus-per-task Slurm option is important for MPI/Hybrid jobs to ensure all cores can be used. This turns that on when tpp is used in the specification, but only if exactly one nodespec is used. If multiple nodespecs are used, it is a "complex" job, which must be handled in a custom fashion, in their job script. --- lib/workflowmgr/slurmbatchsystem.rb | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 0f6eb76..a0e68c1 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -102,7 +102,6 @@ def statuses(jobids) # Check to see if status info is missing for any job and populate jobacct record if necessary if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) } - refresh_jobacct(1) if @jobacct_duration<1 # Check again, and re-populate over a longer history if necessary @@ -174,6 +173,7 @@ def submit(task) # Get the total nodes and max ppn requested maxppn=1 nnodes=0 + tpp=0 nodespecs=value.split("+") nodespecs.each { |nodespec| resources=nodespec.split(":") @@ -196,6 +196,11 @@ def submit(task) # Request max tasks per node input += "#SBATCH --tasks-per-node=#{maxppn}\n" + # Request cpus per task if only one nodespec is specified and tpp was specified + if nodespecs.size == 1 && !tpp.zero? + input += "#SBATCH --cpus-per-task=#{tpp}\n" + end + # Print a warning if multiple nodespecs are specified if nodespecs.size > 1 WorkflowMgr.stderr("WARNING: Rocoto does not support Slurm's heterogeneous job feature. However, Rocoto", 1) From 7c9641d134f4f97361e505d4344a50a96188f469 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Fri, 17 May 2019 22:17:21 +0000 Subject: [PATCH 27/29] First try at adding a recovery mechanism for sbatch commands that erroneously report failure after a successful job submission. --- lib/workflowmgr/slurmbatchsystem.rb | 43 +++++++++++++++++++++++++++++ 1 file changed, 43 insertions(+) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index a0e68c1..ab1f59e 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -18,6 +18,7 @@ class SLURMBatchSystem < BatchSystem require 'etc' require 'parsedate' require 'libxml' + erquire 'securerandom' require 'workflowmgr/utilities' ##################################################### @@ -253,6 +254,10 @@ def submit(task) input += "#SBATCH #{value}\n" end + # Add secret identifier for later retrieval + randomID = SecureRandom.hex + input += "#SBATCH #{randomID}\n" + # Add export commands to pass environment vars to the job unless task.envars.empty? varinput='' @@ -278,6 +283,44 @@ def submit(task) # Parse the output of the submit command if output=~/^Submitted batch job (\d+)/ return $1,output + elsif output=~/Batch job submission failed: Socket timed out on send\/recv operation/ + begin + # Get the username of this process + username=Etc.getpwuid(Process.uid).name + + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32",45) + + # Raise SchedulerDown if the command failed + raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 + + # Return if the output is empty + return nil,output if queued_jobs.empty? + + rescue Timeout::Error,WorkflowMgr::SchedulerDown + WorkflowMgr.log("#{$!}") + WorkflowMgr.stderr("#{$!}",3) + raise WorkflowMgr::SchedulerDown + end + + # Make sure queued_jobs is properly encoded + if String.method_defined? :encode + queued_jobs = queued_jobs.encode('UTF-8', 'binary', {:invalid => :replace, :undef => :replace, :replace => ''}) + end + + # Look for a job that matches the randomID we inserted into the comment + queued_jobs.split("\n").each { |job| + + # Extract job id + jobid=job[0..39].strip + + # Extract randomID + if randomID == job[40..71].strip + WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out") + WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out".1) + return jobid, output + end + } + else return nil,output end From fac5d7441d6a832a4c61ce5817cf75519c7bfc2b Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Wed, 29 May 2019 17:52:44 +0000 Subject: [PATCH 28/29] Fix bugs in recovery of jobids when Slurm sbatch fails with socket error. --- lib/workflowmgr/slurmbatchsystem.rb | 27 +++++++++++++++++++-------- lib/workflowmgr/workflowengine.rb | 2 +- 2 files changed, 20 insertions(+), 9 deletions(-) diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index ab1f59e..ea45aa7 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -18,7 +18,7 @@ class SLURMBatchSystem < BatchSystem require 'etc' require 'parsedate' require 'libxml' - erquire 'securerandom' + require 'securerandom' require 'workflowmgr/utilities' ##################################################### @@ -256,7 +256,7 @@ def submit(task) # Add secret identifier for later retrieval randomID = SecureRandom.hex - input += "#SBATCH #{randomID}\n" + input += "#SBATCH --comment=#{randomID}\n" # Add export commands to pass environment vars to the job unless task.envars.empty? @@ -275,7 +275,7 @@ def submit(task) tf.write(input) tf.flush() - WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}",4) + WorkflowMgr.stderr("Submitting #{task.attributes[:name]} using #{cmd} < #{tf.path} with input\n{{\n#{input}\n}}", 4) # Run the submit command output=`#{cmd} < #{tf.path} 2>&1`.chomp() @@ -283,12 +283,18 @@ def submit(task) # Parse the output of the submit command if output=~/^Submitted batch job (\d+)/ return $1,output - elsif output=~/Batch job submission failed: Socket timed out on send\/recv operation/ + elsif output=~/Batch job submission failed: Socket timed out/ + + WorkflowMgr.stderr("WARNING: '#{output}', looking to see if job was submitted anyway...", 1) + queued_jobs="" + errors="" + exit_status=0 begin + # Get the username of this process username=Etc.getpwuid(Process.uid).name - - queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32",45) + + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,comment:32", 45) # Raise SchedulerDown if the command failed raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 @@ -310,18 +316,23 @@ def submit(task) # Look for a job that matches the randomID we inserted into the comment queued_jobs.split("\n").each { |job| + # Skip headers + next if job=~/CLUSTER/ + next if job=~/JOBID/ + # Extract job id jobid=job[0..39].strip # Extract randomID if randomID == job[40..71].strip - WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out") - WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch socket time out".1) + WorkflowMgr.log("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out") + WorkflowMgr.stderr("WARNING: Retrieved jobid=#{jobid} when submitting #{task.attributes[:name]} after sbatch failed with socket time out",1) return jobid, output end } else + WorkflowMgr.stderr("WARNING: job submission failed: #{output}", 1) return nil,output end diff --git a/lib/workflowmgr/workflowengine.rb b/lib/workflowmgr/workflowengine.rb index 2ff8b17..9ca3294 100644 --- a/lib/workflowmgr/workflowengine.rb +++ b/lib/workflowmgr/workflowengine.rb @@ -1615,7 +1615,7 @@ def expire_cycles expired_cycles.each do |cycle| @active_jobs.keys.each do |taskname| next if @active_jobs[taskname][cycle.cycle].nil? - unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" + unless @active_jobs[taskname][cycle.cycle].state == "SUCCEEDED" || @active_jobs[taskname][cycle.cycle].state == "FAILED" || @active_jobs[taskname][cycle.cycle].state == "DEAD" || @active_jobs[taskname][cycle.cycle].state == "EXPIRED" || @active_jobs[taskname][cycle.cycle].state == "SUBMITTING" @logServer.log(cycle.cycle,"Deleting #{taskname} job #{@active_jobs[taskname][cycle.cycle].id} because this cycle has expired!") @bqServer.delete(@active_jobs[taskname][cycle.cycle].id) end From 6f8ae275eee1e29c81c8f8bb33945e9438b1ac29 Mon Sep 17 00:00:00 2001 From: Christopher Harrop Date: Wed, 29 May 2019 17:54:43 +0000 Subject: [PATCH 29/29] Enable capability to provide cores/nodes request via a tag. --- lib/workflowmgr/schema_with_metatasks.rng | 39 ++++++++++++-------- lib/workflowmgr/schema_without_metatasks.rng | 39 ++++++++++++-------- 2 files changed, 46 insertions(+), 32 deletions(-) diff --git a/lib/workflowmgr/schema_with_metatasks.rng b/lib/workflowmgr/schema_with_metatasks.rng index cd75076..d653ee6 100644 --- a/lib/workflowmgr/schema_with_metatasks.rng +++ b/lib/workflowmgr/schema_with_metatasks.rng @@ -465,14 +465,30 @@ - + - - - - - - + + + + + + + + + + + + + + + + + + + + + + @@ -518,15 +534,6 @@ - - - - - - - - - diff --git a/lib/workflowmgr/schema_without_metatasks.rng b/lib/workflowmgr/schema_without_metatasks.rng index 2b9d97c..79cab40 100644 --- a/lib/workflowmgr/schema_without_metatasks.rng +++ b/lib/workflowmgr/schema_without_metatasks.rng @@ -446,14 +446,30 @@ - + - - - - - - + + + + + + + + + + + + + + + + + + + + + + @@ -499,15 +515,6 @@ - - - - - - - - -