Vertex AI: Sklearn でカスタム予測ルーチンを使用して、予測用のデータの前処理と後処理を行う

1. 概要

このラボでは、Vertex AI でカスタム予測ルーチンを使用して、カスタム前処理ロジックとカスタム後処理ロジックを記述する方法について学習します。このサンプルでは Scikit-learn を使用していますが、カスタム予測ルーチンは XGBoost、PyTorch、TensorFlow などの他の Python ML フレームワークでも使用できます。

学習内容

次の方法を学習します。

  • カスタム予測ルーチンを使用してカスタム予測ロジックを記述する
  • カスタム サービング コンテナとモデルをローカルでテストする
  • Vertex AI Prediction でカスタム サービング コンテナをテストする

このラボを Google Cloud で実行するための総費用は約 $1 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、PredictionsWorkbench について詳しく学習します。

Vertex プロダクトの概要

3. ユースケースの概要

ユースケース

このラボでは、ランダム フォレスト回帰モデルを構築し、カット、明瞭さ、サイズなどの属性に基づいてダイヤモンドの価格を予測します。

カスタム前処理ロジックを記述して、サービング時のデータをモデルで想定される形式にすることを確認します。また、予測を丸めて文字列に変換するカスタム後処理ロジックも記述します。このロジックを記述するには、カスタム予測ルーチンを使用します。

カスタム予測ルーチンの概要

Vertex AI の事前構築済みコンテナは、機械学習フレームワークの予測オペレーションを実行することで予測リクエストを処理します。予測の実行前に入力を処理したり、結果を返す前にモデルの予測を後処理したりする場合、カスタム予測ルーチンが登場する前であれば、カスタム コンテナを構築する必要がありました。

カスタム サービング コンテナを構築するには、トレーニング済みモデルのラップ、HTTP リクエストからモデル入力への変換、モデル出力から応答への変換を行う HTTP サーバーを記述する必要があります。

カスタム予測ルーチンでは、Vertex AI がサービング関連のコンポーネントを提供するため、ユーザーはモデルとデータ変換に集中できます。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。これはノートブック インスタンスを作成するために必要です。

ステップ 2: Artifact Registry API を有効にする

まだ有効になっていない場合は、[Artifact Registry] に移動して [有効にする] を選択します。これを使用して、カスタム サービング コンテナを作成します。

ステップ 3: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 4: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[インスタンス] をクリックし、[新規作成] を選択します。

デフォルトのオプションをそのまま使用して、[作成] をクリックします。

インスタンスの準備ができたら、[JUPYTERLAB を開く] をクリックしてインスタンスを開きます。

5. トレーニング コードを作成する

ステップ 1: Cloud Storage バケットを作成する

モデルと前処理アーティファクトを Cloud Storage バケットに保存します。使用するバケットがプロジェクトにすでにある場合は、この手順をスキップできます。

ランチャーから新しいターミナル セッションを開きます。

Open_terminal

ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project は実際のプロジェクト ID で置き換えてください。

PROJECT_ID='your-cloud-project'

次に、ターミナルで次のコマンドを実行して、プロジェクトに新しいバケットを作成します。

BUCKET="gs://${PROJECT_ID}-cpr-bucket"
gsutil mb -l us-central1 $BUCKET

ステップ 2: モデルのトレーニング

ターミナルで、cpr-codelab という新しいディレクトリを作成し、そのディレクトリに移動します。

mkdir cpr-codelab
cd cpr-codelab

ファイル ブラウザで新しい cpr-codelab ディレクトリに移動し、ランチャーを使用して task.ipynb という名前の新しい Python 3 ノートブックを作成します。

file_browser

cpr-codelab ディレクトリは次のようになります。

+ cpr-codelab/
    + task.ipynb

ノートブックに次のコードを貼り付けます。

まず、requirements.txt ファイルを作成します。

%%writefile requirements.txt
fastapi
uvicorn==0.17.6
joblib~=1.0
numpy~=1.20
scikit-learn>=1.2.2
pandas
google-cloud-storage>=1.26.0,<2.0.0dev
google-cloud-aiplatform[prediction]>=1.16.0

