Running custom model training on Vertex Pipelines

1. Overview

In this lab, you will learn how to run a custom model training job using the Kubeflow Pipelines SDK on Vertex Pipelines.

What you learn

You'll learn how to:

  • Use the Kubeflow Pipelines SDK to build scalable ML pipelines
  • Create and containerize a custom Scikit-learn model training job that uses Vertex AI managed datasets, and will run on Vertex AI Training within a pipeline
  • Run a batch prediction job within Vertex Pipelines
  • Use pre-built components for interacting with Vertex AI services, provided through the google_cloud_pipeline_components library

The total cost to run this lab on Google Cloud is about $5.

2. Intro to Vertex AI

This lab uses the Vertex AI, our end-to-end managed ML platform on Google Cloud. Vertex AI integrates Google's ML offerings across Google Cloud into a seamless development experience. In addition to model training and deployment services, Vertex AI also includes a variety of MLOps products, including Vertex Pipelines (the focus of this lab), Model Monitoring, Feature Store, and more. You can see all Vertex AI product offerings in the diagram below.

Vertex product overview

If you have any feedback, please see the support page.

Why are ML pipelines useful?

Before we dive in, let's first understand why you would want to use a pipeline. Imagine you're building out a ML workflow that includes processing data, training a model, hyperparameter tuning, evaluation, and model deployment. Each of these steps may have different dependencies, which may become unwieldy if you treat the entire workflow as a monolith. As you begin to scale your ML process, you might want to share your ML workflow with others on your team so they can run it and contribute code. Without a reliable, reproducible process, this can become difficult. With pipelines, each step in your ML process is its own container. This lets you develop steps independently and track the input and output from each step in a reproducible way. You can also schedule or trigger runs of your pipeline based on other events in your Cloud environment, like kicking off a pipeline run when new training data is available.

The tl;dr: pipelines help you streamline and reproduce your ML workflows.

3. Cloud environment setup

You'll need a Google Cloud Platform project with billing enabled to run this codelab. To create a project, follow the instructions here.

Step 1: Start Cloud Shell

In this lab you're going to work in a Cloud Shell session, which is a command interpreter hosted by a virtual machine running in Google's cloud. You could just as easily run this section locally on your own computer, but using Cloud Shell gives everyone access to a reproducible experience in a consistent environment. After the lab, you're welcome to retry this section on your own computer.

Authorize cloud shell

Activate Cloud Shell

From the top right of the Cloud Console, click the button below to Activate Cloud Shell:

Activate Cloud Shell

If you've never started Cloud Shell before, you're presented with an intermediate screen (below the fold) describing what it is. If that's the case, click Continue (and you won't ever see it again). Here's what that one-time screen looks like:

Cloud Shell setup

It should only take a few moments to provision and connect to Cloud Shell.

Cloud Shell init

This virtual machine is loaded with all the development tools you need. It offers a persistent 5GB home directory and runs in Google Cloud, greatly enhancing network performance and authentication. Much, if not all, of your work in this codelab can be done with simply a browser or your Chromebook.

Once connected to Cloud Shell, you should see that you are already authenticated and that the project is already set to your project ID.

Run the following command in Cloud Shell to confirm that you are authenticated:

gcloud auth list

You should see something like this in the command output:

Cloud Shell output

Run the following command in Cloud Shell to confirm that the gcloud command knows about your project:

gcloud config list project

Command output

project = <PROJECT_ID>

If it is not, you can set it with this command:

gcloud config set project <PROJECT_ID>

Command output

Updated property [core/project].

Cloud Shell has a few environment variables, including GOOGLE_CLOUD_PROJECT which contains the name of our current Cloud project. We'll use this in various places throughout this lab. You can see it by running:


Step 2: Enable APIs

In later steps, you'll see where these services are needed (and why), but for now, run this command to give your project access to the Compute Engine, Container Registry, and Vertex AI services:

gcloud services enable         \

This should produce a successful message similar to this one:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Step 3: Create a Cloud Storage Bucket

To run a training job on Vertex AI, we'll need a storage bucket to store our saved model assets. The bucket needs to be regional. We're using us-central here, but you are welcome to use another region (just replace it throughout this lab). If you already have a bucket you can skip this step.

Run the following commands in your Cloud Shell terminal to create a bucket:

gsutil mb -l us-central1 $BUCKET_NAME

