Vertex Pipelines 简介

1. 概览

在本实验中,您将学习如何使用 Vertex Pipelines 创建和运行机器学习流水线。

学习内容

您将了解如何:

  • 使用 Kubeflow Pipelines SDK 构建可扩缩的机器学习流水线
  • 创建并运行包含文本输入的 3 步入门流水线
  • 创建并运行用于训练、评估和部署 AutoML 分类模型的流水线
  • 使用预构建的组件与通过 google_cloud_pipeline_components 库提供的 Vertex AI 服务进行交互
  • 使用 Cloud Scheduler 安排流水线作业

在 Google Cloud 上运行本实验的总费用约为 25 美元

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。

除了模型训练和部署服务之外,Vertex AI 还包括各种 MLOps 产品,包括 Vertex Pipelines(本实验的重点)、模型监控、Feature Store 等。您可以在下图中查看所有 Vertex AI 产品。

Vertex 产品概览

如果您有任何反馈,请参阅支持页面

机器学习流水线有何作用?

在深入了解之前,我们先来了解一下为什么要使用流水线。假设您要构建一个机器学习工作流,其中包含处理数据、训练模型、超参数调节、评估和模型部署。其中每个步骤都可能具有不同的依赖项,如果您将整个工作流视为单体式应用,这些依赖项可能会很难处理。开始扩展机器学习流程时,您可能希望与团队中的其他人共享机器学习工作流,以便他们运行该工作流并贡献代码。如果没有可靠、可重现的过程,这可能会变得非常困难。借助流水线,机器学习流程中的每个步骤都有自己的容器。这样,您就可以独立开发步骤,并以可重现的方式跟踪每个步骤的输入和输出。您还可以根据云环境中的其他事件安排或触发流水线的运行,例如在有新的训练数据时启动流水线运行。

要点:流水线可帮助您自动化重现机器学习工作流。

3. Cloud 环境设置

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启动 Cloud Shell

在本实验中,您将使用 Cloud Shell 会话,这是一个由在 Google 云中运行的虚拟机托管的命令解释器。您可以在自己的计算机本地轻松运行此部分,但借助 Cloud Shell,每个人都可以在一致的环境中获得可重现的体验。完成实验后,您可以在自己的计算机上重试本部分。

为 Cloud Shell 授权

激活 Cloud Shell

在 Cloud 控制台的右上角,点击下方按钮以激活 Cloud Shell

激活 Cloud Shell

如果您以前从未启动过 Cloud Shell,系统会显示一个中间屏幕(非首屏)来介绍 Cloud Shell。如果是这种情况,请点击继续(此后您将不会再看到此通知)。一次性屏幕如下所示:

Cloud Shell 设置

预配和连接到 Cloud Shell 只需花几分钟时间。

Cloud Shell 初始化

这个虚拟机装有您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证。只需使用一个浏览器或 Google Chromebook 即可完成本 Codelab 中的大部分(甚至全部)工作。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的项目 ID:

在 Cloud Shell 中运行以下命令以确认您已通过身份验证:

gcloud auth list

您应该会在命令输出中看到如下内容:

Cloud Shell 输出

在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目:

gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

Cloud Shell 有一些环境变量,其中 GOOGLE_CLOUD_PROJECT 包含我们当前 Cloud 项目的名称。在本实验的各个位置,都会用到这个 ID。您可以通过运行以下命令来查看此信息:

echo $GOOGLE_CLOUD_PROJECT

第 2 步:启用 API

在后续步骤中,您将看到需要这些服务的位置(以及原因),但现在,请运行此命令,为您的项目授予对 Compute Engine、Container Registry 和 Vertex AI 服务的访问权限:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

这将生成类似于以下内容的成功消息:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

第 3 步:创建 Cloud Storage 存储分区

如需在 Vertex AI 上运行训练作业,我们需要一个存储分区来存储我们保存的模型资源。存储分区必须是区域级的。我们在这里使用的是 us-central,但欢迎您使用其他区域(只需在整个实验中替换它即可)。如果您已有存储分区,可以跳过此步骤。

在 Cloud Shell 终端运行以下命令以创建存储分区:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

接下来,我们将为计算服务账号授予对此存储分区的访问权限。这将确保 Vertex Pipelines 具有将文件写入此存储分区所需的权限。运行以下命令以添加此权限:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:

Vertex AI 菜单

然后,在用户管理的笔记本中,点击新建笔记本

创建新的笔记本

然后选择 TensorFlow 企业版 2.3(带 LTS)实例类型,不带 GPU

TFE 实例

使用默认选项,然后点击创建

第 5 步:打开笔记本

创建实例后,选择打开 JupyterLab

打开笔记本

4. Vertex Pipelines 设置

我们还需要安装几个额外的库,才能使用 Vertex Pipelines:

  • Kubeflow 流水线:这是我们将用于构建流水线的 SDK。Vertex Pipelines 支持运行使用 Kubeflow 流水线或 TFX 构建的流水线。
  • Google Cloud 流水线组件:此库提供预构建的组件,让您可以轻松地在流水线步骤中与 Vertex AI 服务进行交互。

第 1 步:创建 Python 笔记本并安装库

首先,在笔记本实例的“启动器”菜单中,选择 Python 3 创建一个笔记本:

创建 Python3 笔记本

您可以通过点击笔记本实例左上角的 + 号来访问启动器菜单。

如需安装我们将在本实验中使用的两项服务,请先在笔记本单元中设置用户标志:

USER_FLAG = "--user"

然后从笔记本运行以下命令:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

安装这些软件包后,您需要重启内核:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最后,检查您是否已正确安装软件包。KFP SDK 版本应不低于 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

第 2 步:设置项目 ID 和存储分区

在本实验中,您需要参考您的 Cloud 项目 ID 和之前创建的存储分区。接下来,我们将为每个元素创建变量。

如果您不知道项目 ID,可以通过运行以下命令来获取:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

否则,请在此处进行设置:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

然后创建一个变量来存储存储分区名称。如果您在本实验中创建了该 API,则以下配置将会起作用。否则,您需要手动进行设置:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

第 3 步:导入库

添加以下代码,以导入我们将在此 Codelab 中使用的库:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

第 4 步:定义常量

在构建流水线之前,我们需要做的最后一件事是定义一些常量变量。PIPELINE_ROOT 是 Cloud Storage 路径,我们的流水线创建的制品将写入其中。我们在此处使用 us-central1 作为区域,但如果您在创建存储分区时使用了其他区域,请更新以下代码中的 REGION 变量:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

运行上述代码后,您应该会看到输出的流水线根目录。这是将写入流水线中的制品的 Cloud Storage 位置。其格式为 gs://YOUR-BUCKET-NAME/pipeline_root/

5. 创建您的第一个流水线

为了熟悉 Vertex Pipelines 的工作原理,我们先使用 KFP SDK 创建一个简短的流水线。此流水线不会执行任何与机器学习相关的操作(别担心,我们马上就能实现!),我们将使用它来教您:

  • 如何在 KFP SDK 中创建自定义组件
  • 如何在 Vertex Pipelines 中运行和监控流水线

我们将创建一个流水线,用于输出包含以下两个输出的句子:商品名称和表情符号说明。此流水线由三个部分组成:

  • product_name:此组件会将商品名称(或您真正需要的任何名词)作为输入,并返回该字符串作为输出
  • emoji:此组件将获取表情符号的文字说明,并将其转换为表情符号。例如,✨ 的文本代码是“sparkles”。此组件使用表情符号库向您展示如何管理流水线中的外部依赖项
  • build_sentence:最后这个组件将使用前两个部分的输出,以构建一个使用该表情符号的句子。例如,生成的输出可能是“Vertex Pipelines is ✨”。

