-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathstream-processing.py
93 lines (76 loc) · 2.88 KB
/
stream-processing.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import atexit
import logging
import json
import sys
import time
from kafka import KafkaProducer
from kafka.errors import KafkaError, KafkaTimeoutError
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
logger_format = '%(asctime)-15s %(message)s'
logging.basicConfig(format=logger_format)
logger = logging.getLogger('stream-processing')
logger.setLevel(logging.INFO)
topic = None
target_topic = None
brokers = None
kafka_producer = None
def shutdown_hook(producer):
"""
a shutdown hook to be called before the shutdown
:param producer: instance of a kafka producer
:return: None
"""
try:
logger.info('Flushing pending messages to kafka, timeout is set to 10s')
producer.flush(10)
logger.info('Finish flushing pending messages to kafka')
except KafkaError as kafka_error:
logger.warn('Failed to flush pending messages to kafka, caused by: %s', kafka_error.message)
finally:
try:
logger.info('Closing kafka connection')
producer.close(10)
except Exception as e:
logger.warn('Failed to close kafka connection, caused by: %s', e.message)
def process_stream(stream):
def send_to_kafka(rdd):
results = rdd.collect()
for r in results:
data = json.dumps(
{
'symbol': r[0],
'timestamp': time.time(),
'average': r[1]
}
)
try:
logger.info('Sending average price %s to kafka' % data)
kafka_producer.send(target_topic, value=data)
except KafkaError as error:
logger.warn('Failed to send average stock price to kafka, caused by: %s', error.message)
def pair(data):
record = json.loads(data[1].decode('utf-8'))[0]
return record.get('StockSymbol'), (float(record.get('LastTradePrice')), 1)
stream.map(pair).reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])).map(lambda (k, v): (k, v[0]/v[1])).foreachRDD(send_to_kafka)
if __name__ == '__main__':
if len(sys.argv) != 4:
print("Usage: stream-process.py [topic] [target-topic] [broker-list]")
exit(1)
# - create SparkContext and StreamingContext
sc = SparkContext("local[2]", "StockAveragePrice")
sc.setLogLevel('ERROR')
ssc = StreamingContext(sc, 5)
topic, target_topic, brokers = sys.argv[1:]
# - instantiate a kafka stream for processing
directKafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {'metadata.broker.list': brokers})
process_stream(directKafkaStream)
# - instantiate a simple kafka producer
kafka_producer = KafkaProducer(
bootstrap_servers=brokers
)
# - setup proper shutdown hook
atexit.register(shutdown_hook, kafka_producer)
ssc.start()
ssc.awaitTermination()