1. 概览
在本实验中,您将学习如何使用 Vertex ML Metadata 分析 Vertex Pipelines 运行产生的元数据。
学习内容
您将了解如何:
- 使用 Kubeflow Pipelines SDK 构建机器学习流水线,以便在 Vertex AI 中创建数据集,并在此数据集上训练和部署自定义 Scikit-learn 模型
- 编写用于生成工件和元数据的自定义流水线组件
- 比较 Cloud 控制台和编程中的 Vertex Pipelines 运行
- 跟踪流水线生成的工件的沿袭
- 查询流水线运行的元数据
在 Google Cloud 上运行此实验的总费用约为 2 美元。
2. Vertex AI 简介
本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。
除了模型训练和部署服务之外,Vertex AI 还包含各种 MLOps 产品,包括 Vertex Pipelines、机器学习元数据、模型监控、Feature Store 等。您可以在下图中查看所有 Vertex AI 产品。
本实验重点介绍 Vertex Pipelines 和 Vertex ML Metadata。
如果您有任何 Vertex AI 反馈,请参阅支持页面。
机器学习流水线有何用途?
在深入了解之前,我们先来了解一下为什么要使用流水线。假设您要构建一个机器学习工作流,其中包含处理数据、训练模型、超参数调节、评估和模型部署。这些步骤中的每一个都可能具有不同的依赖项,如果您将整个工作流视为一个单体,可能会变得难以管理。当您开始扩展机器学习流程时,可能需要与团队中的其他人分享您的机器学习工作流,以便他们运行该工作流并贡献代码。如果没有可靠且可重现的流程,这可能会很困难。借助流水线,机器学习流程中的每个步骤都有自己的容器。这样,您就可以独立开发步骤,并以可重现的方式跟踪每个步骤的输入和输出。您还可以根据 Cloud 环境中的其他事件安排或触发流水线运行,例如在有新训练数据可用时启动流水线运行。
要点:流水线可帮助您自动执行和重现机器学习工作流。
3. 云环境设置
您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。
启动 Cloud Shell
在本实验中,您将在 Cloud Shell 会话中工作,该会话是由在 Google 云端运行的虚拟机托管的命令解释器。您可以在自己的计算机本地轻松运行此部分,但借助 Cloud Shell,每个人都可以在一致的环境中获得可重现的体验。实验结束后,您可以随时在自己的计算机上重试本部分。
激活 Cloud Shell
在 Cloud 控制台的右上角,点击下方按钮以激活 Cloud Shell:
如果您以前从未启动过 Cloud Shell,将看到一个中间屏幕(在折叠下面),描述它是什么。如果是这种情况,请点击继续(您将永远不会再看到它)。一次性屏幕如下所示:
预配和连接到 Cloud Shell 只需花几分钟时间。
这个虚拟机已加载了您需要的所有开发工具。它提供了一个持久的 5GB 主目录,并且在 Google Cloud 中运行,大大增强了网络性能和身份验证。只需使用一个浏览器或 Google Chromebook 即可完成本 Codelab 中的大部分(甚至全部)工作。
在连接到 Cloud Shell 后,您应该会看到自己已通过身份验证,并且相关项目已设置为您的项目 ID:
在 Cloud Shell 中运行以下命令以确认您已通过身份验证:
gcloud auth list
命令输出
Credentialed Accounts ACTIVE ACCOUNT * <my_account>@<my_domain.com> To set the active account, run: $ gcloud config set account `ACCOUNT`
在 Cloud Shell 中运行以下命令,以确认 gcloud 命令了解您的项目:
gcloud config list project
命令输出
[core] project = <PROJECT_ID>
如果不是上述结果,您可以使用以下命令进行设置:
gcloud config set project <PROJECT_ID>
命令输出
Updated property [core/project].
Cloud Shell 有一些环境变量,其中 GOOGLE_CLOUD_PROJECT
包含我们当前 Cloud 项目的名称。我们将在本实验的各个部分使用它。您可以通过运行以下命令查看该文件:
echo $GOOGLE_CLOUD_PROJECT
启用 API
在后续步骤中,您将看到需要这些服务的位置(以及原因),但现在,请运行此命令,为您的项目授予对 Compute Engine、Container Registry 和 Vertex AI 服务的访问权限:
gcloud services enable compute.googleapis.com \
containerregistry.googleapis.com \
aiplatform.googleapis.com
这将生成类似于以下内容的成功消息:
Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.
创建 Cloud Storage 存储桶
如需在 Vertex AI 上运行训练作业,我们需要一个存储分区来存储已保存的模型资产。存储分区必须是区域级存储分区。我们在此处使用的是 us-central
,但您也可以使用其他区域(只需在本实验中将其替换即可)。如果您已有存储分区,则可以跳过此步骤。
在 Cloud Shell 终端中运行以下命令以创建存储分区:
BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME
接下来,我们将向计算服务账号授予对此存储分区的访问权限。这将确保 Vertex Pipelines 具有将文件写入此存储分区的必要权限。运行以下命令以添加此权限:
gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin
创建 Vertex AI Workbench 实例
在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:
然后,在用户管理的笔记本中,点击新建笔记本:
然后,选择不带 GPU 的 TensorFlow 企业版 2.3(提供长期支持)实例类型:
使用默认选项,然后点击创建。
打开您的记事本
创建实例后,选择打开 JupyterLab:
4. Vertex Pipelines 设置
我们还需要安装几个额外的库,才能使用 Vertex Pipelines:
- Kubeflow Pipelines:这是我们将用于构建流水线的 SDK。Vertex Pipelines 支持运行使用 Kubeflow 流水线或 TFX 构建的流水线。
- Vertex AI SDK:此 SDK 优化了调用 Vertex AI API 的体验。我们将使用它在 Vertex AI 上运行流水线。
创建 Python 笔记本并安装库
首先,在笔记本实例的“启动器”菜单中,选择 Python 3 以创建笔记本:
如需安装我们将在本实验中使用的两项服务,请先在笔记本单元中设置用户标志:
USER_FLAG = "--user"
然后从笔记本运行以下命令:
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0
!pip3 install {USER_FLAG} kfp==1.8.9
安装这些软件包后,您需要重启内核:
import os
if not os.getenv("IS_TESTING"):
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
接下来,检查您是否已正确安装 KFP SDK 版本。此值应该大于等于 1.8:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
然后,确认您的 Vertex AI SDK 版本是否高于或等于 1.6.2:
!pip list | grep aiplatform
设置项目 ID 和存储分区
在本实验中,您将引用您的 Cloud 项目 ID 和您之前创建的存储分区。接下来,我们将为每个元素创建变量。
如果您不知道项目 ID,可以通过运行以下命令来获取:
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
print("Project ID: ", PROJECT_ID)
否则,请在此处进行设置:
if PROJECT_ID == "" or PROJECT_ID is None:
PROJECT_ID = "your-project-id" # @param {type:"string"}
然后,创建一个用于存储存储分区名称的变量。如果您是在本实验中创建的,则可以使用以下命令。否则,您需要手动进行设置:
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"
导入库
添加以下代码,以导入我们将在此 Codelab 中使用的库:
import matplotlib.pyplot as plt
import pandas as pd
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath
from google.cloud import aiplatform
# We'll use this namespace for metadata querying
from google.cloud import aiplatform_v1
定义常量
在构建流水线之前,我们需要做的最后一件事是定义一些常量变量。PIPELINE_ROOT
是 Cloud Storage 路径,我们的流水线创建的制品将写入其中。我们在此处使用 us-central1
作为区域,但如果您在创建存储分区时使用了其他区域,请更新以下代码中的 REGION
变量:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT
运行上述代码后,您应该会看到系统输出了流水线的根目录。这是将写入流水线工件的 Cloud Storage 位置。其格式为 gs://YOUR-BUCKET-NAME/pipeline_root/
5. 使用自定义组件创建 3 步流水线
本实验的重点是了解流水线运行产生的元数据。为此,我们需要一个在 Vertex Pipelines 上运行的流水线,我们首先从这里开始。在这里,我们将定义包含以下自定义组件的 3 步流水线:
get_dataframe
:从 BigQuery 表中检索数据并将其转换为 Pandas DataFrametrain_sklearn_model
:使用 Pandas DataFrame 训练和导出 Scikit Learn 模型以及一些指标deploy_model
:将导出的 Scikit Learn 模型部署到 Vertex AI 中的端点
在本流水线中,我们将使用 UCI 机器学习干豆数据集,数据集为:KOKLU, M. 和 OZKAN, I.A.,(2020 年),“Multiclass Classification of Dry Beans using Computer Vision and Machine Learning Techniques”。DOI。
这是一个表格数据集,在我们的流水线中,我们将使用该数据集训练、评估和部署 Scikit-learn 模型,该模型会根据豆类的特征将其分类为 7 种类型之一。我们开始编码吧!
创建基于 Python 函数的组件
使用 KFP SDK,我们可以根据 Python 函数创建组件。我们将对此流水线中的 3 个组件使用该方法。
下载 BigQuery 数据并转换为 CSV
首先,我们将构建 get_dataframe
组件:
@component(
packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "db-dtypes"],
base_image="python:3.9",
output_component_file="create_dataset.yaml"
)
def get_dataframe(
bq_table: str,
output_data_path: OutputPath("Dataset")
):
from google.cloud import bigquery
import pandas as pd
import os
project_number = os.environ["CLOUD_ML_PROJECT_ID"]
bqclient = bigquery.Client(project=project_number)
table = bigquery.TableReference.from_string(
bq_table
)
rows = bqclient.list_rows(
table
)
dataframe = rows.to_dataframe(
create_bqstorage_client=True,
)
dataframe = dataframe.sample(frac=1, random_state=2)
dataframe.to_csv(output_data_path)
我们来详细了解一下此组件中发生的情况:
@component
修饰器会在运行流水线时将此函数编译为组件。每当您编写自定义组件时,都需要使用此方法。base_image
参数指定此组件将使用的容器映像。- 此组件将使用几个 Python 库,我们通过
packages_to_install
参数指定这些库。 output_component_file
参数是可选参数,用于指定要将编译后的组件写入到的 YAML 文件。运行该单元格后,您应该会看到该文件已写入您的笔记本实例。如果您想与他人共享此组件,可以向对方发送生成的 YAML 文件,并让对方使用以下命令加载该文件:
# This is optional, it shows how to load a component from a yaml file
# dataset_component = kfp.components.load_component_from_file('./create_dataset.yaml')
- 接下来,此组件使用 BigQuery Python 客户端库将数据从 BigQuery 下载到 Pandas DataFrame,然后将该数据的输出工件创建为 CSV 文件。这将作为输入传递给我们的下一个组件
创建用于训练 Scikit-learn 模型的组件
在此组件中,我们将使用之前生成的 CSV 文件训练 Scikit-learn 决策树模型。此组件会导出生成的 Scikit 模型,以及包含模型准确性、框架以及用于训练模型的数据集大小的 Metrics
工件:
@component(
packages_to_install=["sklearn", "pandas", "joblib", "db-dtypes"],
base_image="python:3.9",
output_component_file="beans_model_component.yaml",
)
def sklearn_train(
dataset: Input[Dataset],
metrics: Output[Metrics],
model: Output[Model]
):
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from joblib import dump
import pandas as pd
df = pd.read_csv(dataset.path)
labels = df.pop("Class").tolist()
data = df.values.tolist()
x_train, x_test, y_train, y_test = train_test_split(data, labels)
skmodel = DecisionTreeClassifier()
skmodel.fit(x_train,y_train)
score = skmodel.score(x_test,y_test)
print('accuracy is:',score)
metrics.log_metric("accuracy",(score * 100.0))
metrics.log_metric("framework", "Scikit Learn")
metrics.log_metric("dataset_size", len(df))
dump(skmodel, model.path + ".joblib")
定义一个组件,用于将模型上传并部署到 Vertex AI
最后,我们的最后一项组件将获取上一步中训练的模型,将其上传到 Vertex AI,并将其部署到端点:
@component(
packages_to_install=["google-cloud-aiplatform"],
base_image="python:3.9",
output_component_file="beans_deploy_component.yaml",
)
def deploy_model(
model: Input[Model],
project: str,
region: str,
vertex_endpoint: Output[Artifact],
vertex_model: Output[Model]
):
from google.cloud import aiplatform
aiplatform.init(project=project, location=region)
deployed_model = aiplatform.Model.upload(
display_name="beans-model-pipeline",
artifact_uri = model.uri.replace("model", ""),
serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest"
)
endpoint = deployed_model.deploy(machine_type="n1-standard-4")
# Save data to the output params
vertex_endpoint.uri = endpoint.resource_name
vertex_model.uri = deployed_model.resource_name
在这里,我们将使用 Vertex AI SDK 通过用于预测的预构建容器上传模型。然后,它将模型部署到端点,并将 URI 返回给模型和端点资源。在本 Codelab 的后面,您将详细了解将此数据作为工件返回意味着什么。
定义和编译流水线
现在,我们已经定义了三个组件,接下来将创建流水线定义。此图展示了输入工件和输出工件如何在步骤之间流动:
@pipeline(
# Default pipeline root. You can override it when submitting the pipeline.
pipeline_root=PIPELINE_ROOT,
# A name for the pipeline.
name="mlmd-pipeline",
)
def pipeline(
bq_table: str = "",
output_data_path: str = "data.csv",
project: str = PROJECT_ID,
region: str = REGION
):
dataset_task = get_dataframe(bq_table)
model_task = sklearn_train(
dataset_task.output
)
deploy_task = deploy_model(
model=model_task.outputs["model"],
project=project,
region=region
)
以下命令将生成用于运行流水线的 JSON 文件:
compiler.Compiler().compile(
pipeline_func=pipeline, package_path="mlmd_pipeline.json"
)
启动两次流水线运行
接下来,我们将启动流水线的两次运行。首先,我们来定义一个要用于流水线作业 ID 的时间戳:
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
请注意,我们的流水线在运行时需要一个参数:我们要用于训练数据的 bq_table
。此流水线运行将使用较小版本的 beans 数据集:
run1 = aiplatform.PipelineJob(
display_name="mlmd-pipeline",
template_path="mlmd_pipeline.json",
job_id="mlmd-pipeline-small-{0}".format(TIMESTAMP),
parameter_values={"bq_table": "sara-vertex-demos.beans_demo.small_dataset"},
enable_caching=True,
)
接下来,使用同一数据集的较大版本创建另一个流水线作业。
run2 = aiplatform.PipelineJob(
display_name="mlmd-pipeline",
template_path="mlmd_pipeline.json",
job_id="mlmd-pipeline-large-{0}".format(TIMESTAMP),
parameter_values={"bq_table": "sara-vertex-demos.beans_demo.large_dataset"},
enable_caching=True,
)
最后,为这两次运行启动流水线执行。最好在两个单独的笔记本单元中执行此操作,以便您可以查看每次运行的输出。
run1.submit()
然后,启动第二次运行:
run2.submit()
运行此单元格后,您会看到一个链接,用于在 Vertex AI 控制台中查看每个流水线。打开此链接以查看有关流水线的更多详细信息:
完成后(此流水线每次运行大约需要 10-15 分钟),您会看到如下内容:
现在,您已经完成了两次流水线运行,接下来可以仔细了解一下流水线工件、指标和沿袭了。
6. 了解流水线工件和沿袭
在流水线图中,您会在每个步骤后面看到小方框。这些是工件,即流水线步骤生成的输出。工件有多种类型。在此特定的流水线中,我们有数据集、指标、模型和端点制品。点击界面顶部的展开工件滑块,即可查看每个工件的更多详细信息:
点击某个工件将显示有关该工件的更多详细信息,包括其 URI。例如,点击 vertex_endpoint 工件后,您将在 Vertex AI 控制台中找到已部署的端点的 URI:
借助 Metrics
工件,您可以传递与特定流水线步骤相关联的自定义指标。在流水线的 sklearn_train
组件中,我们记录了模型的准确性、框架和数据集大小方面的指标。点击指标工件即可查看这些详细信息:
每个工件都具有沿袭,用于描述其连接的其他工件。再次点击流水线的 vertex_endpoint 工件,然后点击查看沿袭按钮:
此操作将会打开一个新标签页,您可以在其中查看与所选工件相关联的所有工件。您的沿袭图将如下所示:
这会显示与此端点关联的模型、指标和数据集。这项测试的价值何在?您可能将模型部署到多个端点,或者需要知道用于训练部署到您正在查看的端点的模型的具体数据集。沿袭图有助于您了解各个工件在机器学习系统的其他上下文中的含义。您还可以通过编程方式访问谱系,此 Codelab 稍后会介绍。
7. 比较流水线运行
单个流水线很可能会被运行多次,可能使用不同的输入参数、新数据,或者由团队中的人员运行。为了跟踪流水线运行情况,如果能够根据各种指标比较流水线运行情况,将会非常方便。在本部分中,我们将探索比较运行作业的两种方法。
在 Pipelines 界面中比较运行
在 Cloud 控制台中,前往“流水线”信息中心。此页面简要介绍了您已执行的每项流水线运行。选中最近两次运行,然后点击顶部的比较按钮:
这会打开一个页面,我们可以在其中比较所选各个运行作业的输入参数和指标。请注意这两次运行的 BigQuery 表、数据集大小和准确性值不同:
您可以使用此界面功能比较多于两次运行,甚至可以比较来自不同流水线的运行。
使用 Vertex AI SDK 比较运行作业
在执行许多流水线时,您可能需要一种方法来以编程方式获取这些比较指标,以便深入了解指标详情并创建可视化图表。
您可以使用 aiplatform.get_pipeline_df()
方法访问运行作业元数据。在这里,我们将获取同一流水线最近两次运行的元数据,并将其加载到 Pandas DataFrame 中。这里的 pipeline
参数是指我们在流水线定义中为流水线指定的名称:
df = aiplatform.get_pipeline_df(pipeline="mlmd-pipeline")
df
输出 DataFrame 时,您会看到如下内容:
我们在这里只执行了流水线两次,但您可以想象,如果执行次数更多,您将获得多少指标。接下来,我们将使用 Matplotlib 创建自定义可视化图表,以了解模型准确性与用于训练的数据量之间的关系。
在新的笔记本单元中运行以下命令:
plt.plot(df["metric.dataset_size"], df["metric.accuracy"],label="Accuracy")
plt.title("Accuracy and dataset size")
plt.legend(loc=4)
plt.show()
您应该会看到与以下类似的内容:
8. 查询流水线指标
除了获取包含所有流水线指标的 DataFrame 之外,您可能还希望以编程方式查询在机器学习系统中创建的工件。然后,您可以创建自定义信息中心,或让组织中的其他人获取特定工件详细信息。
获取所有模型工件
如需以这种方式查询工件,我们将创建一个 MetadataServiceClient
:
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
metadata_client = aiplatform_v1.MetadataServiceClient(
client_options={
"api_endpoint": API_ENDPOINT
}
)
接下来,我们将向该端点发出 list_artifacts
请求,并传递一个过滤条件,指明我们希望在响应中看到哪些工件。首先,让我们获取项目中所有属于模型的软件工件。为此,请在笔记本中运行以下代码:
MODEL_FILTER="schema_title = \"system.Model\""
artifact_request = aiplatform_v1.ListArtifactsRequest(
parent="projects/{0}/locations/{1}/metadataStores/default".format(PROJECT_ID, REGION),
filter=MODEL_FILTER
)
model_artifacts = metadata_client.list_artifacts(artifact_request)
生成的 model_artifacts
响应包含项目中每个模型工件的可迭代对象,以及每个模型的关联元数据。
过滤对象并在 DataFrame 中显示
如果我们能够更轻松地直观呈现生成的工件查询,将非常方便。接下来,我们获取 2021 年 8 月 10 日之后创建且状态为 LIVE
的所有工件。运行此请求后,我们将在 Pandas DataFrame 中显示结果。首先,执行请求:
LIVE_FILTER = "create_time > \"2021-08-10T00:00:00-00:00\" AND state = LIVE"
artifact_req = {
"parent": "projects/{0}/locations/{1}/metadataStores/default".format(PROJECT_ID, REGION),
"filter": LIVE_FILTER
}
live_artifacts = metadata_client.list_artifacts(artifact_req)
然后,在 DataFrame 中显示结果:
data = {'uri': [], 'createTime': [], 'type': []}
for i in live_artifacts:
data['uri'].append(i.uri)
data['createTime'].append(i.create_time)
data['type'].append(i.schema_title)
df = pd.DataFrame.from_dict(data)
df
您会看到如下内容:
除了在此处尝试的方法外,您还可以根据其他条件过滤软件制品。
至此,您已完成本实验!
🎉 恭喜!🎉
您学习了如何使用 Vertex AI 执行以下操作:
- 使用 Kubeflow Pipelines SDK 构建机器学习流水线,以便在 Vertex AI 中创建数据集,并在此数据集上训练和部署自定义 Scikit-learn 模型
- 编写用于生成工件和元数据的自定义流水线组件
- 比较 Cloud 控制台和编程中的 Vertex Pipelines 运行
- 跟踪流水线生成的工件的沿袭
- 查询流水线运行的元数据
如需详细了解 Vertex 的不同部分,请参阅相关文档。
9. 清理
为免产生费用,建议您删除在本实验中创建的资源。
停止或删除 Notebooks 实例
如果您想继续使用在本实验中创建的笔记本,建议您在不使用时将其关闭。在 Cloud 控制台的 Notebooks 界面中,选择相应笔记本,然后选择停止。如果您想完全删除该实例,请选择删除:
删除 Vertex AI 端点
如需删除您部署的端点,请前往 Vertex AI 控制台的端点部分,然后点击删除图标:
删除您的 Cloud Storage 存储分区
如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”: