Vertex Pipelines 简介

1. 概览

在本实验中,您将学习如何使用 Vertex Pipelines 创建和运行机器学习流水线。

学习内容

您将了解如何:

  • 使用 Kubeflow Pipelines SDK 构建可扩缩的机器学习流水线
  • 创建并运行一个三步式入门级流水线来接收文本输入
  • 创建并运行一个流水线来训练、评估和部署 AutoML 分类模型
  • 使用通过 google_cloud_pipeline_components 库提供的预构建组件与 Vertex AI 服务交互
  • 使用 Cloud Scheduler 安排流水线作业

在 Google Cloud 上运行此实验的总费用约为 25 美元

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。

除了模型训练和部署服务之外,Vertex AI 还包含各种 MLOps 产品,包括 Vertex Pipelines(本实验的重点)、Model Monitoring、Feature Store 等。您可以在下图中查看所有 Vertex AI 产品。

Vertex 产品概览

如果您有任何反馈,请参阅支持页面

机器学习流水线有哪些优势?

在深入探讨之前,先来了解一下为什么要使用流水线。假设您正在构建一个机器学习工作流,该工作流中包含数据处理、模型训练、超参数调优、评估和模型部署步骤。每个步骤可能有不同的依赖项,如果将整个工作流作为一个单体式应用来处理,可能会变得难以管理。在开始扩展机器学习流程时,您可能需要与团队中的其他成员共享您的机器学习工作流,以便其运行该工作流并贡献代码。但如果没有可靠且可重复的流程,将很难做到这一点。有了流水线,机器学习流程中的每个步骤都是其各自的容器。这样您就能独立开发各个步骤,并以可重复的方式跟踪每个步骤的输入和输出。您还可以基于云环境中的其他事件(比如有新训练数据可用时)来安排或触发流水线的运行。

总结:流水线有助于自动执行重复执行机器学习工作流。

3. 云环境设置

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启动 Cloud Shell

在本实验中,您将使用 Cloud Shell 会话,这是一个由在 Google 云中运行的虚拟机托管的命令解释器。您也可以在自己的计算机上本地运行本部分,但使用 Cloud Shell 可让每个人在一致的环境中获得可重现的体验。完成本实验后,欢迎您在自己的计算机上重试本部分。

为 Cloud Shell 提供授权

激活 Cloud Shell

在 Cloud 控制台的右上角,点击下方按钮以激活 Cloud Shell

激活 Cloud Shell

如果您以前从未启动过 Cloud Shell,将看到一个中间屏幕(非首屏),描述它是什么。如果是这种情况,请点击继续(您将永远不会再看到它)。一次性屏幕如下所示:

Cloud Shell 设置

预配和连接到 Cloud Shell 只需花几分钟时间。

Cloud Shell 初始化

这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 中运行,大幅提高了网络性能和身份验证效率。只需使用一个浏览器或 Google Chromebook 即可完成本 Codelab 中的大部分(甚至全部)工作。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的项目 ID:

在 Cloud Shell 中运行以下命令以确认您已通过身份验证:

gcloud auth list

您应该会在命令输出中看到类似以下内容:

Cloud Shell 输出

在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目:

gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

Cloud Shell 有几个环境变量,包括 GOOGLE_CLOUD_PROJECT,其中包含当前云项目的名称。在本实验中,我们将在多个地方使用此变量。您可以通过运行以下命令查看:

echo $GOOGLE_CLOUD_PROJECT

第 2 步:启用 API

在后续步骤中,您将了解在哪些情况下需要这些服务(以及原因),但现在,请运行以下命令,以授予您的项目对 Compute Engine、Container Registry 和 Vertex AI 服务的访问权限:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

这应该会生成类似如下内容的成功消息:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

第 3 步:创建 Cloud Storage 存储分区

如需在 Vertex AI 上运行训练作业,我们需要一个存储分区来存储已保存的模型资产。存储分区必须是区域级存储分区。我们在此处使用的是 us-central,但您也可以使用其他区域(只需在本实验中将其替换掉即可)。如果您已有存储分区,则可以跳过此步骤。

在 Cloud Shell 终端中运行以下命令以创建存储分区:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

接下来,我们将向计算服务账号授予对此存储分区的访问权限。这样可确保 Vertex Pipelines 拥有将文件写入此存储分区所需的权限。运行以下命令可添加此权限:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:

Vertex AI 菜单

然后,在用户管理的笔记本中,点击新建笔记本

新建笔记本

然后选择不带 GPUTensorFlow 企业版 2.3(提供长期支持)实例类型:

TFE 实例

使用默认选项,然后点击创建

第 5 步:打开笔记本

创建实例后,选择打开 JupyterLab

打开笔记本

4. Vertex Pipelines 设置

若要使用 Vertex Pipelines,我们还需要额外安装几个库:

  • Kubeflow Pipelines:此 SDK 将用于构建流水线。Vertex Pipelines 支持运行使用 Kubeflow Pipelines 或 TFX 构建的流水线。
  • Google Cloud 流水线组件:此库可提供预构建的组件,可让您更轻松地在流水线步骤中与 Vertex AI 服务进行交互。

第 1 步:创建 Python 笔记本并安装库

首先,在笔记本实例页面中,从“启动器”菜单中选择 Python 3 以创建笔记本:

创建 Python3 笔记本

若要访问“启动器”菜单,请点击笔记本实例页面左上角的加号 (+)。

如需安装本实验中要使用的两项服务,请先在笔记本单元中设置用户标志:

USER_FLAG = "--user"

然后,从笔记本中运行以下命令:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

安装这些软件包后,需要重启内核:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最后,检查您是否正确安装了软件包。KFP SDK 应为 1.8 或更高版本:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

第 2 步:设置项目 ID 和存储桶

在本实验中,您将全程引用之前创建的云项目 ID 和存储分区。接下来,我们将为每个变量创建变量。

如果不知道项目 ID,可运行以下命令来获取:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

否则,请在此处进行设置:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

然后,创建一个变量来存储您的存储分区名称。如果您是在本实验中创建的,则以下操作会奏效。否则,您需要手动设置此项:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

第 3 步:导入库

添加以下命令,以导入我们将在整个 Codelab 中使用的库:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

第 4 步:定义常量

在构建流水线之前,我们需要做的最后一件事是定义一些常量变量。PIPELINE_ROOT 是一个 Cloud Storage 路径,流水线创建的制品将写入该路径。这里我们使用的区域是 us-central1,如果您在创建存储分区时使用了其他区域,请运行以下代码来更新 REGION 变量:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

运行上述代码后,您应该会看到输出的流水线根目录。来自流水线的制品将写入此 Cloud Storage 位置。它将采用 gs://YOUR-BUCKET-NAME/pipeline_root/ 格式

5. 创建第一个流水线

为了熟悉 Vertex Pipelines 的运作方式,我们将首先使用 KFP SDK 创建一个较短的流水线。此流水线不涉及任何与机器学习相关的内容(别担心,我们很快就会学到相关内容!),我们使用此流水线是为了教您:

  • 如何在 KFP SDK 中创建自定义组件
  • 如何在 Vertex Pipelines 中运行和监控流水线

我们将创建一个流水线来输出由两个输出(商品名称和表情符号说明)组成的句子。此流水线由三个组件组成:

  • product_name:此组件将接收商品名称(或您想要的任何名词)作为输入,并返回该字符串作为输出
  • emoji:此组件将接收表情符号的文本说明,并将其转换为表情符号。例如,✨ 的文本代码为“sparkles”。此组件使用表情符号库来告诉您如何管理流水线中的外部依赖项
  • build_sentence:这是最后一个组件,它将使用前两个组件的输出生成一个使用表情符号的句子。例如,可能会生成“Vertex Pipelines is ✨”之类的输出。

让我们开始编码吧!

第 1 步:创建基于 Python 函数的组件

