From 6b9c69300435fbff8efeb3510251436ef2226736 Mon Sep 17 00:00:00 2001 From: Yichen TANG Date: Mon, 15 Apr 2019 16:17:10 +0200 Subject: [PATCH 1/8] Skip after() if cancel() called on future by client side. The after() would still be executed if future is cancelled by computation manager(ex: error detected).In this case, it will throw CompleteException. Fix tests: junit assert on main thread. Job id's relation should not be cleared before send scancel cmd to slurm Signed-off-by: Yichen TANG --- .../computation/slurm/SlurmUnitTests.java | 149 +++++++++++++----- .../computation/slurm/FlagFilesMonitor.java | 2 +- .../computation/slurm/ScontrolMonitor.java | 16 +- .../slurm/SlurmComputationManager.java | 60 +++++-- .../powsybl/computation/slurm/TaskStore.java | 27 ++-- .../slurm/ScontrolMonitorTest.java | 8 +- .../computation/slurm/TaskStoreTest.java | 13 +- 7 files changed, 188 insertions(+), 87 deletions(-) diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index c930448..87be96f 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -12,7 +12,6 @@ import org.apache.commons.compress.archivers.zip.ZipArchiveEntry; import org.apache.commons.compress.archivers.zip.ZipArchiveOutputStream; import org.apache.commons.io.IOUtils; -import org.junit.Assert; import org.junit.Before; import org.junit.Ignore; import org.junit.Test; @@ -28,6 +27,8 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeUnit; @@ -35,7 +36,8 @@ import java.util.stream.IntStream; import java.util.zip.GZIPOutputStream; -import static org.junit.Assert.*; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; /** * @author Yichen TANG @@ -46,8 +48,16 @@ public class SlurmUnitTests { private static final Logger LOGGER = LoggerFactory.getLogger(SlurmUnitTests.class); private static final ExecutionEnvironment EMPTY_ENV = new ExecutionEnvironment(Collections.emptyMap(), "unit_test_", false); + private static final String EXPECTED_ERROR_JOB_MSG = "An error job found"; + private static final String EXPECTED_SCANCEL_JOB_MSG = "A CANCELLED job detected by monitor"; + private static final String EXPECTED_DEADLINE_JOB_MSG = "A DEADLINE job detected by monitor"; + private static final String EXPECTED_SUBMITTER_CANCEL_MSG = "Cancelled by submitter"; + private static final String FAILED_SEP = "******** TEST FAILED ********"; + private ModuleConfig config; + private volatile boolean failed = false; + @Before public void setup() { YamlModuleConfigRepository configRepository = new YamlModuleConfigRepository(Paths.get("src/test/resources/config.yml")); @@ -82,28 +92,20 @@ private void baseTest(TestAttribute testAttribute, Supplier completableFuture = computationManager.execute(EMPTY_ENV, supplier.get(), parameters); if (testAttribute.getType() == Type.TO_CANCEL) { - if (testAttribute.getTestName().endsWith("CancelExternal")) { - System.out.println("Go to cancel on server"); + System.out.println("Will be cancelled by junit test in 5 seconds..."); + Thread.sleep(5000); + boolean cancel = completableFuture.cancel(true); + System.out.println("Cancelled:" + cancel); + failed = !cancel; + try { completableFuture.join(); - System.out.println("Canceled:" + completableFuture.isCancelled()); - } else { - System.out.println("to cancel"); - Thread.sleep(5000); - boolean cancel = completableFuture.cancel(true); - System.out.println("Canceled:" + cancel); - Assert.assertTrue(cancel); + failed = true; + } catch (CancellationException ce) { + boolean b = failed || !ce.getMessage().equals(EXPECTED_SUBMITTER_CANCEL_MSG); + failed = b; } } else if (testAttribute.getType() == Type.TO_WAIT) { - System.out.println("to wait finish"); - if (testAttribute.getTestName().equals("deadline")) { - try { - completableFuture.join(); - } catch (CompletionException exception) { - // ignored - } - } else { - completableFuture.join(); - } + assertToWaitTest(testAttribute, completableFuture); } else if (testAttribute.getType() == Type.TO_SHUTDOWN) { System.out.println("Go shutdown JVM"); completableFuture.join(); @@ -113,6 +115,45 @@ private void baseTest(TestAttribute testAttribute, Supplier before(Path path) { @Override public Void after(Path workingDir, ExecutionReport report) { - try { - return super.after(workingDir, report); - } catch (IOException e) { - fail(); - } return null; } @@ -171,13 +207,12 @@ public Void after(Path workingDir, ExecutionReport report) { try { super.after(workingDir, report); } catch (IOException e) { - e.printStackTrace(); - fail(); + failed = true; } Path out2 = workingDir.resolve("out2.gz"); System.out.println("---------" + testAttribute.testName + "----------"); System.out.println("out2.gz should exists:" + Files.exists(out2)); - Assert.assertTrue(Files.exists(out2)); + failed = !Files.exists(out2); System.out.println("------------------------------------"); return null; } @@ -198,7 +233,7 @@ private static void generateZipFileOnRemote(String name, Path dest) { } @Test - public void testGrougCmd() throws InterruptedException { + public void testGroupCmd() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "groupCmd"); Supplier> supplier = () -> new AbstractExecutionHandler() { @Override @@ -283,6 +318,12 @@ public void testLongProgramToCancel() throws InterruptedException { public List before(Path workingDir) { return longProgramToCancel(); } + + @Override + public Void after(Path workingDir, ExecutionReport report) throws IOException { + failed = true; + return null; + } }; baseTest(testAttribute, supplier); } @@ -295,6 +336,12 @@ public void testLongListProgsToCancel() throws InterruptedException { public List before(Path workingDir) { return longListProgsToCancel(); } + + @Override + public Void after(Path workingDir, ExecutionReport report) throws IOException { + failed = true; + return null; + } }; baseTest(testAttribute, supplier); } @@ -307,18 +354,29 @@ public void testMixedProgsToCancel() throws InterruptedException { public List before(Path workingDir) { return mixedProgsToCancel(); } + + @Override + public Void after(Path workingDir, ExecutionReport report) throws IOException { + failed = true; + return null; + } }; baseTest(testAttribute, supplier); } @Test public void testLongProgramToCancelExternal() throws InterruptedException { - TestAttribute testAttribute = new TestAttribute(Type.TO_CANCEL, "longProgramToCancelExternal", true); + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "longProgramToCancelExternal", true); Supplier> supplier = () -> new AbstractExecutionHandler() { @Override public List before(Path workingDir) { return longProgram(200); } + + @Override + public Void after(Path workingDir, ExecutionReport report) throws IOException { + return null; + } }; baseTest(testAttribute, supplier); } @@ -378,10 +436,18 @@ public List before(Path workingDir) { public Void after(Path workingDir, ExecutionReport report) throws IOException { List actual2GMd5 = Files.readAllLines(workingDir.resolve("c1_0.out")); List actual4GMd5 = Files.readAllLines(workingDir.resolve("c2_0.out")); - List expected2GMd5 = Collections.singletonList("1ea9851f9b83e9bd50b8d7577b23e14b 2GFile"); - List expected4GMd5 = Collections.singletonList("bbe2b516d690f337d8f48fc03db99c9a 4GFile"); - assertEquals(expected2GMd5, actual2GMd5); - assertEquals(expected4GMd5, actual4GMd5); + String expected2GMd5 = "1ea9851f9b83e9bd50b8d7577b23e14b 2GFile"; + String expected4GMd5 = "bbe2b516d690f337d8f48fc03db99c9a 4GFile"; + if (!Objects.equals(actual2GMd5.get(0), expected2GMd5)) { + failed = true; + System.out.println(" actuel:" + actual2GMd5.get(0)); + System.out.println("expected:" + expected2GMd5); + } + if (!Objects.equals(actual4GMd5.get(0), expected4GMd5)) { + failed = true; + System.out.println(" actuel:" + actual4GMd5.get(0)); + System.out.println("expected:" + expected4GMd5); + } return null; } }; @@ -427,7 +493,7 @@ public void testDeadline() throws InterruptedException { Supplier> supplier = () -> new AbstractExecutionHandler() { @Override public List before(Path workingDir) { - return CommandExecutionsTestFactory.longProgram(10); + return longProgram(10); } }; ComputationParametersBuilder builder = new ComputationParametersBuilder(); @@ -457,7 +523,7 @@ private void testQos(String qos) throws InterruptedException { Supplier> supplier = () -> new AbstractExecutionHandler() { @Override public List before(Path workingDir) { - return CommandExecutionsTestFactory.longProgram(10); + return longProgram(10); } }; ComputationParameters parameters = ComputationParameters.empty(); @@ -466,13 +532,16 @@ public List before(Path workingDir) { baseTest(testAttribute, supplier, parameters); } - private static void assertErrors(String testName, ExecutionReport report) { + private void assertErrors(String testName, ExecutionReport report) { System.out.println("---------" + testName + "----------"); System.out.println("Errors should exists:" + !report.getErrors().isEmpty()); - Assert.assertFalse(report.getErrors().isEmpty()); + if (report.getErrors().isEmpty()) { + failed = true; + } System.out.println("------------------------------------"); } + private static class TestAttribute { private final Type type; @@ -502,7 +571,7 @@ boolean isShortScontrolTime() { } } - private static List twoSimpleCmd() { + static List twoSimpleCmd() { Command command1 = new SimpleCommandBuilder() .id("simpleCmdId") .program("sleep") diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/FlagFilesMonitor.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/FlagFilesMonitor.java index 904a951..dcb7441 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/FlagFilesMonitor.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/FlagFilesMonitor.java @@ -53,7 +53,7 @@ public void run() { commandRunner.execute("rm " + flagDir + "/" + line); // cancel following jobs(which depends on this job) if there are errors if (line.startsWith("myerror_")) { - taskStore.getCompletableFuture(workingDirName).cancel(true); + taskStore.getCompletableFuture(workingDirName).cancelBySlurm(new SlurmException("An error job found")); } else if (line.startsWith("mydone_")) { String id = line.substring(lastIdx + 1); taskStore.untracing(Long.parseLong(id)); diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java index 2fbc50b..81c7841 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java @@ -9,8 +9,10 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.*; -import java.util.concurrent.CompletableFuture; +import java.util.HashSet; +import java.util.List; +import java.util.Optional; +import java.util.Set; import java.util.stream.Collectors; /** @@ -54,7 +56,7 @@ public void run() { try { scontrolResult = scontrolCmd.send(commandRunner); SlurmConstants.JobState jobState = scontrolResult.getJobState(); - boolean unmoral = false; + boolean unormal = false; switch (jobState) { case RUNNING: case PENDING: @@ -63,10 +65,10 @@ public void run() { case TIMEOUT: case DEADLINE: case CANCELLED: - unmoral = true; + unormal = true; LOGGER.info("JobId: {} is {}", id, jobState); - Optional unormalFuture = taskStore.getCompletableFutureByJobId(id); - unormalFuture.ifPresent(completableFuture -> completableFuture.cancel(true)); + Optional unormalFuture = taskStore.getCompletableFutureByJobId(id); + unormalFuture.ifPresent(f -> f.cancelBySlurm(new SlurmException("A " + jobState + " job detected by monitor"))); break; case COMPLETE: // this monitor found task finished before flagDirMonitor @@ -76,7 +78,7 @@ public void run() { default: LOGGER.warn("Not implemented yet {}", jobState); } - if (unmoral) { + if (unormal) { break; // restart } } catch (SlurmCmdNonZeroException e) { diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java index 933b8da..360e547 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java @@ -48,6 +48,8 @@ public class SlurmComputationManager implements ComputationManager { private static final String CLOSE_START_NO_MORE_SEND_INFO = "SCM close started and no more send sbatch to slurm"; + private static final String CANCEL_BY_CALLER_MSG = "Cancelled by submitter"; + private final SlurmComputationConfig config; private final ExecutorService executorService; @@ -206,7 +208,7 @@ public CompletableFuture execute(ExecutionEnvironment environment, Execut requireNonNull(environment); requireNonNull(handler); - Mycf f = new Mycf<>(this); + SlurmCompletableFuture f = new SlurmCompletableFuture<>(this); executorService.submit(() -> { f.setThread(Thread.currentThread()); try { @@ -215,19 +217,24 @@ public CompletableFuture execute(ExecutionEnvironment environment, Execut LOGGER.error(e.toString(), e); f.completeExceptionally(e); } - taskStore.remove(f); + if (!f.isCancelled()) { + taskStore.clearBy(f); + } }); return f; } - static class Mycf extends CompletableFuture { + static class SlurmCompletableFuture extends CompletableFuture { - Thread thread; - SlurmComputationManager mgr; - volatile boolean cancel = false; + private Thread thread; + private SlurmComputationManager mgr; + private volatile boolean cancel = false; + private volatile boolean cancelledBySlurm = false; - Mycf(SlurmComputationManager manager) { - mgr = manager; + private SlurmException exception; + + SlurmCompletableFuture(SlurmComputationManager manager) { + mgr = Objects.requireNonNull(manager); } @Override @@ -235,6 +242,18 @@ public boolean isCancelled() { return cancel; } + // called by monitors, it differs from cancel() by client-side + boolean cancelBySlurm(SlurmException exception) { + Objects.requireNonNull(exception); + this.exception = exception; + cancelledBySlurm = true; + return cancel(true); + } + + private boolean isCancelledBySlurm() { + return cancelledBySlurm; + } + @Override public boolean cancel(boolean mayInterruptIfRunning) { cancel = true; @@ -244,6 +263,10 @@ public boolean cancel(boolean mayInterruptIfRunning) { return false; } + completeExceptionally(cancelledBySlurm ? + new CompletionException(exception.getMessage(), exception) : + new CancellationException(CANCEL_BY_CALLER_MSG)); + while (thread == null) { try { LOGGER.debug("Waiting submittedFuture to be set..."); @@ -272,7 +295,7 @@ public boolean cancel(boolean mayInterruptIfRunning) { } mgr.scancelCascading(jobId); - mgr.taskStore.remove(this); + mgr.taskStore.clearBy(this); return true; } @@ -302,7 +325,7 @@ private void scancel(Long jobId) { commandRunner.execute("scancel " + jobId); } - private void remoteExecute(ExecutionEnvironment environment, ExecutionHandler handler, ComputationParameters parameters, CompletableFuture f) { + private void remoteExecute(ExecutionEnvironment environment, ExecutionHandler handler, ComputationParameters parameters, SlurmCompletableFuture f) { Path remoteWorkingDir; try (WorkingDirectory remoteWorkingDirectory = new RemoteWorkingDir(baseDir, environment.getWorkingDirPrefix(), environment.isDebug())) { remoteWorkingDir = remoteWorkingDirectory.toPath(); @@ -330,14 +353,17 @@ private void remoteExecute(ExecutionEnvironment environment, ExecutionHandle Thread.currentThread().interrupt(); } - if (closeStarted) { - return; - } - SlurmExecutionReport report = generateReport(jobIdCommandMap, remoteWorkingDir); - - R result = handler.after(remoteWorkingDir, report); - f.complete(result); + if (f.isCancelled()) { + if (f.isCancelledBySlurm()) { + // some exceptions in platform + handler.after(remoteWorkingDir, report); + } else { + // client call cancel and skip after + } + } else { + f.complete(handler.after(remoteWorkingDir, report)); + } } catch (IOException e) { LOGGER.error(e.toString(), e); f.completeExceptionally(e); diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java index 4cf605b..7c81a4a 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java @@ -26,8 +26,8 @@ class TaskStore { // workingDir<--->Task of computation private Map workingDirTaskMap = new HashMap<>(); private Map workingDirFirstJobMap = new HashMap<>(); - private Map futureWorkingDirMap = new HashMap<>(); - private Map workingDirFutureMap = new HashMap<>(); + private Map futureWorkingDirMap = new HashMap<>(); + private Map workingDirFutureMap = new HashMap<>(); private ReadWriteLock taskLock = new ReentrantReadWriteLock(); private Map jobDependencies = new HashMap<>(); @@ -67,7 +67,7 @@ Long getFirstJobId(CompletableFuture future) { } } - CompletableFuture getCompletableFuture(String workingDirName) { + SlurmComputationManager.SlurmCompletableFuture getCompletableFuture(String workingDirName) { taskLock.readLock().lock(); try { return workingDirFutureMap.get(workingDirName); @@ -101,7 +101,7 @@ void insert(String workingDirName, TaskCounter taskCounter, Long firstJobId) { trace(firstJobId); } - void insert(String workingDirName, CompletableFuture future) { + void insert(String workingDirName, SlurmComputationManager.SlurmCompletableFuture future) { taskLock.writeLock().lock(); try { futureWorkingDirMap.put(future, workingDirName); @@ -162,7 +162,12 @@ Set getTaskCounters() { return new HashSet<>(workingDirTaskMap.values()); } - void remove(CompletableFuture future) { + /** + * Clear all job ids and its dependency in the task store. + * Called after complete or cancel all job + * @param future all job ids belongs to the future + */ + void clearBy(CompletableFuture future) { Long firstJobId = removeTaskMaps(future); removeIds(firstJobId); } @@ -218,9 +223,9 @@ private Set removeIds(Long firstId) { } } - Optional getCompletableFutureByJobId(long id) { + Optional getCompletableFutureByJobId(long id) { // try with first id - Optional completableFuture = getFutureByFirstId(id); + Optional completableFuture = getFutureByFirstId(id); if (completableFuture.isPresent()) { return completableFuture; } @@ -234,7 +239,7 @@ Optional getCompletableFutureByJobId(long id) { return completableFuture; } - private Optional getFutureByFirstId(long firstJobId) { + private Optional getFutureByFirstId(long firstJobId) { taskLock.readLock().lock(); try { return workingDirFirstJobMap.entrySet() @@ -248,7 +253,7 @@ private Optional getFutureByFirstId(long firstJobId) { } } - private Optional getFutureByMasterId(long masterId) { + private Optional getFutureByMasterId(long masterId) { OptionalLong option = getFirstId(masterId); if (option.isPresent()) { return getFutureByFirstId(option.getAsLong()); @@ -259,7 +264,7 @@ private Optional getFutureByMasterId(long masterId) { private OptionalLong getFirstId(long masterId) { // is already a first job id - Optional completableFuture = getFutureByFirstId(masterId); + Optional completableFuture = getFutureByFirstId(masterId); if (completableFuture.isPresent()) { return OptionalLong.of(masterId); } @@ -288,7 +293,7 @@ private OptionalLong getFirstId(long masterId) { return OptionalLong.of(tmp); } - private Optional getFutureByBatchId(long batchId) { + private Optional getFutureByBatchId(long batchId) { OptionalLong optMasterId = getMasterId(batchId); if (optMasterId.isPresent()) { return getFutureByMasterId(optMasterId.getAsLong()); diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java index 03ce124..f159e6e 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java @@ -55,11 +55,11 @@ public void testUnormalFound() { @Before public void setup() { - SlurmComputationManager.Mycf mycf; + SlurmComputationManager.SlurmCompletableFuture slurmCompletableFuture; slurm = mock(SlurmComputationManager.class); - mycf = new SlurmComputationManager.Mycf(slurm); - mycf.setThread(new Thread()); - ts = TaskStoreTest.generateTaskStore(mycf, false); + slurmCompletableFuture = new SlurmComputationManager.SlurmCompletableFuture(slurm); + slurmCompletableFuture.setThread(new Thread()); + ts = TaskStoreTest.generateTaskStore(slurmCompletableFuture, false); Whitebox.setInternalState(slurm, "taskStore", ts); when(slurm.getTaskStore()).thenReturn(ts); cm = mock(CommandExecutor.class); diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java index 30254ca..d407a7b 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java @@ -12,7 +12,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; -import java.util.concurrent.CompletableFuture; import static org.junit.Assert.*; import static org.mockito.Mockito.mock; @@ -56,7 +55,7 @@ public void testParallelInserts() { // 3←4,5 // ↑ // 6 - static TaskStore generateTaskStore(CompletableFuture future, boolean checkTracing) { + static TaskStore generateTaskStore(SlurmComputationManager.SlurmCompletableFuture future, boolean checkTracing) { String workingDir = "a_working_dir"; TaskCounter counter = mock(TaskCounter.class); @@ -91,7 +90,7 @@ static TaskStore generateTaskStore(CompletableFuture future, boolean checkTracin @Test public void test() { - TaskStore taskStore = generateTaskStore(mock(CompletableFuture.class), false); + TaskStore taskStore = generateTaskStore(mock(SlurmComputationManager.SlurmCompletableFuture.class), false); assertEquals(Arrays.asList(3L, 6L), taskStore.getDependentJobs(1L)); assertEquals(Collections.singletonList(6L), taskStore.getDependentJobs(3L)); assertTrue(taskStore.getDependentJobs(6L).isEmpty()); @@ -102,13 +101,13 @@ public void test() { @Test public void testRemove() { - CompletableFuture future = mock(CompletableFuture.class); + SlurmComputationManager.SlurmCompletableFuture future = mock(SlurmComputationManager.SlurmCompletableFuture.class); TaskStore taskStore = generateTaskStore(future, false); assertEquals(1L, taskStore.getFirstJobId(future).longValue()); assertNotNull(taskStore.getTaskCounter(future)); assertEquals(future, taskStore.getCompletableFuture("a_working_dir")); - taskStore.remove(future); + taskStore.clearBy(future); assertNull(taskStore.getTaskCounter(future)); assertNull(taskStore.getFirstJobId(future)); @@ -121,7 +120,7 @@ public void testRemove() { @Test public void testGetFutureFromJobId() { - CompletableFuture future = mock(CompletableFuture.class); + SlurmComputationManager.SlurmCompletableFuture future = mock(SlurmComputationManager.SlurmCompletableFuture.class); TaskStore taskStore = generateTaskStore(future, false); assertEquals(future, taskStore.getCompletableFutureByJobId(1L).orElse(null)); assertEquals(future, taskStore.getCompletableFutureByJobId(2L).orElse(null)); @@ -133,6 +132,6 @@ public void testGetFutureFromJobId() { @Test public void testTracing() { - generateTaskStore(mock(CompletableFuture.class), true); + generateTaskStore(mock(SlurmComputationManager.SlurmCompletableFuture.class), true); } } From 34344afa0da486a8d3a7468a2412d09b6aaa4a60 Mon Sep 17 00:00:00 2001 From: Yichen TANG Date: Thu, 13 Jun 2019 11:43:18 +0200 Subject: [PATCH 2/8] Some fix Signed-off-by: Yichen TANG --- .../powsybl/computation/slurm/SlurmUnitTests.java | 13 ++++++++++--- .../slurm/SlurmComputationManager.java | 15 +++++++++++---- 2 files changed, 21 insertions(+), 7 deletions(-) diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index 87be96f..378a652 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -148,7 +148,7 @@ private void assertToWaitTest(TestAttribute testAttribute, CompletableFuture com // normal tests failed = true; } - if (!e.getMessage().equals(expected)) { + if (!e.getCause().getMessage().equals(expected)) { failed = true; System.out.println(FAILED_SEP); System.out.println("Actuel:" + e.getMessage()); @@ -492,8 +492,15 @@ public void testDeadline() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "deadline", true); Supplier> supplier = () -> new AbstractExecutionHandler() { @Override - public List before(Path workingDir) { - return longProgram(10); + public List before(Path workingDir) throws IOException { + return CommandExecutionsTestFactory.longProgram(10); + } + + @Override + public Void after(Path workingDir, ExecutionReport report) throws IOException { + System.out.println("in deadline after"); + failed = report.getErrors().isEmpty(); + return null; } }; ComputationParametersBuilder builder = new ComputationParametersBuilder(); diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java index 360e547..957d630 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java @@ -250,6 +250,10 @@ boolean cancelBySlurm(SlurmException exception) { return cancel(true); } + SlurmException getException() { + return exception; + } + private boolean isCancelledBySlurm() { return cancelledBySlurm; } @@ -263,10 +267,6 @@ public boolean cancel(boolean mayInterruptIfRunning) { return false; } - completeExceptionally(cancelledBySlurm ? - new CompletionException(exception.getMessage(), exception) : - new CancellationException(CANCEL_BY_CALLER_MSG)); - while (thread == null) { try { LOGGER.debug("Waiting submittedFuture to be set..."); @@ -358,8 +358,15 @@ private void remoteExecute(ExecutionEnvironment environment, ExecutionHandle if (f.isCancelledBySlurm()) { // some exceptions in platform handler.after(remoteWorkingDir, report); + // Normally in this case the handler.after() will throw exceptions because: + // 1. Exitcode non-zero + // 2. Result file not found + // the exception would be caught outside. + // But if the after() not throws exception, (ex: unit tests), it rethrows slurm exception + throw f.getException(); } else { // client call cancel and skip after + throw new CancellationException(CANCEL_BY_CALLER_MSG); } } else { f.complete(handler.after(remoteWorkingDir, report)); From 959a4f394a4fa023a76db4db897d98e1dfce0b28 Mon Sep 17 00:00:00 2001 From: yichen88 Date: Mon, 15 Jul 2019 12:21:08 +0200 Subject: [PATCH 3/8] Fix ignore Signed-off-by: yichen88 --- .gitignore | 2 +- .../test/java/com/powsybl/computation/slurm/SlurmUnitTests.java | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.gitignore b/.gitignore index d99e7bf..fe683e8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Maven projects /computation-mpi/target/ /computation-slurm/target/ -/computation-slurm-tests/target/ +/computation-slurm-test/target/ /distribution-hpc/target/ /target diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index 378a652..dd490df 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -548,7 +548,6 @@ private void assertErrors(String testName, ExecutionReport report) { System.out.println("------------------------------------"); } - private static class TestAttribute { private final Type type; From 1612011148bc0d316322d8f5d7d65f67682d28fc Mon Sep 17 00:00:00 2001 From: yichen88 Date: Mon, 12 Aug 2019 17:14:39 +0200 Subject: [PATCH 4/8] Fix typo --- .../java/com/powsybl/computation/slurm/ScontrolMonitor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java index 81c7841..a98a193 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/ScontrolMonitor.java @@ -56,7 +56,7 @@ public void run() { try { scontrolResult = scontrolCmd.send(commandRunner); SlurmConstants.JobState jobState = scontrolResult.getJobState(); - boolean unormal = false; + boolean abnormal = false; switch (jobState) { case RUNNING: case PENDING: @@ -65,7 +65,7 @@ public void run() { case TIMEOUT: case DEADLINE: case CANCELLED: - unormal = true; + abnormal = true; LOGGER.info("JobId: {} is {}", id, jobState); Optional unormalFuture = taskStore.getCompletableFutureByJobId(id); unormalFuture.ifPresent(f -> f.cancelBySlurm(new SlurmException("A " + jobState + " job detected by monitor"))); @@ -78,7 +78,7 @@ public void run() { default: LOGGER.warn("Not implemented yet {}", jobState); } - if (unormal) { + if (abnormal) { break; // restart } } catch (SlurmCmdNonZeroException e) { From c53e23aeb33ba7ab9532ba0f903a608671fdf316 Mon Sep 17 00:00:00 2001 From: Yichen TANG Date: Mon, 29 Apr 2019 13:35:12 +0200 Subject: [PATCH 5/8] Use --arrays in slurm - Remove --array=0 if count = 1(Not used in previous) - Initialize SbatchGenerator with all parameters(flagdirs, command, workingDir, env) - mydone_ flag file would be added at the last command's execution. If it is not an array job, then just one mydone_ file would be created and the countDownLatch is set to one. If is an array-job, countdownLatch set to n and wait for n mydone_ file created. - Job would be auto cancelled by slurm with a new argument(--kill-on-invalid-dep=yes) added in sbatch's cmd by default. - no more batchIds in TaskStore. Signed-off-by: Yichen TANG --- .../computation/slurm/SlurmUnitTests.java | 121 +++++++- .../src/test/resources/myapps/myecho.sh | 6 +- .../computation/slurm/CommandUtils.java | 13 +- .../computation/slurm/SbatchCmdBuilder.java | 33 ++- .../slurm/SbatchScriptGenerator.java | 258 ++++++++++++++---- .../slurm/SlurmComputationManager.java | 149 +++++----- .../powsybl/computation/slurm/TaskStore.java | 68 +---- .../slurm/CommandExecutionsTestFactory.java | 55 +++- .../computation/slurm/SbatchCmdTest.java | 20 +- .../slurm/SbatchScriptGeneratorTest.java | 85 +++--- .../slurm/ScontrolMonitorTest.java | 9 +- .../computation/slurm/TaskStoreTest.java | 27 +- .../expectedShell/commandFiles.batch | 22 ++ .../groupCmdWithArgsCount3.batch | 16 ++ .../myEchoSimpleCmdWithUnzipZip.batch | 22 ++ .../simpleArrayJobWithArgu.batch | 14 + .../expectedShell/simpleCmdWithCount3.batch | 14 + 17 files changed, 636 insertions(+), 296 deletions(-) create mode 100644 computation-slurm/src/test/resources/expectedShell/commandFiles.batch create mode 100644 computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch create mode 100644 computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch create mode 100644 computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch create mode 100644 computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index dd490df..884295c 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -46,7 +46,8 @@ public class SlurmUnitTests { private static final Logger LOGGER = LoggerFactory.getLogger(SlurmUnitTests.class); - private static final ExecutionEnvironment EMPTY_ENV = new ExecutionEnvironment(Collections.emptyMap(), "unit_test_", false); + private static final boolean DEBUG = false; + private static final ExecutionEnvironment EMPTY_ENV = new ExecutionEnvironment(Collections.emptyMap(), "unit_test_", DEBUG); private static final String EXPECTED_ERROR_JOB_MSG = "An error job found"; private static final String EXPECTED_SCANCEL_JOB_MSG = "A CANCELLED job detected by monitor"; @@ -157,27 +158,46 @@ private void assertToWaitTest(TestAttribute testAttribute, CompletableFuture com } } + @Test + public void testSimpleEchoWithoutCount() throws InterruptedException { + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "simpleEchoWithoutCount"); + Supplier> supplier = () -> { + return new AbstractExecutionHandler() { + + @Override + public List before(Path path) { + return simpleEchoWithCount(1); + } + + // TODO verify the .out file into which "te1st0" string are echoed. + }; + }; + baseTest(testAttribute, supplier); + } + @Test public void testSimpleCmdWithCount() throws InterruptedException { - TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "simpleCmdWithCount"); - Supplier> supplier = () -> new AbstractExecutionHandler() { - @Override - public List before(Path path) { - return simpleCmdWithCount(7); - } + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "simpleEchoWithCount"); + Supplier> supplier = () -> + new AbstractExecutionHandler() { + @Override + public List before(Path path) { + return simpleEchoWithCount(7); + } - @Override - public Void after(Path workingDir, ExecutionReport report) { - return null; - } + @Override + public Void after(Path workingDir, ExecutionReport report) { + return null; + } - private List simpleCmdWithCount(int count) { - return CommandExecutionsTestFactory.simpleCmdWithCount(count); - } - }; + }; baseTest(testAttribute, supplier); } + private static List simpleEchoWithCount(int count) { + return CommandExecutionsTestFactory.simpleEchoWithCount(count); + } + @Test public void testLongTask() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "longTask"); @@ -232,6 +252,53 @@ private static void generateZipFileOnRemote(String name, Path dest) { } } + @Test + public void testSimpleArgs() throws InterruptedException { + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "simpleArgs"); + Supplier> supplier = () -> { + return new AbstractExecutionHandler() { + @Override + public List before(Path workingDir) throws IOException { + return CommandExecutionsTestFactory.oddEvenCmd(3); + } + + @Override + public Void after(Path workingDir, ExecutionReport report) throws IOException { + super.after(workingDir, report); + List lines0 = Files.readAllLines(workingDir.resolve("evenOutput0.txt")); + List lines1 = Files.readAllLines(workingDir.resolve("oddOutput1.txt")); + System.out.println(lines0); + System.out.println(lines1); + if (!lines0.get(0).equals("evenIn0") || !lines1.get(0).equals("oddIn1")) { + failed = true; + } + return null; + } + }; + }; + baseTest(testAttribute, supplier); + } + + @Test + public void testOneJobFails() throws InterruptedException { + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "OneJobFails"); + Supplier> supplier = () -> { + return new AbstractExecutionHandler() { + @Override + public List before(Path workingDir) throws IOException { + return CommandExecutionsTestFactory.failInOneOfArrayJob(); + } + + @Override + public Void after(Path workingDir, ExecutionReport report) { + assertErrors(testAttribute.testName, report); + return null; + } + }; + }; + baseTest(testAttribute, supplier); + } + @Test public void testGroupCmd() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "groupCmd"); @@ -244,6 +311,20 @@ public List before(Path workingDir) { baseTest(testAttribute, supplier); } + @Test + public void testGroupCmdWithArgs() throws InterruptedException { + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "GroupCmdWithArgs"); + Supplier> supplier = () -> { + return new AbstractExecutionHandler() { + @Override + public List before(Path workingDir) { + return CommandExecutionsTestFactory.groupCmdWithArgs(3); + } + }; + }; + baseTest(testAttribute, supplier); + } + @Test public void testTwoSimpleCmd() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "twoSimpleCmd"); @@ -489,6 +570,16 @@ public List before(Path workingDir) { // 2. Wait 1 mins @Test public void testDeadline() throws InterruptedException { + Thread makeSlurmBusyThread = new Thread(() -> { + try { + makeSlurmBusy(); + } catch (InterruptedException e) { + // do nothing + } + }); + makeSlurmBusyThread.start(); + TimeUnit.SECONDS.sleep(5); + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "deadline", true); Supplier> supplier = () -> new AbstractExecutionHandler() { @Override diff --git a/computation-slurm-test/src/test/resources/myapps/myecho.sh b/computation-slurm-test/src/test/resources/myapps/myecho.sh index 4d86ea7..ab17a36 100644 --- a/computation-slurm-test/src/test/resources/myapps/myecho.sh +++ b/computation-slurm-test/src/test/resources/myapps/myecho.sh @@ -1,2 +1,6 @@ #!/bin/sh -echo $1 > $2 +if [ $1 = "oddIn3" ]; then + echoooo +else + echo $1 > $2 +fi diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/CommandUtils.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/CommandUtils.java index c005b20..c30f822 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/CommandUtils.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/CommandUtils.java @@ -35,7 +35,16 @@ static String commandToString(String program, Collection args) { requireNonNull(program); requireNonNull(args); - String argStr = args.stream().collect(getWrapperAndJoiner()); - return program + " " + argStr; + return program + " " + commandArgsToString(args); + } + + /** + * Generates a command's argu string, with each argument wrapped with quotes. + * @param args + * @return the argu's string + */ + static String commandArgsToString(Collection args) { + requireNonNull(args); + return args.stream().collect(getWrapperAndJoiner()); } } diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java index 3a4667c..4992f33 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java @@ -33,14 +33,16 @@ SbatchCmdBuilder jobName(String jobName) { return this; } + /** + * + * @param i if equals 1, --array would not be set. If negative, exception would be thrown. + * @return this builder + */ SbatchCmdBuilder array(int i) { - if (i < 0) { + if (i <= 0) { throw new IllegalArgumentException(i + " is not validate for array."); } - if (i == 1) { - sbatchArgsByName.put("array", Integer.toString(0)); - } - if (i != 1) { + if (i > 1) { sbatchArgsByName.put("array", "0-" + (i - 1)); } return this; @@ -55,6 +57,20 @@ SbatchCmdBuilder aftercorr(List jobIds) { return this; } + SbatchCmdBuilder aftercorr(@Nullable Long preMasterJob) { + if (preMasterJob != null) { + sbatchArgsByName.put("dependency", "aftercorr:" + preMasterJob); + } + return this; + } + + SbatchCmdBuilder afternotok(Long lastMasterJob) { + if (lastMasterJob != null) { + sbatchArgsByName.put("dependency", "afternotok:" + lastMasterJob); + } + return this; + } + SbatchCmdBuilder nodes(int i) { if (i < 1) { throw new IllegalArgumentException(i + " is not validate for nodes."); @@ -84,6 +100,7 @@ SbatchCmdBuilder cpusPerTask(int i) { return this; } + // TODO test SbatchCmdBuilder error(String pattern) { Objects.requireNonNull(pattern); sbatchArgsByName.put("error", pattern); @@ -107,6 +124,11 @@ SbatchCmdBuilder oversubscribe() { return this; } + private SbatchCmdBuilder killOnInvalidDep() { + sbatchArgsByName.put("kill-on-invalid-dep", "yes"); + return this; + } + SbatchCmdBuilder script(String name) { this.script = Objects.requireNonNull(name); return this; @@ -144,6 +166,7 @@ SbatchCmdBuilder qos(String qos) { } SbatchCmd build() { + killOnInvalidDep(); validate(); return new SbatchCmd(sbatchArgsByName, sbatchArgsByCharacter, sbatchOptions, script); } diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java index d9ccc09..15c425d 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java @@ -9,6 +9,7 @@ import com.powsybl.computation.*; import org.apache.commons.lang3.StringUtils; +import javax.annotation.Nullable; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -32,13 +33,63 @@ class SbatchScriptGenerator { private static final String CHECK_EXITCODE = "rc=$?; if [[ $rc != 0 ]]; then touch %s/myerror_%s_$SLURM_JOBID; exit $rc; fi"; private static final String TOUCH_MYDONE = "touch %s/mydone_%s_$SLURM_JOBID"; + private static final String INDENTATION_4 = " "; + private static final String INDENTATION_6 = " "; + private static final String SH_CASE_BREAK = INDENTATION_4 + ";;"; + private static final String SPL_CMD_ARGS = "ARGS"; + private static final String SUB_CMD_ARGS = "ARGS_"; + private static final String PRE_FILE = "PRE"; + private static final String POST_FILE = "POST"; + private final Path flagDir; + private final Path workingDir; + private final Map env; + private final CommandExecution commandExecution; + + private boolean isArrayJob; + private boolean isLast = false; - SbatchScriptGenerator(Path flagDir) { + SbatchScriptGenerator(Path flagDir, CommandExecution commandExecution, Path workingDir, Map env) { this.flagDir = Objects.requireNonNull(flagDir); + this.commandExecution = Objects.requireNonNull(commandExecution); + this.workingDir = Objects.requireNonNull(workingDir); + this.env = Objects.requireNonNull(env); + isArrayJob = commandExecution.getExecutionCount() != 1; + } + + SbatchScriptGenerator setIsLast(boolean isLast) { + this.isLast = isLast; + return this; + } + + /** + * Returns the list of commands which constitute the sbatch script. + */ + List parse() { + List shell = new ArrayList<>(); + shell.add(SHEBANG); + if (isArrayJob) { + arrayJobCase(shell); + preProcess(shell); + } else { + preProcessNonArray(shell); + } + export(shell, env); + cmd(shell); + if (isArrayJob) { + postProcess(shell); + } else { + postProcessNonArray(shell); + } + touchMydone(shell); + return shell; } - List unzipCommonInputFiles(Command command) { + /** + * Returns the list of commands which constitute the sbatch script. + * The unzip common input files script DO NOT create a flag file at the end. + */ + static List unzipCommonInputFiles(Command command) { List shell = new ArrayList<>(); shell.add(SHEBANG); command.getInputFiles() @@ -48,25 +99,133 @@ List unzipCommonInputFiles(Command command) { return shell; } - /** - * Returns the list of commands which constitute the sbatch script. - */ - List parser(Command command, int executionIndex, Path workingDir, Map env) { - List list = new ArrayList<>(); - list.add(SHEBANG); - preProcess(list, command, executionIndex); - export(list, env); - cmdWithArgu(list, command, executionIndex, workingDir); - postProcess(list, command, executionIndex); - touchMydone(list, workingDir); - return list; + private void arrayJobCase(List shell) { + Command command = commandExecution.getCommand(); + shell.add("case $SLURM_ARRAY_TASK_ID in"); + for (int caseIdx = 0; caseIdx < commandExecution.getExecutionCount(); caseIdx++) { + shell.add(INDENTATION_4 + caseIdx + ")"); + addInputFilenames(shell, caseIdx); + addOutputFilenames(shell, caseIdx); + switch (command.getType()) { + case SIMPLE: + SimpleCommand simpleCmd = (SimpleCommand) command; + String args = CommandUtils.commandArgsToString(simpleCmd.getArgs(caseIdx)); + shell.add(INDENTATION_6 + SPL_CMD_ARGS + "=\"" + args + "\""); + shell.add(SH_CASE_BREAK); + break; + case GROUP: + GroupCommand groupCommand = (GroupCommand) command; + List subCommands = groupCommand.getSubCommands(); + List subArgs = new ArrayList<>(); + for (int i = 0; i < subCommands.size(); i++) { + GroupCommand.SubCommand cmd = subCommands.get(i); + String argsSub = CommandUtils.commandArgsToString(cmd.getArgs(caseIdx)); + String de = SUB_CMD_ARGS + i + "=\"" + argsSub + "\""; + subArgs.add(de); + } + String subArgsJoined = String.join(" ", subArgs); + shell.add(INDENTATION_6 + subArgsJoined); + shell.add(SH_CASE_BREAK); + break; + default: + throw new AssertionError("Unexpected command type value: " + command.getType()); + } + } + shell.add("esac"); + } + + // TODO Input/OutputFile abstraction in core + private void addInputFilenames(List shell, int caseIdx) { + List inputFiles = commandExecution.getCommand().getInputFiles(); + List ins = new ArrayList<>(); + for (int i = 0; i < inputFiles.size(); i++) { + InputFile inputFile = inputFiles.get(i); + if (!inputFile.dependsOnExecutionNumber()) { + // skip because this file is already unzip in a previous batch + continue; + } + ins.add(PRE_FILE + i + "=" + inputFile.getName(caseIdx)); + } + if (ins.isEmpty()) { + return; + } + shell.add(INDENTATION_6 + String.join(" ", ins)); + } + + private void addOutputFilenames(List shell, int caseIdx) { + List outputFiles = commandExecution.getCommand().getOutputFiles(); + List outs = new ArrayList<>(); + for (int i = 0; i < outputFiles.size(); i++) { + OutputFile outputFile = outputFiles.get(i); + if (!outputFile.dependsOnExecutionNumber() || outputFile.getPostProcessor() == null) { + continue; + } + outs.add(POST_FILE + i + "=" + outputFile.getName(caseIdx)); + } + if (outs.isEmpty()) { + return; + } + shell.add(INDENTATION_6 + String.join(" ", outs)); + } + + private void cmd(List shell) { + Command command = commandExecution.getCommand(); + switch (command.getType()) { + case SIMPLE: + SimpleCommand simpleCmd = (SimpleCommand) command; + simpleCmdWithArgs(shell, simpleCmd); + shell.add(String.format(CHECK_EXITCODE, flagDir.toAbsolutePath(), workingDir.getFileName())); + break; + case GROUP: + GroupCommand groupCommand = (GroupCommand) command; + List subCommands = groupCommand.getSubCommands(); + for (int i = 0; i < subCommands.size(); i++) { + GroupCommand.SubCommand cmd = subCommands.get(i); + subCmdWithArgs(shell, cmd, i); + shell.add(String.format(CHECK_EXITCODE, flagDir.toAbsolutePath(), workingDir.getFileName())); + } + break; + default: + throw new AssertionError("Unexpected command type value: " + command.getType()); + } + } + + private void simpleCmdWithArgs(List list, SimpleCommand simpleCommand) { + if (isArrayJob) { + list.add(simpleCommand.getProgram() + " $" + SPL_CMD_ARGS); + } else { + list.add(CommandUtils.commandToString(simpleCommand.getProgram(), simpleCommand.getArgs(0))); // not an array job, 0 is the only number + } + } + + private void subCmdWithArgs(List list, GroupCommand.SubCommand subCommand, int idxInGroup) { + if (isArrayJob) { + list.add(subCommand.getProgram() + " $" + SUB_CMD_ARGS + idxInGroup); + } else { + list.add(CommandUtils.commandToString(subCommand.getProgram(), subCommand.getArgs(0))); // not an array job, 0 is the only number + } } // only preprocess input file which dependent on executionIdx - private void preProcess(List list, Command command, int executionIndex) { + private void preProcessNonArray(List shell) { + Command command = commandExecution.getCommand(); command.getInputFiles().stream() .filter(InputFile::dependsOnExecutionNumber) - .forEach(file -> addUnzip(list, file.getName(executionIndex), file.getPreProcessor())); + .forEach(file -> { + addUnzip(shell, file.getName(0), file.getPreProcessor()); + }); + } + + private void preProcess(List shell) { + List inputFiles = commandExecution.getCommand().getInputFiles(); + for (int i = 0; i < inputFiles.size(); i++) { + InputFile inputFile = inputFiles.get(i); + if (!inputFile.dependsOnExecutionNumber()) { + // skip because this file is already unzip in a previous batch + continue; + } + addUnzip(shell, "$" + PRE_FILE + i, inputFile.getPreProcessor()); + } } private static void addUnzip(List shell, String filename, FilePreProcessor preProcessor) { @@ -84,58 +243,61 @@ private static void addUnzip(List shell, String filename, FilePreProcess } } - private void export(List list, Map env) { + private void export(List shell, Map env) { StringBuilder sb = new StringBuilder(); for (Map.Entry entry : env.entrySet()) { String name = entry.getKey(); String value = entry.getValue(); sb.append(SH_EXPORT).append(name).append("=").append(value); if (name.endsWith("PATH")) { - sb.append(":").append("$").append(name); + sb.append(":$").append(name); } sb.append("; "); } if (!StringUtils.isEmpty(sb.toString())) { - list.add(sb.toString()); + shell.add(sb.toString()); } } - private void cmdWithArgu(List list, Command command, int executionIndex, Path workingDir) { - switch (command.getType()) { - case SIMPLE: - SimpleCommand simpleCmd = (SimpleCommand) command; - list.add(CommandUtils.commandToString(simpleCmd.getProgram(), simpleCmd.getArgs(executionIndex))); - list.add(String.format(CHECK_EXITCODE, flagDir.toAbsolutePath(), workingDir.getFileName())); - break; - case GROUP: - GroupCommand groupCommand = (GroupCommand) command; - for (GroupCommand.SubCommand subCommand : groupCommand.getSubCommands()) { - list.add(CommandUtils.commandToString(subCommand.getProgram(), subCommand.getArgs(executionIndex))); - list.add(String.format(CHECK_EXITCODE, flagDir.toAbsolutePath(), workingDir.getFileName())); + private void postProcessNonArray(List shell) { + commandExecution.getCommand().getOutputFiles().forEach(file -> { + addGzip(shell, file.getName(0), file.getPostProcessor()); + }); + } + + private void postProcess(List shell) { + if (isArrayJob) { + List outputFiles = commandExecution.getCommand().getOutputFiles(); + for (int i = 0; i < outputFiles.size(); i++) { + OutputFile outputFile = outputFiles.get(i); + if (!outputFile.dependsOnExecutionNumber()) { + // skip because this file is already unzip in a previous batch + continue; } - break; - default: - throw new AssertionError("Unexpected command type value: " + command.getType()); + FilePostProcessor postProcessor = outputFile.getPostProcessor(); + addGzip(shell, "$" + POST_FILE + i, postProcessor); + } + } else { + postProcessNonArray(shell); } } - private void postProcess(List list, Command command, int executionIndex) { - command.getOutputFiles().forEach(file -> { - FilePostProcessor postProcessor = file.getPostProcessor(); - String fileName; - fileName = file.getName(executionIndex); - if (postProcessor != null) { - if (postProcessor == FilePostProcessor.FILE_GZIP) { - list.add(SH_GZIP + fileName); - } else { - throw new AssertionError("Unexpected postProcessor type value: " + postProcessor); - } + private static void addGzip(List shell, String fileName, @Nullable FilePostProcessor postProcessor) { + if (postProcessor != null) { + if (postProcessor == FilePostProcessor.FILE_GZIP) { + shell.add(SH_GZIP + fileName); + // TODO gzip fails +// shell.add(String.format(CHECK_EXITCODE, flagDir.toAbsolutePath(), workingDir.getFileName())); + } else { + throw new AssertionError("Unexpected postProcessor type value: " + postProcessor); } - }); + } } - private void touchMydone(List list, Path workingDir) { - list.add(String.format(TOUCH_MYDONE, flagDir.toAbsolutePath(), workingDir.getFileName())); + private void touchMydone(List list) { + if (isLast) { + list.add(String.format(TOUCH_MYDONE, flagDir.toAbsolutePath(), workingDir.getFileName())); + } } } diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java index 957d630..90b32d2 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java @@ -17,6 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; import java.io.*; import java.net.URI; import java.net.URISyntaxException; @@ -30,6 +31,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static com.powsybl.computation.slurm.SlurmConstants.ERR_EXT; +import static com.powsybl.computation.slurm.SlurmConstants.OUT_EXT; import static java.util.Objects.requireNonNull; /** @@ -41,6 +44,7 @@ public class SlurmComputationManager implements ComputationManager { private static final Logger LOGGER = LoggerFactory.getLogger(SlurmComputationManager.class); private static final String SACCT_NONZERO_JOB = "sacct --jobs=%s -n --format=\"jobid,exitcode\" | grep -v \"0:0\" | grep -v \"\\.\""; + private static final String ARRAY_INDEX_PH = "%a"; // ph=placeholder private static final Pattern DIGITAL_PATTERN = Pattern.compile("\\d+"); private static final String BATCH_EXT = ".batch"; private static final String FLAGS_DIR_PREFIX = "myflags_"; // where flag files are created and be watched @@ -157,7 +161,7 @@ private static Path initFlagDir(Path baseDir) throws IOException { Path p; long n = new SecureRandom().nextLong(); n = n == -9223372036854775808L ? 0L : Math.abs(n); - p = baseDir.resolve(FLAGS_DIR_PREFIX + Long.toString(n)); + p = baseDir.resolve(FLAGS_DIR_PREFIX + n); return Files.createDirectories(p); } @@ -288,13 +292,13 @@ public boolean cancel(boolean mayInterruptIfRunning) { // in subsequent method calls such as Files.read in executionHandler.after. LOGGER.debug("Canceled thread"); - Long jobId = mgr.taskStore.getFirstJobId(this); - if (jobId == null) { + Long firstJobId = mgr.taskStore.getFirstJobId(this); + if (firstJobId == null) { LOGGER.warn("job not be submitted yet"); return true; } - mgr.scancelCascading(jobId); + mgr.scancelCascading(firstJobId); mgr.taskStore.clearBy(this); return true; } @@ -304,25 +308,17 @@ void setThread(Thread t) { } } - private void scancelCascading(Long jobId) { + private void scancelCascading(Long firstJobId) { // For array jobs can be cancelled just by calling on master jobId - // but currently array_job in slurm is not used, so jobs should be cancelled one by one - // For dependency jobs will not be cancelled automatically and they will be pending in queue infinitely - LOGGER.debug("cancel cascading: {}", jobId); - scancel(jobId); - taskStore.getBatchIds(jobId).forEach(this::scancel); - taskStore.getDependentJobs(jobId).forEach(this::scancelWithMasterId); + LOGGER.debug("scancel first job {}", firstJobId); + scancelMasterJob(firstJobId); + taskStore.getDependentJobs(firstJobId).forEach(this::scancelMasterJob); } - private void scancelWithMasterId(Long masterid) { - scancel(masterid); - taskStore.getBatchIds(masterid).forEach(this::scancel); - } - - private void scancel(Long jobId) { - LOGGER.debug("scancel {}", jobId); - taskStore.untracing(jobId); - commandRunner.execute("scancel " + jobId); + private void scancelMasterJob(Long masterid) { + LOGGER.debug("scancel master job {}", masterid); + taskStore.untracing(masterid); + commandRunner.execute("scancel " + masterid); } private void remoteExecute(ExecutionEnvironment environment, ExecutionHandler handler, ComputationParameters parameters, SlurmCompletableFuture f) { @@ -333,8 +329,7 @@ private void remoteExecute(ExecutionEnvironment environment, ExecutionHandle taskStore.insert(remoteWorkingDir.getFileName().toString(), f); List commandExecutions = handler.before(remoteWorkingDir); - int sum = commandExecutions.stream().mapToInt(CommandExecution::getExecutionCount).sum(); - TaskCounter taskCounter = new TaskCounter(sum); + TaskCounter taskCounter = new TaskCounter(commandExecutions.get(commandExecutions.size() - 1).getExecutionCount()); Map jobIdCommandMap; jobIdCommandMap = generateSbatchAndSubmit(commandExecutions, parameters, remoteWorkingDir, environment, taskCounter, f); @@ -409,12 +404,10 @@ private Map generateSbatchAndSubmit(List comman ExecutionEnvironment environment, TaskCounter taskCounter, CompletableFuture future) throws IOException { Map jobIdCommandMap = new HashMap<>(); - Long firstJobId = null; // the first jobId submitted of List - List preJobIds = new ArrayList<>(); + Long firstJobId = null; // the first jobId submitted of List, or the unzip jobId Long preMasterJobId = null; // the masterJobId is the first id of each CommandExecution // it could be also job id for unzip common inputs - outerSendingLoop: for (int commandIdx = 0; commandIdx < commandExecutions.size(); commandIdx++) { CommandExecution commandExecution = commandExecutions.get(commandIdx); Command command = commandExecution.getCommand(); @@ -428,12 +421,11 @@ private Map generateSbatchAndSubmit(List comman .anyMatch(inputFile -> !inputFile.dependsOnExecutionNumber() && inputFile.getPreProcessor() != null)) { if (closeStarted) { LOGGER.info(CLOSE_START_NO_MORE_SEND_INFO); - break outerSendingLoop; + break; } - SbatchScriptGenerator sbatchScriptGenerator = new SbatchScriptGenerator(flagDir); - List shell = sbatchScriptGenerator.unzipCommonInputFiles(command); - copyShellToRemoteWorkingDir(shell, UNZIP_INPUTS_COMMAND_ID + "_" + commandIdx, workingDir); - cmd = buildSbatchCmd(workingDir, UNZIP_INPUTS_COMMAND_ID, commandIdx, preJobIds, parameters); + List shell = SbatchScriptGenerator.unzipCommonInputFiles(command); + copyShellToRemoteWorkingDir(shell, UNZIP_INPUTS_COMMAND_ID + commandIdx, workingDir); + cmd = buildCommonUnzipCmd(workingDir, commandIdx, preMasterJobId); if (isSendAllowed(future)) { long jobId = launchSbatch(cmd); if (firstJobId == null) { @@ -444,7 +436,6 @@ private Map generateSbatchAndSubmit(List comman if (preMasterJobId != null) { taskStore.insertDependency(preMasterJobId, jobId); } - preJobIds = Collections.singletonList(jobId); preMasterJobId = jobId; } else { logNotSendReason(future); @@ -452,42 +443,32 @@ private Map generateSbatchAndSubmit(List comman } } - // no job array --> commandId_index.batch + if (closeStarted) { + LOGGER.info(CLOSE_START_NO_MORE_SEND_INFO); + break; + } Long masterJobId = null; - for (int executionIndex = 0; executionIndex < commandExecution.getExecutionCount(); executionIndex++) { - if (closeStarted) { - LOGGER.info(CLOSE_START_NO_MORE_SEND_INFO); - break outerSendingLoop; + prepareBatch(environment, commandExecution, workingDir, commandIdx == commandExecutions.size() - 1); + cmd = buildSbatchCmd(workingDir, command.getId(), commandExecution.getExecutionCount(), preMasterJobId, parameters); + if (isSendAllowed(future)) { + long jobId = launchSbatch(cmd); + if (firstJobId == null) { + firstJobId = jobId; + taskStore.insert(workingDir.getFileName().toString(), taskCounter, firstJobId); + LOGGER.debug("First jobid : {}", firstJobId); } - prepareBatch(command, executionIndex, environment, commandExecution, workingDir); - cmd = buildSbatchCmd(workingDir, command.getId(), executionIndex, preJobIds, parameters); - if (isSendAllowed(future)) { - long jobId = launchSbatch(cmd); - if (firstJobId == null) { - firstJobId = jobId; - taskStore.insert(workingDir.getFileName().toString(), taskCounter, firstJobId); - LOGGER.debug("First jobid : {}", firstJobId); - } - if (executionIndex == 0) { - masterJobId = jobId; - if (preMasterJobId != null) { - taskStore.insertDependency(preMasterJobId, masterJobId); - } - } - taskStore.insertBatchIds(masterJobId, jobId); - } else { - logNotSendReason(future); - break; + masterJobId = jobId; + if (preMasterJobId != null) { + taskStore.insertDependency(preMasterJobId, masterJobId); } + } else { + logNotSendReason(future); + break; } - - jobIdCommandMap.put(masterJobId, command); - List jobIds = new ArrayList<>(taskStore.getBatchIds(masterJobId)); - jobIds.add(masterJobId); - preMasterJobId = masterJobId; - preJobIds = jobIds; + jobIdCommandMap.put(masterJobId, command); } + return jobIdCommandMap; } @@ -504,17 +485,11 @@ private void logNotSendReason(CompletableFuture future) { } } - private void prepareBatch(Command command, int executionIndex, ExecutionEnvironment environment, CommandExecution commandExecution, Path remoteWorkingDir) throws IOException { - // prepare sbatch script from command + private void prepareBatch(ExecutionEnvironment environment, CommandExecution commandExecution, Path remoteWorkingDir, boolean isLast) throws IOException { Map executionVariables = CommandExecution.getExecutionVariables(environment.getVariables(), commandExecution); - SbatchScriptGenerator scriptGenerator = new SbatchScriptGenerator(flagDir); - List shell = scriptGenerator.parser(command, executionIndex, remoteWorkingDir, executionVariables); - if (executionIndex == -1) { - // array job not used yet - copyShellToRemoteWorkingDir(shell, command.getId(), remoteWorkingDir); - } else { - copyShellToRemoteWorkingDir(shell, command.getId() + "_" + executionIndex, remoteWorkingDir); - } + SbatchScriptGenerator batchGen = new SbatchScriptGenerator(flagDir, commandExecution, remoteWorkingDir, executionVariables).setIsLast(isLast); + List shell = batchGen.parse(); + copyShellToRemoteWorkingDir(shell, commandExecution.getCommand().getId(), remoteWorkingDir); } private static void copyShellToRemoteWorkingDir(List shell, String batchName, Path remoteWorkingDir) throws IOException { @@ -528,21 +503,22 @@ private static void copyShellToRemoteWorkingDir(List shell, String batch } } - private SbatchCmd buildSbatchCmd(Path workingDir, String commandId, int executionIndex, List preJobIds, ComputationParameters baseParams) { + // TODO move this part into .batch file + private SbatchCmd buildSbatchCmd(Path workingDir, String commandId, int count, @Nullable Long preJobId, ComputationParameters baseParams) { // prepare sbatch cmd String baseFileName = workingDir.resolve(commandId).toAbsolutePath().toString(); + String idx = count == 1 ? "_0" : "_%a"; SbatchCmdBuilder builder = new SbatchCmdBuilder() - .script(baseFileName + "_" + executionIndex + BATCH_EXT) + .script(baseFileName + BATCH_EXT) .jobName(commandId) .workDir(workingDir) .nodes(1) .ntasks(1) .oversubscribe() - .output(baseFileName + "_" + executionIndex + SlurmConstants.OUT_EXT) - .error(baseFileName + "_" + executionIndex + SlurmConstants.ERR_EXT); - if (!preJobIds.isEmpty()) { - builder.aftercorr(preJobIds); - } + .output(baseFileName + idx + OUT_EXT) + .error(baseFileName + idx + ERR_EXT) + .aftercorr(preJobId) + .array(count); SlurmComputationParameters extension = baseParams.getExtension(SlurmComputationParameters.class); if (extension != null) { extension.getQos().ifPresent(builder::qos); @@ -551,6 +527,21 @@ private SbatchCmd buildSbatchCmd(Path workingDir, String commandId, int executio return builder.build(); } + /** + * Because UNZIP_INPUTS_COMMAND_ID is same basename, so executionIdx is needed to put cmd_Id.sbatch in same working dir + */ + private static SbatchCmd buildCommonUnzipCmd(Path workingDir, int executionIdx, @Nullable Long preJobId) { + String baseFileName = workingDir.resolve(UNZIP_INPUTS_COMMAND_ID).toAbsolutePath().toString(); + SbatchCmdBuilder builder = new SbatchCmdBuilder() + .script(baseFileName + executionIdx + BATCH_EXT) + .jobName(UNZIP_INPUTS_COMMAND_ID) + .workDir(workingDir) + .output(baseFileName + executionIdx + OUT_EXT) + .error(baseFileName + executionIdx + ERR_EXT) + .aftercorr(preJobId); + return builder.build(); + } + private long launchSbatch(SbatchCmd cmd) { try { SbatchCmdResult sbatchResult = cmd.send(commandRunner); @@ -665,7 +656,7 @@ private Thread shutdownThread() { LOGGER.info("Shutdown slurm..."); LOGGER.info("Cancel current subbmited jobs"); Set tracingIds = new HashSet<>(getTaskStore().getTracingIds()); - tracingIds.forEach(this::scancel); + tracingIds.forEach(l -> scancelMasterJob(l)); // count down task to avoid InterruptedException getTaskStore().getTaskCounters().forEach(TaskCounter::cancel); baseClose(); diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java index 7c81a4a..ae6f92f 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/TaskStore.java @@ -33,9 +33,6 @@ class TaskStore { private Map jobDependencies = new HashMap<>(); private ReadWriteLock jobDependencyLock = new ReentrantReadWriteLock(); - private Map> batchIds = new HashMap<>(); - private ReadWriteLock batchIdsLock = new ReentrantReadWriteLock(); - private Set tracingIds = Collections.newSetFromMap(new ConcurrentHashMap<>()); TaskCounter getTaskCounter(String workingDir) { @@ -122,29 +119,6 @@ void insertDependency(Long preJobId, Long jobId) { trace(jobId); } - void insertBatchIds(Long masterJobId, Long jobId) { - if (!masterJobId.equals(jobId)) { - batchIdsLock.writeLock().lock(); - try { - batchIds.computeIfAbsent(masterJobId, k -> new ArrayList<>()).add(jobId); - LOGGER.debug("batchIds: {} -> {}", masterJobId, jobId); - } finally { - batchIdsLock.writeLock().unlock(); - } - trace(jobId); - } - } - - List getBatchIds(Long masterJobId) { - batchIdsLock.readLock().lock(); - try { - List longs = batchIds.get(masterJobId); - return longs == null ? Collections.emptyList() : longs; - } finally { - batchIdsLock.readLock().unlock(); - } - } - private void trace(long id) { LOGGER.debug("tracing {}", id); tracingIds.add(id); @@ -205,21 +179,10 @@ private Set removeIds(Long firstId) { toRemoveMasterIds.add(toRemove); toRemove = jobDependencies.remove(toRemove); } - } finally { - jobDependencyLock.writeLock().unlock(); - } - allIdsFromFirstId.addAll(toRemoveMasterIds); - batchIdsLock.writeLock().lock(); - try { - toRemoveMasterIds.forEach(masterId -> { - List remove = batchIds.remove(masterId); - if (remove != null && !remove.isEmpty()) { - allIdsFromFirstId.addAll(remove); - } - }); + allIdsFromFirstId.addAll(toRemoveMasterIds); return allIdsFromFirstId; } finally { - batchIdsLock.writeLock().unlock(); + jobDependencyLock.writeLock().unlock(); } } @@ -234,8 +197,9 @@ Optional getCompletableFutureByJ if (completableFuture.isPresent()) { return completableFuture; } + // TODO get future by array job // try with batch id - completableFuture = getFutureByBatchId(id); +// completableFuture = getFutureByBatchId(id); return completableFuture; } @@ -293,28 +257,4 @@ private OptionalLong getFirstId(long masterId) { return OptionalLong.of(tmp); } - private Optional getFutureByBatchId(long batchId) { - OptionalLong optMasterId = getMasterId(batchId); - if (optMasterId.isPresent()) { - return getFutureByMasterId(optMasterId.getAsLong()); - } - return Optional.empty(); - } - - private OptionalLong getMasterId(long batchId) { - Optional>> max; - batchIdsLock.readLock().lock(); - try { - max = batchIds.entrySet().stream() - .filter(entry -> entry.getKey() < batchId) - .max(Comparator.comparingLong(Map.Entry::getKey)); - } finally { - batchIdsLock.readLock().unlock(); - } - - if (max.isPresent() && (max.get().getValue().contains(batchId))) { - return OptionalLong.of(max.get().getKey()); - } - return OptionalLong.empty(); - } } diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/CommandExecutionsTestFactory.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/CommandExecutionsTestFactory.java index 42897f4..608b8e7 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/CommandExecutionsTestFactory.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/CommandExecutionsTestFactory.java @@ -30,7 +30,7 @@ static List simpleCmd() { return Collections.singletonList(commandExecution); } - static List simpleCmdWithCount(int executionCount) { + static List simpleEchoWithCount(int executionCount) { Command command = new SimpleCommandBuilder() .id("cmdId") .program("echo") @@ -53,6 +53,12 @@ static List myEchoSimpleCmdWithUnzipZip(int executionCount) { return Collections.singletonList(commandExecution); } + /** + * Test for: + * 1. One shared(non-execution-dependency) .zip file + * 2. Null post-process file + * @return + */ static List commandFiles(int executionCount) { InputFile stringInput = new InputFile("foo.zip", FilePreProcessor.ARCHIVE_UNZIP); InputFile functionsInput = new InputFile(integer -> "in" + integer + ".zip", FilePreProcessor.ARCHIVE_UNZIP); @@ -101,4 +107,51 @@ static List makeSlurmBusy() { .build(); return Collections.singletonList(new CommandExecution(command, 42)); } + + static List groupCmdWithArgs(int count) { + Command command = new GroupCommandBuilder() + .id("groupCmdId") + .subCommand() + .program("sleep") + .args(i -> Collections.singletonList(++i + "s")) + .add() + .subCommand() + .program("echo") + .args(i -> Collections.singletonList(echoString(++i))) + .add() + .build(); + return Collections.singletonList(new CommandExecution(command, count)); + } + + // 1->1 + // 2->22 + // 3->333 + private static String echoString(int i) { + StringBuilder sb = new StringBuilder(); + for (int j = 0; j < i; j++) { + sb.append(i); + } + return sb.toString(); + } + + static List oddEvenCmd(int executionCount) { + Command cmd = new SimpleCommandBuilder() + .id("oddEven") + .program("/home/dev-itesla/myapps/myecho.sh") + .args(i -> { + if (i % 2 == 0) { + return Arrays.asList("evenIn" + i, "evenOutput" + i + ".txt"); + } else { + return Arrays.asList("oddIn" + i, "oddOutput" + i + ".txt"); + } + }) + .build(); + return Collections.singletonList(new CommandExecution(cmd, executionCount)); + } + + static List failInOneOfArrayJob() { + // oddIn3 is not a valid input in myecho.sh + return oddEvenCmd(4); + } + } diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java index 7830fd9..bbd8246 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java @@ -35,13 +35,13 @@ public void testJobArray() { .script("submit.sh") .array(3) .build(); - assertEquals("sbatch --job-name=array3 --array=0-2 submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=array3 --array=0-2 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); builder = new SbatchCmdBuilder(); cmd = builder.jobName("array1") .script("submit.sh") .array(1) .build(); - assertEquals("sbatch --job-name=array1 --array=0 submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=array1 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); } @Test @@ -61,13 +61,13 @@ public void testAftercorr() { .script("submit.sh") .aftercorr(Collections.singletonList(1111L)) .build(); - assertEquals("sbatch --job-name=jobname --dependency=aftercorr:1111 submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=jobname --dependency=aftercorr:1111 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); builder = new SbatchCmdBuilder(); cmd = builder.jobName("jobname") .script("submit.sh") .aftercorr(Arrays.asList(1111L, 2222L, 3333L)) .build(); - assertEquals("sbatch --job-name=jobname --dependency=aftercorr:1111:2222:3333 submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=jobname --dependency=aftercorr:1111:2222:3333 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); } @Test @@ -78,7 +78,7 @@ public void testOptions() { .array(1) .oversubscribe() .build(); - assertEquals("sbatch --job-name=array1 --array=0 --oversubscribe submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=array1 --kill-on-invalid-dep=yes --oversubscribe submit.sh", cmd.toString()); } @Test @@ -87,13 +87,13 @@ public void testTimeout() { .script("foo.sh") .timeout("2:00") .build(); - assertEquals("sbatch --job-name=foo --time=2:00 foo.sh", cmd.toString()); + assertEquals("sbatch --job-name=foo --kill-on-invalid-dep=yes --time=2:00 foo.sh", cmd.toString()); String nullDuration = null; SbatchCmd nullableTimeout = new SbatchCmdBuilder().jobName("foo") .script("foo.sh") .timeout(nullDuration) .build(); - assertEquals("sbatch --job-name=foo --time=UNLIMITED foo.sh", nullableTimeout.toString()); + assertEquals("sbatch --job-name=foo --kill-on-invalid-dep=yes --time=UNLIMITED foo.sh", nullableTimeout.toString()); } @Test @@ -103,7 +103,7 @@ public void testQos() { .script("submit.sh") .qos("value_qos") .build(); - assertEquals("sbatch --job-name=testQos --qos=value_qos submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=testQos --qos=value_qos --kill-on-invalid-dep=yes submit.sh", cmd.toString()); } @Test @@ -113,7 +113,7 @@ public void testDeadline() { .script("submit.sh") .deadline(10) .build(); - assertEquals("sbatch --job-name=dead --deadline=`date -d \"10 seconds\" \"+%Y-%m-%dT%H:%M:%S\"` submit.sh", cmd.toString()); + assertEquals("sbatch --job-name=dead --kill-on-invalid-dep=yes --deadline=`date -d \"10 seconds\" \"+%Y-%m-%dT%H:%M:%S\"` submit.sh", cmd.toString()); } @Test @@ -125,7 +125,7 @@ public void testDir() throws IOException { .script("submit.sh") .workDir(dir) .build(); - assertEquals("sbatch -D /tmp/foo --job-name=testDir submit.sh", cmd.toString()); + assertEquals("sbatch -D /tmp/foo --job-name=testDir --kill-on-invalid-dep=yes submit.sh", cmd.toString()); } } } diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java index 5d60ef3..fb3e083 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java @@ -15,12 +15,17 @@ import org.junit.Before; import org.junit.Test; +import java.io.IOException; +import java.net.URISyntaxException; import java.nio.file.FileSystem; +import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import static com.powsybl.computation.slurm.CommandExecutionsTestFactory.oddEvenCmd; import static org.junit.Assert.assertEquals; /** @@ -32,8 +37,6 @@ public class SbatchScriptGeneratorTest { private Path flagPath; private Path workingPath; - private final int commandIdx = 0; - @Before public void setUp() { fileSystem = Jimfs.newFileSystem(Configuration.unix()); @@ -49,65 +52,44 @@ public void tearDown() throws Exception { @Test public void testSimpleCmd() { List commandExecutions = CommandExecutionsTestFactory.simpleCmd(); - CommandExecution commandExecution = commandExecutions.get(commandIdx); - Command command = commandExecution.getCommand(); - List shell = new SbatchScriptGenerator(flagPath).parser(command, 0, workingPath, Collections.emptyMap()); + List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap()).parse(); assertEquals(ImmutableList.of("#!/bin/sh", "echo \"test\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", - "touch /tmp/flags/mydone_workingPath_12345_$SLURM_JOBID"), shell); + "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi"), shell); } @Test - public void testSimpleCmdWithCount() { - List commandExecutions = CommandExecutionsTestFactory.simpleCmdWithCount(3); - CommandExecution commandExecution = commandExecutions.get(commandIdx); - Command command = commandExecution.getCommand(); - List shell = new SbatchScriptGenerator(flagPath).parser(command, 0, workingPath, Collections.emptyMap()); + public void testLastSimpleCmd() { + List commandExecutions = CommandExecutionsTestFactory.simpleCmd(); + List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap()) + .setIsLast(true).parse(); assertEquals(ImmutableList.of("#!/bin/sh", - "echo \"te1st0\"", + "echo \"test\"", "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", "touch /tmp/flags/mydone_workingPath_12345_$SLURM_JOBID"), shell); } @Test - public void testMyEchoSimpleCmd() { - List commandExecutions = CommandExecutionsTestFactory.myEchoSimpleCmdWithUnzipZip(3); - CommandExecution commandExecution = commandExecutions.get(commandIdx); - Command command = commandExecution.getCommand(); - List shell = new SbatchScriptGenerator(flagPath).parser(command, 0, workingPath, Collections.emptyMap()); - assertEquals(ImmutableList.of("#!/bin/sh", - "unzip -o -q in0.zip", - "/home/dev-itesla/myapps/myecho.sh \"in0\" \"out0\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", - "gzip out0", - "touch /tmp/flags/mydone_workingPath_12345_$SLURM_JOBID"), shell); + public void testSimpleCmdWithCount() throws IOException, URISyntaxException { + assertCommandExecutionToShell(CommandExecutionsTestFactory.simpleEchoWithCount(3).get(0), "simpleCmdWithCount3.batch"); } @Test - public void testCommandFiles() { - List commandExecutions = CommandExecutionsTestFactory.commandFiles(3); - Command command = commandExecutions.get(0).getCommand(); - List shell = new SbatchScriptGenerator(flagPath).parser(command, 2, workingPath, Collections.emptyMap()); - assertEquals(expectedTestCommandFilesBatch(), shell); + public void testMyEchoSimpleCmd() throws IOException, URISyntaxException { + assertCommandExecutionToShell(CommandExecutionsTestFactory.myEchoSimpleCmdWithUnzipZip(3).get(0), "myEchoSimpleCmdWithUnzipZip.batch"); } - private static List expectedTestCommandFilesBatch() { - List shell = new ArrayList<>(); - shell.add("#!/bin/sh"); - shell.add("unzip -o -q in2.zip"); - shell.add("/home/dev-itesla/myapps/myecho.sh \"in2\" \"out2\""); - shell.add("rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi"); - shell.add("gzip tozip2"); - shell.add("touch /tmp/flags/mydone_workingPath_12345_$SLURM_JOBID"); - return shell; + @Test + public void testCommandFiles() throws IOException, URISyntaxException { + List commandExecutions = CommandExecutionsTestFactory.commandFiles(3); + assertCommandExecutionToShell(commandExecutions.get(0), "commandFiles.batch"); } @Test public void testOnlyUnzipBatch() { List commandExecutions = CommandExecutionsTestFactory.commandFiles(3); Command command = commandExecutions.get(0).getCommand(); - List shell = new SbatchScriptGenerator(flagPath).unzipCommonInputFiles(command); + List shell = SbatchScriptGenerator.unzipCommonInputFiles(command); assertEquals(expectedtestOnlyUnzipBatch(), shell); } @@ -121,15 +103,30 @@ private static List expectedtestOnlyUnzipBatch() { @Test public void testGroupCmd() { List commandExecutions = CommandExecutionsTestFactory.groupCmd(); - CommandExecution commandExecution = commandExecutions.get(0); - Command command = commandExecution.getCommand(); - List shell = new SbatchScriptGenerator(flagPath).parser(command, 0, workingPath, Collections.emptyMap()); + List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap()).parse(); assertEquals(ImmutableList.of("#!/bin/sh", "sleep \"5s\"", "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", "echo \"sub2\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", - "touch /tmp/flags/mydone_workingPath_12345_$SLURM_JOBID"), shell); + "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi"), shell); } + @Test + public void testGroupCmdWithArgWithCount() throws IOException, URISyntaxException { + List commandExecutions = CommandExecutionsTestFactory.groupCmdWithArgs(3); + CommandExecution commandExecution = commandExecutions.get(0); + assertCommandExecutionToShell(commandExecution, "groupCmdWithArgsCount3.batch"); + } + + @Test + public void testSbatchGenerator() throws URISyntaxException, IOException { + assertCommandExecutionToShell(oddEvenCmd(3).get(0), "simpleArrayJobWithArgu.batch"); + } + + private void assertCommandExecutionToShell(CommandExecution commandExecution, String expected) throws URISyntaxException, IOException { + SbatchScriptGenerator shellGenerator = new SbatchScriptGenerator(flagPath, commandExecution, workingPath, Collections.emptyMap()); + List shell = shellGenerator.parse(); + List expectedShell = Files.readAllLines(Paths.get(this.getClass().getResource("/expectedShell/" + expected).toURI())); + assertEquals(expectedShell, shell); + } } diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java index f159e6e..a5cceeb 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/ScontrolMonitorTest.java @@ -11,7 +11,6 @@ import org.mockito.Matchers; import org.mockito.internal.util.reflection.Whitebox; - import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.anyString; @@ -47,10 +46,10 @@ public void testUnormalFound() { ScontrolMonitor monitor = new ScontrolMonitor(slurm); monitor.run(); assertTrue(ts.getTracingIds().isEmpty()); - // check scancel all 6 jobs only once - for (int i = 1; i < 7; i++) { - verify(cm, times(1)).execute("scancel " + i); - } + // check scancel all master jobs only once + verify(cm, times(1)).execute("scancel 1"); + verify(cm, times(1)).execute("scancel 3"); + verify(cm, times(1)).execute("scancel 6"); } @Before diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java index d407a7b..8d0521f 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/TaskStoreTest.java @@ -33,7 +33,6 @@ public void testParallelInserts() { for (long l = k * nbPerThread; l < (k + 1) * nbPerThread; l++) { long v = l + 1; taskStore.insertDependency(l, v); - taskStore.insertBatchIds(l, v); } }); t.start(); @@ -50,9 +49,9 @@ public void testParallelInserts() { assertEquals(nbPerThread * threadSize - toTestId, taskStore.getDependentJobs(toTestId).size()); } - // 1←2 + // 1(2) // ↑ - // 3←4,5 + // 3(4,5) // ↑ // 6 static TaskStore generateTaskStore(SlurmComputationManager.SlurmCompletableFuture future, boolean checkTracing) { @@ -62,28 +61,19 @@ static TaskStore generateTaskStore(SlurmComputationManager.SlurmCompletableFutur TaskStore taskStore = new TaskStore(); taskStore.insert(workingDir, future); taskStore.insert(workingDir, counter, 1L); - taskStore.insertBatchIds(1L, 1L); if (checkTracing) { assertEquals(Collections.singleton(1L), taskStore.getTracingIds()); } - taskStore.insertBatchIds(1L, 2L); if (checkTracing) { - assertEquals(2, taskStore.getTracingIds().size()); + assertEquals(1, taskStore.getTracingIds().size()); } taskStore.insertDependency(1L, 3L); - taskStore.insertBatchIds(3L, 3L); - taskStore.insertBatchIds(3L, 4L); - if (checkTracing) { - assertEquals(4, taskStore.getTracingIds().size()); - } - taskStore.insertBatchIds(3L, 5L); if (checkTracing) { - assertEquals(5, taskStore.getTracingIds().size()); + assertEquals(2, taskStore.getTracingIds().size()); } taskStore.insertDependency(3L, 6L); - taskStore.insertBatchIds(6L, 6L); if (checkTracing) { - assertEquals(6, taskStore.getTracingIds().size()); + assertEquals(3, taskStore.getTracingIds().size()); } return taskStore; } @@ -94,9 +84,6 @@ public void test() { assertEquals(Arrays.asList(3L, 6L), taskStore.getDependentJobs(1L)); assertEquals(Collections.singletonList(6L), taskStore.getDependentJobs(3L)); assertTrue(taskStore.getDependentJobs(6L).isEmpty()); - assertEquals(Collections.singletonList(2L), taskStore.getBatchIds(1L)); - assertEquals(Arrays.asList(4L, 5L), taskStore.getBatchIds(3L)); - assertTrue(taskStore.getBatchIds(6L).isEmpty()); } @Test @@ -113,7 +100,6 @@ public void testRemove() { assertNull(taskStore.getFirstJobId(future)); assertNull(taskStore.getCompletableFuture("a_working_dir")); assertTrue(taskStore.getDependentJobs(1L).isEmpty()); - assertTrue(taskStore.getBatchIds(3L).isEmpty()); // tracing ids are cleaned by 1. mydone_ in flag monitor 2. scancel in scm assertFalse(taskStore.getTracingIds().isEmpty()); } @@ -123,10 +109,7 @@ public void testGetFutureFromJobId() { SlurmComputationManager.SlurmCompletableFuture future = mock(SlurmComputationManager.SlurmCompletableFuture.class); TaskStore taskStore = generateTaskStore(future, false); assertEquals(future, taskStore.getCompletableFutureByJobId(1L).orElse(null)); - assertEquals(future, taskStore.getCompletableFutureByJobId(2L).orElse(null)); assertEquals(future, taskStore.getCompletableFutureByJobId(3L).orElse(null)); - assertEquals(future, taskStore.getCompletableFutureByJobId(4L).orElse(null)); - assertEquals(future, taskStore.getCompletableFutureByJobId(5L).orElse(null)); assertEquals(future, taskStore.getCompletableFutureByJobId(6L).orElse(null)); } diff --git a/computation-slurm/src/test/resources/expectedShell/commandFiles.batch b/computation-slurm/src/test/resources/expectedShell/commandFiles.batch new file mode 100644 index 0000000..696d784 --- /dev/null +++ b/computation-slurm/src/test/resources/expectedShell/commandFiles.batch @@ -0,0 +1,22 @@ +#!/bin/sh +case $SLURM_ARRAY_TASK_ID in + 0) + PRE1=in0.zip + POST0=tozip0 + ARGS=""in0" "out0"" + ;; + 1) + PRE1=in1.zip + POST0=tozip1 + ARGS=""in1" "out1"" + ;; + 2) + PRE1=in2.zip + POST0=tozip2 + ARGS=""in2" "out2"" + ;; +esac +unzip -o -q $PRE1 +/home/dev-itesla/myapps/myecho.sh $ARGS +rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi +gzip $POST0 \ No newline at end of file diff --git a/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch b/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch new file mode 100644 index 0000000..babc618 --- /dev/null +++ b/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch @@ -0,0 +1,16 @@ +#!/bin/sh +case $SLURM_ARRAY_TASK_ID in + 0) + ARGS_0=""1s"" ARGS_1=""1"" + ;; + 1) + ARGS_0=""2s"" ARGS_1=""22"" + ;; + 2) + ARGS_0=""3s"" ARGS_1=""333"" + ;; +esac +sleep $ARGS_0 +rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi +echo $ARGS_1 +rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi \ No newline at end of file diff --git a/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch b/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch new file mode 100644 index 0000000..2c15e5e --- /dev/null +++ b/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch @@ -0,0 +1,22 @@ +#!/bin/sh +case $SLURM_ARRAY_TASK_ID in + 0) + PRE0=in0.zip + POST0=out0 + ARGS=""in0" "out0"" + ;; + 1) + PRE0=in1.zip + POST0=out1 + ARGS=""in1" "out1"" + ;; + 2) + PRE0=in2.zip + POST0=out2 + ARGS=""in2" "out2"" + ;; +esac +unzip -o -q $PRE0 +/home/dev-itesla/myapps/myecho.sh $ARGS +rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi +gzip $POST0 \ No newline at end of file diff --git a/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch b/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch new file mode 100644 index 0000000..9c48a9d --- /dev/null +++ b/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch @@ -0,0 +1,14 @@ +#!/bin/sh +case $SLURM_ARRAY_TASK_ID in + 0) + ARGS=""evenIn0" "evenOutput0.txt"" + ;; + 1) + ARGS=""oddIn1" "oddOutput1.txt"" + ;; + 2) + ARGS=""evenIn2" "evenOutput2.txt"" + ;; +esac +/home/dev-itesla/myapps/myecho.sh $ARGS +rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi \ No newline at end of file diff --git a/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch b/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch new file mode 100644 index 0000000..d1dc1b5 --- /dev/null +++ b/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch @@ -0,0 +1,14 @@ +#!/bin/sh +case $SLURM_ARRAY_TASK_ID in + 0) + ARGS=""te1st0"" + ;; + 1) + ARGS=""te1st1"" + ;; + 2) + ARGS=""te1st2"" + ;; +esac +echo $ARGS +rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi \ No newline at end of file From 1ef734632cb4d370489137f67f3b870ea477cc36 Mon Sep 17 00:00:00 2001 From: Yichen TANG Date: Wed, 5 Jun 2019 13:32:20 +0200 Subject: [PATCH 6/8] Fix failed execution idx Signed-off-by: Yichen TANG --- .../computation/slurm/SlurmUnitTests.java | 16 ++++++++++++---- .../slurm/SlurmComputationManager.java | 18 ++++++++++++------ 2 files changed, 24 insertions(+), 10 deletions(-) diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index 884295c..3b0c1d5 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -280,8 +280,8 @@ public Void after(Path workingDir, ExecutionReport report) throws IOException { } @Test - public void testOneJobFails() throws InterruptedException { - TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "OneJobFails"); + public void testOneExecutionInvalid() throws InterruptedException { + TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "invalidOneExecution"); Supplier> supplier = () -> { return new AbstractExecutionHandler() { @Override @@ -291,7 +291,7 @@ public List before(Path workingDir) throws IOException { @Override public Void after(Path workingDir, ExecutionReport report) { - assertErrors(testAttribute.testName, report); + assertErrors(testAttribute.testName, report, 3); return null; } }; @@ -631,10 +631,18 @@ public List before(Path workingDir) { } private void assertErrors(String testName, ExecutionReport report) { + assertErrors(testName, report, 0); + } + + private void assertErrors(String testName, ExecutionReport report, int expectedFailIdx) { System.out.println("---------" + testName + "----------"); System.out.println("Errors should exists:" + !report.getErrors().isEmpty()); if (report.getErrors().isEmpty()) { failed = true; + } else { + System.out.println("idx:" + report.getErrors().get(0).getIndex()); + System.out.println("exitcode:" + report.getErrors().get(0).getExitCode()); + failed = !(expectedFailIdx == report.getErrors().get(0).getIndex()); } System.out.println("------------------------------------"); } @@ -688,7 +696,7 @@ private static List invalidProgram() { .program("echoo") .args("hello") .build(); - return Collections.singletonList(new CommandExecution(cmd, 1)); + return Collections.singletonList(new CommandExecution(cmd, 3)); } private static List invalidProgramInGroup() { diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java index 90b32d2..b57226d 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java @@ -43,8 +43,7 @@ public class SlurmComputationManager implements ComputationManager { private static final Logger LOGGER = LoggerFactory.getLogger(SlurmComputationManager.class); - private static final String SACCT_NONZERO_JOB = "sacct --jobs=%s -n --format=\"jobid,exitcode\" | grep -v \"0:0\" | grep -v \"\\.\""; - private static final String ARRAY_INDEX_PH = "%a"; // ph=placeholder + private static final String SACCT_NONZERO_JOB = "sacct --jobs=%s -n --format=\"jobid,exitcode,state\" | grep -v \"0:0\" | grep -v \"\\.\""; private static final Pattern DIGITAL_PATTERN = Pattern.compile("\\d+"); private static final String BATCH_EXT = ".batch"; private static final String FLAGS_DIR_PREFIX = "myflags_"; // where flag files are created and be watched @@ -383,13 +382,20 @@ private SlurmExecutionReport generateReport(Map jobIdCommandMap, if (sacctOutput.length() > 0) { String[] lines = sacctOutput.split("\n"); for (String line : lines) { + int executionIdx = 0; Matcher m = DIGITAL_PATTERN.matcher(line); m.find(); long jobId = Long.parseLong(m.group()); - m.find(); - int executionIdx = Integer.parseInt(m.group()); - m.find(); - int exitCode = Integer.parseInt(m.group()); + if (line.contains("_") && !line.contains("-")) { + // see https://stackoverflow.com/questions/56424735/saccts-result-on-an-arrayjob-is-not-updated + m.find(); + executionIdx = Integer.parseInt(m.group()); + } else { + // not array job and do nothing + // 9952 127:0 + } + String[] split = line.split("\\s+"); + int exitCode = Integer.parseInt(split[1].split(":")[0]); // error message ??? ExecutionError error = new ExecutionError(jobIdCommandMap.get(jobId), executionIdx, exitCode); errors.add(error); From 3529d5316e0526d0cc6495b75448c4301bca484f Mon Sep 17 00:00:00 2001 From: Yichen TANG Date: Tue, 11 Jun 2019 11:13:43 +0200 Subject: [PATCH 7/8] Move makeSlurmBusy() Signed-off-by: Yichen TANG --- .../java/com/powsybl/computation/slurm/SlurmUnitTests.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index 3b0c1d5..a6e13d4 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -554,7 +554,6 @@ private static void generateGzFileOnRemote(int sizeGb, Path dest) { } } - @Test public void makeSlurmBusy() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "deadline"); Supplier> supplier = () -> new AbstractExecutionHandler() { @@ -566,8 +565,7 @@ public List before(Path workingDir) { baseTest(testAttribute, supplier); } - // 1. Make slurm busy if necessary - // 2. Wait 1 mins + // TODO with makeSlurmBusy() @Test public void testDeadline() throws InterruptedException { Thread makeSlurmBusyThread = new Thread(() -> { @@ -598,6 +596,7 @@ public Void after(Path workingDir, ExecutionReport report) throws IOException { builder.setDeadline("longProgram", 12); baseTest(testAttribute, supplier, builder.build()); + makeSlurmBusyThread.interrupt(); } // sacctmgr show qos From 705c50646675e8ebe56b8265a3988b52e40b8ec9 Mon Sep 17 00:00:00 2001 From: yichen88 Date: Tue, 10 Sep 2019 14:04:49 +0200 Subject: [PATCH 8/8] Move sbatch options into script expect the deadline option Signed-off-by: yichen88 --- .../computation/slurm/SlurmUnitTests.java | 4 +- ...chCmdBuilder.java => SbatchArguments.java} | 80 +++++------- .../powsybl/computation/slurm/SbatchCmd.java | 49 +++---- .../slurm/SbatchScriptGenerator.java | 12 +- .../slurm/SlurmComputationManager.java | 41 +++--- .../slurm/SbatchArgumentsTest.java | 122 ++++++++++++++++++ .../computation/slurm/SbatchCmdTest.java | 113 +--------------- .../slurm/SbatchScriptGeneratorTest.java | 41 ++++-- .../expectedShell/commandFiles.batch | 2 + .../groupCmdWithArgsCount3.batch | 2 + .../myEchoSimpleCmdWithUnzipZip.batch | 2 + .../simpleArrayJobWithArgu.batch | 2 + .../expectedShell/simpleCmdWithCount3.batch | 2 + 13 files changed, 246 insertions(+), 226 deletions(-) rename computation-slurm/src/main/java/com/powsybl/computation/slurm/{SbatchCmdBuilder.java => SbatchArguments.java} (64%) create mode 100644 computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchArgumentsTest.java diff --git a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java index a6e13d4..87fd693 100644 --- a/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java +++ b/computation-slurm-test/src/test/java/com/powsybl/computation/slurm/SlurmUnitTests.java @@ -46,7 +46,7 @@ public class SlurmUnitTests { private static final Logger LOGGER = LoggerFactory.getLogger(SlurmUnitTests.class); - private static final boolean DEBUG = false; + private static final boolean DEBUG = true; private static final ExecutionEnvironment EMPTY_ENV = new ExecutionEnvironment(Collections.emptyMap(), "unit_test_", DEBUG); private static final String EXPECTED_ERROR_JOB_MSG = "An error job found"; @@ -554,7 +554,7 @@ private static void generateGzFileOnRemote(int sizeGb, Path dest) { } } - public void makeSlurmBusy() throws InterruptedException { + void makeSlurmBusy() throws InterruptedException { TestAttribute testAttribute = new TestAttribute(Type.TO_WAIT, "deadline"); Supplier> supplier = () -> new AbstractExecutionHandler() { @Override diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchArguments.java similarity index 64% rename from computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java rename to computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchArguments.java index 4992f33..f039370 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmdBuilder.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchArguments.java @@ -6,29 +6,31 @@ */ package com.powsybl.computation.slurm; -import com.google.common.base.Preconditions; - import javax.annotation.Nullable; import java.nio.file.Path; import java.util.*; import java.util.stream.Collectors; /** - * Builds an {@link SbatchCmd}, used to submit script execution to Slurm. + * This class contains two map and one set for three types of arguments in sbatch. * @see Sbatch * * @author Yichen Tang */ -class SbatchCmdBuilder { +class SbatchArguments { - private static final String DATETIME_FORMATTER = "`date -d \"%d seconds\" \"+%%Y-%%m-%%dT%%H:%%M:%%S\"`"; + private static final String PRECEDENT = "#SBATCH "; + private static final String DENPENDENCY_OPT = "dependency"; private final Map sbatchArgsByName = new HashMap<>(); private final Map sbatchArgsByCharacter = new HashMap<>(); private final TreeSet sbatchOptions = new TreeSet<>(); - private String script; - SbatchCmdBuilder jobName(String jobName) { + SbatchArguments() { + killOnInvalidDep(); + } + + SbatchArguments jobName(String jobName) { sbatchArgsByName.put("job-name", jobName); return this; } @@ -38,7 +40,7 @@ SbatchCmdBuilder jobName(String jobName) { * @param i if equals 1, --array would not be set. If negative, exception would be thrown. * @return this builder */ - SbatchCmdBuilder array(int i) { + SbatchArguments array(int i) { if (i <= 0) { throw new IllegalArgumentException(i + " is not validate for array."); } @@ -48,30 +50,30 @@ SbatchCmdBuilder array(int i) { return this; } - SbatchCmdBuilder aftercorr(List jobIds) { + SbatchArguments aftercorr(List jobIds) { Objects.requireNonNull(jobIds); if (!jobIds.isEmpty()) { String coll = jobIds.stream().map(Object::toString).collect(Collectors.joining(":", "aftercorr:", "")); - sbatchArgsByName.put("dependency", coll); + sbatchArgsByName.put(DENPENDENCY_OPT, coll); } return this; } - SbatchCmdBuilder aftercorr(@Nullable Long preMasterJob) { + SbatchArguments aftercorr(@Nullable Long preMasterJob) { if (preMasterJob != null) { - sbatchArgsByName.put("dependency", "aftercorr:" + preMasterJob); + sbatchArgsByName.put(DENPENDENCY_OPT, "aftercorr:" + preMasterJob); } return this; } - SbatchCmdBuilder afternotok(Long lastMasterJob) { + SbatchArguments afternotok(Long lastMasterJob) { if (lastMasterJob != null) { - sbatchArgsByName.put("dependency", "afternotok:" + lastMasterJob); + sbatchArgsByName.put(DENPENDENCY_OPT, "afternotok:" + lastMasterJob); } return this; } - SbatchCmdBuilder nodes(int i) { + SbatchArguments nodes(int i) { if (i < 1) { throw new IllegalArgumentException(i + " is not validate for nodes."); } @@ -79,7 +81,7 @@ SbatchCmdBuilder nodes(int i) { return this; } - SbatchCmdBuilder ntasks(int i) { + SbatchArguments ntasks(int i) { if (i < 1) { throw new IllegalArgumentException(i + " is not validate for ntasks."); } @@ -87,7 +89,7 @@ SbatchCmdBuilder ntasks(int i) { return this; } - SbatchCmdBuilder tasksPerNode(int i) { + SbatchArguments tasksPerNode(int i) { if (i < 1) { throw new IllegalArgumentException(i + " is not validate for tasksPerNode."); } @@ -95,62 +97,51 @@ SbatchCmdBuilder tasksPerNode(int i) { return this; } - SbatchCmdBuilder cpusPerTask(int i) { + SbatchArguments cpusPerTask(int i) { sbatchArgsByName.put("cpus-per-task", Integer.toString(i)); return this; } // TODO test - SbatchCmdBuilder error(String pattern) { + SbatchArguments error(String pattern) { Objects.requireNonNull(pattern); sbatchArgsByName.put("error", pattern); return this; } - SbatchCmdBuilder output(String pattern) { + SbatchArguments output(String pattern) { Objects.requireNonNull(pattern); sbatchArgsByName.put("output", pattern); return this; } - SbatchCmdBuilder partition(String partition) { + SbatchArguments partition(String partition) { Objects.requireNonNull(partition); sbatchArgsByName.put("partition", partition); return this; } - SbatchCmdBuilder oversubscribe() { + SbatchArguments oversubscribe() { sbatchOptions.add("oversubscribe"); return this; } - private SbatchCmdBuilder killOnInvalidDep() { + private SbatchArguments killOnInvalidDep() { sbatchArgsByName.put("kill-on-invalid-dep", "yes"); return this; } - SbatchCmdBuilder script(String name) { - this.script = Objects.requireNonNull(name); - return this; - } - - SbatchCmdBuilder workDir(Path dir) { + SbatchArguments workDir(Path dir) { Objects.requireNonNull(dir); sbatchArgsByCharacter.put('D', dir.toAbsolutePath().toString()); return this; } - SbatchCmdBuilder timeout(@Nullable String duration) { + SbatchArguments timeout(@Nullable String duration) { sbatchArgsByName.put("time", checkTimeout(duration)); return this; } - SbatchCmdBuilder deadline(long seconds) { - Preconditions.checkArgument(seconds > 0, "Invalid seconds({}) for deadline: must be 1 or greater", seconds); - sbatchArgsByName.put("deadline", String.format(DATETIME_FORMATTER, seconds)); - return this; - } - private static String checkTimeout(@Nullable String duration) { if (duration == null) { return "UNLIMITED"; @@ -159,21 +150,18 @@ private static String checkTimeout(@Nullable String duration) { return duration; } - SbatchCmdBuilder qos(String qos) { + SbatchArguments qos(String qos) { Objects.requireNonNull(qos); sbatchArgsByName.put("qos", qos); return this; } - SbatchCmd build() { - killOnInvalidDep(); - validate(); - return new SbatchCmd(sbatchArgsByName, sbatchArgsByCharacter, sbatchOptions, script); + List toScript() { + List list = new ArrayList<>(); + sbatchArgsByCharacter.forEach((k, v) -> list.add(PRECEDENT + "-" + k + " " + v)); + sbatchArgsByName.forEach((k, v) -> list.add(PRECEDENT + "--" + k + "=" + v)); + sbatchOptions.forEach(o -> list.add(PRECEDENT + "--" + o)); + return list; } - private void validate() { - if (null == script) { - throw new SlurmException("Script is null in cmd"); - } - } } diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmd.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmd.java index 2e2da20..f596cc7 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmd.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchCmd.java @@ -6,12 +6,12 @@ */ package com.powsybl.computation.slurm; -import java.util.Map; +import com.google.common.base.Preconditions; + import java.util.Objects; -import java.util.TreeSet; /** - * A submission command to Slurm: sbatch options scriptName. + * A submission command to Slurm: sbatch [--deadline] scriptName. * * Sbatch scripts will be submitted using that kind of command. * @@ -19,22 +19,21 @@ */ class SbatchCmd extends AbstractSlurmCmd { - private static final String SBATCH = "sbatch"; - - private final Map argsByName; - private final Map argsByChar; - - private final TreeSet options; + private static final String SBATCH = "sbatch "; + private static final String DATETIME_FORMATTER = "--deadline=`date -d \"%d seconds\" \"+%%Y-%%m-%%dT%%H:%%M:%%S\"` "; private final String scriptName; - private String cmd; + private long deadLine = 0; - SbatchCmd(Map argsByName, Map argsByChar, TreeSet options, String scriptName) { - this.argsByName = Objects.requireNonNull(argsByName); - this.argsByChar = Objects.requireNonNull(argsByChar); + SbatchCmd(String scriptName) { this.scriptName = Objects.requireNonNull(scriptName); - this.options = Objects.requireNonNull(options); + } + + SbatchCmd deadLine(long seconds) { + Preconditions.checkArgument(seconds > 0, "Invalid seconds({}) for deadline: must be 1 or greater", seconds); + deadLine = seconds; + return this; } SbatchCmdResult send(CommandExecutor commandExecutor) throws SlurmCmdNonZeroException { @@ -44,24 +43,10 @@ SbatchCmdResult send(CommandExecutor commandExecutor) throws SlurmCmdNonZeroExce @Override public String toString() { - if (cmd == null) { - StringBuilder sb = new StringBuilder(); - sb.append(SBATCH); - argsByChar.forEach((c, v) -> { - sb.append(" -").append(c); - sb.append(" ").append(v); - }); - argsByName.forEach((k, v) -> { - sb.append(" --").append(k); - sb.append("=").append(v); - }); - options.forEach(opt -> { - sb.append(" --"); - sb.append(opt); - }); - sb.append(" ").append(scriptName); - cmd = sb.toString(); + StringBuilder sb = new StringBuilder().append(SBATCH); + if (deadLine > 0) { + sb.append(String.format(DATETIME_FORMATTER, deadLine)); } - return cmd; + return sb.append(scriptName).toString(); } } diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java index 15c425d..ef98379 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SbatchScriptGenerator.java @@ -46,14 +46,17 @@ class SbatchScriptGenerator { private final Map env; private final CommandExecution commandExecution; + private final SbatchArguments arguments; + private boolean isArrayJob; private boolean isLast = false; - SbatchScriptGenerator(Path flagDir, CommandExecution commandExecution, Path workingDir, Map env) { + SbatchScriptGenerator(Path flagDir, CommandExecution commandExecution, Path workingDir, Map env, SbatchArguments sbatchArguments) { this.flagDir = Objects.requireNonNull(flagDir); this.commandExecution = Objects.requireNonNull(commandExecution); this.workingDir = Objects.requireNonNull(workingDir); this.env = Objects.requireNonNull(env); + arguments = Objects.requireNonNull(sbatchArguments); isArrayJob = commandExecution.getExecutionCount() != 1; } @@ -68,6 +71,7 @@ SbatchScriptGenerator setIsLast(boolean isLast) { List parse() { List shell = new ArrayList<>(); shell.add(SHEBANG); + shell.addAll(arguments.toScript()); if (isArrayJob) { arrayJobCase(shell); preProcess(shell); @@ -88,10 +92,14 @@ List parse() { /** * Returns the list of commands which constitute the sbatch script. * The unzip common input files script DO NOT create a flag file at the end. + * @param command Contains the shared input files to be unzipped first + * @param sbatchArguments + * @return Sbatch shell */ - static List unzipCommonInputFiles(Command command) { + static List unzipCommonInputFiles(Command command, SbatchArguments sbatchArguments) { List shell = new ArrayList<>(); shell.add(SHEBANG); + shell.addAll(sbatchArguments.toScript()); command.getInputFiles() .stream().filter(inputFile -> !inputFile.dependsOnExecutionNumber()) // common .filter(inputFile -> inputFile.getPreProcessor() != null) // to unzip diff --git a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java index b57226d..6cd73a0 100644 --- a/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java +++ b/computation-slurm/src/main/java/com/powsybl/computation/slurm/SlurmComputationManager.java @@ -429,9 +429,10 @@ private Map generateSbatchAndSubmit(List comman LOGGER.info(CLOSE_START_NO_MORE_SEND_INFO); break; } - List shell = SbatchScriptGenerator.unzipCommonInputFiles(command); - copyShellToRemoteWorkingDir(shell, UNZIP_INPUTS_COMMAND_ID + commandIdx, workingDir); - cmd = buildCommonUnzipCmd(workingDir, commandIdx, preMasterJobId); + SbatchArguments sbatchArguments = prepareSbatchArguments(workingDir, commandIdx, preMasterJobId); + List shell = SbatchScriptGenerator.unzipCommonInputFiles(command, sbatchArguments); + Path shellPath = copyShellToRemoteWorkingDir(shell, UNZIP_INPUTS_COMMAND_ID + commandIdx, workingDir); + cmd = new SbatchCmd(shellPath.toAbsolutePath().toString()); if (isSendAllowed(future)) { long jobId = launchSbatch(cmd); if (firstJobId == null) { @@ -454,8 +455,10 @@ private Map generateSbatchAndSubmit(List comman break; } Long masterJobId = null; - prepareBatch(environment, commandExecution, workingDir, commandIdx == commandExecutions.size() - 1); - cmd = buildSbatchCmd(workingDir, command.getId(), commandExecution.getExecutionCount(), preMasterJobId, parameters); + SbatchArguments sbatchArguments = convert2Builder(workingDir, command.getId(), commandExecution.getExecutionCount(), preMasterJobId, parameters); + Path shellPath = prepareBatch(environment, commandExecution, workingDir, commandIdx == commandExecutions.size() - 1, sbatchArguments); + cmd = new SbatchCmd(shellPath.toAbsolutePath().toString()); + parameters.getDeadline(command.getId()).ifPresent(cmd::deadLine); if (isSendAllowed(future)) { long jobId = launchSbatch(cmd); if (firstJobId == null) { @@ -491,31 +494,28 @@ private void logNotSendReason(CompletableFuture future) { } } - private void prepareBatch(ExecutionEnvironment environment, CommandExecution commandExecution, Path remoteWorkingDir, boolean isLast) throws IOException { + private Path prepareBatch(ExecutionEnvironment environment, CommandExecution commandExecution, Path remoteWorkingDir, boolean isLast, SbatchArguments builder) throws IOException { Map executionVariables = CommandExecution.getExecutionVariables(environment.getVariables(), commandExecution); - SbatchScriptGenerator batchGen = new SbatchScriptGenerator(flagDir, commandExecution, remoteWorkingDir, executionVariables).setIsLast(isLast); + SbatchScriptGenerator batchGen = new SbatchScriptGenerator(flagDir, commandExecution, remoteWorkingDir, executionVariables, builder).setIsLast(isLast); List shell = batchGen.parse(); - copyShellToRemoteWorkingDir(shell, commandExecution.getCommand().getId(), remoteWorkingDir); + return copyShellToRemoteWorkingDir(shell, commandExecution.getCommand().getId(), remoteWorkingDir); } - private static void copyShellToRemoteWorkingDir(List shell, String batchName, Path remoteWorkingDir) throws IOException { + private static Path copyShellToRemoteWorkingDir(List shell, String batchName, Path remoteWorkingDir) throws IOException { StringBuilder sb = new StringBuilder(); shell.forEach(line -> sb.append(line).append('\n')); String str = sb.toString(); - Path batch; - batch = remoteWorkingDir.resolve(batchName + BATCH_EXT); + Path batch = remoteWorkingDir.resolve(batchName + BATCH_EXT); try (InputStream targetStream = new ByteArrayInputStream(str.getBytes())) { Files.copy(targetStream, batch); } + return batch; } - // TODO move this part into .batch file - private SbatchCmd buildSbatchCmd(Path workingDir, String commandId, int count, @Nullable Long preJobId, ComputationParameters baseParams) { - // prepare sbatch cmd + private SbatchArguments convert2Builder(Path workingDir, String commandId, int count, @Nullable Long preJobId, ComputationParameters baseParams) { String baseFileName = workingDir.resolve(commandId).toAbsolutePath().toString(); String idx = count == 1 ? "_0" : "_%a"; - SbatchCmdBuilder builder = new SbatchCmdBuilder() - .script(baseFileName + BATCH_EXT) + SbatchArguments builder = new SbatchArguments() .jobName(commandId) .workDir(workingDir) .nodes(1) @@ -529,23 +529,20 @@ private SbatchCmd buildSbatchCmd(Path workingDir, String commandId, int count, @ if (extension != null) { extension.getQos().ifPresent(builder::qos); } - baseParams.getDeadline(commandId).ifPresent(builder::deadline); - return builder.build(); + return builder; } /** * Because UNZIP_INPUTS_COMMAND_ID is same basename, so executionIdx is needed to put cmd_Id.sbatch in same working dir */ - private static SbatchCmd buildCommonUnzipCmd(Path workingDir, int executionIdx, @Nullable Long preJobId) { + private static SbatchArguments prepareSbatchArguments(Path workingDir, int executionIdx, @Nullable Long preJobId) { String baseFileName = workingDir.resolve(UNZIP_INPUTS_COMMAND_ID).toAbsolutePath().toString(); - SbatchCmdBuilder builder = new SbatchCmdBuilder() - .script(baseFileName + executionIdx + BATCH_EXT) + return new SbatchArguments() .jobName(UNZIP_INPUTS_COMMAND_ID) .workDir(workingDir) .output(baseFileName + executionIdx + OUT_EXT) .error(baseFileName + executionIdx + ERR_EXT) .aftercorr(preJobId); - return builder.build(); } private long launchSbatch(SbatchCmd cmd) { diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchArgumentsTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchArgumentsTest.java new file mode 100644 index 0000000..fd3a4ff --- /dev/null +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchArgumentsTest.java @@ -0,0 +1,122 @@ +/** + * Copyright (c) 2019, RTE (http://www.rte-france.com) + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + */ +package com.powsybl.computation.slurm; + +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +import java.io.IOException; +import java.nio.file.FileSystem; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * @author Yichen TANG + */ +public class SbatchArgumentsTest { + + private static final String INVALID_DEP_YES = "#SBATCH --kill-on-invalid-dep=yes"; + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testJobArray() { + SbatchArguments builder = new SbatchArguments(); + List script = builder.jobName("array3") + .array(3) + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=array3", "#SBATCH --array=0-2", INVALID_DEP_YES), script); + builder = new SbatchArguments(); + script = builder.jobName("array1") + .array(1) + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=array1", INVALID_DEP_YES), script); + } + + @Test + public void invalidJobArray() { + thrown.expect(IllegalArgumentException.class); + SbatchArguments builder = new SbatchArguments(); + builder.jobName("test") + .array(-1) + .toScript(); + } + + @Test + public void testAftercorr() { + SbatchArguments builder = new SbatchArguments(); + List script = builder.jobName("jobname") + .aftercorr(1111L) + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=jobname", "#SBATCH --dependency=aftercorr:1111", INVALID_DEP_YES), script); + builder = new SbatchArguments(); + script = builder.jobName("jobname") + .aftercorr(Arrays.asList(1111L, 2222L, 3333L)) + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=jobname", "#SBATCH --dependency=aftercorr:1111:2222:3333", INVALID_DEP_YES), script); + } + + @Test + public void testOptions() { + SbatchArguments builder = new SbatchArguments(); + List script = builder.jobName("array1") + .array(1) + .oversubscribe() + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=array1", INVALID_DEP_YES, "#SBATCH --oversubscribe"), script); + } + + @Test + public void testTimeout() { + List script = new SbatchArguments().jobName("foo") + .timeout("2:00") + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=foo", "#SBATCH --kill-on-invalid-dep=yes", "#SBATCH --time=2:00"), script); + String nullDuration = null; + List nullableTimeout = new SbatchArguments().jobName("foo") + .timeout(nullDuration) + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=foo", "#SBATCH --kill-on-invalid-dep=yes", "#SBATCH --time=UNLIMITED"), nullableTimeout); + } + + @Test + public void testQos() { + SbatchArguments builder = new SbatchArguments(); + List script = builder.jobName("testQos") + .qos("value_qos") + .toScript(); + assertEquals(Arrays.asList("#SBATCH --job-name=testQos", "#SBATCH --qos=value_qos", INVALID_DEP_YES), script); + } + +// @Test +// public void testDeadline() { +// SbatchArguments builder = new SbatchArguments(); +// List script = builder.jobName("dead") +// .deadline(10) +// .toScript(); +// assertEquals(Arrays.asList("#SBATCH --job-name=dead --kill-on-invalid-dep=yes --deadline=`date -d \"10 seconds\" \"+%Y-%m-%dT%H:%M:%S\"` submit.sh", script); +// } + + @Test + public void testDir() throws IOException { + try (FileSystem fileSystem = Jimfs.newFileSystem(Configuration.unix())) { + Path dir = fileSystem.getPath("/tmp/foo"); + SbatchArguments builder = new SbatchArguments(); + List script = builder.jobName("testDir") + .workDir(dir) + .toScript(); + assertEquals(Arrays.asList("#SBATCH -D /tmp/foo", "#SBATCH --job-name=testDir", INVALID_DEP_YES), script); + } + } +} diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java index bbd8246..a5389ff 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchCmdTest.java @@ -6,17 +6,7 @@ */ package com.powsybl.computation.slurm; -import com.google.common.jimfs.Configuration; -import com.google.common.jimfs.Jimfs; -import org.junit.Rule; import org.junit.Test; -import org.junit.rules.ExpectedException; - -import java.io.IOException; -import java.nio.file.FileSystem; -import java.nio.file.Path; -import java.util.Arrays; -import java.util.Collections; import static org.junit.Assert.assertEquals; @@ -25,107 +15,12 @@ */ public class SbatchCmdTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testJobArray() { - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - SbatchCmd cmd = builder.jobName("array3") - .script("submit.sh") - .array(3) - .build(); - assertEquals("sbatch --job-name=array3 --array=0-2 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); - builder = new SbatchCmdBuilder(); - cmd = builder.jobName("array1") - .script("submit.sh") - .array(1) - .build(); - assertEquals("sbatch --job-name=array1 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); - } - - @Test - public void invalidJobArray() { - thrown.expect(IllegalArgumentException.class); - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - builder.jobName("test") - .script("submit.sh") - .array(-1) - .build(); - } - - @Test - public void testAftercorr() { - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - SbatchCmd cmd = builder.jobName("jobname") - .script("submit.sh") - .aftercorr(Collections.singletonList(1111L)) - .build(); - assertEquals("sbatch --job-name=jobname --dependency=aftercorr:1111 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); - builder = new SbatchCmdBuilder(); - cmd = builder.jobName("jobname") - .script("submit.sh") - .aftercorr(Arrays.asList(1111L, 2222L, 3333L)) - .build(); - assertEquals("sbatch --job-name=jobname --dependency=aftercorr:1111:2222:3333 --kill-on-invalid-dep=yes submit.sh", cmd.toString()); - } - - @Test - public void testOptions() { - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - SbatchCmd cmd = builder.jobName("array1") - .script("submit.sh") - .array(1) - .oversubscribe() - .build(); - assertEquals("sbatch --job-name=array1 --kill-on-invalid-dep=yes --oversubscribe submit.sh", cmd.toString()); - } - - @Test - public void testTimeout() { - SbatchCmd cmd = new SbatchCmdBuilder().jobName("foo") - .script("foo.sh") - .timeout("2:00") - .build(); - assertEquals("sbatch --job-name=foo --kill-on-invalid-dep=yes --time=2:00 foo.sh", cmd.toString()); - String nullDuration = null; - SbatchCmd nullableTimeout = new SbatchCmdBuilder().jobName("foo") - .script("foo.sh") - .timeout(nullDuration) - .build(); - assertEquals("sbatch --job-name=foo --kill-on-invalid-dep=yes --time=UNLIMITED foo.sh", nullableTimeout.toString()); - } - - @Test - public void testQos() { - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - SbatchCmd cmd = builder.jobName("testQos") - .script("submit.sh") - .qos("value_qos") - .build(); - assertEquals("sbatch --job-name=testQos --qos=value_qos --kill-on-invalid-dep=yes submit.sh", cmd.toString()); - } - @Test public void testDeadline() { - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - SbatchCmd cmd = builder.jobName("dead") - .script("submit.sh") - .deadline(10) - .build(); - assertEquals("sbatch --job-name=dead --kill-on-invalid-dep=yes --deadline=`date -d \"10 seconds\" \"+%Y-%m-%dT%H:%M:%S\"` submit.sh", cmd.toString()); - } + SbatchCmd sbatchCmd = new SbatchCmd("foo.batch"); + assertEquals("sbatch foo.batch", sbatchCmd.toString()); - @Test - public void testDir() throws IOException { - try (FileSystem fileSystem = Jimfs.newFileSystem(Configuration.unix())) { - Path dir = fileSystem.getPath("/tmp/foo"); - SbatchCmdBuilder builder = new SbatchCmdBuilder(); - SbatchCmd cmd = builder.jobName("testDir") - .script("submit.sh") - .workDir(dir) - .build(); - assertEquals("sbatch -D /tmp/foo --job-name=testDir --kill-on-invalid-dep=yes submit.sh", cmd.toString()); - } + sbatchCmd.deadLine(10); + assertEquals("sbatch --deadline=`date -d \"10 seconds\" \"+%Y-%m-%dT%H:%M:%S\"` foo.batch", sbatchCmd.toString()); } } diff --git a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java index fb3e083..3c0e7b4 100644 --- a/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java +++ b/computation-slurm/src/test/java/com/powsybl/computation/slurm/SbatchScriptGeneratorTest.java @@ -33,15 +33,22 @@ */ public class SbatchScriptGeneratorTest { + private static final String SHEBANG = "#!/bin/sh"; + private static final String SBATCH_ARGU_KILL = "#SBATCH --kill-on-invalid-dep=yes"; + private static final String SBATCH_ARGU_DIR = "#SBATCH -D /home/test/workingPath_12345"; + private static final String CHECK_ERROR = "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi"; + private FileSystem fileSystem; private Path flagPath; private Path workingPath; + private SbatchArguments arguments; @Before public void setUp() { fileSystem = Jimfs.newFileSystem(Configuration.unix()); flagPath = fileSystem.getPath("/tmp/flags"); workingPath = fileSystem.getPath("/home/test/workingPath_12345"); + arguments = new SbatchArguments().workDir(workingPath); } @After @@ -52,20 +59,24 @@ public void tearDown() throws Exception { @Test public void testSimpleCmd() { List commandExecutions = CommandExecutionsTestFactory.simpleCmd(); - List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap()).parse(); - assertEquals(ImmutableList.of("#!/bin/sh", + List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap(), arguments).parse(); + assertEquals(ImmutableList.of(SHEBANG, + SBATCH_ARGU_DIR, + SBATCH_ARGU_KILL, "echo \"test\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi"), shell); + CHECK_ERROR), shell); } @Test public void testLastSimpleCmd() { List commandExecutions = CommandExecutionsTestFactory.simpleCmd(); - List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap()) + List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap(), arguments) .setIsLast(true).parse(); - assertEquals(ImmutableList.of("#!/bin/sh", + assertEquals(ImmutableList.of(SHEBANG, + SBATCH_ARGU_DIR, + SBATCH_ARGU_KILL, "echo \"test\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", + CHECK_ERROR, "touch /tmp/flags/mydone_workingPath_12345_$SLURM_JOBID"), shell); } @@ -89,13 +100,15 @@ public void testCommandFiles() throws IOException, URISyntaxException { public void testOnlyUnzipBatch() { List commandExecutions = CommandExecutionsTestFactory.commandFiles(3); Command command = commandExecutions.get(0).getCommand(); - List shell = SbatchScriptGenerator.unzipCommonInputFiles(command); + List shell = SbatchScriptGenerator.unzipCommonInputFiles(command, arguments); assertEquals(expectedtestOnlyUnzipBatch(), shell); } private static List expectedtestOnlyUnzipBatch() { List shell = new ArrayList<>(); - shell.add("#!/bin/sh"); + shell.add(SHEBANG); + shell.add(SBATCH_ARGU_DIR); + shell.add(SBATCH_ARGU_KILL); shell.add("unzip -o -q foo.zip"); return shell; } @@ -103,12 +116,14 @@ private static List expectedtestOnlyUnzipBatch() { @Test public void testGroupCmd() { List commandExecutions = CommandExecutionsTestFactory.groupCmd(); - List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap()).parse(); - assertEquals(ImmutableList.of("#!/bin/sh", + List shell = new SbatchScriptGenerator(flagPath, commandExecutions.get(0), workingPath, Collections.emptyMap(), arguments).parse(); + assertEquals(ImmutableList.of(SHEBANG, + SBATCH_ARGU_DIR, + SBATCH_ARGU_KILL, "sleep \"5s\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi", + CHECK_ERROR, "echo \"sub2\"", - "rc=$?; if [[ $rc != 0 ]]; then touch /tmp/flags/myerror_workingPath_12345_$SLURM_JOBID; exit $rc; fi"), shell); + CHECK_ERROR), shell); } @Test @@ -124,7 +139,7 @@ public void testSbatchGenerator() throws URISyntaxException, IOException { } private void assertCommandExecutionToShell(CommandExecution commandExecution, String expected) throws URISyntaxException, IOException { - SbatchScriptGenerator shellGenerator = new SbatchScriptGenerator(flagPath, commandExecution, workingPath, Collections.emptyMap()); + SbatchScriptGenerator shellGenerator = new SbatchScriptGenerator(flagPath, commandExecution, workingPath, Collections.emptyMap(), arguments); List shell = shellGenerator.parse(); List expectedShell = Files.readAllLines(Paths.get(this.getClass().getResource("/expectedShell/" + expected).toURI())); assertEquals(expectedShell, shell); diff --git a/computation-slurm/src/test/resources/expectedShell/commandFiles.batch b/computation-slurm/src/test/resources/expectedShell/commandFiles.batch index 696d784..3ad7649 100644 --- a/computation-slurm/src/test/resources/expectedShell/commandFiles.batch +++ b/computation-slurm/src/test/resources/expectedShell/commandFiles.batch @@ -1,4 +1,6 @@ #!/bin/sh +#SBATCH -D /home/test/workingPath_12345 +#SBATCH --kill-on-invalid-dep=yes case $SLURM_ARRAY_TASK_ID in 0) PRE1=in0.zip diff --git a/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch b/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch index babc618..d90a66e 100644 --- a/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch +++ b/computation-slurm/src/test/resources/expectedShell/groupCmdWithArgsCount3.batch @@ -1,4 +1,6 @@ #!/bin/sh +#SBATCH -D /home/test/workingPath_12345 +#SBATCH --kill-on-invalid-dep=yes case $SLURM_ARRAY_TASK_ID in 0) ARGS_0=""1s"" ARGS_1=""1"" diff --git a/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch b/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch index 2c15e5e..8b8cf83 100644 --- a/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch +++ b/computation-slurm/src/test/resources/expectedShell/myEchoSimpleCmdWithUnzipZip.batch @@ -1,4 +1,6 @@ #!/bin/sh +#SBATCH -D /home/test/workingPath_12345 +#SBATCH --kill-on-invalid-dep=yes case $SLURM_ARRAY_TASK_ID in 0) PRE0=in0.zip diff --git a/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch b/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch index 9c48a9d..457a80e 100644 --- a/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch +++ b/computation-slurm/src/test/resources/expectedShell/simpleArrayJobWithArgu.batch @@ -1,4 +1,6 @@ #!/bin/sh +#SBATCH -D /home/test/workingPath_12345 +#SBATCH --kill-on-invalid-dep=yes case $SLURM_ARRAY_TASK_ID in 0) ARGS=""evenIn0" "evenOutput0.txt"" diff --git a/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch b/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch index d1dc1b5..2a245a1 100644 --- a/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch +++ b/computation-slurm/src/test/resources/expectedShell/simpleCmdWithCount3.batch @@ -1,4 +1,6 @@ #!/bin/sh +#SBATCH -D /home/test/workingPath_12345 +#SBATCH --kill-on-invalid-dep=yes case $SLURM_ARRAY_TASK_ID in 0) ARGS=""te1st0""