Vertex AI:将自定义预测例程与 Sklearn 结合使用,对数据进行预处理和后处理,以便进行预测

1. 概览

在本实验中,您将学习如何在 Vertex AI 上使用自定义预测例程编写自定义预处理和后处理逻辑。虽然此示例使用的是 Scikit-learn,但自定义预测例程也可以与 XGBoost、PyTorch 和 TensorFlow 等其他 Python 机器学习框架搭配使用。

学习内容

您将了解如何:

  • 使用自定义预测例程编写自定义预测逻辑
  • 在本地测试自定义服务容器和模型
  • 在 Vertex AI Predictions 上测试自定义服务容器

在 Google Cloud 上运行此实验的总费用约为 1 美元。

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。

Vertex AI 包含许多不同的产品,可支持端到端机器学习工作流。本实验将重点介绍 PredictionsWorkbench

Vertex 产品概览

3. 用例概览

使用场景

在本实验中,您将构建一个随机森林回归模型,以根据切割、清晰度和尺寸等属性预测钻石的价格。

您将编写自定义预处理逻辑,以检查服务时的数据是否采用模型所需的格式。您还将编写自定义后处理逻辑,以对预测进行四舍五入并将其转换为字符串。如需编写此逻辑,您需要使用自定义预测例程。

自定义预测例程简介

Vertex AI 预构建容器通过执行机器学习框架的预测操作来处理预测请求。在自定义预测例程之前,如果您想在执行预测之前对输入进行预处理,或者在返回结果之前对模型的预测进行后处理,则需要构建自定义容器。

构建自定义服务容器需要编写一个 HTTP 服务器,用于封装训练好的模型、将 HTTP 请求转换为模型输入,以及将模型输出转换为响应。

借助自定义预测例程,Vertex AI 可为您提供与服务相关的组件,以便您专注于模型和数据转换。

4. 设置您的环境

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启用 Compute Engine API

前往 Compute Engine,然后选择启用(如果尚未启用)。您需要此信息才能创建笔记本实例。

第 2 步:启用 Artifact Registry API

前往 Artifact Registry,然后选择启用(如果尚未启用)。您将使用此产品创建自定义服务容器。

第 3 步:启用 Vertex AI API

前往 Cloud Console 的 Vertex AI 部分,然后点击启用 Vertex AI API

Vertex AI 信息中心

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:

Vertex AI 菜单

启用 Notebooks API(如果尚未启用)。

Notebook_api

启用后,点击实例,然后选择新建

接受默认选项,然后点击创建

实例准备就绪后,点击打开 JUPYTERLAB 即可打开实例。

5. 编写训练代码

第 1 步:创建 Cloud Storage 存储分区

您将模型和预处理工件存储到 Cloud Storage 存储分区中。如果项目中已有要使用的存储分区,则可以跳过此步骤。

在启动器中打开一个新的终端会话。

Open_terminal

在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

接下来,在终端中运行以下命令,以便在项目中创建一个新的存储桶。

BUCKET="gs://${PROJECT_ID}-cpr-bucket"
gsutil mb -l us-central1 $BUCKET

第 2 步:训练模型

在终端中,创建一个名为 cpr-codelab 的新目录并通过 cd 命令进入该目录。

mkdir cpr-codelab
cd cpr-codelab

在文件浏览器中,前往新的 cpr-codelab 目录,然后使用启动器创建一个名为 task.ipynb 的新 Python 3 笔记本。

file_browser

您的 cpr-codelab 目录现在应如下所示:

+ cpr-codelab/
    + task.ipynb

在笔记本中,粘贴以下代码。

首先,编写一个 requirements.txt 文件。

%%writefile requirements.txt
fastapi
uvicorn==0.17.6
joblib~=1.0
numpy~=1.20
scikit-learn>=1.2.2
pandas
google-cloud-storage>=1.26.0,<2.0.0dev
google-cloud-aiplatform[prediction]>=1.16.0

