In this lab, you run Dataflow pipelines to serve predictions for batch requests as well as streaming in real-time.

What you need

You must have

What you learn

In this lab, you write code to:

Step 1

Launch Cloud Shell (NOT Datalab) from GCP console and run the following commands to confirm that user account and project are correctly set

gcloud auth list
gcloud config list project

If it is not, you can set it with this command:

gcloud config set project <PROJECT_ID>

Step 2

In Cloud Shell, navigate to the folder containing the starter code for this lab

cd ~/training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving

If this directory doesn't exist, you may need to git clone the repository first:

cd ~
git clone https://github.com/GoogleCloudPlatform/training-data-analyst.git

Step 3

Run the what_to_fix.sh script to see a list of items you need to add/modify to existing code to run your app:

./what_to_fix.sh

As a result of this, you will see a list of filenames and lines within those files marked with "TODO". These are the lines where you have to add/modify code. For this lab, you will focus on #TODO items for .java files only, namely BabyweightMLService.java : which is your prediction service

In this section, you fix the code in BabyweightMLService.java and test it with the run_once.sh script that is provided. If you need help with the code, look at the next section that provides hints on how to fix code in BabyweightMLService.java

Step 1

You may use the Cloud Shell code editor to view and edit the contents of these files.

Click on the icon on the top right of your Cloud Shell window to launch Code Editor

Step 2

After it is launched, navigate to the following directory: training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving/pipeline/src/main/java/com/google/cloud/training/mlongcp

Step 3

Open the BabyweightMLService.java files and replace #TODOs in the code.

Step 4

Once completed, go into your Cloud Shell and run the run_once.sh script to test your ML service

cd ~/training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving
./run_once.sh

This section of the lab calls AddPrediction.java that takes a batch input (one big CSV), calls the prediction service to generate baby weight predictions and writes them into local files (multiple CSVs).

Step 1

In your Cloud Shell code editor, open the AddPrediction.java file available in the following directory: training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving/pipeline/src/main/java/com/google/cloud/training/mlongcp

Step 2

Look through the code and notice how, based on input argument, it decides to set up a batch or streaming pipeline, and creates the appropriate TextInputOutput or PubSubBigQuery io object respectively to handle the reading and writing.

Step 3

Test batch mode by running the run_ontext.sh script provided in the lab directory:

cd ~/training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving
./run_ontext.sh

In this section of the lab, you will launch a streaming pipeline with Dataflow, which will accept incoming information from Cloud Pub/Sub, use the info to call the prediction service to get baby weight predictions, and finally write that info into a BigQuery table.

Step 1

On your GCP Console's left-side menu, go into Pub/Sub and click the "Create Topic" button on top. Create a topic called babies.

Step 2

Back in your Cloud Shell, modify the script run_dataflow.sh to get Project Id (using --project) from command line arguments, and then run as follows:

cd ~/training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving
./run_dataflow.sh

This will create a streaming Dataflow pipeline.

Step 3

Back in your GCP Console, use the left-side menu to go into Dataflow and verify that the streaming job is created.

Step 4

Next, click on the job name to view the pipeline graph. Click on the pipeline steps (boxes) and look at the run details (like system lag, elements added, etc.) of that step on the right side.

This means that your pipeline is running and waiting for input. Let's provide input through the Pub/Sub topic.

Step 5

Copy some lines from your example.csv.gz

cd ~/training-data-analyst/courses/machine_learning/deepdive/06_structured/labs/serving
zcat exampledata.csv.gz

Step 6

On your GCP Console, go back into Pub/Sub, click on the babies topic, and then click on "Publish message" button on top. In the message box, paste the lines you just copied from exampledata.csv.gz and click on Publish button.

Step 7

You may go back into Dataflow jobs on your GCP Console, click on your job and see how the run details have changed for the steps, for example click on write_toBQ and look at Elements added.

Step 8

Lets verify that the predicted weights have been recorded into the BigQuery table. On your GCP console, click on BigQuery. This typically opens a new tab and may ask for you qwiklabs account's password. Once entered, you will be redirected to BigQuery console. Look at the left-side menu and you should see the babyweight dataset. Click on the blue down arrow to its left, and you should see your prediction table.

Step 9

Click on Compose Query button on the top left. Type the query below in the query box to retrieve rows from your predictions table. Click on Show Options button under the query box and uncheck "Use Legacy SQL".

SELECT * FROM babyweight.predictions LIMIT 1000

Step 10

Click the Run Query button. Notice the predicted_weights_pounds column in the result.

Step 11

Remember that your pipeline is still running. You can publish additional messages from your example.csv.gz and verify new rows added to your predictions table. Once you are satisfied, you may stop the Dataflow pipeline by going into your Dataflow Jobs page, and click the Stop job button on the right side Job summary window.