デプロイするモデルには、ノートブック環境とは異なる一連の依存関係がプリインストールされています。そのため、モデルのすべての依存関係を requirements.txt に一覧表示し、pip を使用してノートブックにまったく同じ依存関係をインストールする必要があります。後で、Vertex AI にデプロイする前にモデルをローカルでテストして、環境が一致することを再確認します。

Pip を使用してノートブックの依存関係をインストールします。

!pip install -U --user -r requirements.txt

pip install の完了後にカーネルを再起動する必要があります。

restart_kernel

次に、モデルと前処理アーティファクトを保存するディレクトリを作成します。

USER_SRC_DIR = "src_dir"
!mkdir $USER_SRC_DIR
!mkdir model_artifacts

# copy the requirements to the source dir
!cp requirements.txt $USER_SRC_DIR/requirements.txt

cpr-codelab ディレクトリは次のようになります。

+ cpr-codelab/
    + model_artifacts/
    + scr_dir/
        + requirements.txt
    + task.ipynb
    + requirements.txt

ディレクトリ構造が設定されたので、モデルをトレーニングしましょう。

まず、ライブラリをインポートします。

import seaborn as sns
import numpy as np
import pandas as pd

from sklearn import preprocessing
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import make_pipeline
from sklearn.compose import make_column_transformer

import joblib
import logging

# set logging to see the docker container logs
logging.basicConfig(level=logging.INFO)

次に、次の変数を定義します。PROJECT_ID はプロジェクト ID に、BUCKET_NAME は前の手順で作成したバケットに置き換えてください。

REGION = "us-central1"
MODEL_ARTIFACT_DIR = "sklearn-model-artifacts"
REPOSITORY = "diamonds"
IMAGE = "sklearn-image"
MODEL_DISPLAY_NAME = "diamonds-cpr"

# Replace with your project
PROJECT_ID = "{PROJECT_ID}"

# Replace with your bucket
BUCKET_NAME = "gs://{BUCKET_NAME}"

Seaborn ライブラリからデータを読み込み、2 つのデータフレーム(1 つは特徴付き、もう 1 つはラベル付き)を作成します。

data = sns.load_dataset('diamonds', cache=True, data_home=None)

label = 'price'

y_train = data['price']
x_train = data.drop(columns=['price'])

トレーニング データを見てみましょう。各行が菱形を表していることがわかります。

x_train.head()

ラベル(対応する価格)があります。

y_train.head()

次に、sklearn 列変換を定義して、カテゴリ特徴量をワンホット エンコードし、数値特徴量をスケーリングします。

column_transform = make_column_transformer(
    (preprocessing.OneHotEncoder(), [1,2,3]),
    (preprocessing.StandardScaler(), [0,4,5,6,7,8]))

ランダム フォレスト モデルを定義する

regr = RandomForestRegressor(max_depth=10, random_state=0)

次に、sklearn パイプラインを作成します。つまり、このパイプラインにフィードされるデータは、まずエンコードまたはスケーリングされてからモデルに渡されます。

my_pipeline = make_pipeline(column_transform, regr)

トレーニング データにパイプラインを適合させる

my_pipeline.fit(x_train, y_train)

モデルを試して、期待どおりに機能することを確認しましょう。モデルで predict メソッドを呼び出して、テストサンプルを渡します。

my_pipeline.predict([[0.23, 'Ideal', 'E', 'SI2', 61.5, 55.0, 3.95, 3.98, 2.43]])

これで、パイプラインを model_artifacts ディレクトリに保存し、Cloud Storage バケットにコピーできます。

joblib.dump(my_pipeline, 'model_artifacts/model.joblib')

!gsutil cp model_artifacts/model.joblib {BUCKET_NAME}/{MODEL_ARTIFACT_DIR}/

ステップ 3: 前処理アーティファクトを保存する

次に、前処理アーティファクトを作成します。このアーティファクトは、モデルサーバーの起動時にカスタム コンテナに読み込まれます。前処理アーティファクトは、ほぼすべての形式(pickle ファイルなど)にできますが、この場合は、JSON ファイルにディクショナリを書き込みます。

clarity_dict={"Flawless": "FL",
              "Internally Flawless": "IF",
              "Very Very Slightly Included": "VVS1",
              "Very Slightly Included": "VS2",
              "Slightly Included": "S12",
              "Included": "I3"}

