Vertex Pipelines 简介

1. 概览

在本实验中,您将学习如何使用 Vertex Pipelines 创建和运行机器学习流水线。

学习内容

您将了解如何:

  • 使用 Kubeflow Pipelines SDK 构建可扩缩的机器学习流水线
  • 创建并运行一个接受文本输入的 3 步引导流水线
  • 创建并运行用于训练、评估和部署 AutoML 分类模型的流水线
  • 使用通过 google_cloud_pipeline_components 库提供的预构建组件与 Vertex AI 服务交互
  • 使用 Cloud Scheduler 安排流水线作业

在 Google Cloud 上运行本实验的总费用约为 25 美元

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。

除了模型训练和部署服务之外,Vertex AI 还包括各种 MLOps 产品,包括 Vertex Pipelines(本实验的重点)、模型监控、Feature Store 等。您可以在下图中查看所有 Vertex AI 产品。

Vertex 产品概览

如果您有任何反馈,请参阅支持页面

机器学习流水线有何用途?

在深入了解之前,我们先来了解一下为什么要使用流水线。假设您要构建一个机器学习工作流,其中包括处理数据、训练模型、超参数调优、评估和模型部署。其中每个步骤都可能具有不同的依赖项,如果您将整个工作流视为单体式应用,这些依赖项可能会很难处理。当您开始扩展机器学习流程时,可能需要与团队中的其他人分享您的机器学习工作流,以便他们运行该工作流并贡献代码。如果没有可靠且可重现的流程,这可能会很困难。借助流水线,机器学习流程中的每个步骤都有自己的容器。这样,您就可以独立开发步骤,并以可重现的方式跟踪每个步骤的输入和输出。您还可以根据 Cloud 环境中的其他事件安排或触发流水线运行,例如在有新训练数据可用时启动流水线运行。

要点:流水线可帮助您自动化重现机器学习工作流。

3. 云环境设置

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启动 Cloud Shell

在本实验中,您将在 Cloud Shell 会话中工作,该会话是由在 Google 云端运行的虚拟机托管的命令解释器。您也可以在自己的计算机上本地运行本部分内容,但使用 Cloud Shell 可让所有人都能在一致的环境中获得可重现的体验。实验结束后,您可以随时在自己的计算机上重试本部分。

为 Cloud Shell 授权

激活 Cloud Shell

在 Cloud 控制台的右上角,点击下方按钮以激活 Cloud Shell

激活 Cloud Shell

如果您以前从未启动过 Cloud Shell,将看到一个中间屏幕(在折叠下面),描述它是什么。如果是这种情况,请点击继续(您将永远不会再看到它)。一次性屏幕如下所示:

Cloud Shell 设置

预配和连接到 Cloud Shell 只需花几分钟时间。

Cloud Shell init

这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证。只需使用一个浏览器或 Google Chromebook 即可完成本 Codelab 中的大部分(甚至全部)工作。

在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的项目 ID:

在 Cloud Shell 中运行以下命令以确认您已通过身份验证:

gcloud auth list

您应该会在命令输出中看到类似以下内容:

Cloud Shell 输出

在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目:

gcloud config list project

命令输出

[core]
project = <PROJECT_ID>

如果不是上述结果,您可以使用以下命令进行设置:

gcloud config set project <PROJECT_ID>

命令输出

Updated property [core/project].

Cloud Shell 有几个环境变量,其中包括 GOOGLE_CLOUD_PROJECT,其中包含当前 Cloud 项目的名称。我们将在本实验的各个部分使用它。您可以通过运行以下命令来查看此信息:

echo $GOOGLE_CLOUD_PROJECT

第 2 步:启用 API

在后续步骤中,您将了解需要在哪些位置使用这些服务(以及原因),但目前,请运行以下命令,为您的项目授予对 Compute Engine、Container Registry 和 Vertex AI 服务的访问权限:

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

这应该会生成类似以下内容的成功消息:

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

第 3 步:创建 Cloud Storage 存储分区

