Last Updated: 2019-08-01

It can be useful to leverage real time data in a machine learning model when making a prediction. However, doing so requires setting up a streaming data pipeline which is non-trivial.

Typically you will have the following:

  1. A series of IoT devices generating and sending data from the field in real-time (in our case these are the taxis)
  2. A messaging bus to that receives and temporarily stores the IoT data (in our case this is Cloud Pub/Sub)
  3. A streaming processing service that subscribes to the messaging bus, windows the messages and performs data transformations on each window (in our case this is Cloud Dataflow)
  4. A persistent store to keep the processed data (in our case this is BigQuery)

These steps happen continuously and in real-time, and are illustrated by the blue arrows in the diagram above.

Once this streaming data pipeline is established, we need to modify our model serving to leverage it. This simply means adding a call to the persistent store (BigQuery) to fetch the latest real-time data when a prediction request comes in. This flow is illustrated by the red arrows in the diagram above.

This codelab will implement the architecture above to leverage real-time traffic data as a feature in our tensorflow model to predict taxi fare. This will allow the model to predict higher fares during peak traffic times and vise versa.

To get started open a new terminal in your AI Platform JupyterLab instance:

Then git clone the repository (skip this step if you've already cloned this in a prior lab):

git clone https://github.com/GoogleCloudPlatform/training-data-analyst.git

First we'll train a model that includes ‘trips_last_5min' as a feature. This represents the number of taxi rides across the whole NYC taxi fleet taken in the last five minutes. This is our proxy for traffic.

In JuptyerLab navigate to the following directory:

training-data-analyst/courses/machine_learning/deepdive/04_advanced_preprocessing/labs/taxicab_traffic

Open up and run through train.ipynb.

There are no TODOs in this notebook.

After running this notebook verify you have a trained model in the output directory:

In the notebook we took the traffic proxy feature as given, but if you're interested in how this feature was created see here.

Since we don't actually have real-time taxi data we will synthesize it using a simple python script. The script publishes events to Google Cloud Pub/Sub.

In the same folder, inspect the iot_devices.py script. It is configured to send about 2000 trip messages every five minutes with some randomness in the frequency to mimic traffic fluctuations. These numbers comes from looking at the historical average of taxi ride frequency in BigQuery.

In production this script would be replaced with actual taxis with IoT devices sending trip data to Cloud Pub/Sub.

In a terminal, navigate the taxicab_traffic directory then execute the iot_devices.py script.

PROJECT_ID=$(gcloud config list project --format "value(core.project)")
python iot_devices.py --project=$PROJECT_ID

You will see new messages being published every 5 seconds. Keep this terminal open

Now that we have our taxi data being pushed to Pub/Sub, let's consume it using a streaming DataFlow pipeline.

The pipeline is defined in streaming_count.py, open and inspect it.

There are 5 transformations being applied:

  1. Read from PubSub
  2. Window the messages (TODO)
  3. Count number of messages in the window
  4. Format the count for BigQuery
  5. Write to BigQuery

The second transform is left as a #TODO for you: Specify a sliding window that is 5 minutes long, and gets recalculated every 15 seconds.

Reference the beam programming guide for guidance. To check your answer reference the solution.

In a new terminal launch the pipeline. Change the BUCKET if necessary (it is assumed to be your PROJECT_ID)

PROJECT_ID=$(gcloud config list project --format "value(core.project)")
BUCKET=$PROJECT_ID # CHANGE AS NECESSARY 
python streaming_count.py \
        --input_topic taxi_rides \
        --runner=DataflowRunner \
        --project=$PROJECT_ID \
        --temp_location=gs://$BUCKET/dataflow_streaming

You should now see your pipeline running in the Dataflow jobs in the Cloud Console:

The dataflow pipeline we just created writes to a BigQuery Table, however we've yet to actually create that table! Let's remedy that now. Execute the following in a terminal:

1) Create a dataset named taxifare

bq mk --dataset taxifare

2) Create a table named traffic_realtime

bq mk --table taxifare.traffic_realtime trips_last_5min:INTEGER,time:TIMESTAMP

3) Explore Data in Table

Since your Dataflow Pipeline is already running, you should start seeing data in your table within one minute. Open the BigQuery Web UI and run the following query.

SELECT
  *
FROM
  `taxifare.traffic_realtime`
ORDER BY
  time DESC

Re-run the above query periodically data to observe new data streaming in! You should see a new row every 15 seconds.

DELETE
FROM
  `taxifare.traffic_realtime`
WHERE
  TIMESTAMP_DIFF(CURRENT_TIMESTAMP,time,HOUR) > 12

Now that we have our BQ table being updated every 15 seconds with the latest traffic data, we're ready to deploy the model.

Open deploy.ipynb in JupyterLab and complete the TODOs.

Once the model is deployed, note how the prediction changes every 15 seconds even with the same client inputs. It's responding to new traffic data!

Congratulations! You now have a streaming data pipeline and a tensorflow model which leverages it.