Skip to content
This repository has been archived by the owner on Nov 26, 2021. It is now read-only.

Commit

Permalink
blink
Browse files Browse the repository at this point in the history
  • Loading branch information
alpinegizmo committed Sep 2, 2020
1 parent ad16768 commit a75cbc8
Show file tree
Hide file tree
Showing 14 changed files with 88 additions and 216 deletions.
23 changes: 20 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<slf4j.version>1.7.19</slf4j.version>
<flink.version>1.10.0</flink.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.binary.version>2.11</scala.binary.version>
<junit.version>4.12</junit.version>
<assertj.version>3.11.1</assertj.version>
</properties>
Expand Down Expand Up @@ -110,9 +110,26 @@

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-uber_${scala.binary.version}</artifactId>
<artifactId>flink-table-api-java-bridge_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import org.apache.flink.api.java.typeutils.EnumTypeInfo;
import org.apache.flink.api.java.typeutils.ListTypeInfo;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
Expand Down Expand Up @@ -122,7 +124,8 @@ public void flatMap(Tuple2<Shape, Shape> value, Collector<Tuple2<Shape, Shape>>
.setParallelism(4)
.broadcast(rulesStateDescriptor);

DataStream<String> output = itemColorKeyedStream
BroadcastConnectedStream<Item, Tuple2<Shape, Shape>> foo = itemColorKeyedStream.connect(broadcastRulesStream);
SingleOutputStreamOperator<String> output = itemColorKeyedStream
.connect(broadcastRulesStream)
.process(new MatchFunction());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ public static void main(String[] args) throws Exception {
tEnv.registerFunction("toCoords", new GeoUtils.ToCoords());

Table results = tEnv.sqlQuery(
//"SELECT TUMBLE_START(eventTime, INTERVAL '1' HOUR), isStart, count(isStart) FROM TaxiRides GROUP BY isStart, TUMBLE(eventTime, INTERVAL '1' HOUR)"
//"SELECT avg(endTime - startTime), passengerCnt FROM TaxiRides GROUP BY passengerCnt"
"SELECT CAST (toCellId(endLon, endLat) AS VARCHAR), eventTime," +
"COUNT(*) OVER (" +
"PARTITION BY toCellId(endLon, endLat) ORDER BY eventTime RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" +
") " +
"FROM( SELECT * FROM TaxiRides WHERE not isStart AND toCellId(endLon, endLat) = 50801 )"
"SELECT TUMBLE_START(eventTime, INTERVAL '1' HOUR), isStart, count(isStart) FROM TaxiRides GROUP BY isStart, TUMBLE(eventTime, INTERVAL '1' HOUR)"
);

// convert Table into an append stream and print it
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,14 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.sources.DefinedRowtimeAttributes;
import org.apache.flink.table.sources.RowtimeAttributeDescriptor;
import org.apache.flink.table.sources.StreamTableSource;
import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp;
import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks;
import org.apache.flink.table.types.DataType;
import org.apache.flink.types.Row;

import java.util.Collections;
Expand Down Expand Up @@ -130,17 +132,17 @@ public TypeInformation<Row> getReturnType() {

@Override
public TableSchema getTableSchema() {
TypeInformation<?>[] types = new TypeInformation[] {
Types.LONG,
Types.LONG,
Types.LONG,
Types.BOOLEAN,
Types.FLOAT,
Types.FLOAT,
Types.FLOAT,
Types.FLOAT,
Types.SHORT,
Types.SQL_TIMESTAMP
DataType[] types = new DataType[] {
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BIGINT(),
DataTypes.BOOLEAN(),
DataTypes.FLOAT(),
DataTypes.FLOAT(),
DataTypes.FLOAT(),
DataTypes.FLOAT(),
DataTypes.SMALLINT(),
DataTypes.TIMESTAMP()
};

String[] names = new String[]{
Expand All @@ -156,7 +158,10 @@ public TableSchema getTableSchema() {
"eventTime"
};

return new TableSchema(names, types);
return new TableSchema.Builder()
.fields(names, types)
.watermark("eventTime", "eventTime - INTERVAL '5' SECOND", DataTypes.TIMESTAMP())
.build();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception {
env.setParallelism(1);

DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks(Time.milliseconds(OUT_OF_ORDERNESS)));

Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime");
tableEnv.registerTable("events", events);
Expand Down Expand Up @@ -85,8 +85,8 @@ public void cancel() {
}

private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor<Event> {
public TimestampsAndWatermarks() {
super(Time.milliseconds(OUT_OF_ORDERNESS));
public TimestampsAndWatermarks(Time t) {
super(t);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public static void main(String[] args) throws Exception {

Table popPlaces = tEnv
// scan TaxiRides table
.scan("TaxiRides")
.from("TaxiRides")
// filter for valid rides
.filter("isInNYC(startLon, startLat) && isInNYC(endLon, endLat)")
// select fields and compute grid cell of departure or arrival coordinates
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ else if(head.f1 instanceof Watermark) {
sourceContext.emitWatermark(emitWatermark);
// schedule next watermark
long watermarkTime = delayedEventTime + watermarkDelayMSecs;
Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1);
Watermark nextWatermark = new Watermark(watermarkTime);
emitSchedule.add(new Tuple2<Long, Object>(watermarkTime, nextWatermark));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class ExerciseBase {
public static SinkFunction out = null;
public static int parallelism = 4;

public final static String pathToRideData = "/Users/david/stuff/flink-training/trainingData/nycTaxiRides.gz";
public final static String pathToFareData = "/Users/david/stuff/flink-training/trainingData/nycTaxiFares.gz";
public final static String pathToRideData = "/Users/david/stuff/flink-training-web/trainingData/nycTaxiRides.gz";
public final static String pathToFareData = "/Users/david/stuff/flink-training-web/trainingData/nycTaxiFares.gz";

public static SourceFunction<TaxiRide> rideSourceOrTest(SourceFunction<TaxiRide> source) {
if (rides == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,16 @@
import com.ververica.flinktraining.exercises.datastream_java.utils.GeoUtils;
import com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.io.File;
import java.io.FileOutputStream;
import java.io.ObjectOutputStream;

public class RideCleansingSolution extends ExerciseBase {
public static void main(String[] args) throws Exception {

Expand All @@ -48,6 +54,22 @@ public static void main(String[] args) throws Exception {
// print the filtered stream
printOrTest(filteredRides);

rides.map(new MapFunction<TaxiRide, Object>() {
@Override
public Object map(TaxiRide value) throws Exception {
System.out.println(("hello"));
System.err.println(("world"));
return null;
}
});

//JobGraph jobGraph = env.getStreamGraph().getJobGraph();
//final String jobGraphFilename = "job.graph";
//File jobGraphFile = new File(jobGraphFilename);
//FileOutputStream output = new FileOutputStream(jobGraphFile);
//ObjectOutputStream obOutput = new ObjectOutputStream(output);
//obOutput.writeObject(jobGraph);

// run the cleansing pipeline
env.execute("Taxi Ride Cleansing");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ public static void main(String[] args) throws Exception {

env.enableCheckpointing(10000L);
CheckpointConfig config = env.getCheckpointConfig();
config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
config.enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

DataStream<TaxiRide> rides = env
.addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.state.api.ExistingSavepoint;
Expand All @@ -31,6 +32,9 @@
import com.ververica.flinktraining.exercises.datastream_java.datatypes.TaxiFare;
import com.ververica.flinktraining.exercises.datastream_java.datatypes.TaxiRide;

import static org.apache.flink.api.java.aggregation.Aggregations.MIN;
import static org.apache.flink.api.java.aggregation.Aggregations.SUM;

public class ReadRidesAndFaresSnapshot {
static class ReadRidesAndFares extends KeyedStateReaderFunction<Long, Tuple2<TaxiRide, TaxiFare>> {
ValueState<TaxiRide> ride;
Expand Down Expand Up @@ -59,7 +63,7 @@ public static void main(String[] args) throws Exception {
/***************************************************************************************
Update this path to point to a checkpoint or savepoint from RidesAndFaresSolution.java
***************************************************************************************/
String pathToSnapshot = "file:///tmp/checkpoints/12529c8286f3bb9721dbd7076832ff06/chk-5";
String pathToSnapshot = "file:///tmp/checkpoints/d026157cec94402dd98c6be51d4db8ca/chk-2";

ExistingSavepoint sp = Savepoint.load(bEnv, pathToSnapshot, backend);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand Down Expand Up @@ -52,31 +53,26 @@ public static void main(String[] args) throws Exception {
final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second

// set up streaming execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf = new Configuration();
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(ExerciseBase.parallelism);

// start the data generator
DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor)));
DataStream<TaxiFare> fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor))).name("testname");

// compute tips per hour for each driver
DataStream<Tuple3<Long, Long, Float>> hourlyTips = fares
fares
.keyBy((TaxiFare fare) -> fare.driverId)
.timeWindow(Time.hours(1))
.process(new AddTips());

DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
.process(new AddTips())
.timeWindowAll(Time.hours(1))
.maxBy(2);

// You should explore how this alternative behaves. In what ways is the same as,
// and different from, the solution above (using a timeWindowAll)?
.maxBy(2)
.print();

// DataStream<Tuple3<Long, Long, Float>> hourlyMax = hourlyTips
// .keyBy(0)
// .maxBy(2);
// printOrTest(hourlyMax);

printOrTest(hourlyMax);
System.out.println(env.getExecutionPlan());

// execute the transformation pipeline
env.execute("Hourly Tips (java)");
Expand Down

This file was deleted.

Loading

0 comments on commit a75cbc8

Please sign in to comment.