Vertex Pipelines でのカスタムモデル トレーニングの実行

1. 概要

このラボでは、Vertex Pipelines で Kubeflow Pipelines SDK を使ってモデルのトレーニング用のカスタムジョブを実行する方法について学びます。

学習内容

次の方法を学習します。

  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • Vertex AI マネージド データセットを使用し、パイプライン内の Vertex AI Training で実行されるカスタム Scikit-learn モデル トレーニング ジョブを作成してコンテナ化する
  • Vertex Pipelines 内でバッチ予測ジョブを実行する
  • google_cloud_pipeline_components ライブラリを通じて提供される事前構築済みコンポーネントを使用して Vertex AI サービスとやり取りする

このラボを Google Cloud で実行するための総費用は約 $5 です。

2. Vertex AI の概要

このラボでは、Google Cloud 上のエンドツーエンドのマネージド ML プラットフォームである Vertex AI を使用します。Vertex AI は、Google Cloud 全体で Google の ML サービスを統合して、シームレスな開発エクスペリエンスを提供します。Vertex AI には、モデルのトレーニングとデプロイ サービスに加え、このラボで取り上げる Vertex Pipelines、Model Monitoring、Feature Store など、さまざまな MLOps プロダクトが含まれています。以下の図ですべての Vertex AI プロダクトを確認できます。

Vertex プロダクトの概要

ご意見やご質問がありましたら、サポートページからお寄せください。

ML パイプラインはなぜ有用か?

本題に入る前に、なぜパイプラインを使用するのかについて理解しておきましょう。データの処理、モデルのトレーニング、ハイパーパラメータの調整、評価、モデルのデプロイを含む ML ワークフローを構築しているとします。これらのステップにはそれぞれ異なる依存関係があり、ワークフロー全体をモノリスとして扱うと、扱いづらくなる場合があります。また、ML プロセスを拡張する際は、チームの他のメンバーがワークフローを実行し、コーディングに参加できるように、ML ワークフローを共有したいところですが、信頼性と再現性のあるプロセスがなければ困難です。パイプラインでは、ML プロセスの各ステップがそれぞれのコンテナとなります。これにより、ステップを独立して開発し、各ステップからの入力と出力を再現可能な方法で追跡できます。また、新しいトレーニング データが利用可能になったらパイプラインの実行を開始するなど、クラウド環境内の他のイベントに基づいてパイプラインの実行をスケジュールまたはトリガーすることもできます。

要約: パイプラインは、ML ワークフローの合理化と再現に役立ちます。

3. クラウド環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Cloud Shell を起動する

このラボでは、Cloud Shell セッションで作業します。Cloud Shell は、Google のクラウドで実行されている仮想マシンによってホストされるコマンド インタープリタです。このセクションは、パソコンでもローカルで簡単に実行できますが、Cloud Shell を使用することで、誰もが一貫した環境での再現可能な操作性を利用できるようになります。本ラボの後、このセクションをパソコン上で再度実行してみてください。

Cloud Shell を承認する

Cloud Shell をアクティブにする

Cloud コンソールの右上で、下のボタンをクリックして [Cloud Shell をアクティブにする] をクリックします。

Cloud Shell をアクティブにする

Cloud Shell を初めて起動する場合は、その内容を説明する中間画面(スクロールしなければ見えない範囲)が表示されます。その場合は、[続行] をクリックします(今後表示されなくなります)。この中間画面は次のようになります。

Cloud Shell の設定

Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。

Cloud Shell の初期化

この仮想マシンには、必要な開発ツールがすべて含まれています。永続的なホーム ディレクトリが 5 GB 用意されており、Google Cloud で稼働するため、ネットワークのパフォーマンスと認証が大幅に向上しています。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。

Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。

Cloud Shell で次のコマンドを実行して、認証されたことを確認します。

gcloud auth list

コマンド出力に次のように表示されます。

Cloud Shell の出力

Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。

gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