如需在 Vertex AI 上运行训练作业,我们需要一个存储分区来存储我们保存的模型资源。存储分区必须是区域级存储分区。我们在此处使用的是 us-central,但您也可以使用其他区域(只需在本实验中将其替换即可)。如果您已有存储分区,则可以跳过此步骤。

在 Cloud Shell 终端运行以下命令以创建存储分区:

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

接下来,我们将向计算服务账号授予对此存储分区的访问权限。这将确保 Vertex Pipelines 具有将文件写入此存储分区所需的权限。运行以下命令以添加此权限:

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:

Vertex AI 菜单

然后,在用户管理的笔记本中,点击新建笔记本

新建笔记本

然后,选择不带 GPU 的 TensorFlow 企业版 2.3(提供长期支持)实例类型:

TFE 实例

使用默认选项,然后点击创建

第 5 步:打开您的记事本

创建实例后,选择打开 JupyterLab

打开笔记本

4. Vertex Pipelines 设置

我们还需要安装一些其他库才能使用 Vertex Pipelines:

  • Kubeflow Pipelines:这是我们将用于构建流水线的 SDK。Vertex Pipelines 支持运行使用 Kubeflow 流水线或 TFX 构建的流水线。
  • Google Cloud 流水线组件:此库提供预构建的组件,让您可以轻松地在流水线步骤中与 Vertex AI 服务进行交互。

第 1 步:创建 Python 笔记本并安装库

首先,在笔记本实例的“启动器”菜单中,选择 Python 3 以创建笔记本:

创建 Python3 笔记本

点击笔记本实例左上角的 + 号,即可访问启动器菜单。

如需安装我们将在本实验中使用的这两项服务,请先在笔记本单元格中设置用户标志:

USER_FLAG = "--user"

然后,从笔记本中运行以下命令:

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

安装这些软件包后,您需要重启内核:

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最后,检查您是否已正确安装软件包。KFP SDK 版本应高于或等于 1.8:

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

第 2 步:设置项目 ID 和存储分区

在本实验中,您将引用您的 Cloud 项目 ID 和您之前创建的存储分区。接下来,我们将为每个元素创建变量。

如果您不知道项目 ID,可以通过运行以下命令获取:

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

否则,请在此处进行设置:

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

然后,创建一个用于存储存储分区名称的变量。如果您是在本实验中创建的,则可以使用以下命令。否则,您需要手动设置:

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

第 3 步:导入库

添加以下代码以导入我们将在整个 Codelab 中使用的库:

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

第 4 步:定义常量

在构建流水线之前,我们需要完成最后一步,即定义一些常量变量。PIPELINE_ROOT 是将要写入流水线创建的工件的 Cloud Storage 路径。我们在此处使用 us-central1 作为区域,但如果您在创建存储分区时使用了其他区域,请更新以下代码中的 REGION 变量:

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

运行上述代码后,您应该会看到系统输出了流水线的根目录。这是将写入流水线工件的 Cloud Storage 位置。其格式为:gs://YOUR-BUCKET-NAME/pipeline_root/

5. 创建您的第一个流水线

为了熟悉 Vertex Pipelines 的运作方式,我们先使用 KFP SDK 创建一个简短的流水线。此流水线不会执行任何与机器学习相关的操作(别担心,我们马上就能实现!),我们将使用它来教您:

  • 如何在 KFP SDK 中创建自定义组件
  • 如何在 Vertex Pipelines 中运行和监控流水线

我们将创建一个流水线,用于输出包含以下两个输出的句子:商品名称和表情符号说明。此流水线由三个部分组成:

  • product_name:此组件将以商品名称(或您真正想要的任何名词)作为输入,并以字符串的形式返回该输入
  • emoji:此组件会获取表情符号的文本说明,并将其转换为表情符号。例如,✨ 的文本代码是“sparkles”。此组件使用表情符号库向您展示如何管理流水线中的外部依赖项
  • build_sentence:此最后一个组件将使用前两个组件的输出来构建使用表情符号的句子。例如,生成的输出可能是“Vertex Pipelines is ✨”。