我们开始编码吧!

第 1 步:创建基于 Python 函数的组件

使用 KFP SDK,我们可以根据 Python 函数创建组件。我们将将其用于第一个流水线中的 3 个组件。我们首先构建 product_name 组件,该组件仅接受一个字符串作为输入并返回该字符串。将以下内容添加到笔记本:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

我们来详细了解一下语法:

  • 运行流水线时,@component 修饰器会将此函数编译为组件。每次编写自定义组件时,都要用到这个 ID。
  • base_image 参数指定此组件将使用的容器映像。
  • output_component_file 参数是可选的,用于指定要将已编译组件写入到的 yaml 文件。运行该单元后,您应该会看到该文件写入了您的笔记本实例。如果您希望与他人共享此组件,可以将生成的 yaml 文件发送给对方,让对方通过以下代码加载该组件:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 函数定义后面的 -> str 指定了此组件的输出类型。

第 2 步:创建两个额外的组件

为了完成流水线,我们将再创建两个组件。我们要定义的第一个代码接受一个字符串作为输入,并将此字符串转换为对应的表情符号(如果有)。它会返回一个包含传递的输入文本和生成的表情符号的元组:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

该组件比之前的组件稍微复杂一些。我们来详细了解一下新变化:

  • packages_to_install 参数会告知组件此容器的任何外部库依赖项。在本例中,我们将使用一个名为 emoji 的库。
  • 此组件会返回一个名为 OutputsNamedTuple。请注意,此元组中的每个字符串都有键:emoji_textemoji。我们将在下一个组件中使用它们来访问输出。

此流水线中的最后一个组件将使用前两个部分的输出,并将它们合并以返回一个字符串:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

您可能想知道:该组件如何知道要使用您前面定义的步骤的输出?问的好!我们会在下一步中将其整合在一起。

第 3 步:将组件放在流水线中

我们在上面定义的组件定义创建了工厂函数,这些函数可以在流水线定义中用于创建步骤。如需设置流水线,请使用 @pipeline 修饰器,为流水线指定名称和说明,并提供应写入流水线工件的根路径。我们所说的工件是指流水线生成的所有输出文件。此入门流水线不会生成任何代码,但我们的下一个流水线将会生成。

在下一个代码块中,我们定义一个 intro_pipeline 函数。我们在这里指定初始流水线步骤的输入,以及各个步骤如何相互连接:

  • product_task 接受商品名称作为输入。这里我们将“Vertex Pipelines”但您可以根据需要进行更改
  • emoji_task 接受表情符号的文本代码作为输入。您也可以根据需要将其更改为任何值。例如:“party_face”是指 🥳? 表情符号。请注意,由于此组件和 product_task 组件都没有向其提供输入的任何步骤,因此我们在定义流水线时手动为其指定输入。
  • 流水线中的最后一步 - consumer_task 有三个输入参数:
    • product_task 的输出。由于此步骤仅生成一个输出,因此我们可以通过 product_task.output 引用该输出。
    • emoji_task 步骤的 emoji 输出。请参阅上面定义的 emoji 组件,其中我们为输出参数命名。
    • 同样,emoji 组件中的 emoji_text 具名输出。如果管道中传递的文本与表情符号不对应,它将使用此文本来构建句子。
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

第 4 步:编译并运行流水线

定义流水线后,您就可以对其进行编译了。以下命令将生成一个用于运行流水线的 JSON 文件:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

接下来,创建一个 TIMESTAMP 变量。我们将在作业 ID 中使用此名称:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

然后定义您的流水线作业:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

最后,运行该作业以创建新的流水线执行作业:

job.submit()

运行此单元后,您应该会看到日志,其中包含可在控制台中查看流水线运行的链接:

流水线作业日志

前往该链接。完成后,您的流水线应如下所示:

已完成的简介流水线

此流水线将运行 5-6 分钟。完成后,您可以点击 build-sentence 组件以查看最终输出:

简介流水线输出

现在,您已熟悉 KFP SDK 和 Vertex Pipelines 的工作原理,可以开始构建流水线,使用其他 Vertex AI 服务创建和部署机器学习模型。现在就开始吧!

6. 创建端到端机器学习流水线

现在该构建您的第一个机器学习流水线了。在本流水线中,我们将使用 UCI 机器学习干豆数据集,数据集为:KOKLU, M. 和 OZKAN, I.A.,(2020),“Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques”。”。DOI

这是一个表格数据集。在我们的流水线中,我们将使用该数据集来训练、评估和部署一个 AutoML 模型,该模型会根据豆子的特征将豆子分为 7 种类型之一。

此流水线将执行以下操作:

  • 在 中创建数据集
  • 使用 AutoML 训练表格分类模型
  • 获取此模型的评估指标
  • 根据评估指标,决定是否使用 Vertex Pipelines 中的条件逻辑部署模型
  • 使用 Vertex Prediction 将模型部署到端点

概述的每个步骤都是一个组件。大多数流水线步骤将通过我们之前在此 Codelab 中导入的 google_cloud_pipeline_components 库,使用适用于 Vertex AI 服务的预构建组件。在本部分中,我们先定义一个自定义组件。然后,我们将使用预构建的组件定义其余的流水线步骤。预构建的组件让您可以更轻松地访问 Vertex AI 服务,例如模型训练和部署。

第 1 步:用于模型评估的自定义组件

模型训练完成后,我们要定义的自定义组件将在流水线即将结束时使用。此组件将执行以下任务:

  • 从经过训练的 AutoML 分类模型中获取评估指标
  • 解析指标并在 Vertex Pipelines 界面中呈现它们
  • 将指标与阈值进行比较,以确定是否应部署模型

在定义组件之前,我们先了解一下其输入和输出参数。此流水线将 Cloud 项目的一些元数据、生成的训练模型(稍后我们将定义此组件)、模型的评估指标和 thresholds_dict_str 作为输入。我们将在运行流水线时定义 thresholds_dict_str。对于此分类模型,这就是我们应部署模型的 ROC 曲线下面积。例如,如果我们传入 0.95,则表示我们只希望在该指标高于 95% 时部署模型。

我们的评估组件会返回一个字符串,指示是否部署模型。在笔记本单元中添加以下代码,以创建此自定义组件:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

第 2 步:添加 Google Cloud 预构建组件

在此步骤中,我们将定义其余流水线组件,并了解它们如何协同工作。首先,使用时间戳定义流水线运行的显示名:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

然后,将以下内容复制到新的笔记本单元中:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

我们来看看此代码发生了什么:

  • 首先,与前面的流水线一样,我们定义此流水线采用的输入参数。我们需要手动设置这些变量,因为它们不依赖于流水线中其他步骤的输出。
  • 流水线的其余部分使用一些预构建的组件来与 Vertex AI 服务交互:
    • TabularDatasetCreateOp 在给定 Cloud Storage 或 BigQuery 中的数据集来源的情况下,在 Vertex AI 中创建表格数据集。在此流水线中,我们通过 BigQuery 表网址传递数据
    • AutoMLTabularTrainingJobRunOp 用于针对表格数据集启动 AutoML 训练作业。我们向此组件传递一些配置参数,包括模型类型(在本例中为分类)、列上的一些数据、训练运行时长,以及指向数据集的指针。请注意,为了将数据集传递给此组件,我们需要通过 dataset_create_op.outputs["dataset"] 提供前一个组件的输出
    • EndpointCreateOp 用于在 Vertex AI 中创建端点。此步骤中创建的端点将作为输入传递给下一个组件
    • ModelDeployOp 将给定模型部署到 Vertex AI 中的端点。在本例中,我们将使用上一步中创建的端点。还有其他可用的配置选项,但我们在这里提供我们想要部署的端点机器类型和模型。我们通过访问流水线中训练步骤的输出来传入模型
  • 此流水线还利用了条件逻辑,这是 Vertex Pipelines 的一项功能,可让您根据该条件的结果定义一个条件以及不同的分支。请注意,我们在定义流水线时传递了 thresholds_dict_str 参数。这是我们用于确定是否将模型部署到端点的准确率阈值。为了实现这一点,我们使用 KFP SDK 中的 Condition 类。我们传入的条件是我们之前在此 Codelab 中定义的自定义评估组件的输出。如果此条件为 true,流水线将继续执行 deploy_op 组件。如果准确率没有达到我们预定义的阈值,流水线将在此处停止,并且不会部署模型。