Cloud Shell には、現在の Cloud プロジェクトの名前が格納されている GOOGLE_CLOUD_PROJECT など、いくつかの環境変数があります。ラボでは、さまざまな場所でこれを使用します。次を実行すると確認できます。

echo $GOOGLE_CLOUD_PROJECT

ステップ 2: API を有効にする

これらのサービスがどこで(なぜ)必要になるかは、後の手順でわかります。とりあえず、次のコマンドを実行して Compute Engine、Container Registry、Vertex AI の各サービスへのアクセス権をプロジェクトに付与します。

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com

成功すると次のようなメッセージが表示されます。

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

ステップ 3: Cloud Storage バケットを作成する

Vertex AI でトレーニング ジョブを実行するには、保存したモデルアセットを保存するストレージ バケットが必要です。これはリージョンのバケットである必要があります。ここでは us-central を使用していますが、別のリージョンを使用することもできます(その場合はラボ内の該当箇所をすべて置き換えてください)。すでにバケットがある場合は、この手順を省略できます。

Cloud Shell で次のコマンドを実行して、バケットを作成します。

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

次に、このバケットへのアクセス権をコンピューティング サービス アカウントに付与します。これにより、このバケットにファイルを書き込むために必要な権限が Vertex Pipelines に付与されます。次のコマンドを実行してこの権限を付与します。

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

ステップ 4: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

その後、[ユーザー管理のノートブック] で [新しいノートブック] をクリックします。

新しいノートブックを作成

次に、[TensorFlow Enterprise 2.3 (with LTS)] インスタンス タイプを選択します([GPU なし])。

TFE インスタンス

デフォルトのオプションを使用して、[作成] をクリックします。

ステップ 5: ノートブックを開く

インスタンスが作成されたら、[JupyterLab を開く] を選択します。

ノートブックを開く

4. Vertex Pipelines の設定

Vertex Pipelines を使用するためには、いくつかのライブラリを追加でインストールする必要があります。

  • Kubeflow Pipelines: これはパイプラインの構築に使用する SDK です。Vertex Pipelines は、Kubeflow Pipelines と TFX の両方で構築されたパイプラインの実行をサポートします。
  • Google Cloud パイプライン コンポーネント: このライブラリは、パイプラインの各ステップで Vertex AI サービスとのやり取りを簡単にする事前構築済みコンポーネントを提供します。

ステップ 1: Python ノートブックを作成してライブラリをインストールする

まず、ノートブック インスタンスの Launcher メニュー(ノートブックの左上にある [+] アイコンをクリックしてアクセスできます)で、[Python 3] を選択してノートブックを作成します。

Python3 ノートブックを作成する

ランチャー メニューにアクセスするには、ノートブック インスタンスの左上にある [+] 記号をクリックします。

このラボで使用する両方のサービスをインストールするために、最初にノートブック セルでユーザーフラグを設定します。

USER_FLAG = "--user"

続いて、ノートブックから次のコードを実行します。

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

これらのパッケージをインストールした後、カーネルを再起動する必要があります。

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最後に、パッケージを正しくインストールしたことを確認します。KFP SDK のバージョンは 1.8 以上である必要があります。

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

ステップ 2: プロジェクト ID とバケットを設定する

このラボでは、自分の Cloud プロジェクト ID と前に作成したバケットを参照します。次に、これらごとに変数を作成します。

プロジェクト ID がわからない場合は、次のコードを実行して取得できる可能性があります。

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

それ以外の場合は、こちらで設定します。

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

次に、バケット名を格納する変数を作成します。このラボで作成した場合は、以下のようになります。作成していない場合は以下を手動で設定する必要があります。

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

ステップ 3: ライブラリをインポートする

以下を追加して、この Codelab 全体を通して使用するライブラリをインポートします。

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip

ステップ 4: 定数を定義する

パイプラインを構築する前に行う必要がある最後の作業は、いくつかの定数を定義することです。PIPELINE_ROOT は、パイプラインによって作成されたアーティファクトが書き込まれる Cloud Storage パスです。ここではリージョンとして us-central1 を使用しますが、バケットの作成時に別のリージョンを使用した場合は、次のコードの REGION 変数を更新します。

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