开始编码吧!

第 1 步:创建基于 Python 函数的组件

使用 KFP SDK,我们可以基于 Python 函数创建组件。我们将在第一个流水线中的 3 个组件中使用它。我们先构建 product_name 组件,该组件只接受字符串作为输入,并返回该字符串。将以下内容添加到笔记本:

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

我们来详细了解一下此处的语法:

  • @component 修饰器会在运行流水线时将此函数编译为组件。每当您编写自定义组件时,都需要使用此方法。
  • base_image 参数指定此组件将使用的容器映像。
  • output_component_file 参数是可选参数,用于指定要将编译后的组件写入到的 YAML 文件。运行该单元格后,您应该会看到该文件已写入到您的笔记本实例。如果您想与他人共享此组件,可以向对方发送生成的 YAML 文件,并让对方使用以下命令加载该文件:
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 函数定义后的 -> str 用于指定此组件的输出类型。

第 2 步:创建另外两个组件

为了完成我们的流水线,我们还需要再创建两个组件。我们将首先定义一个函数,该函数接受字符串作为输入,并将此字符串转换为相应的表情符号(如果有)。它会返回一个元组,其中包含传入的文本和生成的表情符号:

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

这个组件比上一个组件要复杂一些。我们来详细了解一下新变化:

  • packages_to_install 参数会告知组件此容器的所有外部库依赖项。在本例中,我们使用的是名为 emoji 的库。
  • 此组件会返回一个名为 OutputsNamedTuple。请注意,此元组中的每个字符串都有键:emoji_textemoji。我们将在下一个组件中使用这些方法来访问输出。

此流水线中的最后一个组件将使用前两个部分的输出,并将它们合并以返回一个字符串:

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

您可能会想知道:此组件如何知道要使用您在前面步骤中定义的输出?问的好!我们将在下一步中将所有内容整合到一起。

第 3 步:将组件组合到流水线中

我们上面定义的组件定义创建了工厂函数,这些函数可在流水线定义中用于创建步骤。如需设置流水线,请使用 @pipeline 修饰器,为流水线指定名称和说明,并提供应写入流水线工件的根路径。工件是指流水线生成的任何输出文件。此片头流水线不会生成任何数据,但我们的下一个流水线会生成。

在下一个代码块中,我们定义了 intro_pipeline 函数。我们在这里指定初始流水线步骤的输入,以及步骤如何相互连接:

  • product_task 将产品名称作为输入。这里我们传递的是“Vertex Pipelines”,但您可以将其更改为任何您喜欢的名称。
  • emoji_task 将表情符号的文本代码作为输入。您也可以将其更改为您喜欢的任何名称。例如,“party_face”是指 🥳? 表情符号。请注意,由于此组件和 product_task 组件都没有向其提供输入的任何步骤,因此我们在定义流水线时会手动为它们指定输入。
  • 流水线中的最后一步 - consumer_task 有三个输入参数:
    • product_task 的输出。由于此步骤只会生成一个输出,因此我们可以通过 product_task.output 引用它。
    • emoji_task 步骤的 emoji 输出。请参阅上面定义的 emoji 组件,其中我们为输出参数命名。
    • 同样,emoji 组件中的 emoji_text 命名输出。如果管道中传递的文本与表情符号不对应,它将使用此文本来构建句子。
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

第 4 步:编译并运行流水线

定义流水线后,您就可以编译它了。以下命令将生成用于运行流水线的 JSON 文件:

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

接下来,创建一个 TIMESTAMP 变量。我们将在作业 ID 中使用此值:

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

然后定义您的流水线作业:

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

最后,运行作业以创建新的流水线作业:

job.submit()

运行此单元格后,您应该会看到包含用于在控制台中查看流水线运行情况的链接的日志:

流水线作业日志

前往该链接。完成后,您的流水线应如下所示:

已完成的简介流水线

此流水线将需要5-6 分钟才能运行完毕。完成后,您可以点击 build-sentence 组件以查看最终输出:

