Intro to Vertex Pipelines

In this lab, you will learn how to create and run ML pipelines with Vertex Pipelines.

What you learn

You'll learn how to:

  • Use the Kubeflow Pipelines SDK to build scalable ML pipelines
  • Create and run a 3-step intro pipeline that takes text input
  • Create and run a pipeline that trains, evaluates, and deploys an AutoML classification model
  • Use pre-built components for interacting with Vertex AI services, provided through the google_cloud_pipeline_components library
  • Schedule a pipeline job with Cloud Scheduler

The total cost to run this lab on Google Cloud is about $25.

This lab uses the newest AI product offering available on Google Cloud. Vertex AI integrates the ML offerings across Google Cloud into a seamless development experience. Previously, models trained with AutoML and custom models were accessible via separate services. The new offering combines both into a single API, along with other new products. You can also migrate existing projects to Vertex AI.

In addition to model training and deployment services, Vertex AI also includes a variety of MLOps products, including Vertex Pipelines (the focus of this lab), Model Monitoring, Feature Store, and more. You can see all Vertex AI product offerings in the diagram below.

Vertex product overview

If you have any feedback, please see the support page.

Why are ML pipelines useful?

Before we dive in, let's first understand why you would want to use a pipeline. Imagine you're building out a ML workflow that includes processing data, training a model, hyperparameter tuning, evaluation, and model deployment. Each of these steps may have different dependencies, which may become unwieldy if you treat the entire workflow as a monolith. As you begin to scale your ML process, you might want to share your ML workflow with others on your team so they can run it and contribute code. Without a reliable, reproducible process, this can become difficult. With pipelines, each step in your ML process is its own container. This lets you develop steps independently and track the input and output from each step in a reproducible way. You can also schedule or trigger runs of your pipeline based on other events in your Cloud environment, like kicking off a pipeline run when new training data is available.

The tl;dr: pipelines help you automate and reproduce your ML workflow.

You'll need a Google Cloud Platform project with billing enabled to run this codelab. To create a project, follow the instructions here.

Step 1: Start Cloud Shell

In this lab you're going to work in a Cloud Shell session, which is a command interpreter hosted by a virtual machine running in Google's cloud. You could just as easily run this section locally on your own computer, but using Cloud Shell gives everyone access to a reproducible experience in a consistent environment. After the lab, you're welcome to retry this section on your own computer.

Authorize cloud shell

Activate Cloud Shell

From the top right of the Cloud Console, click the button below to Activate Cloud Shell:

Activate Cloud Shell

If you've never started Cloud Shell before, you're presented with an intermediate screen (below the fold) describing what it is. If that's the case, click Continue (and you won't ever see it again). Here's what that one-time screen looks like:

Cloud Shell setup

It should only take a few moments to provision and connect to Cloud Shell.

Cloud Shell init

This virtual machine is loaded with all the development tools you need. It offers a persistent 5GB home directory and runs in Google Cloud, greatly enhancing network performance and authentication. Much, if not all, of your work in this codelab can be done with simply a browser or your Chromebook.

Once connected to Cloud Shell, you should see that you are already authenticated and that the project is already set to your project ID.

Run the following command in Cloud Shell to confirm that you are authenticated:

gcloud auth list

You should see something like this in the command output:

Cloud Shell output

Run the following command in Cloud Shell to confirm that the gcloud command knows about your project:

gcloud config list project

Command output

[core]
project = <PROJECT_ID>

If it is not, you can set it with this command:

gcloud config set project <PROJECT_ID>

Command output

Updated property [core/project].

Cloud Shell has a few environment variables, including GOOGLE_CLOUD_PROJECT which contains the name of our current Cloud project. We'll use this in various places throughout this lab. You can see it by running:

echo $GOOGLE_CLOUD_PROJECT

Step 2: Enable APIs

In later steps, you'll see where these services are needed (and why), but for now, run this command to give your project access to the Compute Engine, Container Registry, and Vertex AI services:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