上のコードを実行すると、パイプラインのルート ディレクトリが表示されます。これは、パイプラインからのアーティファクトが書き込まれる Cloud Storage の場所です。gs://YOUR-BUCKET-NAME/pipeline_root/ の形式になります。

5. カスタムモデルのトレーニング ジョブの構成

パイプラインを設定する前に、カスタムモデルのトレーニング ジョブのコードを記述する必要があります。モデルをトレーニングするために、UCI ML の Dry Beans データセットを使用します。このデータセットは KOKLU, M. および OZKAN, I.A.、(2020 年)「Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques」、Computers and Electronics in Agriculture、174、105507。DOI

パイプラインの最初のステップでは、この豆データのバージョンを含む BigQuery テーブルを使用して、Vertex AI でマネージド データセットを作成します。データセットはトレーニング ジョブへの入力として渡されます。このトレーニング コードでは、このマネージド データセットにアクセスするための環境変数へのアクセス権を取得します。

カスタム トレーニング ジョブの設定方法は次のとおりです。

  • データ内の豆の種類を分類する Scikit-learn DecisionTreeClassifier モデルを作成する
  • トレーニング コードを Docker コンテナにパッケージ化して Container Registry に push する

そこから、パイプラインから直接 Vertex AI Training ジョブを開始できます。では始めましょう。

ステップ 1: Docker コンテナでトレーニング コードを定義する

Notebooks インスタンスからランチャーを開き、[ターミナル] を選択します。

ターミナルを開く

次に、次のコマンドを実行して、コンテナ化されたコードを追加するディレクトリを設定します。

mkdir traincontainer
cd traincontainer
touch Dockerfile

mkdir trainer
touch trainer/train.py

これらのコマンドを実行すると、左側に traincontainer/ というディレクトリが表示されます(表示するには更新アイコンのクリックが必要な場合があります)。traincontainer/ ディレクトリには、次のことがわかります。

+ Dockerfile
+ trainer/
    + train.py

コードをコンテナ化する最初のステップは、Dockerfile の作成です。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。使用しているすべてのライブラリがインストールされ、トレーニング コードのエントリ ポイントが設定されます。作成した Dockerfile を開き、次の内容を追加します。

FROM gcr.io/deeplearning-platform-release/sklearn-cpu.0-23
WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

RUN pip install sklearn google-cloud-bigquery joblib pandas google-cloud-storage

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.train"]

ノートブック インスタンスで編集中のファイルを保存するには、ctrl+s を使用します。

次に、train.py ファイルを開きます。ここにトレーニング コードを追加します。次の内容を train.py にコピーします。これにより、マネージド データセットからデータが取得され、Pandas DataFrame に配置されます。さらに、Scikit-learn モデルのトレーニングが行われ、トレーニング済みのモデルが Cloud Storage にアップロードされます。

from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import roc_curve
from sklearn.model_selection import train_test_split
from google.cloud import bigquery
from google.cloud import storage
from joblib import dump

import os
import pandas as pd

bqclient = bigquery.Client()
storage_client = storage.Client()

def download_table(bq_table_uri: str):
    prefix = "bq://"
    if bq_table_uri.startswith(prefix):
        bq_table_uri = bq_table_uri[len(prefix):]

    table = bigquery.TableReference.from_string(bq_table_uri)
    rows = bqclient.list_rows(
        table,
    )
    return rows.to_dataframe(create_bqstorage_client=False)

# These environment variables are from Vertex AI managed datasets
training_data_uri = os.environ["AIP_TRAINING_DATA_URI"]
test_data_uri = os.environ["AIP_TEST_DATA_URI"]

# Download data into Pandas DataFrames, split into train / test
df = download_table(training_data_uri)
test_df = download_table(test_data_uri)
labels = df.pop("Class").tolist()
data = df.values.tolist()
test_labels = test_df.pop("Class").tolist()
test_data = test_df.values.tolist()

