From a75cbc8f5213275f981be37eec7b151c8295dfc8 Mon Sep 17 00:00:00 2001 From: David Anderson Date: Wed, 2 Sep 2020 10:27:57 +0200 Subject: [PATCH] blink --- pom.xml | 23 ++++- .../broadcast/BroadcastState.java | 5 +- .../table_java/examples/RidesPerHour.java | 8 +- .../sources/TaxiRideTableSource.java | 29 ++++--- .../examples/table_java/stream/Sort.java | 6 +- .../popularPlaces/PopularPlacesTableApi.java | 2 +- .../sources/TaxiFareSource.java | 2 +- .../datastream_java/utils/ExerciseBase.java | 4 +- .../basics/RideCleansingSolution.java | 22 +++++ .../state/RidesAndFaresSolution.java | 3 +- .../ReadRidesAndFaresSnapshot.java | 6 +- .../windows/HourlyTipsSolution.java | 24 +++--- .../popularPlaces/PopularPlacesSql.scala | 86 ------------------- .../popularPlaces/PopularPlacesTableApi.scala | 84 ------------------ 14 files changed, 88 insertions(+), 216 deletions(-) delete mode 100644 src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesSql.scala delete mode 100644 src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesTableApi.scala diff --git a/pom.xml b/pom.xml index aab3d17..707e328 100644 --- a/pom.xml +++ b/pom.xml @@ -47,7 +47,7 @@ UTF-8 1.7.19 1.10.0 - 2.12 + 2.11 4.12 3.11.1 @@ -110,9 +110,26 @@ org.apache.flink - flink-table-uber_${scala.binary.version} + flink-table-api-java-bridge_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-table-planner-blink_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-table_${scala.binary.version} + ${flink.version} + + + + org.apache.flink + flink-table-common ${flink.version} - provided diff --git a/src/main/java/com/ververica/flinktraining/examples/datastream_java/broadcast/BroadcastState.java b/src/main/java/com/ververica/flinktraining/examples/datastream_java/broadcast/BroadcastState.java index c5ee93f..08dbc1b 100644 --- a/src/main/java/com/ververica/flinktraining/examples/datastream_java/broadcast/BroadcastState.java +++ b/src/main/java/com/ververica/flinktraining/examples/datastream_java/broadcast/BroadcastState.java @@ -24,9 +24,11 @@ import org.apache.flink.api.java.typeutils.EnumTypeInfo; import org.apache.flink.api.java.typeutils.ListTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; +import org.apache.flink.streaming.api.datastream.BroadcastConnectedStream; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction; import org.apache.flink.util.Collector; @@ -122,7 +124,8 @@ public void flatMap(Tuple2 value, Collector> .setParallelism(4) .broadcast(rulesStateDescriptor); - DataStream output = itemColorKeyedStream + BroadcastConnectedStream> foo = itemColorKeyedStream.connect(broadcastRulesStream); + SingleOutputStreamOperator output = itemColorKeyedStream .connect(broadcastRulesStream) .process(new MatchFunction()); diff --git a/src/main/java/com/ververica/flinktraining/examples/table_java/examples/RidesPerHour.java b/src/main/java/com/ververica/flinktraining/examples/table_java/examples/RidesPerHour.java index e80aca8..d869656 100644 --- a/src/main/java/com/ververica/flinktraining/examples/table_java/examples/RidesPerHour.java +++ b/src/main/java/com/ververica/flinktraining/examples/table_java/examples/RidesPerHour.java @@ -56,13 +56,7 @@ public static void main(String[] args) throws Exception { tEnv.registerFunction("toCoords", new GeoUtils.ToCoords()); Table results = tEnv.sqlQuery( - //"SELECT TUMBLE_START(eventTime, INTERVAL '1' HOUR), isStart, count(isStart) FROM TaxiRides GROUP BY isStart, TUMBLE(eventTime, INTERVAL '1' HOUR)" - //"SELECT avg(endTime - startTime), passengerCnt FROM TaxiRides GROUP BY passengerCnt" - "SELECT CAST (toCellId(endLon, endLat) AS VARCHAR), eventTime," + - "COUNT(*) OVER (" + - "PARTITION BY toCellId(endLon, endLat) ORDER BY eventTime RANGE BETWEEN INTERVAL '10' MINUTE PRECEDING AND CURRENT ROW" + - ") " + - "FROM( SELECT * FROM TaxiRides WHERE not isStart AND toCellId(endLon, endLat) = 50801 )" + "SELECT TUMBLE_START(eventTime, INTERVAL '1' HOUR), isStart, count(isStart) FROM TaxiRides GROUP BY isStart, TUMBLE(eventTime, INTERVAL '1' HOUR)" ); // convert Table into an append stream and print it diff --git a/src/main/java/com/ververica/flinktraining/examples/table_java/sources/TaxiRideTableSource.java b/src/main/java/com/ververica/flinktraining/examples/table_java/sources/TaxiRideTableSource.java index 9418f37..d59c3ad 100644 --- a/src/main/java/com/ververica/flinktraining/examples/table_java/sources/TaxiRideTableSource.java +++ b/src/main/java/com/ververica/flinktraining/examples/table_java/sources/TaxiRideTableSource.java @@ -24,12 +24,14 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.DefinedRowtimeAttributes; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.table.sources.tsextractors.StreamRecordTimestamp; import org.apache.flink.table.sources.wmstrategies.PreserveWatermarks; +import org.apache.flink.table.types.DataType; import org.apache.flink.types.Row; import java.util.Collections; @@ -130,17 +132,17 @@ public TypeInformation getReturnType() { @Override public TableSchema getTableSchema() { - TypeInformation[] types = new TypeInformation[] { - Types.LONG, - Types.LONG, - Types.LONG, - Types.BOOLEAN, - Types.FLOAT, - Types.FLOAT, - Types.FLOAT, - Types.FLOAT, - Types.SHORT, - Types.SQL_TIMESTAMP + DataType[] types = new DataType[] { + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BIGINT(), + DataTypes.BOOLEAN(), + DataTypes.FLOAT(), + DataTypes.FLOAT(), + DataTypes.FLOAT(), + DataTypes.FLOAT(), + DataTypes.SMALLINT(), + DataTypes.TIMESTAMP() }; String[] names = new String[]{ @@ -156,7 +158,10 @@ public TableSchema getTableSchema() { "eventTime" }; - return new TableSchema(names, types); + return new TableSchema.Builder() + .fields(names, types) + .watermark("eventTime", "eventTime - INTERVAL '5' SECOND", DataTypes.TIMESTAMP()) + .build(); } @Override diff --git a/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java b/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java index 0b3622c..e766beb 100644 --- a/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java +++ b/src/main/java/com/ververica/flinktraining/examples/table_java/stream/Sort.java @@ -47,7 +47,7 @@ public static void main(String[] args) throws Exception { env.setParallelism(1); DataStream eventStream = env.addSource(new OutOfOrderEventSource()) - .assignTimestampsAndWatermarks(new TimestampsAndWatermarks()); + .assignTimestampsAndWatermarks(new TimestampsAndWatermarks(Time.milliseconds(OUT_OF_ORDERNESS))); Table events = tableEnv.fromDataStream(eventStream, "eventTime.rowtime"); tableEnv.registerTable("events", events); @@ -85,8 +85,8 @@ public void cancel() { } private static class TimestampsAndWatermarks extends BoundedOutOfOrdernessTimestampExtractor { - public TimestampsAndWatermarks() { - super(Time.milliseconds(OUT_OF_ORDERNESS)); + public TimestampsAndWatermarks(Time t) { + super(t); } @Override diff --git a/src/main/java/com/ververica/flinktraining/examples/table_java/stream/popularPlaces/PopularPlacesTableApi.java b/src/main/java/com/ververica/flinktraining/examples/table_java/stream/popularPlaces/PopularPlacesTableApi.java index 8133ab4..61689fb 100644 --- a/src/main/java/com/ververica/flinktraining/examples/table_java/stream/popularPlaces/PopularPlacesTableApi.java +++ b/src/main/java/com/ververica/flinktraining/examples/table_java/stream/popularPlaces/PopularPlacesTableApi.java @@ -59,7 +59,7 @@ public static void main(String[] args) throws Exception { Table popPlaces = tEnv // scan TaxiRides table - .scan("TaxiRides") + .from("TaxiRides") // filter for valid rides .filter("isInNYC(startLon, startLat) && isInNYC(endLon, endLat)") // select fields and compute grid cell of departure or arrival coordinates diff --git a/src/main/java/com/ververica/flinktraining/exercises/datastream_java/sources/TaxiFareSource.java b/src/main/java/com/ververica/flinktraining/exercises/datastream_java/sources/TaxiFareSource.java index 1d1f9ec..31447a4 100644 --- a/src/main/java/com/ververica/flinktraining/exercises/datastream_java/sources/TaxiFareSource.java +++ b/src/main/java/com/ververica/flinktraining/exercises/datastream_java/sources/TaxiFareSource.java @@ -210,7 +210,7 @@ else if(head.f1 instanceof Watermark) { sourceContext.emitWatermark(emitWatermark); // schedule next watermark long watermarkTime = delayedEventTime + watermarkDelayMSecs; - Watermark nextWatermark = new Watermark(watermarkTime - maxDelayMsecs - 1); + Watermark nextWatermark = new Watermark(watermarkTime); emitSchedule.add(new Tuple2(watermarkTime, nextWatermark)); } } diff --git a/src/main/java/com/ververica/flinktraining/exercises/datastream_java/utils/ExerciseBase.java b/src/main/java/com/ververica/flinktraining/exercises/datastream_java/utils/ExerciseBase.java index 50d249a..85cb3f8 100644 --- a/src/main/java/com/ververica/flinktraining/exercises/datastream_java/utils/ExerciseBase.java +++ b/src/main/java/com/ververica/flinktraining/exercises/datastream_java/utils/ExerciseBase.java @@ -28,8 +28,8 @@ public class ExerciseBase { public static SinkFunction out = null; public static int parallelism = 4; - public final static String pathToRideData = "/Users/david/stuff/flink-training/trainingData/nycTaxiRides.gz"; - public final static String pathToFareData = "/Users/david/stuff/flink-training/trainingData/nycTaxiFares.gz"; + public final static String pathToRideData = "/Users/david/stuff/flink-training-web/trainingData/nycTaxiRides.gz"; + public final static String pathToFareData = "/Users/david/stuff/flink-training-web/trainingData/nycTaxiFares.gz"; public static SourceFunction rideSourceOrTest(SourceFunction source) { if (rides == null) { diff --git a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/basics/RideCleansingSolution.java b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/basics/RideCleansingSolution.java index 6a3f5f0..c302015 100644 --- a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/basics/RideCleansingSolution.java +++ b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/basics/RideCleansingSolution.java @@ -21,10 +21,16 @@ import com.ververica.flinktraining.exercises.datastream_java.utils.GeoUtils; import com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import java.io.File; +import java.io.FileOutputStream; +import java.io.ObjectOutputStream; + public class RideCleansingSolution extends ExerciseBase { public static void main(String[] args) throws Exception { @@ -48,6 +54,22 @@ public static void main(String[] args) throws Exception { // print the filtered stream printOrTest(filteredRides); + rides.map(new MapFunction() { + @Override + public Object map(TaxiRide value) throws Exception { + System.out.println(("hello")); + System.err.println(("world")); + return null; + } + }); + +//JobGraph jobGraph = env.getStreamGraph().getJobGraph(); +//final String jobGraphFilename = "job.graph"; +//File jobGraphFile = new File(jobGraphFilename); +//FileOutputStream output = new FileOutputStream(jobGraphFile); +//ObjectOutputStream obOutput = new ObjectOutputStream(output); +//obOutput.writeObject(jobGraph); + // run the cleansing pipeline env.execute("Taxi Ride Cleansing"); } diff --git a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state/RidesAndFaresSolution.java b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state/RidesAndFaresSolution.java index bb745ef..812b3d6 100644 --- a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state/RidesAndFaresSolution.java +++ b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state/RidesAndFaresSolution.java @@ -66,7 +66,8 @@ public static void main(String[] args) throws Exception { env.enableCheckpointing(10000L); CheckpointConfig config = env.getCheckpointConfig(); - config.enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + config.enableExternalizedCheckpoints( + CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); DataStream rides = env .addSource(rideSourceOrTest(new TaxiRideSource(ridesFile, delay, servingSpeedFactor))) diff --git a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state_processor/ReadRidesAndFaresSnapshot.java b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state_processor/ReadRidesAndFaresSnapshot.java index 0f84e60..4b0fa89 100644 --- a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state_processor/ReadRidesAndFaresSnapshot.java +++ b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/state_processor/ReadRidesAndFaresSnapshot.java @@ -21,6 +21,7 @@ import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.state.api.ExistingSavepoint; @@ -31,6 +32,9 @@ import com.ververica.flinktraining.exercises.datastream_java.datatypes.TaxiFare; import com.ververica.flinktraining.exercises.datastream_java.datatypes.TaxiRide; +import static org.apache.flink.api.java.aggregation.Aggregations.MIN; +import static org.apache.flink.api.java.aggregation.Aggregations.SUM; + public class ReadRidesAndFaresSnapshot { static class ReadRidesAndFares extends KeyedStateReaderFunction> { ValueState ride; @@ -59,7 +63,7 @@ public static void main(String[] args) throws Exception { /*************************************************************************************** Update this path to point to a checkpoint or savepoint from RidesAndFaresSolution.java ***************************************************************************************/ - String pathToSnapshot = "file:///tmp/checkpoints/12529c8286f3bb9721dbd7076832ff06/chk-5"; + String pathToSnapshot = "file:///tmp/checkpoints/d026157cec94402dd98c6be51d4db8ca/chk-2"; ExistingSavepoint sp = Savepoint.load(bEnv, pathToSnapshot, backend); diff --git a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/windows/HourlyTipsSolution.java b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/windows/HourlyTipsSolution.java index 17efc4d..b29726b 100644 --- a/src/main/java/com/ververica/flinktraining/solutions/datastream_java/windows/HourlyTipsSolution.java +++ b/src/main/java/com/ververica/flinktraining/solutions/datastream_java/windows/HourlyTipsSolution.java @@ -21,6 +21,7 @@ import com.ververica.flinktraining.exercises.datastream_java.utils.ExerciseBase; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -52,31 +53,26 @@ public static void main(String[] args) throws Exception { final int servingSpeedFactor = 600; // events of 10 minutes are served in 1 second // set up streaming execution environment - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + Configuration conf = new Configuration(); + StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.setParallelism(ExerciseBase.parallelism); // start the data generator - DataStream fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor))); + DataStream fares = env.addSource(fareSourceOrTest(new TaxiFareSource(input, maxEventDelay, servingSpeedFactor))).name("testname"); // compute tips per hour for each driver - DataStream> hourlyTips = fares + fares .keyBy((TaxiFare fare) -> fare.driverId) .timeWindow(Time.hours(1)) - .process(new AddTips()); - - DataStream> hourlyMax = hourlyTips + .process(new AddTips()) .timeWindowAll(Time.hours(1)) - .maxBy(2); - -// You should explore how this alternative behaves. In what ways is the same as, -// and different from, the solution above (using a timeWindowAll)? + .maxBy(2) + .print(); -// DataStream> hourlyMax = hourlyTips -// .keyBy(0) -// .maxBy(2); +// printOrTest(hourlyMax); - printOrTest(hourlyMax); + System.out.println(env.getExecutionPlan()); // execute the transformation pipeline env.execute("Hourly Tips (java)"); diff --git a/src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesSql.scala b/src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesSql.scala deleted file mode 100644 index de528fe..0000000 --- a/src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesSql.scala +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2015 data Artisans GmbH, 2019 Ververica GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ververica.flinktraining.examples.table_scala.stream.popularPlaces - -import com.ververica.flinktraining.exercises.datastream_java.utils.GeoUtils.{IsInNYC, ToCellId, ToCoords} -import com.ververica.flinktraining.examples.table_java.sources.TaxiRideTableSource -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.api.scala._ -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.table.api.scala.StreamTableEnvironment -import org.apache.flink.table.api.Table -import org.apache.flink.types.Row - -object PopularPlacesSql { - - def main(args: Array[String]) { - - // read parameters - val params = ParameterTool.fromArgs(args) - val input = params.getRequired("input") - - val maxEventDelay = 60 // events are out of order by max 60 seconds - val servingSpeedFactor = 600 // events of 10 minutes are served in 1 second - - // set up streaming execution environment - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - // create TableEnvironment - val tEnv = StreamTableEnvironment.create(env) - - // register TaxiRideTableSource as table "TaxiRides" - tEnv.registerTableSource( - "TaxiRides", - new TaxiRideTableSource(input, maxEventDelay, servingSpeedFactor)) - - // register user-defined functions - tEnv.registerFunction("isInNYC", new IsInNYC) - tEnv.registerFunction("toCellId", new ToCellId) - tEnv.registerFunction("toCoords", new ToCoords) - - val results: Table = tEnv.sqlQuery( - """ - |SELECT - | toCoords(cell), wstart, wend, isStart, popCnt - |FROM - | (SELECT - | cell, - | isStart, - | HOP_START(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wstart, - | HOP_END(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE) AS wend, - | COUNT(isStart) AS popCnt - | FROM - | (SELECT - | eventTime, - | isStart, - | CASE WHEN isStart THEN toCellId(startLon, startLat) ELSE toCellId(endLon, endLat) END AS cell - | FROM TaxiRides - | WHERE isInNYC(startLon, startLat) AND isInNYC(endLon, endLat)) - | GROUP BY cell, isStart, HOP(eventTime, INTERVAL '5' MINUTE, INTERVAL '15' MINUTE)) - |WHERE popCnt > 20 - |""".stripMargin) - - // convert Table into an append stream and print it - // (if we needed a retraction stream we would use tEnv.toRetractStream) - tEnv.toAppendStream[Row](results).print - - // execute query - env.execute - } - -} diff --git a/src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesTableApi.scala b/src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesTableApi.scala deleted file mode 100644 index dddb9e8..0000000 --- a/src/main/scala/com/ververica/flinktraining/examples/table_scala/stream/popularPlaces/PopularPlacesTableApi.scala +++ /dev/null @@ -1,84 +0,0 @@ -/* - * Copyright 2015 data Artisans GmbH, 2019 Ververica GmbH - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.ververica.flinktraining.examples.table_scala.stream.popularPlaces - -import com.ververica.flinktraining.exercises.datastream_java.utils.GeoUtils.{IsInNYC, ToCellId, ToCoords} -import com.ververica.flinktraining.examples.table_java.sources.TaxiRideTableSource -import org.apache.flink.api.java.utils.ParameterTool -import org.apache.flink.streaming.api.TimeCharacteristic -import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment -import org.apache.flink.types.Row -import org.apache.flink.api.scala._ -import org.apache.flink.table.api.Slide -import org.apache.flink.table.api.scala._ - -object PopularPlacesTableApi { - - def main(args: Array[String]) { - - // read parameters - val params = ParameterTool.fromArgs(args) - val input = params.getRequired("input") - - val maxEventDelay = 60 // events are out of order by max 60 seconds - val servingSpeedFactor = 600 // events of 10 minutes are served in 1 second - - // set up streaming execution environment - val env = StreamExecutionEnvironment.getExecutionEnvironment - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) - - // create TableEnvironment - val tEnv = StreamTableEnvironment.create(env) - - // register TaxiRideTableSource as table "TaxiRides" - tEnv.registerTableSource( - "TaxiRides", - new TaxiRideTableSource(input, maxEventDelay, servingSpeedFactor)) - - // create user-defined functions - val isInNYC = new IsInNYC - val toCellId = new ToCellId - val toCoords = new ToCoords - - val popPlaces = tEnv - // scan TaxiRides table - .scan("TaxiRides") - // filter for valid rides - .filter(isInNYC('startLon, 'startLat) && isInNYC('endLon, 'endLat)) - // select fields and compute grid cell of departure or arrival coordinates - .select( - 'eventTime, - 'isStart, - 'isStart.?(toCellId('startLon, 'startLat), toCellId('endLon, 'endLat)) as 'cell) - // specify sliding window of 15 minutes with slide of 5 minutes - .window(Slide over 15.minutes every 5.minutes on 'eventTime as 'w) - // group by cell, isStart, and window - .groupBy('cell, 'isStart, 'w) - // count departures and arrivals per cell (location) and window (time) - .select('cell, 'isStart, 'w.start as 'start, 'w.end as 'end, 'isStart.count as 'popCnt) - // filter for popular places - .filter('popCnt > 20) - // convert cell back to coordinates - .select(toCoords('cell) as 'location, 'start, 'end, 'isStart, 'popCnt) - - // convert Table into an append stream and print it - tEnv.toAppendStream[Row](popPlaces).print - - // execute query - env.execute - } - -}