Last Updated: 2019-08-01
It can be useful to leverage real time data in a machine learning model when making a prediction. However, doing so requires setting up a streaming data pipeline which is non-trivial.
Typically you will have the following:
These steps happen continuously and in real-time, and are illustrated by the blue arrows in the diagram above.
Once this streaming data pipeline is established, we need to modify our model serving to leverage it. This simply means adding a call to the persistent store (BigQuery) to fetch the latest real-time data when a prediction request comes in. This flow is illustrated by the red arrows in the diagram above.
This codelab will implement the architecture above to leverage real-time traffic data as a feature in our tensorflow model to predict taxi fare. This will allow the model to predict higher fares during peak traffic times and vise versa.
To get started open a new terminal in your AI Platform JupyterLab instance:
Then git clone the repository (skip this step if you've already cloned this in a prior lab):
git clone https://github.com/GoogleCloudPlatform/training-data-analyst.git
First we'll train a model that includes ‘trips_last_5min' as a feature. This represents the number of taxi rides across the whole NYC taxi fleet taken in the last five minutes. This is our proxy for traffic.
In JuptyerLab navigate to the following directory:
Open up and run through
There are no TODOs in this notebook.
After running this notebook verify you have a trained model in the output directory:
In the notebook we took the traffic proxy feature as given, but if you're interested in how this feature was created see here.
Since we don't actually have real-time taxi data we will synthesize it using a simple python script. The script publishes events to Google Cloud Pub/Sub.
In the same folder, inspect the
iot_devices.py script. It is configured to send about 2000 trip messages every five minutes with some randomness in the frequency to mimic traffic fluctuations. These numbers comes from looking at the historical average of taxi ride frequency in BigQuery.
In production this script would be replaced with actual taxis with IoT devices sending trip data to Cloud Pub/Sub.
In a terminal, navigate the taxicab_traffic directory then execute the
PROJECT_ID=$(gcloud config list project --format "value(core.project)") python iot_devices.py --project=$PROJECT_ID
You will see new messages being published every 5 seconds. Keep this terminal open
Now that we have our taxi data being pushed to Pub/Sub, let's consume it using a streaming DataFlow pipeline.
The pipeline is defined in
streaming_count.py, open and inspect it.
There are 5 transformations being applied:
The second transform is left as a #TODO for you: Specify a sliding window that is 5 minutes long, and gets recalculated every 15 seconds.
Reference the beam programming guide for guidance. To check your answer reference the solution.
In a new terminal launch the pipeline. Change the BUCKET if necessary (it is assumed to be your PROJECT_ID)
PROJECT_ID=$(gcloud config list project --format "value(core.project)") BUCKET=$PROJECT_ID # CHANGE AS NECESSARY python streaming_count.py \ --input_topic taxi_rides \ --runner=DataflowRunner \ --project=$PROJECT_ID \ --temp_location=gs://$BUCKET/dataflow_streaming
You should now see your pipeline running in the Dataflow jobs in the Cloud Console:
The dataflow pipeline we just created writes to a BigQuery Table, however we've yet to actually create that table! Let's remedy that now. Execute the following in a terminal:
bq mk --dataset taxifare
bq mk --table taxifare.traffic_realtime trips_last_5min:INTEGER,time:TIMESTAMP
Since your Dataflow Pipeline is already running, you should start seeing data in your table within one minute. Open the BigQuery Web UI and run the following query.
SELECT * FROM `taxifare.traffic_realtime` ORDER BY time DESC
Re-run the above query periodically data to observe new data streaming in! You should see a new row every 15 seconds.
DELETE FROM `taxifare.traffic_realtime` WHERE TIMESTAMP_DIFF(CURRENT_TIMESTAMP,time,HOUR) > 12
Now that we have our BQ table being updated every 15 seconds with the latest traffic data, we're ready to deploy the model.
deploy.ipynb in JupyterLab and complete the TODOs.
Once the model is deployed, note how the prediction changes every 15 seconds even with the same client inputs. It's responding to new traffic data!
Congratulations! You now have a streaming data pipeline and a tensorflow model which leverages it.