# Define and train the Scikit model
skmodel = DecisionTreeClassifier()
skmodel.fit(data, labels)
score = skmodel.score(test_data, test_labels)
print('accuracy is:',score)

# Save the model to a local file
dump(skmodel, "model.joblib")

# Upload the saved model file to GCS
bucket = storage_client.get_bucket("YOUR_GCS_BUCKET")
model_directory = os.environ["AIP_MODEL_DIR"]
storage_path = os.path.join(model_directory, "model.joblib")
blob = storage.blob.Blob.from_string(storage_path, client=storage_client)
blob.upload_from_filename("model.joblib")

次に、ノートブックに戻り、次のコマンドを実行して、上記のスクリプトの YOUR_GCS_BUCKET を Cloud Storage バケットの名前に置き換えます。

BUCKET = BUCKET_NAME[5:] # Trim the 'gs://' before adding to train script
!sed -i -r 's@YOUR_GCS_BUCKET@'"$BUCKET"'@' traincontainer/trainer/train.py

この設定は、必要に応じて手動で行うこともできます。その場合は、スクリプトを更新するときにバケット名に gs:// を含めないでください。

これで、トレーニング コードが Docker コンテナに格納され、クラウドでトレーニングを実行する準備が整いました。

ステップ 2: コンテナを Container Registry に push する

トレーニング コードが完成したので、これを Google Container Registry に push する準備ができました。後でパイプラインのトレーニング コンポーネントを構成するときに、このコンテナを Vertex Pipelines にポイントします。

ターミナルに戻り、traincontainer/ ディレクトリのルートから Container Registry のコンテナ イメージの URI を示す変数を定義します。

PROJECT_ID=$(gcloud config get-value project)
IMAGE_URI="gcr.io/$PROJECT_ID/scikit:v1"

次に、以下のコマンドを実行してコンテナをビルドします。

docker build ./ -t $IMAGE_URI

最後に、コンテナを Container Registry に push します。

docker push $IMAGE_URI

Cloud コンソールの [Container Registry] セクションに移動して、コンテナが存在することを確認します。内容は次のようになります。

Container Registry

6. バッチ予測ジョブの構成

パイプラインの最後のステップでは、バッチ予測ジョブを実行します。そのためには、予測を取得するサンプルを含む CSV ファイルを Cloud Storage に提供する必要があります。この CSV ファイルをノートブックに作成し、gsutil コマンドライン ツールを使用して Cloud Storage にコピーします。

バッチ予測の例を Cloud Storage にコピーする

次のファイルには、beans データセットの各クラスから 3 つの例が含まれています。以下の例には Class 列は含まれていません。これがモデルの予測対象であるためです。次のコマンドを実行して、この CSV ファイルをノートブックにローカルに作成します。

