Skip to content

Commit

Permalink
Merge pull request #55 from christopherwharrop/feature/squeue
Browse files Browse the repository at this point in the history
Feature/squeue
  • Loading branch information
samtrahan authored Apr 26, 2019
2 parents dcafdb0 + d02af92 commit c174b8e
Showing 1 changed file with 93 additions and 62 deletions.
155 changes: 93 additions & 62 deletions lib/workflowmgr/slurmbatchsystem.rb
Original file line number Diff line number Diff line change
Expand Up @@ -40,36 +40,6 @@ def initialize(slurm_root=nil)
end


#####################################################
#
# statuses
#
#####################################################
def statuses(jobids)

begin

raise WorkflowMgr::SchedulerDown unless @schedup

# Initialize statuses to UNAVAILABLE
jobStatuses={}
jobids.each do |jobid|
jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" }
end

jobids.each do |jobid|
jobStatuses[jobid] = self.status(jobid)
end

rescue WorkflowMgr::SchedulerDown
@schedup=false
ensure
return jobStatuses
end

end


#####################################################
#
# status
Expand Down Expand Up @@ -110,6 +80,58 @@ def status(jobid)
end


#####################################################
#
# statuses
#
#####################################################
def statuses(jobids)

begin

raise WorkflowMgr::SchedulerDown unless @schedup

# Initialize statuses to UNAVAILABLE
jobStatuses={}
jobids.each do |jobid|
jobStatuses[jobid] = { :jobid => jobid, :state => "UNAVAILABLE", :native_state => "Unavailable" }
end

# Populate the job status table if it is empty
refresh_jobqueue(jobids) if @jobqueue.empty?

# Check to see if status info is missing for any job and populate jobacct record if necessary
if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) }

refresh_jobacct(1) if @jobacct_duration<1

# Check again, and re-populate over a longer history if necessary
if jobids.any? { |jobid| !@jobqueue.has_key?(jobid) && !@jobacct.has_key?(jobid) }
refresh_jobacct(5) if @jobacct_duration<5
end
end

# Collect the statuses of the jobs
jobids.each do |jobid|
if @jobqueue.has_key?(jobid)
jobStatuses[jobid] = @jobqueue[jobid]
elsif @jobacct.has_key?(jobid)
jobStatuses[jobid] = @jobacct[jobid]
else
# We didn't find the job, so return an uknown status record
jobStatuses[jobid] = { :jobid => jobid, :state => "UNKNOWN", :native_state => "Unknown" }
end
end

rescue WorkflowMgr::SchedulerDown
@schedup=false
ensure
return jobStatuses
end

end


#####################################################
#
# submit
Expand All @@ -131,11 +153,11 @@ def submit(task)
end
case option
when :account
input += "#SBATCH --account #{value}\n"
input += "#SBATCH --account=#{value}\n"
when :queue
input += "#SBATCH --qos #{value}\n"
input += "#SBATCH --qos=#{value}\n"
when :partition
input += "#SBATCH --partition #{value.gsub(":",",")}\n"
input += "#SBATCH --partition=#{value.gsub(":",",")}\n"
when :cores
# Ignore this attribute if the "nodes" attribute is present
next unless task.attributes[:nodes].nil?
Expand Down Expand Up @@ -218,7 +240,7 @@ def submit(task)
when :join
input += "#SBATCH -o #{value}\n"
when :jobname
input += "#SBATCH --job-name #{value}\n"
input += "#SBATCH --job-name=#{value}\n"
end
end

Expand Down Expand Up @@ -277,7 +299,7 @@ def delete(jobid)
# refresh_jobqueue
#
#####################################################
def refresh_jobqueue
def refresh_jobqueue(jobids)

begin

Expand All @@ -288,7 +310,13 @@ def refresh_jobqueue
queued_jobs=""
errors=""
exit_status=0
queued_jobs,errors,exit_status=WorkflowMgr.run4("scontrol -o show job",30)

if jobids.nil?
queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue -u #{username} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45)
else
joblist = jobids.join(",")
queued_jobs,errors,exit_status=WorkflowMgr.run4("squeue --jobs=#{joblist} -M all -t all -O jobid:40,username:40,numcpus:10,partition:20,submittime:30,starttime:30,endtime:30,priority:30,exit_code:10,state:30,name:200",45)
end

