From 91026245d9217dab2a4dba8bcf9cbd69558b577e Mon Sep 17 00:00:00 2001 From: Sanrag Sood Date: Fri, 20 Sep 2013 16:04:08 -0700 Subject: [PATCH 1/2] PipelineIteration refactor. Make PipelineIteration less dependent on the specifics of the stage it is executing. There are small number of TODOs which would be cleaned up in a subsequent change --- .../groningen/PipelineIteration.java | 71 +++++++++---------- .../groningen/PipelineStageInfo.java | 25 +++++-- .../groningen/executor/Executor.java | 7 +- .../groningen/executor/ExecutorTest.java | 3 +- 4 files changed, 58 insertions(+), 48 deletions(-) diff --git a/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java b/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java index 390ff99..c976d8e 100644 --- a/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java +++ b/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java @@ -15,20 +15,22 @@ package org.arbeitspferde.groningen; +import com.google.common.collect.Lists; import com.google.inject.Inject; + import org.arbeitspferde.groningen.config.GroningenConfig; import org.arbeitspferde.groningen.config.PipelineIterationScoped; import org.arbeitspferde.groningen.executor.Executor; import org.arbeitspferde.groningen.experimentdb.ExperimentDb; import org.arbeitspferde.groningen.generator.Generator; import org.arbeitspferde.groningen.hypothesizer.Hypothesizer; +import org.arbeitspferde.groningen.profiling.ProfilingRunnable; import org.arbeitspferde.groningen.scorer.IterationScorer; import org.arbeitspferde.groningen.utility.Metric; import org.arbeitspferde.groningen.utility.MetricExporter; import org.arbeitspferde.groningen.validator.Validator; import java.util.concurrent.atomic.AtomicInteger; -import java.util.logging.Level; import java.util.logging.Logger; /** @@ -45,11 +47,8 @@ public class PipelineIteration { private final GroningenConfig config; private final PipelineSynchronizer pipelineSynchronizer; - private final Executor executor; - private final Generator generator; - private final Hypothesizer hypothesizer; - private final Validator validator; - private final IterationScorer scorer; + private final ProfilingRunnable[] pipelineStages; + private final PipelineStageInfo pipelineStageInfo; /** @@ -73,11 +72,8 @@ public PipelineIteration(final GroningenConfig config, this.config = config; this.pipelineSynchronizer = pipelineSynchronizer; - this.executor = executor; - this.generator = generator; - this.hypothesizer = hypothesizer; - this.validator = validator; - this.scorer = scorer; + this.pipelineStages = Lists.newArrayList(hypothesizer, generator, executor, validator, scorer) + .toArray(new ProfilingRunnable[0]); this.pipelineStageInfo = pipelineStageInfo; metricExporter.register( @@ -100,41 +96,38 @@ public int getStage() { * valid window. */ public int getRemainingExperimentalSecs() { - return (int) executor.getRemainingDurationSeconds(); + // TODO(sanragsood): Clean this up, PipelineIteration should be unaware of the specifics of + // the stage pipeline iteration is in. + for (ProfilingRunnable stage : pipelineStages) { + if (stage instanceof Executor) { + return (int)((Executor)stage).getRemainingDurationSeconds(); + } + } + return 0; } public boolean run() { - executor.startUp(); - generator.startUp(); - hypothesizer.startUp(); - validator.startUp(); - scorer.startUp(); - - currentPipelineStage.set(0); + for (ProfilingRunnable stage : pipelineStages) { + stage.startUp(); + } // Synchronization within the pipeline iteration - after the config is updated pipelineSynchronizer.iterationStartHook(); - pipelineStageInfo.set(PipelineStageState.HYPOTHESIZER); - hypothesizer.run(config); - boolean notComplete = hypothesizer.notComplete(); - - if (notComplete) { - currentPipelineStage.set(1); - pipelineStageInfo.set(PipelineStageState.GENERATOR); - generator.run(config); - - // This stage returns only when all experiments are complete - currentPipelineStage.set(2); - pipelineSynchronizer.executorStartHook(); - executor.run(config); - - currentPipelineStage.set(3); - pipelineStageInfo.set(PipelineStageState.SCORING); - validator.run(config); - - currentPipelineStage.set(4); - scorer.run(config); + int stageCount = 0; + boolean notComplete = true; + for (ProfilingRunnable stage : pipelineStages) { + currentPipelineStage.set(stageCount++); + pipelineStageInfo.setStageFromRunnable(stage); + stage.run(config); + // TODO(sanragsood): Clean this up, remove dependency on stage specifics from pipeline + // iteration + if (stage instanceof Hypothesizer) { + notComplete = ((Hypothesizer)stage).notComplete(); + if (!notComplete) { + break; + } + } } return notComplete; diff --git a/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java b/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java index 97d05e2..1d19a8a 100644 --- a/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java +++ b/src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java @@ -2,6 +2,11 @@ import com.google.inject.Inject; +import org.arbeitspferde.groningen.generator.Generator; +import org.arbeitspferde.groningen.hypothesizer.Hypothesizer; +import org.arbeitspferde.groningen.profiling.ProfilingRunnable; +import org.arbeitspferde.groningen.scorer.IterationScorer; + /** * Ties together the iteration count and pipeline state into a value object in which the count * and state can be operated/retrieved atomically. Encapsulating them allows for state to be @@ -11,22 +16,32 @@ public class PipelineStageInfo { private int iterationNumber; private PipelineStageState state; - + @Inject public PipelineStageInfo() { this(0, PipelineStageState.INITIALIZED); } - + public PipelineStageInfo(int iterationNumber, PipelineStageState state) { this.iterationNumber = iterationNumber; this.state = state; } - + /** Set only the pipeline state. */ public synchronized void set(PipelineStageState state) { this.state = state; } + public synchronized void setStageFromRunnable(ProfilingRunnable runnable) { + if (runnable instanceof Hypothesizer) { + set(PipelineStageState.HYPOTHESIZER); + } else if (runnable instanceof Generator) { + set(PipelineStageState.GENERATOR); + } else if (runnable instanceof IterationScorer) { + set(PipelineStageState.SCORING); + } + } + /** Increment the iteration count and set the stage atomically. */ public synchronized void incrementIterationAndSetState(PipelineStageState state) { iterationNumber++; @@ -35,7 +50,7 @@ public synchronized void incrementIterationAndSetState(PipelineStageState state) /** * Get a tuple of the iteration count and state atomically. - * + * * @return the iteration count and state */ public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() { @@ -48,7 +63,7 @@ public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() { public static class ImmutablePipelineStageInfo { public final int iterationNumber; public final PipelineStageState state; - + private ImmutablePipelineStageInfo(int iterationNumber, PipelineStageState state) { this.iterationNumber = iterationNumber; this.state = state; diff --git a/src/main/java/org/arbeitspferde/groningen/executor/Executor.java b/src/main/java/org/arbeitspferde/groningen/executor/Executor.java index f924f57..82e2d2a 100644 --- a/src/main/java/org/arbeitspferde/groningen/executor/Executor.java +++ b/src/main/java/org/arbeitspferde/groningen/executor/Executor.java @@ -335,7 +335,7 @@ private void checkCommandLine(String commandLine) { } } } - + /** * Provide the remaining time within the run of the experiment. * @@ -359,6 +359,7 @@ public long getRemainingDurationSeconds() { @Override public void profiledRun(GroningenConfig config) throws RuntimeException { + pipelineSynchronizer.executorStartHook(); preSteps(config); steadyState(config); postSteps(config); @@ -484,8 +485,8 @@ private void postSteps(GroningenConfig config) { // Clear the JVM settings protobuf to cause subjects to restart with default JVM settings subjectSettingsFileManager.delete(subject.getAssociatedSubject().getExpSettingsFile()); } - - pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART); + + pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART); restartAllGroups(config); for (SubjectStateBridge subject : subjects) { log.info(String.format("Running extractor thread on %s.", subject.getHumanIdentifier())); diff --git a/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java b/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java index d160dce..310cd82 100644 --- a/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java +++ b/src/test/java/org/arbeitspferde/groningen/executor/ExecutorTest.java @@ -61,6 +61,7 @@ protected void setUp() throws Exception { EasyMock.replay(mockSubjectInterrogator); mockPipelineSynchronizer = EasyMock.createMock(PipelineSynchronizer.class); + mockPipelineSynchronizer.executorStartHook(); EasyMock.replay(mockPipelineSynchronizer); mockSubjectSettingsFileManager = EasyMock.createMock(SubjectSettingsFileManager.class); @@ -80,7 +81,7 @@ protected void setUp() throws Exception { EasyMock.replay(mockCollectionLogAddressor); final PipelineStageInfo pipelineStageInfo = new PipelineStageInfo(); - + executor = new Executor(clock, monitor, experimentDb, mockManipulator, mockHealthQuerier, mockSubjectInterrogator, mockPipelineSynchronizer, mockSubjectSettingsFileManager, mockMetricExporter, mockFileFactory, new NullServingAddressGenerator(), From 71dcda98efc473d446b2cd7a913f52af0160aaa4 Mon Sep 17 00:00:00 2001 From: Sanrag Sood Date: Sat, 21 Sep 2013 15:36:32 -0700 Subject: [PATCH 2/2] Builder pattern for PipelineIteration. This allows for PipelineIteration instances to be constructed in a much more flexible way making it a general use pipeline framework. --- .../arbeitspferde/groningen/BaseModule.java | 2 + .../groningen/PipelineIteration.java | 50 ++++++++++++------- .../groningen/PipelineIterationProvider.java | 37 ++++++++++++++ .../groningen/WorkhorseFactory.java | 11 +++- .../groningen/generator/Generator.java | 2 +- 5 files changed, 82 insertions(+), 20 deletions(-) create mode 100644 src/main/java/org/arbeitspferde/groningen/PipelineIterationProvider.java diff --git a/src/main/java/org/arbeitspferde/groningen/BaseModule.java b/src/main/java/org/arbeitspferde/groningen/BaseModule.java index 019d051..6b1afee 100644 --- a/src/main/java/org/arbeitspferde/groningen/BaseModule.java +++ b/src/main/java/org/arbeitspferde/groningen/BaseModule.java @@ -108,6 +108,8 @@ protected void configure() { .in(PipelineScoped.class); // PipelineIteration-scoped bindings + bind(PipelineIteration.class).toProvider(PipelineIterationProvider.class) + .in(PipelineIterationScoped.class); bind(Generator.class).in(PipelineIterationScoped.class); bind(Validator.class).in(PipelineIterationScoped.class); bind(IterationScorer.class).in(PipelineIterationScoped.class); diff --git a/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java b/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java index c976d8e..c976d98 100644 --- a/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java +++ b/src/main/java/org/arbeitspferde/groningen/PipelineIteration.java @@ -16,20 +16,17 @@ package org.arbeitspferde.groningen; import com.google.common.collect.Lists; -import com.google.inject.Inject; +import com.google.inject.Injector; import org.arbeitspferde.groningen.config.GroningenConfig; import org.arbeitspferde.groningen.config.PipelineIterationScoped; import org.arbeitspferde.groningen.executor.Executor; -import org.arbeitspferde.groningen.experimentdb.ExperimentDb; -import org.arbeitspferde.groningen.generator.Generator; import org.arbeitspferde.groningen.hypothesizer.Hypothesizer; import org.arbeitspferde.groningen.profiling.ProfilingRunnable; -import org.arbeitspferde.groningen.scorer.IterationScorer; import org.arbeitspferde.groningen.utility.Metric; import org.arbeitspferde.groningen.utility.MetricExporter; -import org.arbeitspferde.groningen.validator.Validator; +import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.logging.Logger; @@ -45,6 +42,30 @@ public class PipelineIteration { private static final Logger log = Logger.getLogger(PipelineIteration.class.getCanonicalName()); + public static final class Builder { + + private List pipelineStagesList = Lists.newArrayList(); + + public Builder addPipelineStage(ProfilingRunnable pipelineStage) { + pipelineStagesList.add(pipelineStage); + return this; + } + + public PipelineIteration build() { + Injector injector = WorkhorseFactory.getInjector(); + return new PipelineIteration( + injector.getInstance(GroningenConfig.class), + injector.getInstance(PipelineSynchronizer.class), + injector.getInstance(MetricExporter.class), + injector.getInstance(PipelineStageInfo.class), + pipelineStagesList.toArray(new ProfilingRunnable[0])); + } + } + + public static Builder builder() { + return new Builder(); + } + private final GroningenConfig config; private final PipelineSynchronizer pipelineSynchronizer; private final ProfilingRunnable[] pipelineStages; @@ -59,22 +80,15 @@ public class PipelineIteration { **/ private static AtomicInteger currentPipelineStage = new AtomicInteger(); - @Inject - public PipelineIteration(final GroningenConfig config, - final PipelineSynchronizer pipelineSynchronizer, - final Executor executor, - final Generator generator, - final Hypothesizer hypothesizer, - final Validator validator, - final IterationScorer scorer, - final MetricExporter metricExporter, - final PipelineStageInfo pipelineStageInfo) { - + private PipelineIteration(final GroningenConfig config, + final PipelineSynchronizer pipelineSynchronizer, + final MetricExporter metricExporter, + final PipelineStageInfo pipelineStageInfo, + final ProfilingRunnable[] pipelineStages) { this.config = config; this.pipelineSynchronizer = pipelineSynchronizer; - this.pipelineStages = Lists.newArrayList(hypothesizer, generator, executor, validator, scorer) - .toArray(new ProfilingRunnable[0]); this.pipelineStageInfo = pipelineStageInfo; + this.pipelineStages = pipelineStages; metricExporter.register( "current_pipeline_stage", diff --git a/src/main/java/org/arbeitspferde/groningen/PipelineIterationProvider.java b/src/main/java/org/arbeitspferde/groningen/PipelineIterationProvider.java new file mode 100644 index 0000000..9f150a7 --- /dev/null +++ b/src/main/java/org/arbeitspferde/groningen/PipelineIterationProvider.java @@ -0,0 +1,37 @@ +package org.arbeitspferde.groningen; + +import org.arbeitspferde.groningen.config.PipelineIterationScoped; +import org.arbeitspferde.groningen.executor.Executor; +import org.arbeitspferde.groningen.generator.Generator; +import org.arbeitspferde.groningen.hypothesizer.Hypothesizer; +import org.arbeitspferde.groningen.scorer.IterationScorer; +import org.arbeitspferde.groningen.validator.Validator; + +import com.google.inject.Inject; +import com.google.inject.Provider; + +@PipelineIterationScoped +public class PipelineIterationProvider implements Provider { + + private final PipelineIteration.Builder builder; + + @Inject + public PipelineIterationProvider(final Hypothesizer hypothesizer, + final Generator generator, + final Executor executor, + final Validator validator, + final IterationScorer scorer) { + builder = PipelineIteration.builder() + .addPipelineStage(hypothesizer) + .addPipelineStage(generator) + .addPipelineStage(executor) + .addPipelineStage(validator) + .addPipelineStage(scorer); + } + + @Override + public PipelineIteration get() { + return builder.build(); + } + +} diff --git a/src/main/java/org/arbeitspferde/groningen/WorkhorseFactory.java b/src/main/java/org/arbeitspferde/groningen/WorkhorseFactory.java index 321d890..b9c5de8 100644 --- a/src/main/java/org/arbeitspferde/groningen/WorkhorseFactory.java +++ b/src/main/java/org/arbeitspferde/groningen/WorkhorseFactory.java @@ -15,6 +15,7 @@ package org.arbeitspferde.groningen; import com.google.inject.Guice; +import com.google.inject.Injector; import com.google.inject.Module; /** @@ -23,7 +24,15 @@ * it would be for another user of Groningen to extend the test bed for new purposes. */ public class WorkhorseFactory { + + private static Injector injector; + + public static Injector getInjector() { + return injector; + } + public GroningenWorkhorse workhorseFor(final Module... modules) { - return Guice.createInjector(modules).getInstance(GroningenWorkhorse.class); + injector = Guice.createInjector(modules); + return injector.getInstance(GroningenWorkhorse.class); } } diff --git a/src/main/java/org/arbeitspferde/groningen/generator/Generator.java b/src/main/java/org/arbeitspferde/groningen/generator/Generator.java index df2ab2f..8a6c0ef 100644 --- a/src/main/java/org/arbeitspferde/groningen/generator/Generator.java +++ b/src/main/java/org/arbeitspferde/groningen/generator/Generator.java @@ -65,7 +65,7 @@ public class Generator extends ProfilingRunnable { private final PipelineId pipelineId; - /** The delay period in millis betwen retries of failing Generator runs */ + /** The delay period in millis between retries of failing Generator runs */ private static final long RUN_GENERATOR_RETRY_DELAY = 60000L; /**