トレーニング データの clarity 特徴は、常に省略形式(「Flawless」ではなく「FL」など)でした。サービス提供時に、この機能のデータも省略されていることを確認します。モデルは「FL」をワンホット エンコードする方法を認識していますが、「Flawless」は認識していないためです。このカスタム前処理ロジックは後で記述します。ここでは、このルックアップ テーブルを JSON ファイルに保存して、Cloud Storage バケットに書き込みます。

import json
with open("model_artifacts/preprocessor.json", "w") as f:
    json.dump(clarity_dict, f)

!gsutil cp model_artifacts/preprocessor.json {BUCKET_NAME}/{MODEL_ARTIFACT_DIR}/

ローカルの cpr-codelab ディレクトリは次のようになります。

+ cpr-codelab/
    + model_artifacts/
        + model.joblib
        + preprocessor.json
    + scr_dir/
        + requirements.txt
    + task.ipynb
    + requirements.txt

6. CPR モデルサーバーを使用してカスタム サービング コンテナをビルドする

モデルがトレーニングされ、前処理アーティファクトが保存されたので、カスタム サービング コンテナをビルドします。通常、サービング コンテナを構築するには、モデルサーバー コードを記述する必要があります。一方、カスタム予測ルーチンを使用すると、Vertex AI Prediction がモデルサーバーを生成し、カスタム コンテナ イメージを構築します。

カスタム サービング コンテナには、次の 3 つのコードが含まれています。

  1. モデルサーバー(SDK によって自動的に生成され、scr_dir/ に保存されます)
    • モデルをホストする HTTP サーバー
    • ルートやポートなどの設定を担当します。
  2. リクエスト ハンドラ
    • リクエスト本文のシリアル化解除、レスポンスのシリアル化、レスポンス ヘッダーの設定など、リクエスト処理のウェブサーバー側の処理を担当します。
    • この例では、SDK で提供されているデフォルトのハンドラ google.cloud.aiplatform.prediction.handler.PredictionHandler を使用します。
  3. 予測子
    • 予測リクエストを処理する ML ロジックを担当します。

これらの各コンポーネントは、ユースケースの要件に基づいてカスタマイズできます。この例では、予測器のみを実装します。

予測器は、カスタム前処理や後処理など、予測リクエストを処理する ML ロジックを担います。カスタム予測ロジックを記述するには、Vertex AI Predictor インターフェースをサブクラス化します。

このリリースのカスタム予測ルーチンには、再利用可能な XGBoost 予測と Sklearn 予測子が含まれていますが、別のフレームワークを使用する必要がある場合は、基本予測子をサブクラス化することで独自の予測を作成できます。

Sklearn 予測子の例を以下に示します。このカスタム モデル サーバーを構築するために記述する必要があるコードはこれだけです。

sklearn_predictor

ノートブックに次のコードを貼り付けて、SklearnPredictor をサブクラス化し、src_dir/ の Python ファイルに書き込みます。この例では、predict メソッドではなく、loadpreprocesspostprocess メソッドのみをカスタマイズします。

%%writefile $USER_SRC_DIR/predictor.py

import joblib
import numpy as np
import json

from google.cloud import storage
from google.cloud.aiplatform.prediction.sklearn.predictor import SklearnPredictor


class CprPredictor(SklearnPredictor):

    def __init__(self):
        return

    def load(self, artifacts_uri: str) -> None:
        """Loads the sklearn pipeline and preprocessing artifact."""

        super().load(artifacts_uri)

        # open preprocessing artifact
        with open("preprocessor.json", "rb") as f:
            self._preprocessor = json.load(f)


    def preprocess(self, prediction_input: np.ndarray) -> np.ndarray:
        """Performs preprocessing by checking if clarity feature is in abbreviated form."""

        inputs = super().preprocess(prediction_input)

        for sample in inputs:
            if sample[3] not in self._preprocessor.values():
                sample[3] = self._preprocessor[sample[3]]
        return inputs

    def postprocess(self, prediction_results: np.ndarray) -> dict:
        """Performs postprocessing by rounding predictions and converting to str."""

        return {"predictions": [f"${value}" for value in np.round(prediction_results)]}

