Skip to content

Commit

Permalink
Merge pull request #16 from flightaware/pre-webinar
Browse files Browse the repository at this point in the history
  • Loading branch information
NasaGeek authored Feb 2, 2021
2 parents 3bce260 + 43ca87e commit 3fddea1
Show file tree
Hide file tree
Showing 385 changed files with 611 additions and 112 deletions.
4 changes: 2 additions & 2 deletions .env-sample
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FH_USERNAME=user
FH_APIKEY=key
INIT_CMD_ARGS=events "flightplan departure arrival cancellation position"
GOOGLE_MAPS_API_KEY=key
INIT_CMD_ARGS=events "flifo departure arrival cancellation position"
GOOGLE_MAPS_API_KEY=key
3 changes: 1 addition & 2 deletions .github/workflows/dockerimage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,7 @@ jobs:
env:
INIT_CMD_TIME: "pitr 1584126630"
INIT_CMD_ARGS: "events \"flightplan departure arrival cancellation\""
KAFKA_FLIFO_TOPIC_NAME: feed1
KAFKA_POSITION_TOPIC_NAME: position_feed1
KAFKA_TOPIC_NAME: feed1
run: |
cd connector
make pip-sync-ci
Expand Down
109 changes: 60 additions & 49 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ Firestarter is a small collection of services and sample applications to help
you get started using [FlightAware's Firehose Flight Data Feed](https://flightaware.com/commercial/firehose/documentation).

Firestarter is structured as a group of Docker containers managed by
Docker Compose. Currently, 2 core services and 1 sample application are
Docker Compose. Currently, 3 core services and 2 sample applications are
included, with more being developed (see [the roadmap](./ROADMAP.md) for
details).

Expand All @@ -17,8 +17,8 @@ before you can start using Firestarter.
available at https://flightaware.com/commercial/firehose/documentation/commands
and in the env section of [docker-compose.yml](./docker-compose.yml). Its value
will vary based on your account configuration, but a very basic example that
should work for most users is `events "flightplan departure arrival
cancellation"`.
should work for most users is `events "flifo departure arrival cancellation
position"`.

There are a number of other environment variables that can be set to tune the
behavior of Firestarter. They are documented in
Expand All @@ -30,17 +30,22 @@ Details available at Docker's site: https://docs.docker.com/get-docker/
The usual Docker Compose incantation run in the root of this repo will get you
up and running:
```
docker-compose up --build
docker-compose pull && docker-compose up
```

After running the above command and letting the 3 containers build, you should
be greeted with log output from each container. The services will log
periodically as Firehose messages are received, while the sample webapp will
produce some initial log output and then only log as requests are made to it.
`docker-compose pull` pulls prebuilt images from the Github Container Registry,
and `docker-compose up` creates containers from those images and launches them.
If you'd like to build the images yourself, you can instead run
`docker-compose up --build`.

You can test out the sample application by visiting http://localhost:5000 in
your web browser (if not running Docker locally, use the Docker host's
address).
After running the above command, you should be greeted with log output from
each container. The services will log periodically as Firehose messages are
received, while the sample webapps will produce some initial log output and
then only log as requests are made to them.

You can test out the FIDS sample application by visiting http://localhost:5000
in your web browser (if not running Docker locally, use the Docker host's
address). The map sample application can be accessed at http://localhost:5001.


## Firestarter Components
Expand All @@ -53,8 +58,30 @@ involves building and sending the initiation command, handling compression, and
reconnecting to Firehose without data loss if the connection is interrupted.
The connector then forwards Firehose messages to its own clients.

### kafka/zookeeper
We are using kafka as a message queue between the connector and the db-updater.
Kafka depends on zookeeper to coordinate some important tasks, so we included
that as well. We chose to pull existing docker containers for these pieces of
software.
Their documentation can be found here:
https://hub.docker.com/r/bitnami/kafka/
https://hub.docker.com/r/bitnami/zookeeper/

In this code, the connector is the kafka "producer" and the db-updater is the
kafka "consumer". If db-updater stops running and restarts, kafka will ensure
that it starts reading from the queue where it left off. We recommend that
you let kafka take care of this offset reconnect logic.

We ensure that the kafka consumer will start where it left off with the
"enable_auto_commit" and "auto_commit_interval_ms" parameters. We also need to
be sure to provide a group name to store the last offset. Consumers with
different group names will each consume all messages in a given topic, and
consumers with the same group name will split messages from that topic between
them. A single Kafka topic is used in Firestarter to stream all messages
published by the connector to all subscribers.

### db-updater
The db-updater service receives Firehose messages from the connector and
The db-updater service receives Firehose messages from the queue and
maintains a database table based on their contents. The service is capable of
handling so-called "flifo" (flight info) messages and airborne position messages.
Two db-updater containers are configured by default - one handles flight info and
Expand All @@ -66,54 +93,38 @@ effort. To prevent bloat, flights and positions older than 48 hours are
automatically dropped from the table.

### fids
The sample application is a webapp backed by the flights database. You can use
it to browse flight data by airport, presenting flights similarly to how you'd
see them on a flight information display system (FIDS). Detailed information
for individual flights can also be viewed. While the 2 services are intended to
be used in a production environment, this sample application should only be
considered a demonstration of what can be built using the data from Firehose.
It should *not* be used in a production environment.

### fids with Google Maps
Now, you can see positions displayed on a static Google Maps image on each
flight info page! In order to enable this feature, you need to configure your
own Google Maps API key.
The fids sample application is a webapp backed by the flights and positions
databases. You can use it to browse flight data by airport, presenting flights
similarly to how you'd see them on a flight information display system (FIDS).
Detailed information for individual flights can also be viewed.

If you've specified a Google Maps API key, a flight's actual route will be
displayed as a static image on its information page.
Instructions:
https://developers.google.com/maps/documentation/maps-static/get-api-key
Once you get your API key, just specify it in your .env file as
GOOGLE_MAPS_API_KEY. Then you will see static maps with a flight track on your
flight info pages. Note that you may need to enter your payment information.
The Google Maps API is a paid service with a limited free tier.
flight info pages. Note that the Google Maps API is a paid service with a free
tier, so you will need to provide payment information when signing up.
Pricing information:
https://developers.google.com/maps/documentation/maps-static/usage-and-billing
You can see that you currently get "a $200 USD Google Maps Platform credit"
each month, and each query 0-100,000 is 0.002 USD each. So that means that you
will get 100,000 free queries per month. Since this is a demo and not meant for
production, that should be fine.

### kafka/zookeeper
We are using kafka as a message queue between the connector and the db-updater.
Kafka depends on zookeeper to coordinate some important tasks, so we included
that as well. We chose to pull existing docker containers for these pieces of
software.
Their documentation can be found here:
https://hub.docker.com/r/bitnami/kafka/
https://hub.docker.com/r/bitnami/zookeeper/

In this code, the connector is the kafka "producer" and the db-updater is the
kafka "consumer". If db-updater stops running and restarts, kafka will ensure
that it starts reading from the queue where it left off. We recommend that
you let kafka take care of this offset reconnect logic.

The relevant consumer code in the db-updater is here:
https://github.com/flightaware/firestarter/blob/master/db-updater/main.py#L334

We ensure that the kafka consumer will start where it left off with the
"enable_auto_commit" and "auto_commit_interval_ms" parameters. We also need to
be sure to provide a group name to store the last offset. Consumers with
different group names will each consume all messages in a given topic, and
consumers with the same group name will split messages from that topic between
them.
While Firestarter's services are suited for use in a production environment,
this sample application should only be considered a demonstration of what can
be built using the data from Firehose. It should *not* be used in a production
environment.

### map
The map sample application demonstrates another potential application of
Firehose data by plotting live airborne flight positions directly onto a
dynamic Google Map. Rather than using Firestarter's databases, this web app's
backend connects directly to the queue service. This is a place where an API
like Firehose truly shines, as no polling is required by the client to update
the map. A Google Maps API key is required to run this application.

Check out [the roadmap](./ROADMAP.md) to see what components are coming in the
future!
6 changes: 3 additions & 3 deletions ROADMAP.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,16 @@ Firestarter v1 contains 2 services and an example application. With these
components you can track flights at the granularity of departures and arrivals
(using flight info messages) and view them in a FIDS-like interface.

### v2 (current release)
### v2
Firestarter v2 will introduce a robust queueing component between the connector
and its clients, allowing for efficient data fan-out.

### v3
### v3 (current release)
Firestarter v3 adds support for processing and storage of airborne position data
from Firehose. It will likely also include a sample application for viewing such
data.

### v4
Firestarter v4 will support processing and storage of surface position data from
Firehose. It will likely also include a sample application for viewing such
data.
data.
24 changes: 8 additions & 16 deletions connector/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,24 +232,16 @@ def delivery_report(err, _):
# All we can really do is report it
print(f"Error when delivering message: {err}")

# FIXME: This makes keepalives a bit useless if they won't be showing
# up in order with any other messages
key = message.get("id", "").encode() or None
try:
if message["type"] == "keepalive":
topics = [
os.environ["KAFKA_POSITION_TOPIC_NAME"],
os.environ["KAFKA_FLIFO_TOPIC_NAME"],
]
elif message["type"] == "position":
topics = [os.environ["KAFKA_POSITION_TOPIC_NAME"]]
else:
topics = [os.environ["KAFKA_FLIFO_TOPIC_NAME"]]
for topic in topics:
producer.produce(
topic,
key=key,
value=line,
callback=delivery_report,
)
producer.produce(
os.environ["KAFKA_TOPIC_NAME"],
key=key,
value=line,
callback=delivery_report,
)
except BufferError as e:
print(f"Encountered full outgoing buffer, should resolve itself: {e}")
time.sleep(1)
Expand Down
3 changes: 1 addition & 2 deletions connector/test/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,7 @@ def setUp(self):
"INIT_CMD_TIME": "live",
"SERVER": "testserver",
"PRINT_STATS_PERIOD": "0",
"KAFKA_FLIFO_TOPIC_NAME": "topic1",
"KAFKA_POSITION_TOPIC_NAME": "topic2",
"KAFKA_TOPIC_NAME": "topic1",
},
)
self.mock_reader = Mock()
Expand Down
26 changes: 18 additions & 8 deletions db-updater/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@
sa.Column("predicted_on", TIMESTAMP_TZ()),
sa.Column("predicted_in", TIMESTAMP_TZ()),
)
VALID_EVENTS = {"arrival", "cancellation", "departure", "flightplan", "onblock", "offblock", "extendedFlightInfo"}
elif TABLE == "positions":
table = sa.Table(
"positions",
Expand Down Expand Up @@ -144,6 +145,7 @@
sa.Column("wind_quality", sa.Integer),
sa.Column("wind_speed", sa.Integer),
)
VALID_EVENTS = {"position"}

