diff --git a/lib/workflowmgr/slurmbatchsystem.rb b/lib/workflowmgr/slurmbatchsystem.rb index 3cd59fd..0f6eb76 100644 --- a/lib/workflowmgr/slurmbatchsystem.rb +++ b/lib/workflowmgr/slurmbatchsystem.rb @@ -40,36 +40,6 @@ def initialize(slurm_root=nil) end - ##################################################### - # - # statuses - # - ##################################################### - def statuses(jobids) - - begin - - raise WorkflowMgr::SchedulerDown unless @schedup - - # Initialize statuses to UNAVAILABLE - jobStatuses={} - jobids.each do |jobid| - jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" } - end - - jobids.each do |jobid| - jobStatuses[jobid] = self.status(jobid) - end - - rescue WorkflowMgr::SchedulerDown - @schedup=false - ensure - return jobStatuses - end - - end - - ##################################################### # # status @@ -110,6 +80,58 @@ def status(jobid) end + ##################################################### + # + # statuses + # + ##################################################### + def statuses(jobids) + + begin + + raise WorkflowMgr::SchedulerDown unless @schedup + + # Initialize statuses to UNAVAILABLE + jobStatuses={} + jobids.each do |jobid| + jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" } + end + + # Populate the job status table if it is empty + refresh_jobqueue(jobids) if @jobqueue.empty? + + # Check to see if status info is missing for any job and populate jobacct record if necessary + if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) } + + refresh_jobacct(1) if @jobacct_duration<1 + + # Check again, and re-populate over a longer history if necessary + if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) && !@jobacct.has_key?(jobid) } + refresh_jobacct(5) if @jobacct_duration<5 + end + end + + # Collect the statuses of the jobs + jobids.each do |jobid| + if @jobqueue.has_key?(jobid) + jobStatuses[jobid] = @jobqueue[jobid] + elsif @jobacct.has_key?(jobid) + jobStatuses[jobid] = @jobacct[jobid] + else + # We didn't find the job, so return an uknown status record + jobStatuses[jobid] = { :jobid => jobid, :state => "UNKNOWN", :native_state => "Unknown" } + end + end + + rescue WorkflowMgr::SchedulerDown + @schedup=false + ensure + return jobStatuses + end + + end + + ##################################################### # # submit @@ -131,11 +153,11 @@ def submit(task) end case option when :account - input += "#SBATCH --account #{value}\n" + input += "#SBATCH --account=#{value}\n" when :queue - input += "#SBATCH --qos #{value}\n" + input += "#SBATCH --qos=#{value}\n" when :partition - input += "#SBATCH --partition #{value.gsub(":",",")}\n" + input += "#SBATCH --partition=#{value.gsub(":",",")}\n" when :cores # Ignore this attribute if the "nodes" attribute is present next unless task.attributes[:nodes].nil? @@ -218,7 +240,7 @@ def submit(task) when :join input += "#SBATCH -o #{value}\n" when :jobname - input += "#SBATCH --job-name #{value}\n" + input += "#SBATCH --job-name=#{value}\n" end end @@ -277,7 +299,7 @@ def delete(jobid) # refresh_jobqueue # ##################################################### - def refresh_jobqueue + def refresh_jobqueue(jobids) begin @@ -288,7 +310,13 @@ def refresh_jobqueue queued_jobs="" errors="" exit_status=0 - queued_jobs,errors,exit_status=WorkflowMgr.run4("scontrol -o show job",30) + + if jobids.nil? + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + else + joblist = jobids.join(",") + queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue --jobs=#{joblist} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45) + end # Raise SchedulerDown if the command failed raise WorkflowMgr::SchedulerDown,errors unless exit_status==0 @@ -310,52 +338,56 @@ def refresh_jobqueue # For each job, find the various attributes and create a job record queued_jobs.split("\n").each { |job| + # Skip headings + next if job[0..4] == 'JOBID' + next if job[0..7] == 'CLUSTER:' + # Initialize an empty job record record={} - # Look at all the attributes for this job and build the record - jobfields=Hash[job.split.collect {|f| f.split("=")}.collect{|f| f.length == 2 ? f : [f[0], '']}] - - # Skip records for other users - next unless jobfields["UserId"] =~/^#{username}\(/ - # Extract job id - record[:jobid]=jobfields["JobId"] + record[:jobid]=job[0..39].strip # Extract job name - record[:jobname]=jobfields["Name"] + record[:jobname]=job[270..job.length].strip # Extract job owner - record[:user]=jobfields["UserId"].split("(").first + record[:user]=job[40..79].strip # Extract core count - record[:cores]=jobfields["NumCPUs"].to_i + record[:cores]=job[80..89].strip.to_i # Extract the partition - record[:queue]=jobfields["Partition"] + record[:queue]=job[90..109].strip # Extract the submit time - record[:submit_time]=Time.local(*jobfields["SubmitTime"].split(/[-:T]/)).getgm + record[:submit_time]=Time.local(*job[110..139].strip.split(/[-:T]/)).getgm # Extract the start time - record[:start_time]=Time.local(*jobfields["StartTime"].split(/[-:T]/)).getgm + record[:start_time]=Time.local(*job[140..169].strip.split(/[-:T]/)).getgm # Extract the end time - record[:end_time]=Time.local(*jobfields["EndTime"].split(/[-:T]/)).getgm + record[:end_time]=Time.local(*job[170..199].strip.split(/[-:T]/)).getgm # Extract the priority - record[:priority]=jobfields["Priority"] + record[:priority]=job[200..229].strip # Extract the exit status - code,signal=jobfields["ExitCode"].split(":").collect {|i| i.to_i} - if code==0 - record[:exit_status]=signal + code_signal=job[230..239].strip + if code_signal=~ /:/ + + code,signal=core_signal.split(":").collect {|i| i.to_i} + if code==0 + record[:exit_status]=signal + else + record[:exit_status]=code + end else - record[:exit_status]=code + record[:exit_status]=code_signal.to_i end # Extract job state - case jobfields["JobState"] + case job[240..269].strip when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ @@ -372,8 +404,7 @@ def refresh_jobqueue else record[:state]="UNKNOWN" end - record[:native_state]=jobfields["JobState"] - + record[:native_state]=job[240..269].strip # Add record to job queue @jobqueue[record[:jobid]]=record @@ -402,7 +433,7 @@ def refresh_jobacct(delta_days) exit_status=0 mmddyy=(DateTime.now-delta_days).strftime('%m%d%y') cmd="sacct -S #{mmddyy} -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P" - completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,30) + completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,45) return if errors=~/SLURM accounting storage is disabled/ @@ -467,11 +498,11 @@ def refresh_jobacct(delta_days) # Extract job state case jobfields[10] - when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/,/^REQUEUED$/ + when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/ record[:state]="QUEUED" - when /^RUNNING$/,/^COMPLETING$/ + when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/ record[:state]="RUNNING" - when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/,/^OUT_OF_MEMORY$/,/^BOOT_FAIL$/,/^DEADLINE$/ + when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/ record[:state]="FAILED" record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs when /^COMPLETED$/