This should produce a successful message similar to this one:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

Step 3: Create a Cloud Storage Bucket

To run a training job on Vertex AI, we'll need a storage bucket to store our saved model assets. The bucket needs to be regional. We're using us-central here, but you are welcome to use another region (just replace it throughout this lab). If you already have a bucket you can skip this step.

Run the following commands in your Cloud Shell terminal to create a bucket:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

Next we'll give our compute service account access to this bucket. This will ensure that Vertex Pipelines has the necessary permissions to write files to this bucket. Run the following command to add this permission:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

Step 4: Create an Vertex Notebooks instance

From the Vertex AI section of your Cloud Console, click on Notebooks:

Vertex AI menu

From there, select New Instance. Then select the TensorFlow Enterprise 2.3 (with LTS) instance type without GPUs:

TFE instance

Use the default options and then click Create.

Step 5: Open your Notebook

Once the instance has been created, select Open JupyterLab:

Open CAIP Notebook

There are a few additional libraries we'll need to install in order to use Vertex Pipelines:

  • Kubeflow Pipelines: This is the SDK we'll be using to build our pipeline. Vertex Pipelines supports running pipelines built with both Kubeflow Pipelines or TFX.
  • Google Cloud Pipeline Components: This library provides pre-built components that make it easier to interact with Vertex AI services from your pipeline steps.

Step 1: Create Python notebook and install libraries

First, from the Launcher menu in your Notebook instance, create a notebook by selecting Python 3:

Create Python3 notebook

You can access the Launcher menu by clicking on the + sign in the top left of your notebook instance.

To install both services we'll be using in this lab, first set the user flag in a notebook cell:

USER_FLAG = "--user"

Then run the following from your notebook:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.4.3 --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components==0.1.6 --upgrade

After installing these packages you'll need to restart the kernel:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

Finally, check that you have correctly installed the packages. The KFP SDK version should be >=1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

Step 2: Set your project ID and bucket

Throughout this lab you'll reference your Cloud project ID and the bucket you created earlier. Next we'll create variables for each of those.

If you don't know your project ID you may be able to get it by running the following:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

Otherwise, set it here:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

Then create a variable to store your bucket name. If you created it in this lab, the following will work. Otherwise, you'll need to set this manually:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

Step 3: Import libraries

Add the following to import the libraries we'll be using throughout this codelab:

from typing import NamedTuple

import kfp
from kfp import dsl
from kfp.v2 import compiler
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component)
from kfp.v2.google.client import AIPlatformClient

from google.cloud import aiplatform
from google.cloud.aiplatform import pipeline_jobs
from google_cloud_pipeline_components import aiplatform as gcc_aip

Step 4: Define constants

The last thing we need to do before building our pipeline is define some constant variables. PIPELINE_ROOT is the Cloud Storage path where the artifacts created by our pipeline will be written. We're using us-central1 as the region here, but if you used a different region when you created your bucket, update the REGION variable in the code below:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

After running the code above, you should see the root directory for your pipeline printed. This is the Cloud Storage location where the artifacts from your pipeline will be written. It will be in the format of gs://YOUR-BUCKET-NAME/pipeline_root/

To get familiar with how Vertex Pipelines works, we'll first create a short pipeline using the KFP SDK. This pipeline doesn't do anything ML related (don't worry, we'll get there!), we're using it to teach you:

  • How to create custom components in the KFP SDK
  • How to run and monitor a pipeline in Vertex Pipelines

We'll create a pipeline that prints out a sentence using two outputs: a product name and an emoji description. This pipeline will consist of three components:

  • product_name: This component will take a product name (or any noun you want really) as input, and return that string as output
  • emoji: This component will take the text description of an emoji and convert it to an emoji. For example, the text code for ✨ is "sparkles". This component uses an emoji library to show you how to manage external dependencies in your pipeline
  • build_sentence: This final component will consume the output of the previous two to build a sentence that uses the emoji. For example, the resulting output might be "Vertex Pipelines is ✨".