您部署的模型将预安装一组与您的笔记本环境不同的依赖项。因此,您需要在 requirements.txt 中列出模型的所有依赖项,然后使用 pip 在记事本中安装完全相同的依赖项。稍后,您将在本地测试模型,然后再部署到 Vertex AI,以确认环境是否匹配。

在笔记本中使用 pip 安装依赖项。

!pip install -U --user -r requirements.txt

请注意,在 pip 安装完成后,你需要重启内核。

restart_kernel

接下来,创建用于存储模型和预处理工件的目录。

USER_SRC_DIR = "src_dir"
!mkdir $USER_SRC_DIR
!mkdir model_artifacts

# copy the requirements to the source dir
!cp requirements.txt $USER_SRC_DIR/requirements.txt

您的 cpr-codelab 目录现在应如下所示:

+ cpr-codelab/
    + model_artifacts/
    + scr_dir/
        + requirements.txt
    + task.ipynb
    + requirements.txt

现在目录结构已设置完毕,可以开始训练模型了!

首先,导入库。

import seaborn as sns
import numpy as np
import pandas as pd

from sklearn import preprocessing
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import make_pipeline
from sklearn.compose import make_column_transformer

import joblib
import logging

# set logging to see the docker container logs
logging.basicConfig(level=logging.INFO)

然后,定义以下变量。请务必将 PROJECT_ID 替换为您的项目 ID,并将 BUCKET_NAME 替换为您在上一步中创建的存储分区。

REGION = "us-central1"
MODEL_ARTIFACT_DIR = "sklearn-model-artifacts"
REPOSITORY = "diamonds"
IMAGE = "sklearn-image"
MODEL_DISPLAY_NAME = "diamonds-cpr"

# Replace with your project
PROJECT_ID = "{PROJECT_ID}"

# Replace with your bucket
BUCKET_NAME = "gs://{BUCKET_NAME}"

从 seaborn 库加载数据,然后创建两个数据框,一个包含特征,另一个包含标签。

data = sns.load_dataset('diamonds', cache=True, data_home=None)

label = 'price'

y_train = data['price']
x_train = data.drop(columns=['price'])

我们来看看训练数据。您可以看到每一行代表一个菱形。

x_train.head()

标签,也就是相应的价格。

y_train.head()

现在,定义 sklearn 列转换以对分类特征进行独热编码并缩放数值特征

column_transform = make_column_transformer(
    (preprocessing.OneHotEncoder(), [1,2,3]),
    (preprocessing.StandardScaler(), [0,4,5,6,7,8]))

定义随机森林模型

regr = RandomForestRegressor(max_depth=10, random_state=0)

接下来,创建 sklearn 流水线。这意味着,馈送到此流水线的数据将先进行编码/缩放,然后再传递给模型。

my_pipeline = make_pipeline(column_transform, regr)

在训练数据上拟合流水线

my_pipeline.fit(x_train, y_train)

我们来试用一下该模型,确保其能按预期运行。对该模型调用 predict 方法,并传入测试样本。

my_pipeline.predict([[0.23, 'Ideal', 'E', 'SI2', 61.5, 55.0, 3.95, 3.98, 2.43]])

现在,我们可以将流水线保存到 model_artifacts 目录,并将其复制到 Cloud Storage 存储分区。

joblib.dump(my_pipeline, 'model_artifacts/model.joblib')

!gsutil cp model_artifacts/model.joblib {BUCKET_NAME}/{MODEL_ARTIFACT_DIR}/

第 3 步:保存预处理工件

接下来,您将创建一个预处理工件。当模型服务器启动时,此工件将加载到自定义容器中。您的预处理工件几乎可以是任何形式(例如 Pickle 文件),但在这种情况下,您将写出一个字典到 JSON 文件。

clarity_dict={"Flawless": "FL",
              "Internally Flawless": "IF",
              "Very Very Slightly Included": "VVS1",
              "Very Slightly Included": "VS2",
              "Slightly Included": "S12",
              "Included": "I3"}