简介流水线输出

现在,您已经熟悉了 KFP SDK 和 Vertex Pipelines 的运作方式,接下来可以构建一个流水线,使用其他 Vertex AI 服务创建和部署机器学习模型了。我们开始吧!

6. 创建端到端机器学习流水线

现在,您可以构建自己的首个机器学习流水线了。在此流水线中,我们将使用 UCI 机器学习干豆数据集,该数据集来自:KOKLU, M. and OZKAN, I.A., (2020 年),“Multiclass Classification of Dry Beans using Computer Vision and Machine Learning Techniques”。DOI

这是一个表格数据集,在我们的流水线中,我们将使用该数据集训练、评估和部署一个 AutoML 模型,该模型会根据豆子的特性将其分为 7 种类型之一。

此流水线将:

  • 在 中创建数据集
  • 使用 AutoML 训练表格分类模型
  • 获取此模型的评估指标
  • 根据评估指标,决定是否使用 Vertex Pipelines 中的条件逻辑部署模型
  • 使用 Vertex Prediction 将模型部署到端点

上述每个步骤都将成为一个组件。大多数流水线步骤都将通过我们在本 Codelab 中稍早导入的 google_cloud_pipeline_components 库使用 Vertex AI 服务的预构建组件。在本部分中,我们将先定义一个自定义组件。然后,我们将使用预构建的组件定义其余的流水线步骤。预构建的组件让您可以更轻松地访问 Vertex AI 服务,例如模型训练和部署。

第 1 步:用于模型评估的自定义组件

我们将在模型训练完成后,在流水线的最后使用我们定义的自定义组件。此组件将执行以下操作:

  • 从经过训练的 AutoML 分类模型中获取评估指标
  • 解析指标并在 Vertex Pipelines 界面中呈现它们
  • 将指标与阈值进行比较,以确定是否应部署模型

在定义组件之前,我们先来了解一下它的输入和输出参数。此流水线将 Cloud 项目的一些元数据、生成的训练模型(稍后我们将定义此组件)、模型的评估指标和 thresholds_dict_str 作为输入。我们将在运行流水线时定义 thresholds_dict_str。对于此分类模型,这将是 ROC 曲线下方的面积,我们应根据该面积来部署模型。例如,如果我们传入 0.95,则表示我们希望只有当此指标高于 95% 时,流水线才会部署模型。

我们的评估组件会返回一个字符串,指明是否要部署模型。在一个笔记单元格中添加以下代码,以创建此自定义组件:

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

第 2 步:添加 Google Cloud 预构建组件

在此步骤中,我们将定义流水线的其余组件,并了解它们如何协同工作。首先,使用时间戳定义流水线运行的显示名称:

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

然后,将以下内容复制到新的笔记本单元中:

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

我们来看看此代码发生了什么:

  • 首先,与前面的流水线一样,我们定义此流水线采用的输入参数。我们需要手动设置这些参数,因为它们不依赖于流水线中其他步骤的输出。
  • 流水线的其余部分使用一些预构建组件与 Vertex AI 服务进行交互:
    • TabularDatasetCreateOp 会根据 Cloud Storage 或 BigQuery 中的数据集来源,在 Vertex AI 中创建表格数据集。在此流水线中,我们通过 BigQuery 表网址传递数据
    • AutoMLTabularTrainingJobRunOp 会为表格数据集启动 AutoML 训练作业。我们向此组件传递一些配置参数,包括模型类型(在本例中为分类)、列上的一些数据、训练运行时长,以及指向数据集的指针。请注意,为了将数据集传入此组件,我们将通过 dataset_create_op.outputs["dataset"] 提供上一个组件的输出
    • EndpointCreateOp 用于在 Vertex AI 中创建端点。此步骤中创建的端点将作为输入传递给下一个组件
    • ModelDeployOp 会将给定模型部署到 Vertex AI 中的端点。在本例中,我们将使用上一步中创建的端点。还有其他配置选项可用,但在这里,我们提供的是想要部署的端点机器类型和型号。我们通过访问流水线中训练步骤的输出来传入模型
  • 此流水线还使用了条件逻辑,这是 Vertex Pipelines 的一项功能,可让您定义条件以及根据该条件的结果创建不同的分支。请记得,在定义流水线时,我们传递了一个 thresholds_dict_str 参数。这是我们用来确定是否将模型部署到端点的准确性阈值。为了实现这一点,我们使用 KFP SDK 中的 Condition 类。我们传入的条件是我们在此 Codelab 中前面定义的自定义 eval 组件的输出。如果此条件为 true,流水线将继续执行 deploy_op 组件。如果准确性未达到我们预定义的阈值,流水线将在此处停止,并且不会部署模型。

