Skip to content
Albert Bifet edited this page Oct 18, 2013 · 2 revisions

TopologyBuilder is a builder class which builds physical units of the topology and assemble them together. Each topology has a name. Following code snippet shows all the steps of creating a topology with one EntrancePI, two PIs and a few streams.

TopologyBuilder builder = new TopologyBuilder(factory) // ComponentFactory factory
builder.initTopology("Parma Topology"); //initiates an empty topology with a name
//********************************Topology building***********************************
StreamSourceP sourceProcessor = new StreamSourceP(inputPath,d,sampleSize,fpmGap,epsilon,phi,numSamples);
	
starter = new FpmTopologyStarter(ss);
builder.addEntranceProcessor(ss, starter);
Stream sourceDataStream = builder.createStream(sourceProcessor);
sourceProcessor.setDataStream(sourceDataStream);
Stream sourceControlStream = builder.createStream(sourceProcessor);
sourceProcessor.setControlStream(sourceControlStream);

SamplerP samplerP = new SamplerP(minFreqPercent,sampleSize,(float)epsilon,outputPath,sampler);
builder.addProcessor(samplerP, numSamples);
builder.connectInputAllStream(sourceControlStream, samplerP);
builder.connectInputShuffleStream(sourceDataStream, samplerP);

Stream samplerDataStream = builder.createStream(samplerP);
samplerP.setSamplerDataStream(samplerDataStream);
Stream samplerControlStream = builder.createStream(samplerP);
samplerP.setSamplerControlStream(samplerControlStream);

AggregatorP aggregatorProcessor = new AggregatorP(outputPath,(long)numSamples,(long)sampleSize,(long)reqApproxNum,(float)epsilon);
builder.addProcessor(aggregatorProcessor, numAggs);
builder.connectInputKeyStream(samplerDataStream, aggregatorProcessor);
builder.connectInputAllStream(samplerControlStream, aggregatorProcessor);