forked from confluentinc/demo-scene
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathdemo.ksql
37 lines (30 loc) · 1.91 KB
/
demo.ksql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
SET 'auto.offset.reset' = 'earliest';
-- creates a stream from tweets (and topic)
CREATE STREAM twitter_raw ( \
CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) \
WITH (KAFKA_TOPIC='twitter_json_01', partitions=12, VALUE_FORMAT='JSON');
-- create stream for particular hashtag
CREATE STREAM twitter AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name, \
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName, \
EXTRACTJSONFIELD(user,'$.Location') AS user_Location, \
EXTRACTJSONFIELD(user,'$.Description') AS user_Description, \
Text,hashtagentities,lang \
FROM twitter_raw WHERE LCASE(hashtagentities) like '%kafkasummit%';
-- continuous query
SELECT USER_NAME, TEXT FROM TWITTER WHERE LCASE(TEXT) LIKE '%gamussa%';
CREATE TABLE user_tweet_count AS \
SELECT user_screenname, count(*) AS tweet_count \
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
WHERE LCASE(TEXT) LIKE '%gamussa%' \
GROUP BY user_screenname ;
CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START,\
USER_SCREENNAME, \
TWEET_COUNT FROM user_tweet_count;
create table more_than_3_tweets_kafkasummit as \
SELECT WINDOW_START, USER_SCREENNAME, TWEET_COUNT \
FROM USER_TWEET_COUNT_DISPLAY \
WHERE TWEET_COUNT > 3;
select USER_SCREENNAME, TWEET_COUNT from more_than_3_tweets_kafkasummit;