Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PipelineIteration refactor. #17

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 32 additions & 39 deletions src/main/java/org/arbeitspferde/groningen/PipelineIteration.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,22 @@

package org.arbeitspferde.groningen;

import com.google.common.collect.Lists;
import com.google.inject.Inject;

import org.arbeitspferde.groningen.config.GroningenConfig;
import org.arbeitspferde.groningen.config.PipelineIterationScoped;
import org.arbeitspferde.groningen.executor.Executor;
import org.arbeitspferde.groningen.experimentdb.ExperimentDb;
import org.arbeitspferde.groningen.generator.Generator;
import org.arbeitspferde.groningen.hypothesizer.Hypothesizer;
import org.arbeitspferde.groningen.profiling.ProfilingRunnable;
import org.arbeitspferde.groningen.scorer.IterationScorer;
import org.arbeitspferde.groningen.utility.Metric;
import org.arbeitspferde.groningen.utility.MetricExporter;
import org.arbeitspferde.groningen.validator.Validator;

import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -45,11 +47,8 @@ public class PipelineIteration {

private final GroningenConfig config;
private final PipelineSynchronizer pipelineSynchronizer;
private final Executor executor;
private final Generator generator;
private final Hypothesizer hypothesizer;
private final Validator validator;
private final IterationScorer scorer;
private final ProfilingRunnable[] pipelineStages;

private final PipelineStageInfo pipelineStageInfo;

/**
Expand All @@ -73,11 +72,8 @@ public PipelineIteration(final GroningenConfig config,

this.config = config;
this.pipelineSynchronizer = pipelineSynchronizer;
this.executor = executor;
this.generator = generator;
this.hypothesizer = hypothesizer;
this.validator = validator;
this.scorer = scorer;
this.pipelineStages = Lists.newArrayList(hypothesizer, generator, executor, validator, scorer)
.toArray(new ProfilingRunnable[0]);
this.pipelineStageInfo = pipelineStageInfo;

metricExporter.register(
Expand All @@ -100,41 +96,38 @@ public int getStage() {
* valid window.
*/
public int getRemainingExperimentalSecs() {
return (int) executor.getRemainingDurationSeconds();
// TODO(sanragsood): Clean this up, PipelineIteration should be unaware of the specifics of
// the stage pipeline iteration is in.
for (ProfilingRunnable stage : pipelineStages) {
if (stage instanceof Executor) {
return (int)((Executor)stage).getRemainingDurationSeconds();
}
}
return 0;
}

public boolean run() {
executor.startUp();
generator.startUp();
hypothesizer.startUp();
validator.startUp();
scorer.startUp();

currentPipelineStage.set(0);
for (ProfilingRunnable stage : pipelineStages) {
stage.startUp();
}

// Synchronization within the pipeline iteration - after the config is updated
pipelineSynchronizer.iterationStartHook();

pipelineStageInfo.set(PipelineStageState.HYPOTHESIZER);
hypothesizer.run(config);
boolean notComplete = hypothesizer.notComplete();

if (notComplete) {
currentPipelineStage.set(1);
pipelineStageInfo.set(PipelineStageState.GENERATOR);
generator.run(config);

// This stage returns only when all experiments are complete
currentPipelineStage.set(2);
pipelineSynchronizer.executorStartHook();
executor.run(config);

currentPipelineStage.set(3);
pipelineStageInfo.set(PipelineStageState.SCORING);
validator.run(config);

currentPipelineStage.set(4);
scorer.run(config);
int stageCount = 0;
boolean notComplete = true;
for (ProfilingRunnable stage : pipelineStages) {
currentPipelineStage.set(stageCount++);
pipelineStageInfo.setStageFromRunnable(stage);
stage.run(config);
// TODO(sanragsood): Clean this up, remove dependency on stage specifics from pipeline
// iteration
if (stage instanceof Hypothesizer) {
notComplete = ((Hypothesizer)stage).notComplete();
if (!notComplete) {
break;
}
}
}

return notComplete;
Expand Down
25 changes: 20 additions & 5 deletions src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import com.google.inject.Inject;

import org.arbeitspferde.groningen.generator.Generator;
import org.arbeitspferde.groningen.hypothesizer.Hypothesizer;
import org.arbeitspferde.groningen.profiling.ProfilingRunnable;
import org.arbeitspferde.groningen.scorer.IterationScorer;

/**
* Ties together the iteration count and pipeline state into a value object in which the count
* and state can be operated/retrieved atomically. Encapsulating them allows for state to be
Expand All @@ -11,22 +16,32 @@
public class PipelineStageInfo {
private int iterationNumber;
private PipelineStageState state;

@Inject
public PipelineStageInfo() {
this(0, PipelineStageState.INITIALIZED);
}

public PipelineStageInfo(int iterationNumber, PipelineStageState state) {
this.iterationNumber = iterationNumber;
this.state = state;
}

/** Set only the pipeline state. */
public synchronized void set(PipelineStageState state) {
this.state = state;
}

public synchronized void setStageFromRunnable(ProfilingRunnable runnable) {
if (runnable instanceof Hypothesizer) {
set(PipelineStageState.HYPOTHESIZER);
} else if (runnable instanceof Generator) {
set(PipelineStageState.GENERATOR);
} else if (runnable instanceof IterationScorer) {
set(PipelineStageState.SCORING);
}
}

/** Increment the iteration count and set the stage atomically. */
public synchronized void incrementIterationAndSetState(PipelineStageState state) {
iterationNumber++;
Expand All @@ -35,7 +50,7 @@ public synchronized void incrementIterationAndSetState(PipelineStageState state)

/**
* Get a tuple of the iteration count and state atomically.
*
*
* @return the iteration count and state
*/
public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() {
Expand All @@ -48,7 +63,7 @@ public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() {
public static class ImmutablePipelineStageInfo {
public final int iterationNumber;
public final PipelineStageState state;

private ImmutablePipelineStageInfo(int iterationNumber, PipelineStageState state) {
this.iterationNumber = iterationNumber;
this.state = state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private void checkCommandLine(String commandLine) {
}
}
}

/**
* Provide the remaining time within the run of the experiment.
*
Expand All @@ -359,6 +359,7 @@ public long getRemainingDurationSeconds() {

@Override
public void profiledRun(GroningenConfig config) throws RuntimeException {
pipelineSynchronizer.executorStartHook();
preSteps(config);
steadyState(config);
postSteps(config);
Expand Down Expand Up @@ -484,8 +485,8 @@ private void postSteps(GroningenConfig config) {
// Clear the JVM settings protobuf to cause subjects to restart with default JVM settings
subjectSettingsFileManager.delete(subject.getAssociatedSubject().getExpSettingsFile());
}
pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART);

pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART);
restartAllGroups(config);
for (SubjectStateBridge subject : subjects) {
log.info(String.format("Running extractor thread on %s.", subject.getHumanIdentifier()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ protected void setUp() throws Exception {
EasyMock.replay(mockSubjectInterrogator);

mockPipelineSynchronizer = EasyMock.createMock(PipelineSynchronizer.class);
mockPipelineSynchronizer.executorStartHook();
EasyMock.replay(mockPipelineSynchronizer);

mockSubjectSettingsFileManager = EasyMock.createMock(SubjectSettingsFileManager.class);
Expand All @@ -80,7 +81,7 @@ protected void setUp() throws Exception {
EasyMock.replay(mockCollectionLogAddressor);

final PipelineStageInfo pipelineStageInfo = new PipelineStageInfo();

executor = new Executor(clock, monitor, experimentDb, mockManipulator, mockHealthQuerier,
mockSubjectInterrogator, mockPipelineSynchronizer, mockSubjectSettingsFileManager,
mockMetricExporter, mockFileFactory, new NullServingAddressGenerator(),
Expand Down