Let's start coding!

Step 1: Create a Python function based component

Using the KFP SDK, we can create components based on Python functions. We'll use that for the 3 components in our first pipeline. We'll first build the product_name component, which simply takes a string as input and returns that string. Add the following to your notebook:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

Let's take a closer look at the syntax here:

  • The @component decorator compiles this function to a component when the pipeline is run. You'll use this anytime you write a custom component.
  • The base_image parameter specifies the container image this component will use.
  • The output_component_file parameter is optional, and specifies the yaml file to write the compiled component to. After running the cell you should see that file written to your notebook instance. If you wanted to share this component with someone, you could send them the generated yaml file and have them load it with the following:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • The -> str after the function definition specifies the output type for this component.

Step 2: Create two additional components

To complete our pipeline, we'll create two more components. The first one we'll define takes a string as input, and converts this string to its corresponding emoji if there is one. It returns a tuple with the input text passed, and the resulting emoji:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', use_aliases=True)
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

This component is a bit more complex than our previous one. Let's break down what's new:

  • The packages_to_install paramater tells the component any external library dependencies for this container. In this case, we're using a library called emoji.
  • This component returns a NamedTuple called Outputs. Notice that each of the strings in this tuple have keys: emoji_text and emoji. We'll use these in our next component to access the output.

The final component in this pipeline will consume the output of the first two and combine them to return a string:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

You might be wondering: how does this component know to use the output from the previous steps you defined? Good question! We will tie it all together in the next step.

Step 3: Putting the components together into a pipeline

The component definitions we defined above created factory functions that can be used in a pipeline definition to create steps. To set up a pipeline, use the @dsl.pipeline decorator, give the pipeline a name and description, and provide the root path where your pipeline's artifacts should be written. By artifacts, we mean any output files generated by your pipeline. This intro pipeline doesn't generate any, but our next pipeline will.

In the next block of code we define an intro_pipeline function. This is where we specify the inputs to our initial pipeline steps, and how steps connect to each other:

  • product_task takes a product name as input. Here we're passing "Vertex Pipelines" but you can change this to whatever you'd like.
  • emoji_task takes the text code for an emoji as input. You can also change this to whatever you'd like. For example, "party_face" refers to the 🥳 emoji. Note that since both this and the product_task component don't have any steps that feed input into them, we manually specify the input for these when we define our pipeline.
  • The last step in our pipeline - consumer_task has three input parameters:
    • The output of product_task. Since this step only produces one output, we can reference it via product_task.output.
    • The emoji output of our emoji_task step. See the emoji component defined above where we named the output parameters.
    • Similarly, the emoji_text named output from the emoji component. In case our pipeline is passed text that doesn't correspond with an emoji, it'll use this text to construct a sentence.
@dsl.pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

Step 4: Compile and run the pipeline

With your pipeline defined, you're ready to compile it. The following will generate a JSON file that you'll use to run the pipeline:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

Next, create a TIMESTAMP variable. We'll use this in our job ID:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

Then define your pipeline job:

job = pipeline_jobs.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

Finally, run the job to create a new pipeline execution:

job.run()

After running this cell, you should see logs with a link to view the pipeline run in your console:

Pipeline job logs

Navigate to that link. Your pipeline should look like this when complete:

Completed intro pipeline

This pipeline will take 5-6 minutes to run. When complete, you can click on the build-sentence component to see the final output:

Intro pipeline output

Now that you're familiar with how the KFP SDK and Vertex Pipelines works, you're ready to build a pipeline that creates and deploys an ML model using other Vertex AI services. Let's dive in!

It's time to build your first ML pipeline. In this pipeline, we'll use the UCI Machine Learning Dry beans dataset, from: KOKLU, M. and OZKAN, I.A., (2020), "Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques."In Computers and Electronics in Agriculture, 174, 105507. DOI.