# Raise SchedulerDown if the command failed
raise WorkflowMgr::SchedulerDown,errors unless exit_status==0
Expand All @@ -310,52 +338,56 @@ def refresh_jobqueue
# For each job, find the various attributes and create a job record
queued_jobs.split("\n").each { |job|

# Skip headings
next if job[0..4] == 'JOBID'
next if job[0..7] == 'CLUSTER:'

# Initialize an empty job record
record={}

# Look at all the attributes for this job and build the record
jobfields=Hash[job.split.collect {|f| f.split("=")}.collect{|f| f.length == 2 ? f : [f[0], '']}]

# Skip records for other users
next unless jobfields["UserId"] =~/^#{username}\(/

# Extract job id
record[:jobid]=jobfields["JobId"]
record[:jobid]=job[0..39].strip

# Extract job name
record[:jobname]=jobfields["Name"]
record[:jobname]=job[270..job.length].strip

# Extract job owner
record[:user]=jobfields["UserId"].split("(").first
record[:user]=job[40..79].strip

# Extract core count
record[:cores]=jobfields["NumCPUs"].to_i
record[:cores]=job[80..89].strip.to_i

# Extract the partition
record[:queue]=jobfields["Partition"]
record[:queue]=job[90..109].strip

# Extract the submit time
record[:submit_time]=Time.local(*jobfields["SubmitTime"].split(/[-:T]/)).getgm
record[:submit_time]=Time.local(*job[110..139].strip.split(/[-:T]/)).getgm

# Extract the start time
record[:start_time]=Time.local(*jobfields["StartTime"].split(/[-:T]/)).getgm
record[:start_time]=Time.local(*job[140..169].strip.split(/[-:T]/)).getgm

# Extract the end time
record[:end_time]=Time.local(*jobfields["EndTime"].split(/[-:T]/)).getgm
record[:end_time]=Time.local(*job[170..199].strip.split(/[-:T]/)).getgm

# Extract the priority
record[:priority]=jobfields["Priority"]
record[:priority]=job[200..229].strip

# Extract the exit status
code,signal=jobfields["ExitCode"].split(":").collect {|i| i.to_i}
if code==0
record[:exit_status]=signal
code_signal=job[230..239].strip
if code_signal=~ /:/

code,signal=core_signal.split(":").collect {|i| i.to_i}
if code==0
record[:exit_status]=signal
else
record[:exit_status]=code
end
else
record[:exit_status]=code
record[:exit_status]=code_signal.to_i
end

# Extract job state
case jobfields["JobState"]
case job[240..269].strip
when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/
record[:state]="QUEUED"
when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/
Expand All @@ -372,8 +404,7 @@ def refresh_jobqueue
else
record[:state]="UNKNOWN"
end
record[:native_state]=jobfields["JobState"]

record[:native_state]=job[240..269].strip
# Add record to job queue
@jobqueue[record[:jobid]]=record

Expand Down Expand Up @@ -402,7 +433,7 @@ def refresh_jobacct(delta_days)
exit_status=0
mmddyy=(DateTime.now-delta_days).strftime('%m%d%y')
cmd="sacct -S #{mmddyy} -L -o jobid,user%30,jobname%30,partition%20,priority,submit,start,end,ncpus,exitcode,state%12 -P"
completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,30)
completed_jobs,errors,exit_status=WorkflowMgr.run4(cmd,45)

return if errors=~/SLURM accounting storage is disabled/

Expand Down Expand Up @@ -467,11 +498,11 @@ def refresh_jobacct(delta_days)

# Extract job state
case jobfields[10]
when /^CONFIGURING$/,/^PENDING$/,/^SUSPENDED$/,/^REQUEUED$/
when /^(CONFIGURING|PENDING|SUSPENDED|RESV_DEL_HOLD|REQUEUE_FED|REQUEUE_HOLD|REQUEUED|SPECIAL_EXIT|SUSPENDED)$/
record[:state]="QUEUED"
when /^RUNNING$/,/^COMPLETING$/
when /^(RUNNING|COMPLETING|RESIZING|SIGNALING|STAGE_OUT|STOPPED)$/
record[:state]="RUNNING"
when /^CANCELLED$/,/^FAILED$/,/^NODE_FAIL$/,/^PREEMPTED$/,/^TIMEOUT$/,/^OUT_OF_MEMORY$/,/^BOOT_FAIL$/,/^DEADLINE$/
when /^(CANCELLED|FAILED|NODE_FAIL|PREEMPTED|TIMEOUT|BOOT_FAIL|DEADLINE|OUT_OF_MEMORY|REVOKED)$/
record[:state]="FAILED"
record[:exit_status]=255 if record[:exit_status]==0 # Override exit status of 0 for "failed" jobs
when /^COMPLETED$/
Expand Down

0 comments on commit c174b8e

Please sign in to comment.