Testing a proof of concept pipeline to produce and consume messages using openFDA API.
I will use this cluster to create a Kafka Application that produces and consumes messages. This Application will be written in Python.
To produce messages, I will use the openFDA API that enables clients to receive data in near real-time. You can request access to the API and any developer can build applications using it.
Apache Kafka is a distributed streaming platform that is used to build real time streaming data pipelines and applications that adapt to data streams.
openFDA provides APIs and raw download access to a number of high-value, high priority and scalable structured datasets, including adverse events, drug product labeling, and recall enforcement reports.
Languages
- Python
- Pandas
Technologies
- Kafka
- AWS EC2
- Jupyter
Third-Party Libraries
- AWS CLI
- s3fs
- kafka-python
1. Install and configure AWS CLI
2. Assign AmazonEC2FullAccess to your user on IAM
3. Follow this to use SSH to connect to your instance.
ssh -i "kafka-fda-project.pem" [email protected]
wget https://downloads.apache.org/kafka/3.3.1/kafka_2.12-3.3.1.tgz
tar -xvf kafka_2.12-3.3.1.tgz
sudo yum install java-1.8.0-openjdk
java -version
cd kafka_2.12-3.3.1
Note: This is a demo POC. In real world use, it is best practice to separate zookeeper, producer and consumer into separate clusters.
bin/zookeeper-server-start.sh config/zookeeper.properties
Open a new terminal window and duplicate the session
export KAFKA_HEAP_OPTS="-Xmx256M -Xms128M"
cd kafka_2.12-3.3.1
bin/kafka-server-start.sh config/server.properties
Note: This is pointing to private server, we need to change server.properties so that it can run on a public IP. You cannot access your AWS instance by EC2 hostname / private IP DNS name from outside the AWS VPC.
sudo vi config/server.properties
Change ADVERTISED_LISTENERS
to public ip of the EC2 instance
15. On your AWS EC2 console, edit EC2 > Security Groups > Inbound Rules > Allow All Traffic > Source: My IP
(If you're doing this for your organization, consult your Cloud, Devops or Security Engineer for best practices.)
Open a new terminal window and duplicate the session
cd kafka_2.12-3.3.1
bin/kafka-topics.sh --create --topic demo_test --bootstrap-server {Put the Public IP of your EC2 Instance:9092} --replication-factor 1 --partitions 1
bin/kafka-console-producer.sh --topic demo_test --bootstrap-server {Put the Public IP of your EC2 Instance:9092}
Open a new terminal window and duplicate the session
cd kafka_2.12-3.3.1
bin/kafka-console-consumer.sh --topic demo_test --bootstrap-server {Put the Public IP of your EC2 Instance:9092}