This is a tabular dataset, and in our pipeline we'll use the dataset to train, evaluate, and deploy an AutoML model that classifies beans into one of 7 types based on their characteristics.

This pipeline will:

  • Create a Dataset in Vertex AI
  • Train a tabular classification model with AutoML
  • Get evaluation metrics on this model
  • Based on the evaluation metrics, decide whether to deploy the model using conditional logic in Vertex Pipelines
  • Deploy the model to an endpoint using Vertex Prediction

Each of the steps outlined will be a component. Most of the pipeline steps will use pre-built components for Vertex AI services via the google_cloud_pipeline_components library we imported earlier in this codelab. In this section, we'll define one custom component first. Then, we'll define the rest of the pipeline steps using pre-built components. Pre-built components make it easier to access Vertex AI services, like model training and deployment.

Step 1: A custom component for model evaluation

The custom component we'll define will be used towards the end of our pipeline once model training has completed. This component will do a few things:

  • Get the evaluation metrics from the trained AutoML classification model
  • Parse the metrics and render them in the Vertex Pipelines UI
  • Compare the metrics to a threshold to determine whether the model should be deployed

Before we define the component, let's understand its input and output parameters. As input, this pipeline takes some metadata on our Cloud project, the resulting trained model (we'll define this component later), the model's evaluation metrics, and a thresholds_dict_str. The thresholds_dict_str is something we'll define when we run our pipeline. In the case of this classification model, this will be the area under the ROC curve value for which we should deploy the model. For example, if we pass in 0.95, that means we'd only like our pipeline to deploy the model if this metric is above 95%.

Our evaluation component returns a string indicating whether or not to deploy the model. Add the following in a notebook cell to create this custom component:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tables_eval_component.yaml", # Optional: you can use this to load the component later
    packages_to_install=["google-cloud-aiplatform"],
)
def classif_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Model],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    """This function renders evaluation metrics for an AutoML Tabular classification model.
    It retrieves the classification model evaluation generated by the AutoML Tabular training
    process, does some parsing, and uses that info to render the ROC curve and confusion matrix
    for the model. It also uses given metrics threshold information and compares that to the
    evaluation results to determine whether the model is sufficiently accurate to deploy.
    """
    import json
    import logging

    from google.cloud import aiplatform

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is 
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info(
                        "{} < {}; returning False".format(metrics_dict[k], v)
                    )
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aiplatform.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.uri.replace("aiplatform://v1/", "")
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aiplatform.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

Step 2: Adding Google Cloud pre-built components

In this step we'll define the rest of our pipeline components and see how they all fit together. First, define the display name for your pipeline run using a timestamp:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

Then copy the following into a new notebook cell:

@kfp.dsl.pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classif_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        deploy_op = gcc_aip.ModelDeployOp(  # noqa: F841
            model=training_op.outputs["model"],
            project=project,
            machine_type="n1-standard-4",
        )

Let's see what's happening in this code:

  • First, just as in our previous pipeline, we define the input parameters this pipeline takes. We need to set these manually since they don't depend on the output of other steps in the pipeline.
  • The rest of the pipeline uses a few pre-built components for interacting with Vertex AI services:
    • TabularDatasetCreateOp creates a tabular dataset in Vertex AI given a dataset source either in Cloud Storage or BigQuery. In this pipeline, we're passing the data via a BigQuery table URL
    • AutoMLTabularTrainingJobRunOp kicks off an AutoML training job for a tabular dataset. We pass a few configuration parameters to this component, including the model type (in this case, classification), some data on the columns, how long we'd like to run training for, and a pointer to the dataset. Notice that to pass in the dataset to this component, we're providing the output of the previous component via dataset_create_op.outputs["dataset"]
    • ModelDeployOp deploys a given model to an endpoint in Vertex AI. There are additional configuration options available, but here we're providing the endpoint machine type, project, and model we'd like to deploy. We're passing in the model by accessing the outputs of the training step in our pipeline
  • This pipeline also makes use of conditional logic, a feature of Vertex Pipelines that lets you define a condition, along with different branches based on the result of that condition. Remember that when we defined our pipeline we passed a thresholds_dict_str parameter. This is the accuracy threshold we're using to determine whether to deploy our model to an endpoint. To implement this, we make use of the Condition class from the KFP SDK. The condition we pass in is the output of the custom eval component we defined earlier in this codelab. If this condition is true, the pipeline will continue to execute the deploy_op component. If accuracy doesn't meet our predefined threshold, the pipeline will stop here and won't deploy a model.