Next we'll give our compute service account access to this bucket. This will ensure that Vertex Pipelines has the necessary permissions to write files to this bucket. Run the following command to add this permission:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Step 4: Create a Vertex AI Workbench instance

From the Vertex AI section of your Cloud Console, click on Workbench:

Vertex AI menu

From there, within user-managed Notebooks, click New Notebook:

Create new notebook

Then select the TensorFlow Enterprise 2.3 (with LTS) instance type without GPUs:

TFE instance

Use the default options and then click Create.

Step 5: Open your Notebook

Once the instance has been created, select Open JupyterLab:

Open Notebook

4. Vertex Pipelines setup

There are a few additional libraries we'll need to install in order to use Vertex Pipelines:

  • Kubeflow Pipelines: This is the SDK we'll be using to build our pipeline. Vertex Pipelines supports running pipelines built with both Kubeflow Pipelines or TFX.
  • Google Cloud Pipeline Components: This library provides pre-built components that make it easier to interact with Vertex AI services from your pipeline steps.

Step 1: Create Python notebook and install libraries

First, from the Launcher menu in your Notebook instance (which you can access by clicking on the + icon in the top left of your notebook), create a notebook by selecting Python 3:

Create Python3 notebook

You can access the Launcher menu by clicking on the + sign in the top left of your notebook instance.

To install both services we'll be using in this lab, first set the user flag in a notebook cell:

USER_FLAG = "--user"

Then run the following from your notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

After installing these packages you'll need to restart the kernel:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Step 2: Set your project ID and bucket

Throughout this lab you'll reference your Cloud project ID and the bucket you created earlier. Next we'll create variables for each of those.

If you don't know your project ID you may be able to get it by running the following:

import os

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set it here:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Then create a variable to store your bucket name. If you created it in this lab, the following will work. Otherwise, you'll need to set this manually:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Step 3: Import libraries

Add the following to import the libraries we'll be using throughout this codelab:

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline

from import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

Step 4: Define constants

The last thing we need to do before building our pipeline is define some constant variables. PIPELINE_ROOT is the Cloud Storage path where the artifacts created by our pipeline will be written. We're using us-central1 as the region here, but if you used a different region when you created your bucket, update the REGION variable in the code below:

%env PATH={PATH}:/home/jupyter/.local/bin

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"

After running the code above, you should see the root directory for your pipeline printed. This is the Cloud Storage location where the artifacts from your pipeline will be written. It will be in the format of gs://YOUR-BUCKET-NAME/pipeline_root/

5. Configuring a custom model training job

Before we set up our pipeline, we need to write the code for our custom model training job. To train the model, we'll use the UCI Machine Learning Dry beans dataset, from: KOKLU, M. and OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

Our first pipeline step will create a managed dataset in Vertex AI using a BigQuery table that contains a version of this beans data. The dataset will be passed as input to our training job. In our training code, we'll have access to environment variable to access this managed dataset.

Here's how we'll set up our custom training job:

  • Write a Scikit-learn DecisionTreeClassifier model to classify bean types in our data
  • Package the training code in a Docker container and push it to Container Registry

From there, we'll be able to start a Vertex AI Training job directly from our pipeline. Let's get started!

Step 1: Define our training code in a Docker container

From your Notebooks instance, open the Launcher and select Terminal:

Open terminal

Then run the following to set up a directory where you'll add your containerized code:

mkdir traincontainer
cd traincontainer
touch Dockerfile

mkdir trainer
touch trainer/

After running those commands, you should see a directory called traincontainer/ created on the left (you may need to click the refresh icon to see it). You'll see the following in your traincontainer/ directory:

+ Dockerfile
+ trainer/

Our first step in containerizing our code is to create a Dockerfile. In our Dockerfile we'll include all the commands needed to run our image. It'll install all the libraries we're using and set up the entry point for our training code. Open the Dockerfile you just created, and add the following:


# Copies the trainer code to the docker image.
COPY trainer /trainer

RUN pip install sklearn google-cloud-bigquery joblib pandas google-cloud-storage

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.train"]

To save files as you're editing them in your notebook instance, you can use ctrl+s.

Next, open the file. This is where we'll add our training code. Copy the following into This retrieves the data from our managed dataset, puts it into a Pandas DataFrame, trains a Scikit-learn model, and uploads the trained model to Cloud Storage:

from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from import bigquery
from import storage
from joblib import dump

import os
import pandas as pd

bqclient = bigquery.Client()
storage_client = storage.Client()

def download_table(bq_table_uri: str):
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]

    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bqclient.list_rows(
    return rows.to_dataframe(create_bqstorage_client=False)

# These environment variables are from Vertex AI managed datasets
training_data_uri = os.environ["AIP_TRAINING_DATA_URI"]
test_data_uri = os.environ["AIP_TEST_DATA_URI"]

# Download data into Pandas DataFrames, split into train / test
df = download_table(training_data_uri)
test_df = download_table(test_data_uri)
labels = df.pop("Class").tolist()
data = df.values.tolist()
test_labels = test_df.pop("Class").tolist()
test_data = test_df.values.tolist()

# Define and train the Scikit model
skmodel = DecisionTreeClassifier(), labels)
score = skmodel.score(test_data, test_labels)
print('accuracy is:',score)

# Save the model to a local file
dump(skmodel, "model.joblib")

# Upload the saved model file to GCS
bucket = storage_client.get_bucket("YOUR_GCS_BUCKET")
model_directory = os.environ["AIP_MODEL_DIR"]
storage_path = os.path.join(model_directory, "model.joblib")
blob = storage.blob.Blob.from_string(storage_path, client=storage_client)

Then, navigate back to your notebook and run the following to replace YOUR_GCS_BUCKET from the script above with the name of your Cloud Storage bucket:

BUCKET = BUCKET_NAME[5:] # Trim the 'gs://' before adding to train script
!sed -i -r 's@YOUR_GCS_BUCKET@'"$BUCKET"'@' traincontainer/trainer/

You can also do this manually if you'd prefer. If you do, make sure not to include the gs:// in your bucket name when you update the script.

Now our training code is in a Docker container and we're ready to run training in the Cloud.

Step 2: Push container to Container Registry

With our training code complete, we're ready to push this to Google Container Registry. Later when we configure the training component of our pipeline, we'll point Vertex Pipelines at this container.

Navigate back to your Terminal, and from the root of your traincontainer/ directory, define a variable with the URI for your container image on Container Registry.

PROJECT_ID=$(gcloud config get-value project)

Then, build your container by running the following:

docker build ./ -t $IMAGE_URI

Finally, push the container to Container Registry:

docker push $IMAGE_URI

Navigate to the Container Registry section of your Cloud console to verify your container is there. It will look something like this:

Container Registry

6. Configuring a batch prediction job

The last step of our pipeline will run a batch prediction job. For this to work, we need to provide a CSV file in Cloud Storage that contains the examples we want to get predictions on. We'll create this CSV file in our notebook and copy it to Cloud Storage using the gsutil command line tool.

Copying batch prediction examples to Cloud Storage

The following file contains 3 examples from each class in our beans dataset. The example below doesn't include the Class column since that is what our model will be predicting. Run the following to create this CSV file locally in your notebook:

%%writefile batch_examples.csv

Then, copy the file to your Cloud Storage bucket:

!gsutil cp batch_examples.csv $BUCKET_NAME

We'll reference this file in the next step when we define our pipeline.

7. Building a pipeline with pre-built components

Now that our training code is in the cloud, we're ready to call it from our pipeline. The pipeline we'll define will use three pre-built components from the google_cloud_pipeline_components library we installed earlier. These pre-defined components simplify the code we need to write to set up our pipeline, and will allow us to use Vertex AI services like model training and batch prediction.

Here's what our three-step pipeline will do:

  • Create a managed dataset in Vertex AI
  • Run a training job on Vertx AI using the custom container we set up
  • Run a batch prediction job on our trained Scikit-learn classification model

Step 1: Define our pipeline

Because we're using pre-built components, we can set up our entire pipeline in the pipeline definition. Add the following to a notebook cell:

def pipeline(
    bq_source: str = "bq://sara-vertex-demos.beans_demo.large_dataset",
    bucket: str = BUCKET_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = REGION,
    bq_dest: str = "",
    container_uri: str = "",
    batch_destination: str = ""
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(

    training_op = gcc_aip.CustomContainerTrainingJobRunOp(
    batch_predict_op = gcc_aip.ModelBatchPredictOp(

Step 2: Compile and run the pipeline

With your pipeline defined, you're ready to compile it. The following will generate a JSON file that you'll use to run the pipeline:

    pipeline_func=pipeline, package_path="custom_train_pipeline.json"

Next, create a TIMESTAMP variable. We'll use this in our job ID:

from datetime import datetime

TIMESTAMP ="%Y%m%d%H%M%S")

Then define your pipeline job, passing in a few project-specific parameters:

pipeline_job = aiplatform.PipelineJob(
        "project": PROJECT_ID,
        "bucket": BUCKET_NAME,
        "bq_dest": "bq://{0}".format(PROJECT_ID),
        "container_uri": "{0}/scikit:v1".format(PROJECT_ID),
        "batch_destination": "{0}/batchpredresults".format(BUCKET_NAME)

Finally, run the job to create a new pipeline execution:


After running this cell, you should see logs with a link to view the pipeline run in your console:

Pipeline job logs

Navigate to that link. You can also access it by opening your Pipelines dashboard. Your pipeline should look like this when complete:

Completed intro pipeline

This pipeline will take 5-10 minutes to run, but you can continue to the next step before it completes. Next you'll learn more about what's happening in each of these pipeline steps.

8. Understanding your pipeline execution

Let's take a deeper dive into each of our three pipeline steps.

Pipeline Step 1: Creating a managed dataset

The first step in our pipeline creates a managed dataset in Vertex AI. If you click on the following dataset link in the Pipelines section of your console:

Link to dataset from pipeline

You'll see your dataset in Vertex AI, which includes a link to the data source in BigQuery along with information on the different columns in your dataset. Once you've uploaded a managed dataset to Vertex AI, it can be used to train either an AutoML or custom model.

For custom model jobs that use managed datasets, Vertex AI passes special environment variables to your training jobs and handles splitting your data into train and test sets. We'll make use of this in our next pipeline step.

Pipeline Step 2: Training a model in Vertex AI Training

While your custom training job is running, you can click to view logs directly in the Vertex Pipelines console:

Custom training logs

You can also see details on the custom training job in your Vertex AI Training dashboard. When the training job completes, a Model resource will be created in Vertex AI. We can then deploy this model to an endpoint for online predictions or create a batch prediction job, which we'll do in our next pipeline step.

Pipeline Step 3: Running a batch prediction job on our model

Finally, our pipeline will get predictions on the examples we passed in via a CSV file. When the batch prediction job completes, Vertex AI will write a CSV file to the location we specified in Cloud Storage. When this pipeline step starts running, you can navigate to the Batch Predictions section of your Vertex AI console to see the job created.

Click on the job when it completes to see the Cloud Storage URL of your batch predictions:

Batch prediction job

Click on that link to go to the Cloud Storage directory where you can find the prediction results, and then click to download one of the prediction.results files. In the file, you should see rows that look like the following:

{"instance": [33954.0, 716.75, 277.3684803, 156.3563259, 1.773951126, 0.825970469, 34420.0, 207.9220419, 0.7994819873, 0.9864613597, 0.8305492781, 0.7496238998, 0.008168948587, 0.001591181142, 0.5619359911, 0.996846984], "prediction": "HOROZ"}

This includes the feature values for a particular instance, along with the class our model predicted. For this example, our model thinks this was a "HOROZ" bean.

With that, you've finished the lab!

🎉 Congratulations! 🎉

You've learned how to use Vertex AI to:

  • Use the Kubeflow Pipelines SDK to build end-to-end pipelines with custom components
  • Run your pipelines on Vertex Pipelines and kick off pipeline runs with the SDK
  • View and analyze your Vertex Pipelines graph in the console
  • Use pre-built pipeline components to add Vertex AI services to your pipeline
  • Schedule recurring pipeline jobs

To learn more about different parts of Vertex, check out the documentation.

9. Cleanup

So that you're not charged, it is recommended that you delete the resources created throughout this lab.

Step 1: Stop or delete your Notebooks instance

If you'd like to continue using the notebook you created in this lab, it is recommended that you turn it off when not in use. From the Notebooks UI in your Cloud Console, select the notebook and then select Stop. If you'd like to delete the instance entirely, select Delete:

Stop instance

Step 2: Delete your Cloud Storage bucket

To delete the Storage Bucket, using the Navigation menu in your Cloud Console, browse to Storage, select your bucket, and click Delete:

Delete storage