%%writefile batch_examples.csv
Area,Perimeter,MajorAxisLength,MinorAxisLength,AspectRation,Eccentricity,ConvexArea,EquivDiameter,Extent,Solidity,roundness,Compactness,ShapeFactor1,ShapeFactor2,ShapeFactor3,ShapeFactor4
23288,558.113,207.567738,143.085693,1.450653336,0.7244336162,23545,172.1952453,0.8045881703,0.9890847314,0.9395021523,0.8295857874,0.008913077034,0.002604069884,0.6882125787,0.9983578734
23689,575.638,205.9678003,146.7475015,1.403552348,0.7016945718,24018,173.6714472,0.7652721693,0.9863019402,0.8983750474,0.8431970773,0.00869465998,0.002711119968,0.7109813112,0.9978994889
23727,559.503,189.7993849,159.3717704,1.190922235,0.5430731512,24021,173.8106863,0.8037601626,0.9877607094,0.952462433,0.9157600082,0.007999299741,0.003470231343,0.8386163926,0.9987269085
31158,641.105,212.0669751,187.1929601,1.132879009,0.4699241567,31474,199.1773023,0.7813134733,0.989959967,0.9526231013,0.9392188582,0.0068061806,0.003267009878,0.8821320637,0.9993488983
32514,649.012,221.4454899,187.1344232,1.183349841,0.5346736437,32843,203.4652564,0.7849831,0.9899826447,0.9700068737,0.9188051492,0.00681077351,0.002994124691,0.8442029022,0.9989873701
33078,659.456,235.5600775,178.9312328,1.316483846,0.6503915309,33333,205.2223615,0.7877214708,0.9923499235,0.9558229607,0.8712102818,0.007121351881,0.002530662194,0.7590073551,0.9992209221
33680,683.09,256.203255,167.9334938,1.525623324,0.7552213942,34019,207.081404,0.80680321,0.9900349805,0.9070392732,0.8082699962,0.007606985006,0.002002710402,0.6533003868,0.9966903078
33954,716.75,277.3684803,156.3563259,1.773951126,0.825970469,34420,207.9220419,0.7994819873,0.9864613597,0.8305492781,0.7496238998,0.008168948587,0.001591181142,0.5619359911,0.996846984
36322,719.437,272.0582306,170.8914975,1.591993952,0.7780978465,36717,215.0502424,0.7718560075,0.9892420405,0.8818487005,0.7904566678,0.007490177594,0.001803782407,0.6248217437,0.9947124371
36675,742.917,285.8908964,166.8819538,1.713132487,0.8119506999,37613,216.0927123,0.7788277766,0.9750618137,0.8350248381,0.7558572692,0.0077952528,0.001569528272,0.5713202115,0.9787472145
37454,772.679,297.6274753,162.1493177,1.835514817,0.8385619338,38113,218.3756257,0.8016695205,0.9827093118,0.7883332637,0.7337213257,0.007946480356,0.001420623993,0.5383469838,0.9881438654
37789,766.378,313.5680678,154.3409867,2.031657789,0.8704771226,38251,219.3500608,0.7805870567,0.9879218844,0.8085170916,0.6995293312,0.008297866252,0.001225659709,0.4893412853,0.9941740339
47883,873.536,327.9986493,186.5201272,1.758516115,0.822571799,48753,246.9140116,0.7584464543,0.9821549443,0.7885506623,0.7527897207,0.006850002074,0.00135695419,0.5666923636,0.9965376533
49777,861.277,300.7570338,211.6168613,1.42123379,0.7105823885,50590,251.7499649,0.8019106536,0.9839296304,0.843243269,0.8370542883,0.00604208839,0.001829706116,0.7006598815,0.9958014989
49882,891.505,357.1890036,179.8346914,1.986207449,0.8640114945,51042,252.0153467,0.7260210171,0.9772736178,0.7886896753,0.7055518063,0.007160679276,0.001094585314,0.4978033513,0.9887407248
53249,919.923,325.3866286,208.9174205,1.557489212,0.7666552108,54195,260.3818974,0.6966846347,0.9825445152,0.7907120655,0.8002231025,0.00611066177,0.001545654241,0.6403570138,0.9973491406
61129,964.969,369.3481688,210.9473449,1.750902193,0.8208567513,61796,278.9836198,0.7501135067,0.9892064211,0.8249553283,0.7553404711,0.006042110436,0.001213219664,0.5705392272,0.9989583843
61918,960.372,353.1381442,224.0962377,1.575832543,0.7728529173,62627,280.7782864,0.7539207091,0.9886790043,0.8436218213,0.7950947556,0.005703319619,0.00140599258,0.6321756704,0.9962029945
141953,1402.05,524.2311633,346.3974998,1.513380332,0.7505863011,143704,425.1354762,0.7147107987,0.9878152313,0.9074598849,0.8109694843,0.003692991084,0.0009853172185,0.6576715044,0.9953071199
145285,1440.991,524.9567463,353.0769977,1.486805285,0.7400216694,146709,430.0960442,0.7860466375,0.9902937107,0.8792413513,0.8192980608,0.003613289371,0.001004269363,0.6712493125,0.9980170255
146153,1476.383,526.1933264,356.528288,1.475881001,0.7354662103,149267,431.3789276,0.7319360978,0.9791380546,0.8425962592,0.8198107159,0.003600290972,0.001003163512,0.6720896099,0.991924286