それぞれの方法について詳しく見ていきましょう。

  • load メソッドは、前処理アーティファクトを読み込みます。この場合、ダイヤモンドの透明度値を略語にマッピングする辞書です。
  • preprocess メソッドは、そのアーティファクトを使用して、サービング時に鮮明度特徴が省略形式であることを確認します。一致しない場合は、文字列全体が省略形に変換されます。
  • postprocess メソッドは、予測値を $ 記号付きの文字列として返します。値は四捨五入されます。

次に、Vertex AI Python SDK を使用してイメージをビルドします。カスタム予測ルーチンを使用すると、Dockerfile が生成され、イメージがビルドされます。

from google.cloud import aiplatform

aiplatform.init(project=PROJECT_ID, location=REGION)

import os

from google.cloud.aiplatform.prediction import LocalModel

from src_dir.predictor import CprPredictor  # Should be path of variable $USER_SRC_DIR

local_model = LocalModel.build_cpr_model(
    USER_SRC_DIR,
    f"{REGION}-docker.pkg.dev/{PROJECT_ID}/{REPOSITORY}/{IMAGE}",
    predictor=CprPredictor,
    requirements_path=os.path.join(USER_SRC_DIR, "requirements.txt"),
)

予測用の 2 つのサンプルを含むテストファイルを作成します。一方のインスタンスには明瞭な略称が使われていますが、もう一方のインスタンスは最初に変換する必要があります。

import json

sample = {"instances": [
  [0.23, 'Ideal', 'E', 'VS2', 61.5, 55.0, 3.95, 3.98, 2.43],
  [0.29, 'Premium', 'J', 'Internally Flawless', 52.5, 49.0, 4.00, 2.13, 3.11]]}

with open('instances.json', 'w') as fp:
    json.dump(sample, fp)

ローカルモデルをデプロイして、コンテナをローカルでテストします。

with local_model.deploy_to_local_endpoint(
    artifact_uri = 'model_artifacts/', # local path to artifacts
) as local_endpoint:
    predict_response = local_endpoint.predict(
        request_file='instances.json',
        headers={"Content-Type": "application/json"},
    )

    health_check_response = local_endpoint.run_health_check()

予測結果は、次のコマンドで確認できます。

predict_response.content

7. モデルを Vertex AI にデプロイする

コンテナをローカルでテストしたので、次はイメージを Artifact Registry に push し、モデルを Vertex AI Model Registry にアップロードします。

まず、Artifact Registry にアクセスできるように Docker を構成します。

!gcloud artifacts repositories create {REPOSITORY} --repository-format=docker \
--location=us-central1 --description="Docker repository"


!gcloud auth configure-docker {REGION}-docker.pkg.dev --quiet

次に、イメージを push します。

local_model.push_image()

モデルをアップロードします

model = aiplatform.Model.upload(local_model = local_model,
                                display_name=MODEL_DISPLAY_NAME,
                                artifact_uri=f"{BUCKET_NAME}/{MODEL_ARTIFACT_DIR}",)

モデルがアップロードされると、コンソールに表示されます。

model_registry

次に、オンライン予測に使用できるようにモデルをデプロイします。カスタム予測ルーチンはバッチ予測でも機能するため、ユースケースでオンライン予測が不要な場合は、モデルをデプロイする必要はありません。

endpoint = model.deploy(machine_type="n1-standard-2")

最後に、予測を取得してデプロイされたモデルをテストします。

endpoint.predict(instances=[[0.23, 'Ideal', 'E', 'VS2', 61.5, 55.0, 3.95, 3.98, 2.43]])

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • カスタム予測ルーチンを使用してカスタム前処理ロジックとカスタム後処理ロジックを記述する

Vertex AI のさまざまな機能の詳細については、こちらのドキュメントをご覧ください。

8. クリーンアップ

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Google Cloud コンソールの Workbench UI で、ノートブックを選択して [停止] を選択します。

ノートブックを完全に削除する場合は、右上にある [削除] ボタンをクリックします。

Stop_nb

デプロイしたエンドポイントを削除するには、コンソールの [エンドポイント] セクションに移動し、作成したエンドポイントをクリックして、[エンドポイントからモデルのデプロイを解除] を選択します。

delete_endpoint

コンテナ イメージを削除するには、Artifact Registry に移動し、作成したリポジトリを選択して、[削除] を選択します。

delete_image

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除