训练数据中的 clarity 特征始终采用缩写形式(即“FL”而非“Flawless”)。在传送时,我们希望检查此特征的数据是否也采用缩写。这是因为我们的模型知道如何对“FL”进行独热编码,但不知道如何对“Flawless”进行独热编码。稍后您将编写此自定义预处理逻辑。但现在,只需将此查询表保存为 JSON 文件,然后将其写入 Cloud Storage 存储分区。

import json
with open("model_artifacts/preprocessor.json", "w") as f:
    json.dump(clarity_dict, f)

!gsutil cp model_artifacts/preprocessor.json {BUCKET_NAME}/{MODEL_ARTIFACT_DIR}/

您的本地 cpr-codelab 目录现在应如下所示:

+ cpr-codelab/
    + model_artifacts/
        + model.joblib
        + preprocessor.json
    + scr_dir/
        + requirements.txt
    + task.ipynb
    + requirements.txt

6. 使用 CPR 模型服务器构建自定义传送容器

现在,模型已训练完毕,预处理工件已保存,接下来可以构建自定义服务容器了。通常,构建传送容器需要编写模型服务器代码。但是,借助自定义预测例程,Vertex AI Predictions 会生成模型服务器并为您构建自定义容器映像。

自定义分发容器包含以下 3 段代码:

  1. 模型服务器(此服务器将由 SDK 自动生成并存储在 scr_dir/ 中)
    • 托管模型的 HTTP 服务器
    • 负责设置路线/端口等
  2. 请求处理脚本
    • 负责处理请求的网络服务器方面,例如对请求正文进行反序列化、序列化响应、设置响应标头等。
    • 在此示例中,您将使用 SDK 中提供的默认处理程序 google.cloud.aiplatform.prediction.handler.PredictionHandler
  3. 预测器
    • 负责处理预测请求的机器学习逻辑。

其中每个组件都可以根据您的应用场景要求进行自定义。在此示例中,您只需实现预测器。

预测器负责处理预测请求的机器学习逻辑,例如自定义预处理和后处理。如需编写自定义预测逻辑,您需要继承 Vertex AI Predictor 接口

此版本的自定义预测例程附带可重复使用的 XGBoost 和 Sklearn 预测器,但如果您需要使用其他框架,可以通过对基准预测器进行子类化来创建自己的预测器。

您可以查看下面的 Sklearn 预测器示例。以上就是构建此自定义模型服务器所需编写的所有代码。

sklearn_predictor

在笔记本中,粘贴以下代码,以设置 SklearnPredictor 的子类,并将其写入 src_dir/ 中的 Python 文件。请注意,在此示例中,我们仅自定义 loadpreprocesspostprocess 方法,而不会自定义 predict 方法。

%%writefile $USER_SRC_DIR/predictor.py

import joblib
import numpy as np
import json

from google.cloud import storage
from google.cloud.aiplatform.prediction.sklearn.predictor import SklearnPredictor


class CprPredictor(SklearnPredictor):

    def __init__(self):
        return

    def load(self, artifacts_uri: str) -> None:
        """Loads the sklearn pipeline and preprocessing artifact."""

        super().load(artifacts_uri)

        # open preprocessing artifact
        with open("preprocessor.json", "rb") as f:
            self._preprocessor = json.load(f)


    def preprocess(self, prediction_input: np.ndarray) -> np.ndarray:
        """Performs preprocessing by checking if clarity feature is in abbreviated form."""

        inputs = super().preprocess(prediction_input)

        for sample in inputs:
            if sample[3] not in self._preprocessor.values():
                sample[3] = self._preprocessor[sample[3]]
        return inputs

    def postprocess(self, prediction_results: np.ndarray) -> dict:
        """Performs postprocessing by rounding predictions and converting to str."""

        return {"predictions": [f"${value}" for value in np.round(prediction_results)]}