利用 KFP SDK,我们可以根据 Python 函数来创建组件。我们将使用该数据来创建第一个流水线中的 3 个组件。我们将首先构建 product_name 组件,该组件仅接收字符串作为输入,并返回该字符串。将以下代码添加到笔记本中:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

我们来详细了解一下此处的语法:

  • 流水线运行时,@component 修饰器会将此函数编译为一个组件。您每次编写自定义组件时都会用到它。
  • base_image 参数用于指定此组件将使用的容器映像。
  • output_component_file 是可选参数,用于指定要在其中写入已编译组件的 yaml 文件。运行此单元后,您应该会看到该文件已写入您的笔记本实例中。如果您想与其他人共享此组件,可以将生成的 yaml 文件发送给相关人员,并让他们运行以下命令来加载该文件:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 函数定义后面的 -> str 用于指定此组件的输出类型。

第 2 步:创建两个附加组件

若要构建完整的流水线,我们还需要创建两个组件。我们将定义的第一个组件接收字符串作为输入,并将该字符串转换为对应的表情符号(如有)。它将返回一个元组,其中包含所传递的输入文本以及生成的表情符号:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

此组件比上一个组件复杂一些。我们来详细了解一下新变化:

  • packages_to_install 参数用于向组件指明此容器的所有外部库依赖项。在本例中,我们使用的是名为 emoji 的库。
  • 此组件返回一个名为 OutputsNamedTuple。请注意,此元组中的每个字符串都有自己的键:emoji_textemoji。我们将在下一个组件中使用它们来获取输出。

此流水线中的最后一个组件将使用前两个组件的输出,并将它们合并后返回一个字符串:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

您可能会问:此组件如何知道要使用前几个步骤定义的输出呢?这个问题问得好!我们将在下一步中将所有这些组件相互关联。

第 3 步:将组件合并成一个流水线

前面定义的组件定义创建了工厂函数,这些函数可用于在流水线定义中创建步骤。如需设置流水线,请使用 @pipeline 修饰器,为流水线命名并提供相关的说明,同时提供应向其中写入流水线制品的根路径。制品是指流水线生成的任何输出文件。此入门级流水线不会生成任何制品,但下一个流水线会生成制品。

在下一个代码块中,我们将定义一个 intro_pipeline 函数。我们在此处指定初始流水线步骤的输入,以及如何关联各个步骤:

  • product_task 接收商品名称作为输入。此处我们传递的是“Vertex Pipelines”,您也可以将其更改为任何其他产品名称。
  • emoji_task 接收表情符号的文本代码作为输入。您也可以将其更改为其他值。例如,“party_face”是指 🥳 表情符号。请注意,此组件和 product_task 组件都没有任何可提供输入的步骤,因此在定义流水线时,我们需要手动为它们指定输入。
  • 此流水线的最后一个步骤是 consumer_task,它有三个输入参数:
    • product_task 的输出。由于该步骤仅生成一个输出,因此我们可以通过 product_task.output 来引用它。
    • emoji_task 步骤的 emoji 输出。请参阅前面定义的 emoji 组件,我们在其中命名了输出参数。
    • 同样,还有 emoji 组件的名为 emoji_text 的输出。如果传递给流水线的文本没有对应的表情符号,流水线将使用此文本来构造句子。
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

第 4 步:编译并运行流水线

流水线已经定义完毕,您可以开始编译了。以下命令将生成一个用于运行流水线的 JSON 文件:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

接下来,创建一个 TIMESTAMP 变量。我们将在作业 ID 中使用此值:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

然后定义流水线作业:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

最后,运行作业以创建新的流水线执行:

job.submit()

运行此单元后,您应该会看到日志,其中包含一个链接,供您在控制台中查看流水线运行作业:

流水线作业日志

前往该链接。完成后的流水线应如下所示:

已完成的简介流水线

此流水线需要 5-6 分钟时间才能运行完毕。完成后,您可以点击 build-sentence 组件查看最终输出:

简介流水线输出

至此,您已经熟悉了 KFP SDK 以及 Vertex Pipelines 的运作方式,接下来要构建一个流水线来使用其他 Vertex AI 服务创建和部署机器学习模型。下面我们就来深入探讨一下。

6. 创建端到端机器学习流水线

现在来构建您的第一个机器学习流水线。在此流水线中,我们将使用“UCI Machine Learning 干豆数据集”,该数据集来自 KOKLU, M. 和 OZKAN, I.A.于 2020 年在《Computers and Electronics in Agriculture》上发表的“Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques.”(利用计算机视觉和机器学习技术对干豆进行多类别分类)一文 (174, 105507. DOI

这是一个表格式数据集。在此流水线中,我们将使用该数据集来训练、评估和部署一个 AutoML 模型。该模型会根据豆子特征将不同豆子分为 7 种类型之一。

此流水线将执行以下步骤:

  • 在 中创建数据集
  • 使用 AutoML 训练表格分类模型
  • 获取此模型的评估指标
  • 根据评估指标,决定是否在 Vertex Pipelines 中使用条件逻辑来部署此模型
  • 使用 Vertex Prediction 将模型部署到端点

上述每个步骤都是一个组件。大部分流水线步骤都将使用为 Vertex AI 服务预构建的组件,这些组件通过我们之前在本 Codelab 中导入的 google_cloud_pipeline_components 库提供。在本部分,我们先来定义一个自定义组件。然后再使用预构建的组件定义其余的流水线步骤。借助预构建的组件,您可以更轻松地访问模型训练和部署等 Vertex AI 服务。

第 1 步:用于评估模型的自定义组件

我们定义的自定义组件将在流水线的最后阶段,也就是模型训练完成之后使用。此组件将执行以下几个步骤:

  • 获取经过训练的 AutoML 分类模型的评估指标
  • 解析指标并将它们呈现在 Vertex Pipelines 界面中
  • 将指标与阈值进行比较,以确定是否应该部署该模型

在定义此组件之前,我们先了解一下它的输入和输出参数。此流水线将接收以下内容作为输入:我们的云项目的一些元数据、生成的训练模型(稍后定义这一组件)、模型的评估指标以及 thresholds_dict_strthresholds_dict_str 是我们将在运行流水线时指定的参数。在此分类模型中,它代表的是 ROC 曲线下面积的值,我们将根据此值来部署模型。例如,若我们输入 0.95,即表示我们希望流水线仅在该指标高于 95% 时才部署模型。

我们的评估组件将返回一个字符串,以指明是否部署该模型。将以下代码添加到笔记本单元中,以创建此自定义组件:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

第 2 步:添加 Google Cloud 预构建组件

在此步骤中,我们将定义其余的流水线组件并查看它们搭配使用的情况。首先,使用时间戳来定义流水线运行作业的显示名称:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

然后,将以下代码复制到新的笔记本单元中:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

我们来看看这段代码中发生了什么:

  • 首先,与我们之前的流水线一样,我们定义了此流水线接收的输入参数。这些输入参数不依赖于流水线中其他步骤的输出,因此需要我们手动设置。
  • 此流水线的剩余部分将使用几个预构建的组件来与 Vertex AI 服务交互:
    • TabularDatasetCreateOp 会根据 Cloud Storage 或 BigQuery 中的数据集来源,在 Vertex AI 中创建表格式数据集。在此流水线中,我们通过 BigQuery 表的网址来传递数据
    • AutoMLTabularTrainingJobRunOp 会为表格式数据集启动 AutoML 训练作业。我们向此组件传递了几个配置参数,包括模型类型(本例中为分类)、一些列数据、我们希望训练运行的时长,以及指向数据集的指针。请注意,为了将数据集传递给此组件,我们通过 dataset_create_op.outputs["dataset"] 提供了上一个组件的输出
    • EndpointCreateOp 会在 Vertex AI 中创建一个端点。在此步骤中创建的端点将作为输入传递给下一个组件
    • ModelDeployOp 会将给定模型部署到 Vertex AI 中的端点上。在本示例中,我们将使用上一步中创建的端点。该产品还有一些其他配置选项可用,不过我们在本例中提供了端点机器类型以及要部署的模型。我们通过访问流水线中训练步骤的输出传入了模型
  • 此流水线还使用了条件逻辑。这是 Vertex Pipelines 的一个功能,可用于定义条件以及基于该条件的结果的不同分支。请注意,在定义流水线时,我们传递了一个 thresholds_dict_str 参数。此参数是准确率阈值,将用于确定是否将模型部署到端点。为了实现这一点,我们使用了 KFP SDK 中的 Condition 类。我们传入的条件是之前在本 Codelab 中定义的自定义评估组件的输出。如果此条件为 true,则流水线将会继续执行 deploy_op 组件。如果准确率未达到预定义的阈值,则流水线将在此处停止运行,并且不会部署模型。

第 3 步:编译并运行端到端机器学习流水线

至此整个流水线定义完成,接下来对它进行编译:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

接下来,定义作业:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

最后,运行作业:

ml_pipeline_job.submit()

运行上面的单元后,前往日志中显示的链接,在控制台中查看您的流水线。运行此流水线所需的时间为 1 个小时多一点。其中大部分时间都花在了 AutoML 训练步骤上。完成的流水线如下所示:

已完成的 AutoML 流水线

切换顶部的“展开制品”按钮,即可查看流水线中创建的各个制品的详细信息。例如,点击 dataset 制品,将会看到所创建的 Vertex AI 数据集的详细信息。您可以点击此处的链接,以跳转到该数据集的页面:

流水线数据集

同理,若想查看自定义评估组件生成的指标可视化图表,只需点击名为 metricsc 的制品。在信息中心右侧,您将看到此模型的混淆矩阵:

指标可视化

如需查看此流水线运行后创建的模型和端点,请前往模型部分,然后点击名为 automl-beans 的模型。之后,您应该会看到此模型已部署到端点上:

模型端点

您也可以点击流水线图表中的 endpoint 制品来访问此页面。

除了在控制台中查看流水线图表外,您还可以使用 Vertex Pipelines 来执行沿袭跟踪。沿袭跟踪是指跟踪在整个流水线中创建的制品。这有助于我们了解制品的创建位置,以及它们在整个机器学习工作流中的使用情况。例如,若想查看在此流水线中创建的数据集的沿袭跟踪,只需点击 dataset 制品,然后点击查看沿袭

查看沿袭

下图显示了使用此制品的所有位置:

沿袭详情

第 4 步:比较每次流水线运行的指标

如果您运行了此流水线多次,可能想要比较一下这几次运行的指标。您可以使用 aiplatform.get_pipeline_df() 方法来获取运行元数据。这里,我们将获取此流水线每次运行的元数据,并将其加载到 Pandas DataFrame 中:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

至此,您已完成本实验!

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 使用 Kubeflow Pipelines SDK 构建具有自定义组件的端到端流水线
  • 在 Vertex Pipelines 上运行流水线,并使用 SDK 启动流水线运行
  • 在控制台中查看和分析 Vertex Pipelines 图
  • 使用预构建的流水线组件将 Vertex AI 服务添加到流水线中
  • 安排周期性流水线作业

如需详细了解 Vertex 的不同部分,请参阅相关文档

7. 清理

为避免产生费用,建议您删除在本实验中创建的资源。

第 1 步:停止或删除 Notebooks 实例

如果您想继续使用在本实验中创建的笔记本,建议您在不使用时将其关闭。在 Cloud 控制台的笔记本界面中,选择笔记本,然后选择停止。如果您想完全删除该实例,请选择删除

可停止实例

第 2 步:删除端点

如需删除已部署的端点,请前往 Vertex AI 控制台的端点部分,然后点击删除图标:

删除端点

然后,在以下提示中点击取消部署

取消部署模型

最后,前往控制台的模型部分,找到相应模型,然后从右侧的三点状菜单中点击删除模型

删除模型

第 3 步:删除 Cloud Storage 存储分区

如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”:

删除存储空间