engine_args: dict = {}
db_url: str = os.environ["DB_URL"]
Expand Down Expand Up @@ -219,7 +221,7 @@ def flush(self, conn) -> None:
if not self.cache:
return

print(f"Flushing {len(self.cache)} new positions to table")
print(f"Flushing {len(self.cache)} new positions to database")
assert engine.name in ["sqlite", "postgresql"], f"{engine.name} is unsupported"
if engine.name == "postgresql":
# pylint: disable=import-outside-toplevel
Expand Down Expand Up @@ -317,16 +319,21 @@ def convert_msg_fields(msg: dict) -> dict:
"""Remove unneeded keys from message JSON and convert value types.
Modifies msg in-place and returns it."""
pitr = msg["pitr"]
for key in msg.keys() - MSG_TABLE_KEYS:
del msg[key]
for key, val in msg.items():
for key, val in list(msg.items()):
column_type = str(table.c[key].type)
if column_type == "TIMESTAMP":
msg[key] = datetime.fromtimestamp(int(val), tz=UTC)
elif column_type == "INTEGER":
msg[key] = int(val)
elif column_type == "BOOLEAN":
msg[key] = bool(int(val))
try:
if column_type == "TIMESTAMP":
msg[key] = datetime.fromtimestamp(int(val), tz=UTC)
elif column_type == "INTEGER":
msg[key] = int(float(val))
elif column_type == "BOOLEAN":
msg[key] = bool(int(val))
except Exception as e:
print(f"Couldn't convert '{key}' field in message for flight_id '{msg['id']}' at '{pitr}'")
raise
return msg


Expand Down Expand Up @@ -565,6 +572,9 @@ def main():
# They continue in the examples, so let's do it as well
continue
message = json.loads(messagestr.value())
event_type = message["type"]
if event_type != "keepalive" and event_type not in VALID_EVENTS:
continue
processor_functions.get(message["type"], process_unknown_message)(message)


Expand Down
Loading

0 comments on commit 3fddea1

Please sign in to comment.