第 3 步:编译并运行端到端机器学习流水线

定义完整流水线后,就可以开始编译它了:

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

接下来,定义作业:

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

最后,运行作业:

ml_pipeline_job.submit()

运行上述单元格后,前往日志中显示的链接,即可在控制台中查看您的流水线。此流水线需要运行一小时多一点的时间。其中大部分时间都花在 AutoML 训练步骤上。完成后的流水线将如下所示:

已完成 AutoML 流水线

如果您切换顶部的“展开工件”按钮,则可以查看通过流水线创建的不同工件的详细信息。例如,如果您点击 dataset 工件,则会看到所创建的 Vertex AI 数据集的详细信息。您可以点击此处的链接,前往相应数据集的页面:

流水线数据集

同样,如需查看自定义评估组件生成的指标可视化结果,请点击名为 metricsc 的工件。在信息中心的右侧,您可以看到此模型的混淆矩阵:

指标可视化

如需查看此流水线运行创建的模型和端点,请前往“模型”部分,然后点击名为 automl-beans 的模型。您应该会在该页面上看到部署到端点的此模型:

模型端点

您还可以通过点击流水线图中的端点工件来访问此页面。

除了在控制台中查看流水线图表之外,您还可以使用 Vertex Pipelines 进行沿袭跟踪。所谓沿袭跟踪,是指跟踪整个流水线中创建的工件。这有助于我们了解工件的创建位置以及在整个机器学习工作流中的使用方式。例如,如需查看在此流水线中创建的数据集的沿袭跟踪信息,请点击数据集工件,然后点击查看沿袭

查看沿袭

这会显示使用此工件的所有位置:

谱系详情

第 4 步:比较不同流水线运行作业的指标

如果您多次运行此流水线,则可能需要比较各次运行的指标。您可以使用 aiplatform.get_pipeline_df() 方法访问运行元数据。在这里,我们将获取此流水线所有运行的元数据,并将其加载到 Pandas DataFrame 中:

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

至此,您已完成本实验!

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 使用 Kubeflow Pipelines SDK 构建具有自定义组件的端到端流水线
  • 在 Vertex Pipelines 上运行流水线,并使用 SDK 启动流水线运行
  • 在控制台中查看和分析 Vertex Pipelines 图
  • 使用预构建的流水线组件将 Vertex AI 服务添加到流水线
  • 安排周期性流水线作业

如需详细了解 Vertex 的不同部分,请参阅相关文档

7. 清理

为避免产生费用,建议您删除在本实验中创建的资源。

第 1 步:停止或删除 Notebooks 实例

如果您想继续使用在本实验中创建的笔记本,建议您在不用时将其关闭。在 Cloud 控制台中的“Notebooks”界面中,选择相应 Notebook,然后选择停止。如果您想完全删除该实例,请选择删除

可停止实例

第 2 步:删除端点

如需删除您部署的端点,请前往 Vertex AI 控制台的端点部分,然后点击删除图标:

删除端点

然后,在看到以下提示时点击取消部署

取消部署模型

最后,前往控制台的模型部分,找到该模型,然后点击右侧三点状菜单中的删除模型

删除模型

第 3 步:删除 Cloud Storage 存储分区

如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”:

删除存储空间