次に、ファイルを Cloud Storage バケットにコピーします。

!gsutil cp batch_examples.csv $BUCKET_NAME

次のステップでパイプラインを定義するときに、このファイルを参照します。

7. 事前構築済みコンポーネントを使用したパイプラインの構築

トレーニング コードがクラウドに保存されたので、パイプラインから呼び出す準備ができました。定義するパイプラインは、前にインストールした google_cloud_pipeline_components ライブラリの 3 つのビルド済みコンポーネントを使用します。これらの事前定義コンポーネントにより、パイプラインをセットアップするために記述する必要があるコードが簡素化され、モデルのトレーニングやバッチ予測などの Vertex AI サービスを使用できます。

この 3 ステップのパイプラインは次のように動作します。

  • Vertex AI でマネージド データセットを作成する
  • 設定したカスタム コンテナを使用して Vertx AI でトレーニング ジョブを実行する
  • トレーニング済みの scikit-learn 分類モデルでバッチ予測ジョブを実行する

ステップ 1: パイプラインを定義する

事前構築済みコンポーネントを使用しているため、パイプライン定義でパイプライン全体を設定できます。ノートブックのセルに以下を追加します。

@pipeline(name="automl-beans-custom",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://sara-vertex-demos.beans_demo.large_dataset",
    bucket: str = BUCKET_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = REGION,
    bq_dest: str = "",
    container_uri: str = "",
    batch_destination: str = ""
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        display_name="tabular-beans-dataset",
        bq_source=bq_source,
        project=project,
        location=gcp_region
    )

    training_op = gcc_aip.CustomContainerTrainingJobRunOp(
        display_name="pipeline-beans-custom-train",
        container_uri=container_uri,
        project=project,
        location=gcp_region,
        dataset=dataset_create_op.outputs["dataset"],
        staging_bucket=bucket,
        training_fraction_split=0.8,
        validation_fraction_split=0.1,
        test_fraction_split=0.1,
        bigquery_destination=bq_dest,
        model_serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-24:latest",
        model_display_name="scikit-beans-model-pipeline",
        machine_type="n1-standard-4",
    )
    batch_predict_op = gcc_aip.ModelBatchPredictOp(
        project=project,
        location=gcp_region,
        job_display_name="beans-batch-predict",
        model=training_op.outputs["model"],
        gcs_source_uris=["{0}/batch_examples.csv".format(BUCKET_NAME)],
        instances_format="csv",
        gcs_destination_output_uri_prefix=batch_destination,
        machine_type="n1-standard-4"
    )

ステップ 2: パイプラインをコンパイルして実行する

パイプラインを定義したら、次はそれをコンパイルします。次のコードで、パイプラインの実行に使用する JSON ファイルを生成します。

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="custom_train_pipeline.json"
)

次に、TIMESTAMP 変数を作成します。これをジョブ ID に使用します。

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

次に、パイプライン ジョブを定義し、プロジェクト固有のパラメータをいくつか渡します。

pipeline_job = aiplatform.PipelineJob(
    display_name="custom-train-pipeline",
    template_path="custom_train_pipeline.json",
    job_id="custom-train-pipeline-{0}".format(TIMESTAMP),
    parameter_values={
        "project": PROJECT_ID,
        "bucket": BUCKET_NAME,
        "bq_dest": "bq://{0}".format(PROJECT_ID),
        "container_uri": "gcr.io/{0}/scikit:v1".format(PROJECT_ID),
        "batch_destination": "{0}/batchpredresults".format(BUCKET_NAME)
    },
    enable_caching=True,
)

最後に、ジョブを実行して新しいパイプライン実行を作成します。

pipeline_job.submit()