Step 3: Compile and run the end-to-end ML pipeline

With our full pipeline defined, it's time to compile it:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

Next, define the job:

ml_pipeline_job = pipeline_jobs.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

And finally, run the job:

ml_pipeline_job.run()

Navigate to the link shown in the logs after running the cell above to see your pipeline in the console. This pipeline will take a little over an hour to run. Most of the time is spent in the AutoML training step. The completed pipeline will look something like this:

Completed AutoML pipeline

If you toggle the "Expand artifacts" button at the top, you'll be able to see details for the different artifacts created from your pipeline. For example, if you click on the dataset artifact, you'll see details on the Vertex AI dataset that was created. You can click the link here to go to the page for that dataset:

Pipeline dataset

Similarly, to see the resulting metric visualizations from our custom evaluation component, click on the artifact called metricsc. On the right side of your dashboard, you'll be able to see the confusion matrix for this model:

Metrics visualization

To see the model and endpoint created from this pipeline run, go to the models section and click on the model named automl-beans. There you should see this model deployed to an endpoint:

Model-endpoint

You can also access this page by clicking on the endpoint artifact in your pipeline graph.

In addition to looking at the pipeline graph in the console, you can also use Vertex Pipelines for Lineage Tracking. By lineage tracking, we mean tracking artifacts created throughout your pipeline. This can help us understand where artifacts were created and how they are being used throughout an ML workflow. For example, to see the lineage tracking for the dataset created in this pipeline, click on the dataset artifact and then View Lineage:

View lineage

This shows us all the places this artifact is being used:

Lineage details

Step 4: Comparing metrics across pipeline runs

If you run this pipeline multiple times, you may want to compare metrics across runs. You can use the aiplatform.get_pipeline_df() method to access run metadata. Here, we'll get metadata for all runs of this pipeline and load it into a Pandas DataFrame:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

With that, you've finished the lab!

🎉 Congratulations! 🎉

You've learned how to use Vertex AI to:

  • Use the Kubeflow Pipelines SDK to build end-to-end pipelines with custom components
  • Run your pipelines on Vertex Pipelines and kick off pipeline runs with the SDK
  • View and analyze your Vertex Pipelines graph in the console
  • Use pre-built pipeline components to add Vertex AI services to your pipeline
  • Schedule recurring pipeline jobs

To learn more about different parts of Vertex, check out the documentation.

So that you're not charged, it is recommended that you delete the resources created throughout this lab.

Step 1: Stop or delete your Notebooks instance

If you'd like to continue using the notebook you created in this lab, it is recommended that you turn it off when not in use. From the Notebooks UI in your Cloud Console, select the notebook and then select Stop. If you'd like to delete the instance entirely, select Delete:

Stop instance

Step 2: Delete your endpoint

To delete the endpoint you deployed, navigate to the Endpoints section of your Vertex AI console and click the delete icon:

Delete endpoint

Then, click Undeploy from the following prompt:

Undeploy model

Finally, navigate to the Models section of your console, find that model, and from the three dot menu on the right, click Delete model:

Delete model

Step 3: Delete your Cloud Storage bucket

To delete the Storage Bucket, using the Navigation menu in your Cloud Console, browse to Storage, select your bucket, and click Delete:

Delete storage