第 3 步:编译并运行端到端机器学习流水线

定义完整流水线后,就可以开始编译它了:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

接下来,定义作业:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

最后,运行该作业:

ml_pipeline_job.submit()

运行上述单元后,前往日志中显示的链接,在控制台中查看您的流水线。运行此流水线需要一个多小时的时间。大部分时间都用在 AutoML 训练步骤上。完成后的流水线将如下所示:

已完成 AutoML 流水线

将“展开工件”按钮,您可以看到从流水线创建的不同工件的详细信息。例如,如果您点击 dataset 工件,则会看到已创建的 Vertex AI 数据集的详细信息。您可以点击此处的链接转到该数据集的页面:

流水线数据集

同样,要查看自定义评估组件生成的指标可视化效果,请点击名为 metricsc 的工件。在信息中心的右侧,您可以看到此模型的混淆矩阵:

指标可视化

如需查看通过此流水线运行创建的模型和端点,请转到模型部分,然后点击名为 automl-beans 的模型。此时,您应该会看到此模型已部署到端点:

模型端点

您还可以点击流水线图中的端点工件来访问此页面。

除了在控制台中查看流水线图之外,您还可以使用 Vertex Pipelines 进行沿袭跟踪。我们所说的沿袭跟踪是指跟踪整个流水线中创建的工件。这有助于我们了解工件的创建位置以及在整个机器学习工作流中的使用方式。例如,如需查看在此流水线中创建的数据集的沿袭跟踪,请点击数据集工件,然后点击查看沿袭

查看沿袭

这会显示使用此工件的所有位置:

沿袭详情

第 4 步:比较不同流水线运行作业的指标

如果您多次运行此流水线,则可能需要比较各次运行的指标。您可以使用 aiplatform.get_pipeline_df() 方法访问运行作业元数据。在这里,我们将获取此流水线所有运行的元数据,并将其加载到 Pandas DataFrame 中:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

至此,您已完成本实验!

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 使用 Kubeflow Pipelines SDK 构建具有自定义组件的端到端流水线
  • 在 Vertex Pipelines 上运行流水线并使用 SDK 启动流水线运行
  • 在控制台中查看和分析 Vertex Pipelines 图
  • 使用预构建的流水线组件将 Vertex AI 服务添加到流水线
  • 安排周期性流水线作业

如需详细了解 Vertex 的不同部分,请参阅相关文档

7. 清理

为了不向您收费,因此建议您删除在整个实验过程中创建的资源。

第 1 步:停止或删除 Notebooks 实例

如果您想继续使用在本实验中创建的笔记本,建议您在不使用时将其关闭。在 Cloud 控制台的 Notebooks 界面中,选择相应笔记本,然后选择停止。如果您想完全删除实例,请选择删除

可停止实例

第 2 步:删除端点

如需删除已部署的端点,请前往 Vertex AI 控制台的端点部分,然后点击删除图标:

删除端点

然后,在看到以下提示时点击取消部署

取消部署模型

最后,导航到控制台的模型部分,找到该模型,然后点击右侧的三点状菜单中的删除模型

删除模型

第 3 步:删除 Cloud Storage 存储分区

如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”:

删除存储空间