下面我们来详细了解一下这两种方法。

  • load 方法会加载预处理工件,在本例中,该工件是一个字典,用于将钻石净度值映射到其缩写。
  • preprocess 方法使用该工件来确保在投放时,清晰度功能采用缩写格式。如果不是,则会将完整字符串转换为其缩写。
  • postprocess 方法会以带有 $ 符号的字符串的形式返回预测值,并对值进行四舍五入。

接下来,使用 Vertex AI Python SDK 构建映像。系统会使用自定义预测例程生成 Dockerfile 并为您构建映像。

from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

import os

from google.cloud.aiplatform.prediction import LocalModel

from src_dir.predictor import CprPredictor  # Should be path of variable $USER_SRC_DIR

local_model = LocalModel.build_cpr_model(
    USER_SRC_DIR,
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{IMAGE}",
    predictor=CprPredictor,
    requirements_path=os.path.join(USER_SRC_DIR, "requirements.txt"),
)

编写包含两个预测样本的测试文件。其中一个实例具有清晰的缩写名称,但其他实例需要先进行转换。

import json

sample = {"instances": [
  [0.23, 'Ideal', 'E', 'VS2', 61.5, 55.0, 3.95, 3.98, 2.43],
  [0.29, 'Premium', 'J', 'Internally Flawless', 52.5, 49.0, 4.00, 2.13, 3.11]]}

with open('instances.json', 'w') as fp:
    json.dump(sample, fp)

通过部署本地模型在本地测试容器。

with local_model.deploy_to_local_endpoint(
    artifact_uri = 'model_artifacts/', # local path to artifacts
) as local_endpoint:
    predict_response = local_endpoint.predict(
        request_file='instances.json',
        headers={"Content-Type": "application/json"},
    )

    health_check_response = local_endpoint.run_health_check()

您可以通过以下方式查看预测结果:

predict_response.content

7. 将模型部署到 Vertex AI

现在您已在本地测试了容器,接下来可以将映像推送到 Artifact Registry 并将模型上传到 Vertex AI Model Registry。

首先,配置 Docker 以访问 Artifact Registry。

!gcloud artifacts repositories create {REPOSITORY} --repository-format=docker \
--location=us-central1 --description="Docker repository"


!gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet

然后推送映像。

local_model.push_image()

然后上传模型。

model = aiplatform.Model.upload(local_model = local_model,
                                display_name=MODEL_DISPLAY_NAME,
                                artifact_uri=f"{BUCKET_NAME}/{MODEL_ARTIFACT_DIR}",)

上传模型后,您应该会在控制台中看到它:

model_registry

接下来,部署模型以便将其用于在线预测。自定义预测例程也适用于批量预测,因此,如果您的用例不需要在线预测,则无需部署模型。

endpoint = model.deploy(machine_type="n1-standard-2")

最后,通过获取预测结果来测试已部署的模型。

endpoint.predict(instances=[[0.23, 'Ideal', 'E', 'VS2', 61.5, 55.0, 3.95, 3.98, 2.43]])

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 使用自定义预测例程编写自定义预处理和后处理逻辑

如需详细了解 Vertex AI 的不同部分,请参阅相关文档

8. 清理

如果您想继续使用在本实验中创建的笔记本,建议您在闲置时将其关闭。在 Google Cloud 控制台的 Workbench 界面中,选择相应笔记本,然后选择停止

如果您想完全删除该笔记本,请点击右上角的删除按钮。

Stop_nb

如需删除您部署的端点,请前往控制台的“端点”部分,点击您创建的端点,然后选择从端点取消部署模型

delete_endpoint

如需删除容器映像,请前往 Artifact Registry,选择您创建的代码库,然后选择删除

delete_image

如需删除存储分区,请使用 Cloud 控制台中的导航菜单,浏览到“存储空间”,选择您的存储分区,然后点击删除

删除存储空间