Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pipeline iteration builder #21

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/main/java/org/arbeitspferde/groningen/BaseModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ protected void configure() {
.in(PipelineScoped.class);

// PipelineIteration-scoped bindings
bind(PipelineIteration.class).toProvider(PipelineIterationProvider.class)
.in(PipelineIterationScoped.class);
bind(Generator.class).in(PipelineIterationScoped.class);
bind(Validator.class).in(PipelineIterationScoped.class);
bind(IterationScorer.class).in(PipelineIterationScoped.class);
Expand Down
117 changes: 62 additions & 55 deletions src/main/java/org/arbeitspferde/groningen/PipelineIteration.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,19 @@

package org.arbeitspferde.groningen;

import com.google.inject.Inject;
import com.google.common.collect.Lists;
import com.google.inject.Injector;

import org.arbeitspferde.groningen.config.GroningenConfig;
import org.arbeitspferde.groningen.config.PipelineIterationScoped;
import org.arbeitspferde.groningen.executor.Executor;
import org.arbeitspferde.groningen.experimentdb.ExperimentDb;
import org.arbeitspferde.groningen.generator.Generator;
import org.arbeitspferde.groningen.hypothesizer.Hypothesizer;
import org.arbeitspferde.groningen.scorer.IterationScorer;
import org.arbeitspferde.groningen.profiling.ProfilingRunnable;
import org.arbeitspferde.groningen.utility.Metric;
import org.arbeitspferde.groningen.utility.MetricExporter;
import org.arbeitspferde.groningen.validator.Validator;

import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
Expand All @@ -43,13 +42,34 @@
public class PipelineIteration {
private static final Logger log = Logger.getLogger(PipelineIteration.class.getCanonicalName());

public static final class Builder {

private List<ProfilingRunnable> pipelineStagesList = Lists.newArrayList();

public Builder addPipelineStage(ProfilingRunnable pipelineStage) {
pipelineStagesList.add(pipelineStage);
return this;
}

public PipelineIteration build() {
Injector injector = WorkhorseFactory.getInjector();
return new PipelineIteration(
injector.getInstance(GroningenConfig.class),
injector.getInstance(PipelineSynchronizer.class),
injector.getInstance(MetricExporter.class),
injector.getInstance(PipelineStageInfo.class),
pipelineStagesList.toArray(new ProfilingRunnable[0]));
}
}

public static Builder builder() {
return new Builder();
}

private final GroningenConfig config;
private final PipelineSynchronizer pipelineSynchronizer;
private final Executor executor;
private final Generator generator;
private final Hypothesizer hypothesizer;
private final Validator validator;
private final IterationScorer scorer;
private final ProfilingRunnable[] pipelineStages;

private final PipelineStageInfo pipelineStageInfo;

/**
Expand All @@ -60,25 +80,15 @@ public class PipelineIteration {
**/
private static AtomicInteger currentPipelineStage = new AtomicInteger();

@Inject
public PipelineIteration(final GroningenConfig config,
final PipelineSynchronizer pipelineSynchronizer,
final Executor executor,
final Generator generator,
final Hypothesizer hypothesizer,
final Validator validator,
final IterationScorer scorer,
final MetricExporter metricExporter,
final PipelineStageInfo pipelineStageInfo) {

private PipelineIteration(final GroningenConfig config,
final PipelineSynchronizer pipelineSynchronizer,
final MetricExporter metricExporter,
final PipelineStageInfo pipelineStageInfo,
final ProfilingRunnable[] pipelineStages) {
this.config = config;
this.pipelineSynchronizer = pipelineSynchronizer;
this.executor = executor;
this.generator = generator;
this.hypothesizer = hypothesizer;
this.validator = validator;
this.scorer = scorer;
this.pipelineStageInfo = pipelineStageInfo;
this.pipelineStages = pipelineStages;

metricExporter.register(
"current_pipeline_stage",
Expand All @@ -100,41 +110,38 @@ public int getStage() {
* valid window.
*/
public int getRemainingExperimentalSecs() {
return (int) executor.getRemainingDurationSeconds();
// TODO(sanragsood): Clean this up, PipelineIteration should be unaware of the specifics of
// the stage pipeline iteration is in.
for (ProfilingRunnable stage : pipelineStages) {
if (stage instanceof Executor) {
return (int)((Executor)stage).getRemainingDurationSeconds();
}
}
return 0;
}

public boolean run() {
executor.startUp();
generator.startUp();
hypothesizer.startUp();
validator.startUp();
scorer.startUp();

currentPipelineStage.set(0);
for (ProfilingRunnable stage : pipelineStages) {
stage.startUp();
}

// Synchronization within the pipeline iteration - after the config is updated
pipelineSynchronizer.iterationStartHook();

pipelineStageInfo.set(PipelineStageState.HYPOTHESIZER);
hypothesizer.run(config);
boolean notComplete = hypothesizer.notComplete();

if (notComplete) {
currentPipelineStage.set(1);
pipelineStageInfo.set(PipelineStageState.GENERATOR);
generator.run(config);

// This stage returns only when all experiments are complete
currentPipelineStage.set(2);
pipelineSynchronizer.executorStartHook();
executor.run(config);

currentPipelineStage.set(3);
pipelineStageInfo.set(PipelineStageState.SCORING);
validator.run(config);

currentPipelineStage.set(4);
scorer.run(config);
int stageCount = 0;
boolean notComplete = true;
for (ProfilingRunnable stage : pipelineStages) {
currentPipelineStage.set(stageCount++);
pipelineStageInfo.setStageFromRunnable(stage);
stage.run(config);
// TODO(sanragsood): Clean this up, remove dependency on stage specifics from pipeline
// iteration
if (stage instanceof Hypothesizer) {
notComplete = ((Hypothesizer)stage).notComplete();
if (!notComplete) {
break;
}
}
}

return notComplete;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package org.arbeitspferde.groningen;

import org.arbeitspferde.groningen.config.PipelineIterationScoped;
import org.arbeitspferde.groningen.executor.Executor;
import org.arbeitspferde.groningen.generator.Generator;
import org.arbeitspferde.groningen.hypothesizer.Hypothesizer;
import org.arbeitspferde.groningen.scorer.IterationScorer;
import org.arbeitspferde.groningen.validator.Validator;

import com.google.inject.Inject;
import com.google.inject.Provider;

@PipelineIterationScoped
public class PipelineIterationProvider implements Provider<PipelineIteration> {

private final PipelineIteration.Builder builder;

@Inject
public PipelineIterationProvider(final Hypothesizer hypothesizer,
final Generator generator,
final Executor executor,
final Validator validator,
final IterationScorer scorer) {
builder = PipelineIteration.builder()
.addPipelineStage(hypothesizer)
.addPipelineStage(generator)
.addPipelineStage(executor)
.addPipelineStage(validator)
.addPipelineStage(scorer);
}

@Override
public PipelineIteration get() {
return builder.build();
}

}
25 changes: 20 additions & 5 deletions src/main/java/org/arbeitspferde/groningen/PipelineStageInfo.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import com.google.inject.Inject;

import org.arbeitspferde.groningen.generator.Generator;
import org.arbeitspferde.groningen.hypothesizer.Hypothesizer;
import org.arbeitspferde.groningen.profiling.ProfilingRunnable;
import org.arbeitspferde.groningen.scorer.IterationScorer;

/**
* Ties together the iteration count and pipeline state into a value object in which the count
* and state can be operated/retrieved atomically. Encapsulating them allows for state to be
Expand All @@ -11,22 +16,32 @@
public class PipelineStageInfo {
private int iterationNumber;
private PipelineStageState state;

@Inject
public PipelineStageInfo() {
this(0, PipelineStageState.INITIALIZED);
}

public PipelineStageInfo(int iterationNumber, PipelineStageState state) {
this.iterationNumber = iterationNumber;
this.state = state;
}

/** Set only the pipeline state. */
public synchronized void set(PipelineStageState state) {
this.state = state;
}

public synchronized void setStageFromRunnable(ProfilingRunnable runnable) {
if (runnable instanceof Hypothesizer) {
set(PipelineStageState.HYPOTHESIZER);
} else if (runnable instanceof Generator) {
set(PipelineStageState.GENERATOR);
} else if (runnable instanceof IterationScorer) {
set(PipelineStageState.SCORING);
}
}

/** Increment the iteration count and set the stage atomically. */
public synchronized void incrementIterationAndSetState(PipelineStageState state) {
iterationNumber++;
Expand All @@ -35,7 +50,7 @@ public synchronized void incrementIterationAndSetState(PipelineStageState state)

/**
* Get a tuple of the iteration count and state atomically.
*
*
* @return the iteration count and state
*/
public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() {
Expand All @@ -48,7 +63,7 @@ public synchronized ImmutablePipelineStageInfo getImmutableValueCopy() {
public static class ImmutablePipelineStageInfo {
public final int iterationNumber;
public final PipelineStageState state;

private ImmutablePipelineStageInfo(int iterationNumber, PipelineStageState state) {
this.iterationNumber = iterationNumber;
this.state = state;
Expand Down
11 changes: 10 additions & 1 deletion src/main/java/org/arbeitspferde/groningen/WorkhorseFactory.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

package org.arbeitspferde.groningen;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;

/**
Expand All @@ -23,7 +24,15 @@
* it would be for another user of Groningen to extend the test bed for new purposes.
*/
public class WorkhorseFactory {

private static Injector injector;

public static Injector getInjector() {
return injector;
}

public GroningenWorkhorse workhorseFor(final Module... modules) {
return Guice.createInjector(modules).getInstance(GroningenWorkhorse.class);
injector = Guice.createInjector(modules);
return injector.getInstance(GroningenWorkhorse.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ private void checkCommandLine(String commandLine) {
}
}
}

/**
* Provide the remaining time within the run of the experiment.
*
Expand All @@ -359,6 +359,7 @@ public long getRemainingDurationSeconds() {

@Override
public void profiledRun(GroningenConfig config) throws RuntimeException {
pipelineSynchronizer.executorStartHook();
preSteps(config);
steadyState(config);
postSteps(config);
Expand Down Expand Up @@ -484,8 +485,8 @@ private void postSteps(GroningenConfig config) {
// Clear the JVM settings protobuf to cause subjects to restart with default JVM settings
subjectSettingsFileManager.delete(subject.getAssociatedSubject().getExpSettingsFile());
}
pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART);

pipelineStageInfo.set(PipelineStageState.FINAL_TASK_RESTART);
restartAllGroups(config);
for (SubjectStateBridge subject : subjects) {
log.info(String.format("Running extractor thread on %s.", subject.getHumanIdentifier()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ public class Generator extends ProfilingRunnable {

private final PipelineId pipelineId;

/** The delay period in millis betwen retries of failing Generator runs */
/** The delay period in millis between retries of failing Generator runs */
private static final long RUN_GENERATOR_RETRY_DELAY = 60000L;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ protected void setUp() throws Exception {
EasyMock.replay(mockSubjectInterrogator);

mockPipelineSynchronizer = EasyMock.createMock(PipelineSynchronizer.class);
mockPipelineSynchronizer.executorStartHook();
EasyMock.replay(mockPipelineSynchronizer);

mockSubjectSettingsFileManager = EasyMock.createMock(SubjectSettingsFileManager.class);
Expand All @@ -80,7 +81,7 @@ protected void setUp() throws Exception {
EasyMock.replay(mockCollectionLogAddressor);

final PipelineStageInfo pipelineStageInfo = new PipelineStageInfo();

executor = new Executor(clock, monitor, experimentDb, mockManipulator, mockHealthQuerier,
mockSubjectInterrogator, mockPipelineSynchronizer, mockSubjectSettingsFileManager,
mockMetricExporter, mockFileFactory, new NullServingAddressGenerator(),
Expand Down