From 5774a49f7b82871ee4bbe88ea7da651f419d0f6e Mon Sep 17 00:00:00 2001 From: Sanrag Sood Date: Fri, 20 Sep 2013 16:04:08 -0700 Subject: [PATCH] PipelineIteration refactor. Make PipelineIteration less dependent on the specifics of the stage it is executing. There are small number of TODOs which would be cleaned up in a subsequent change --- .../groningen/PipelineIteration.java | 71 +++++++++---------- .../groningen/PipelineStageInfo.java | 25 +++++-- .../groningen/executor/Executor.java | 7 +- .../groningen/executor/ExecutorTest.java | 3 +- 4 files changed, 58 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java b/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java index 390ff99..c976d8e 100644 --- a/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java +++ b/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java @@ -15,20 +15,22 @@ package org.arbeitspferde.groningen; +import com.google.common.collect.Lists; import com.google.inject.Inject; + import org.arbeitspferde.groningen.config.GroningenConfig; import org.arbeitspferde.groningen.config.PipelineIterationScoped; import org.arbeitspferde.groningen.executor.Executor; import org.arbeitspferde.groningen.experimentdb.ExperimentDb; import org.arbeitspferde.groningen.generator.Generator; import org.arbeitspferde.groningen.hypothesizer.Hypothesizer; +import org.arbeitspferde.groningen.profiling.ProfilingRunnable; import org.arbeitspferde.groningen.scorer.IterationScorer; import org.arbeitspferde.groningen.utility.Metric; import org.arbeitspferde.groningen.utility.MetricExporter; import org.arbeitspferde.groningen.validator.Validator; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -45,11 +47,8 @@ public class PipelineIteration { private final GroningenConfig config; private final PipelineSynchronizer pipelineSynchronizer; - private final Executor executor; - private final Generator generator; - private final Hypothesizer hypothesizer; - private final Validator validator; - private final IterationScorer scorer; + private final ProfilingRunnable[] pipelineStages; + private final PipelineStageInfo pipelineStageInfo; /** @@ -73,11 +72,8 @@ public PipelineIteration(final GroningenConfig config, this.config = config; this.pipelineSynchronizer = pipelineSynchronizer; - this.executor = executor; - this.generator = generator; - this.hypothesizer = hypothesizer; - this.validator = validator; - this.scorer = scorer; + this.pipelineStages = Lists.newArrayList(hypothesizer, generator, executor, validator, scorer) + .toArray(new ProfilingRunnable[0]); this.pipelineStageInfo = pipelineStageInfo; metricExporter.register( @@ -100,41 +96,38 @@ public int getStage() { * valid window. */ public int getRemainingExperimentalSecs() { - return (int) executor.getRemainingDurationSeconds(); + // TODO(sanragsood): Clean this up, PipelineIteration should be unaware of the specifics of + // the stage pipeline iteration is in. + for (ProfilingRunnable stage : pipelineStages) { + if (stage instanceof Executor) { + return (int)((Executor)stage).getRemainingDurationSeconds(); + } + } + return 0; } public boolean run() { - executor.startUp(); - generator.startUp(); - hypothesizer.startUp(); - validator.startUp(); - scorer.startUp(); - - currentPipelineStage.set(0); + for (ProfilingRunnable stage : pipelineStages) { + stage.startUp(); + } // Synchronization within the pipeline iteration - after the config is updated pipelineSynchronizer.iterationStartHook(); - pipelineStageInfo.set(PipelineStageState.HYPOTHESIZER); - hypothesizer.run(config); - boolean notComplete = hypothesizer.notComplete(); - - if (notComplete) { - currentPipelineStage.set(1); - pipelineStageInfo.set(PipelineStageState.GENERATOR); - generator.run(config); - - // This stage returns only when all experiments are complete - currentPipelineStage.set(2); - pipelineSynchronizer.executorStartHook(); - executor.run(config); - - currentPipelineStage.set(3); - pipelineStageInfo.set(PipelineStageState.SCORING); - validator.run(config); - - currentPipelineStage.set(4); - scorer.run(config); + int stageCount = 0; + boolean notComplete = true; + for (ProfilingRunnable stage : pipelineStages) { + currentPipelineStage.set(stageCount++); + pipelineStageInfo.setStageFromRunnable(stage); + stage.run(config); + // TODO(sanragsood): Clean this up, remove dependency on stage specifics from pipeline + // iteration + if (stage instanceof Hypothesizer) { + notComplete = ((Hypothesizer)stage).notComplete(); + if (!notComplete) { + break; + } + } } return notComplete; diff --git a/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java b/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java index 97d05e2..1d19a8a 100644 --- a/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java +++ b/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java @@ -2,6 +2,11 @@ import com.google.inject.Inject; +import org.arbeitspferde.groningen.generator.Generator; +import org.arbeitspferde.groningen.hypothesizer.Hypothesizer; +import org.arbeitspferde.groningen.profiling.ProfilingRunnable; +import org.arbeitspferde.groningen.scorer.IterationScorer; + /** * Ties together the iteration count and pipeline state into a value object in which the count * and state can be operated/retrieved atomically. Encapsulating them allows for state to be @@ -11,22 +16,32 @@ public class PipelineStageInfo { private int iterationNumber; private PipelineStageState state; - + @Inject public PipelineStageInfo() { this(0, PipelineStageState.INITIALIZED); } - + public PipelineStageInfo(int iterationNumber, PipelineStageState state) { this.iterationNumber = iterationNumber; this.state = state; } - + /** Set only the pipeline state. */ public synchronized void set(PipelineStageState state) { this.state = state; } + public synchronized void setStageFromRunnable(ProfilingRunnable runnable) { + if (runnable instanceof Hypothesizer) { + set(PipelineStageState.HYPOTHESIZER); + } else if (runnable instanceof Generator) { + set(PipelineStageState.GENERATOR); + } else if (runnable instanceof IterationScorer) { + set(PipelineStageState.SCORING); + } + } + /** Increment the iteration count and set the stage atomically. */ public synchronized void incrementIterationAndSetState(PipelineStageState state) { iterationNumber++; @@ -35,7 +50,7 @@ public synchronized void incrementIterationAndSetState(PipelineStageState state) /** * Get a tuple of the iteration count and state atomically. - * + * * @return the iteration count and state */ public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() { @@ -48,7 +63,7 @@ public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() { public static class ImmutablePipelineStageInfo { public final int iterationNumber; public final PipelineStageState state; - + private ImmutablePipelineStageInfo(int iterationNumber, PipelineStageState state) { this.iterationNumber = iterationNumber; this.state = state; diff --git a/src/main/java/org/arbeitspferde/groningen/executor/Executor.java b/src/main/java/org/arbeitspferde/groningen/executor/Executor.java index f924f57..82e2d2a 100644 --- a/src/main/java/org/arbeitspferde/groningen/executor/Executor.java +++ b/src/main/java/org/arbeitspferde/groningen/executor/Executor.java @@ -335,7 +335,7 @@ private void checkCommandLine(String commandLine) { } } } - + /** * Provide the remaining time within the run of the experiment. * @@ -359,6 +359,7 @@ public long getRemainingDurationSeconds() { @Override public void profiledRun(GroningenConfig config) throws RuntimeException { + pipelineSynchronizer.executorStartHook(); preSteps(config); steadyState(config); postSteps(config); @@ -484,8 +485,8 @@ private void postSteps(GroningenConfig config) { // Clear the JVM settings protobuf to cause subjects to restart with default JVM settings subjectSettingsFileManager.delete(subject.getAssociatedSubject().getExpSettingsFile()); } - - pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART); + + pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART); restartAllGroups(config); for (SubjectStateBridge subject : subjects) { log.info(String.format("Running extractor thread on %s.", subject.getHumanIdentifier())); diff --git a/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java b/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java index d160dce..310cd82 100644 --- a/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java +++ b/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java @@ -61,6 +61,7 @@ protected void setUp() throws Exception { EasyMock.replay(mockSubjectInterrogator); mockPipelineSynchronizer = EasyMock.createMock(PipelineSynchronizer.class); + mockPipelineSynchronizer.executorStartHook(); EasyMock.replay(mockPipelineSynchronizer); mockSubjectSettingsFileManager = EasyMock.createMock(SubjectSettingsFileManager.class); @@ -80,7 +81,7 @@ protected void setUp() throws Exception { EasyMock.replay(mockCollectionLogAddressor); final PipelineStageInfo pipelineStageInfo = new PipelineStageInfo(); - + executor = new Executor(clock, monitor, experimentDb, mockManipulator, mockHealthQuerier, mockSubjectInterrogator, mockPipelineSynchronizer, mockSubjectSettingsFileManager, mockMetricExporter, mockFileFactory, new NullServingAddressGenerator(),