このセルを実行すると、コンソールにパイプライン実行を表示するリンクを含むログが表示されます。

パイプライン ジョブのログ

そのリンクに移動します。パイプライン ダッシュボードを開いてアクセスすることもできます。完了すると、パイプラインは次のようになります。

完了した簡単なパイプライン

このパイプラインは実行に 5 ~ 10 分かかりますが、完了する前に次のステップに進むことができます。次に、これらの各パイプライン ステップで行われる処理について詳しく説明します。

8. パイプラインの実行について

パイプラインの 3 つのステップをそれぞれ詳しく見ていきましょう。

パイプライン ステップ 1: マネージド データセットを作成する

パイプラインの最初のステップでは、Vertex AI にマネージド データセットを作成します。コンソールの [パイプライン] セクションで、次のデータセット リンクをクリックすると表示されます。

パイプラインからデータセットへのリンク

Vertex AI にデータセットが表示されます。これには、BigQuery のデータソースへのリンクと、データセット内の各列に関する情報が含まれています。マネージド データセットを Vertex AI にアップロードしたら、AutoML やカスタムモデルのトレーニングに使用できます。

マネージド データセットを使用するカスタム モデル ジョブの場合、Vertex AI は特別な環境変数をトレーニング ジョブに渡し、データをトレーニング用データとテスト用データに分割します。これを次のパイプライン ステップで利用します。

パイプライン ステップ 2: Vertex AI Training でモデルをトレーニングする

カスタム トレーニング ジョブの実行中、クリックして Vertex Pipelines コンソールで直接ログを表示できます。

カスタム トレーニング ログ

カスタム トレーニング ジョブの詳細は、Vertex AI のトレーニング ダッシュボードでも確認できます。トレーニング ジョブが完了すると、Vertex AI にモデルリソースが作成されます。その後、このモデルをオンライン予測用のエンドポイントにデプロイするか、バッチ予測ジョブを作成します。これは次のパイプライン ステップで行います。

パイプライン ステップ 3: モデルでバッチ予測ジョブを実行する

最後に、パイプラインは CSV ファイルで渡したサンプルに対する予測を取得します。バッチ予測ジョブが完了すると、Vertex AI は Cloud Storage 内の指定した場所に CSV ファイルを書き込みます。このパイプライン ステップの実行が開始されたら、Vertex AI コンソールの [バッチ予測] セクションに移動して、作成されたジョブを確認できます。

ジョブが完了したら、ジョブをクリックしてバッチ予測の Cloud Storage URL を確認します。

バッチ予測ジョブ

そのリンクをクリックして、予測結果を確認できる Cloud Storage ディレクトリに移動し、いずれかの prediction.results ファイルをダウンロードします。このファイルには、次のような行が表示されます。

{"instance": [33954.0, 716.75, 277.3684803, 156.3563259, 1.773951126, 0.825970469, 34420.0, 207.9220419, 0.7994819873, 0.9864613597, 0.8305492781, 0.7496238998, 0.008168948587, 0.001591181142, 0.5619359911, 0.996846984], "prediction": "HOROZ"}

これには、特定のインスタンスの特徴値や、モデルが予測したクラスが含まれます。この例では、モデルはこれが「HOROZ」であると判断しましたあります。

これでラボは終了です。

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • Kubeflow Pipelines SDK を使用してカスタム コンポーネントでエンドツーエンドのパイプラインを構築する
  • Vertex Pipelines でパイプラインを実行し、SDK でパイプラインの実行を開始する
  • コンソールで Vertex Pipelines グラフを表示して分析する
  • 事前構築済みのパイプライン コンポーネントを使用して Vertex AI サービスをパイプラインに追加する
  • 繰り返し実行するパイプライン ジョブをスケジュールする

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

9. クリーンアップ

料金が発生しないようにするため、このラボで作成したリソースを削除することをおすすめします。

ステップ 1: Notebooks インスタンスを停止または削除する

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。

インスタンスの停止

ステップ 2: Cloud Storage バケットを削除する

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除