1. 概要
このラボでは、Vertex Pipelines を使った ML パイプラインの作成方法と実行方法について学びます。
学習内容
次の方法を学習します。
- Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
- 3 つのステップからなり、入力としてテキストを受け取る簡単なパイプラインを作成して実行する
- AutoML 分類モデルのトレーニング、評価、デプロイを行うパイプラインを作成して実行する
google_cloud_pipeline_components
ライブラリを通して提供される事前構築済みコンポーネントを使用して Vertex AI サービスとやり取りする- Cloud Scheduler でパイプライン ジョブのスケジュールを設定する
このラボを Google Cloud で実行するための総費用は約 $25 です。
2. Vertex AI の概要
このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。
Vertex AI には、モデルのトレーニングとデプロイ サービスに加え、このラボで取り上げる Vertex Pipelines、Model Monitoring、Feature Store など、さまざまな MLOps プロダクトが含まれています。以下の図ですべての Vertex AI プロダクトを確認できます。
ご意見やご質問がありましたら、サポートページからお寄せください。
ML パイプラインはなぜ有用か?
本題に入る前に、なぜパイプラインを使用するのかについて理解しておきましょう。データの処理、モデルのトレーニング、ハイパーパラメータの調整、評価、モデルのデプロイを含む ML ワークフローを構築しているとします。これらのステップにはそれぞれ異なる依存関係があり、ワークフロー全体をモノリスとして扱うと、扱いづらくなる場合があります。また、ML プロセスを拡張する際は、チームの他のメンバーがワークフローを実行し、コーディングに参加できるように、ML ワークフローを共有したいところですが、信頼性と再現性のあるプロセスがなければ困難です。パイプラインでは、ML プロセスの各ステップがそれぞれのコンテナとなります。これにより、ステップを独立して開発し、各ステップからの入力と出力を再現可能な方法で追跡できます。また、新しいトレーニング データが利用可能になったらパイプラインの実行を開始するなど、クラウド環境内の他のイベントに基づいてパイプラインの実行をスケジュールまたはトリガーすることもできます。
要約: パイプラインを使用すると、ML ワークフローの自動化と再現が可能です。
3. クラウド環境の設定
この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。
ステップ 1: Cloud Shell を起動する
このラボでは、Cloud Shell セッションで作業します。Cloud Shell は、Google のクラウドで実行されている仮想マシンによってホストされるコマンド インタープリタです。このセクションは、パソコンでもローカルで簡単に実行できますが、Cloud Shell を使用することで、誰もが一貫した環境での再現可能な操作性を利用できるようになります。本ラボの後、このセクションをパソコン上で再度実行してみてください。
Cloud Shell をアクティブにする
Cloud コンソールの右上にある次のボタンをクリックして、Cloud Shell を有効にする。
Cloud Shell を初めて起動する場合は、その内容を説明する中間画面(スクロールしなければ見えない範囲)が表示されます。その場合は、[続行] をクリックします(二度と表示されません)。この中間画面は次のようになります。
Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。
この仮想マシンには、必要な開発ツールがすべて含まれています。仮想マシンは Google Cloud で稼働し、永続的なホーム ディレクトリが 5 GB 用意されているため、ネットワークのパフォーマンスと認証が大幅に向上しています。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。
Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。
Cloud Shell で次のコマンドを実行して、認証されたことを確認します。
gcloud auth list
コマンド出力に次のように表示されます。
Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。
gcloud config list project
コマンド出力
[core] project = <PROJECT_ID>
上記のようになっていない場合は、次のコマンドで設定できます。
gcloud config set project <PROJECT_ID>
コマンド出力
Updated property [core/project].
Cloud Shell には、現在の Cloud プロジェクトの名前が格納されている GOOGLE_CLOUD_PROJECT
など、いくつかの環境変数があります。ラボでは、さまざまな場所でこれを使用します。次を実行すると確認できます。
echo $GOOGLE_CLOUD_PROJECT
ステップ 2: API を有効にする
これらのサービスがどんな場面で(なぜ)必要になるのかは、後の手順でわかります。とりあえず、次のコマンドを実行して Compute Engine、Container Registry、Vertex AI の各サービスへのアクセス権を取得します。
gcloud services enable compute.googleapis.com \
containerregistry.googleapis.com \
aiplatform.googleapis.com \
cloudbuild.googleapis.com \
cloudfunctions.googleapis.com
成功すると次のようなメッセージが表示されます。
Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.
ステップ 3: Cloud Storage バケットを作成する
Vertex AI でトレーニング ジョブを実行するには、保存対象のモデルアセットを格納するストレージ バケットが必要です。これはリージョンのバケットである必要があります。ここでは us-central
を使用しますが、別のリージョンを使用することもできます(その場合はラボ内の該当箇所をすべて置き換えてください)。すでにバケットがある場合は、この手順を省略できます。
Cloud Shell で次のコマンドを実行して、バケットを作成します。
BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME
次に、このバケットへのアクセス権をコンピューティング サービス アカウントに付与します。これにより、このバケットにファイルを書き込むために必要な権限が Vertex Pipelines に付与されます。次のコマンドを実行してこの権限を付与します。
gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin
ステップ 4: Vertex AI Workbench インスタンスを作成する
Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。
そこから、[ユーザー管理のノートブック] で [新しいノートブック] をクリックします。
次に、TensorFlow Enterprise 2.3(LTS 版)インスタンス タイプ(GPU なし)を選択します。
デフォルトのオプションを使用して、[作成] をクリックします。
ステップ 5: ノートブックを開く
インスタンスが作成されたら、[JupyterLab を開く] を選択します。
4. Vertex Pipelines の設定
Vertex Pipelines を使用するためには、いくつかのライブラリを追加でインストールする必要があります。
- Kubeflow Pipelines: パイプラインの構築に使用する SDK です。Vertex Pipelines は、Kubeflow Pipelines と TFX の両方で構築されたパイプラインの実行をサポートします。
- Google Cloud パイプライン コンポーネント: このライブラリは、パイプラインの各ステップで Vertex AI サービスとのやり取りを簡単にする事前構築済みコンポーネントを提供します。
ステップ 1: Python ノートブックを作成してライブラリをインストールする
まず、ノートブック インスタンスのランチャー メニューから [Python 3] を選択して、ノートブックを作成します。
ランチャー メニューにアクセスするには、ノートブック インスタンスの左上にある [+] 記号をクリックします。
このラボで使用する両方のサービスをインストールするために、最初にノートブック セルでユーザーフラグを設定します。
USER_FLAG = "--user"
続いて、ノートブックから次のコードを実行します。
!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0
これらのパッケージをインストールした後、カーネルを再起動する必要があります。
import os
if not os.getenv("IS_TESTING"):
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
最後に、パッケージを正しくインストールしたことを確認します。KFP SDK のバージョンは 1.8 以上である必要があります。
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"
ステップ 2: プロジェクト ID とバケットを設定する
このラボでは、自分の Cloud プロジェクト ID と前に作成したバケットを参照します。次に、これらを格納する変数をそれぞれ作成します。
プロジェクト ID がわからない場合は、次のコードを実行して取得できる可能性があります。
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
print("Project ID: ", PROJECT_ID)
それ以外の場合は、こちらで設定します。
if PROJECT_ID == "" or PROJECT_ID is None:
PROJECT_ID = "your-project-id" # @param {type:"string"}
次に、バケット名を格納する変数を作成します。このラボで作成した場合は、以下のようになります。作成していない場合は以下を手動で設定する必要があります。
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"
ステップ 3: ライブラリをインポートする
この Codelab 全体で使用するライブラリをインポートするために、次のコードを追加します。
import kfp
from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics
from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple
ステップ 4: 定数を定義する
パイプラインの構築前に最後に行うのが、いくつかの定数変数の定義です。PIPELINE_ROOT
は、パイプラインによって作成されるアーティファクトを書き込む Cloud Storage のパスです。ここではリージョンとして us-central1
を使用しますが、バケットの作成時に別のリージョンを使用した場合は、次のコード内の REGION
変数を更新します。
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"
PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT
上のコードを実行すると、パイプラインのルート ディレクトリが表示されます。これは、パイプラインからのアーティファクトが書き込まれる Cloud Storage の場所です。形式は gs://YOUR-BUCKET-NAME/pipeline_root/
です。
5. 最初のパイプラインを作成する
Vertex Pipelines の仕組みを理解するために、最初に KFP SDK を使用して短いパイプラインを作成します。このパイプラインは ML 関連の処理は何も行いません(それについては後で学習します)。今回はこのパイプラインを使用して次の方法について学びます。
- KFP SDK でカスタム コンポーネントを作成する方法
- Vertex Pipelines でパイプラインを実行してモニタリングする方法
2 つの出力(商品名と絵文字の説明)を使用して 1 つの文を表示するパイプラインを作成します。このパイプラインは 3 つのコンポーネントで構成されます。
product_name
: このコンポーネントは商品名(または指定した名詞)を入力として受け取り、その文字列を出力として返します。emoji
: このコンポーネントは絵文字の説明テキストを受け取り、それを絵文字に変換します。たとえば、✨ のテキストコードは「sparkles」です。このコンポーネントでは絵文字ライブラリを使用しており、パイプラインで外部依存関係を管理する方法を見ることができます。build_sentence
: この最後のコンポーネントは、前の 2 つの出力を使用して、絵文字を含む文を作成します。たとえば、「Vertex Pipelines is ✨」のような出力が得られます。
コードの作成
ステップ 1: Python 関数ベースのコンポーネントを作成する
KFP SDK を使用すると、Python の関数に基づくコンポーネントを作成できます。最初のパイプラインの 3 つのコンポーネントで KFP SDK を使用します。まず product_name
コンポーネントを作成します。このコンポーネントは文字列を入力として受け取り、その文字列を返す単純なものです。ノートブックに次のコードを追加します。
@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
return text
構文を詳しく見ていきます。
@component
デコレータは、パイプラインの実行時にこの関数をコンポーネントにコンパイルします。カスタム コンポーネントを記述するときには、これを使用します。base_image
パラメータは、このコンポーネントが使用するコンテナ イメージを指定します。output_component_file
パラメータはオプションで、コンパイルしたコンポーネントを書き込む yaml ファイルを指定します。セルの実行後、そのファイルがノートブック インスタンスに書き込まれているのを確認できます。このコンポーネントを他のユーザーと共有したい場合は、生成した yaml ファイルをそのユーザーに送信し、次のコードでそれを読み込んでもらうことができます。
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
- 関数定義の後の
-> str
は、このコンポーネントの出力タイプを指定します。
ステップ 2: 追加で 2 つのコンポーネントを作成する
パイプラインを完成させるために、コンポーネントをさらに 2 つ作成します。最初の定義では、文字列を入力として受け取り、対応する絵文字がある場合は、その文字列を絵文字に変換します。渡された入力テキストと結果の絵文字を含むタプルを返します。
@component(packages_to_install=["emoji"])
def emoji(
text: str,
) -> NamedTuple(
"Outputs",
[
("emoji_text", str), # Return parameters
("emoji", str),
],
):
import emoji
emoji_text = text
emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
return (emoji_text, emoji_str)
このコンポーネントは前のものよりも多少複雑です。ここで把握しておくべき新しい点は以下の通りです。
packages_to_install
パラメータは、このコンテナの外部ライブラリ依存関係をコンポーネントに示します。ここでは、emoji というライブラリを使用しています。- このコンポーネントは、
Outputs
というNamedTuple
を返します。このタプルに含まれる各文字列には、emoji_text
とemoji
というキーがあります。次のコンポーネントで、これらを使用して出力にアクセスします。
このパイプラインの最後のコンポーネントは、最初の 2 つのコンポーネントの出力を組み合わせて 1 つの文字列を返します。
@component
def build_sentence(
product: str,
emoji: str,
emojitext: str
) -> str:
print("We completed the pipeline, hooray!")
end_str = product + " is "
if len(emoji) > 0:
end_str += emoji
else:
end_str += emojitext
return(end_str)
このコンポーネントは、前のステップで定義した出力を使用することをどのように知るのかと疑問に思われるかもしれません。重要なポイントです。次のステップで全体をまとめます。
ステップ 3: コンポーネントをパイプラインへとまとめる
上で定義したコンポーネント定義により、パイプライン定義でステップの作成に使用できるファクトリ関数が作成されました。パイプラインを設定するには、@pipeline
デコレータを使用して、パイプラインの名前と説明を指定し、パイプラインのアーティファクトを書き込むルートパスを指定します。アーティファクトとは、パイプラインで生成される出力ファイルを意味します。これは、この簡単なパイプラインでは生成されませんが、次のパイプラインでは生成されます。
次のコードブロックで、intro_pipeline
関数を定義します。ここで最初のパイプライン ステップへの入力、およびステップ間の関係を指定します。
product_task
は商品名を入力として受け取ります。ここでは「Vertex Pipelines」を渡していますが、これは任意の名前に変更できます。emoji_task
は絵文字のテキストコードを入力として受け取ります。これも任意の文字列に変更できます。たとえば、「party_face」は 🥳 という絵文字を表します。このコンポーネントとproduct_task
コンポーネントはどちらも、それらに入力を与えるステップがないため、パイプラインを定義するときに手動で入力を指定します。- パイプラインの最後のステップである
consumer_task
には、3 つの入力パラメータがあります。product_task
の出力。このステップは 1 つの出力のみを生成するため、product_task.output
を介してそれを参照できます。emoji_task
ステップのemoji
出力。出力パラメータに名前を付けたemoji
コンポーネントの定義を参照してください。- 同様に、
emoji
コンポーネントからのemoji_text
名前付き出力。絵文字に対応しないテキストがパイプラインに渡された場合は、このテキストを使用して文を作成します。
@pipeline(
name="hello-world",
description="An intro pipeline",
pipeline_root=PIPELINE_ROOT,
)
# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
product_task = product_name(text)
emoji_task = emoji(emoji_str)
consumer_task = build_sentence(
product_task.output,
emoji_task.outputs["emoji"],
emoji_task.outputs["emoji_text"],
)
ステップ 4: パイプラインをコンパイルして実行する
パイプラインを定義したら、次はそれをコンパイルします。次のコードで、パイプラインの実行に使用する JSON ファイルを生成します。
compiler.Compiler().compile(
pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)
次に、TIMESTAMP
変数を作成します。これをジョブ ID に使用します。
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
次に、パイプライン ジョブを定義します。
job = aiplatform.PipelineJob(
display_name="hello-world-pipeline",
template_path="intro_pipeline_job.json",
job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
enable_caching=True
)
最後に、ジョブを実行して新しいパイプライン実行を作成します。
job.submit()
このセルを実行すると、コンソールでパイプラインの実行結果を表示するためのリンクを含むログが表示されます。
そのリンクに移動します。完了すると、パイプラインは次のようになります。
このパイプラインは実行に 5 ~ 6 分かかります。完了したら、build-sentence
コンポーネントをクリックして最終出力を表示できます。
KFP SDK と Vertex Pipelines の機能に習熟したことで、他の Vertex AI サービスを使用して ML モデルの作成やデプロイを行うパイプラインを構築できるようになりました。それでは詳しく見ていきましょう。
6. エンドツーエンドの ML パイプラインを作成する
それでは、最初の ML パイプラインを構築しましょう。このパイプラインでは、UCI Machine Learning の Dry beans データセットを使用します(出典: KOKLU, M. および OZKAN, I.A.(2020 年)「Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques」、Computers and Electronics in Agriculture、174、105507。DOI。
これは表形式のデータセットです。パイプラインでこのデータセットを使用し、豆をその特徴に基づいて 7 種類に分類する AutoML モデルのトレーニング、評価、デプロイを行います。
このパイプラインは次の処理を行います。
- にデータセットを作成する
- AutoML を使用して表形式の分類モデルをトレーニングする
- このモデルの評価指標を取得する
- 評価指標に基づき、Vertex Pipelines で条件付きロジックを使用して、モデルをデプロイするかどうかを決定する
- Vertex Prediction を使用してモデルをエンドポイントにデプロイする
これらの各ステップがコンポーネントとなります。パイプラインのほとんどのステップでは、この Codelab で前にインポートした google_cloud_pipeline_components
ライブラリを介して Vertex AI サービス用の事前構築済みコンポーネントを使用します。このセクションでは、最初に 1 つのカスタム コンポーネントを定義します。次に、事前構築済みコンポーネントを使用してパイプラインの残りのステップを定義します。事前構築済みコンポーネントにより、モデルのトレーニングやデプロイなど、Vertex AI サービスへのアクセスが簡単になります。
ステップ 1: モデル評価用のカスタム コンポーネント
ここで定義するカスタム コンポーネントは、モデルのトレーニングの完了後、パイプラインの終わりにかけて使用されます。このコンポーネントはいくつかの処理を実行します。
- トレーニング済みの AutoML 分類モデルから評価指標を取得する
- 指標を解析し、Vertex Pipelines UI にレンダリングする
- 指標をしきい値と比較して、モデルをデプロイするかどうかを決定する
コンポーネントを定義する前に、その入力パラメータと出力パラメータについて理解します。パイプラインは入力として Cloud プロジェクトのメタデータ、結果のトレーニング済みモデル(このコンポーネントは後で定義します)、モデルの評価指標、そして thresholds_dict_str
を受け取ります。thresholds_dict_str
は、パイプラインの実行時に定義します。この分類モデルの場合、これはモデルをデプロイする ROC 曲線値の下側の領域となります。たとえば、0.95 を渡した場合は、この指標が 95% を超える場合にのみパイプラインでモデルをデプロイすることを意味します。
評価コンポーネントは、モデルをデプロイするかどうかを示す文字列を返します。ノートブックのセルに次のコードを追加して、このカスタム コンポーネントを作成します。
@component(
base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
output_component_file="tabular_eval_component.yaml",
packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
project: str,
location: str, # "us-central1",
api_endpoint: str, # "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str,
model: Input[Artifact],
metrics: Output[Metrics],
metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]): # Return parameter.
import json
import logging
from google.cloud import aiplatform as aip
# Fetch model eval info
def get_eval_info(client, model_name):
from google.protobuf.json_format import MessageToDict
response = client.list_model_evaluations(parent=model_name)
metrics_list = []
metrics_string_list = []
for evaluation in response:
print("model_evaluation")
print(" name:", evaluation.name)
print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
metrics = MessageToDict(evaluation._pb.metrics)
for metric in metrics.keys():
logging.info("metric: %s, value: %s", metric, metrics[metric])
metrics_str = json.dumps(metrics)
metrics_list.append(metrics)
metrics_string_list.append(metrics_str)
return (
evaluation.name,
metrics_list,
metrics_string_list,
)
# Use the given metrics threshold(s) to determine whether the model is
# accurate enough to deploy.
def classification_thresholds_check(metrics_dict, thresholds_dict):
for k, v in thresholds_dict.items():
logging.info("k {}, v {}".format(k, v))
if k in ["auRoc", "auPrc"]: # higher is better
if metrics_dict[k] < v: # if under threshold, don't deploy
logging.info("{} < {}; returning False".format(metrics_dict[k], v))
return False
logging.info("threshold checks passed.")
return True
def log_metrics(metrics_list, metricsc):
test_confusion_matrix = metrics_list[0]["confusionMatrix"]
logging.info("rows: %s", test_confusion_matrix["rows"])
# log the ROC curve
fpr = []
tpr = []
thresholds = []
for item in metrics_list[0]["confidenceMetrics"]:
fpr.append(item.get("falsePositiveRate", 0.0))
tpr.append(item.get("recall", 0.0))
thresholds.append(item.get("confidenceThreshold", 0.0))
print(f"fpr: {fpr}")
print(f"tpr: {tpr}")
print(f"thresholds: {thresholds}")
metricsc.log_roc_curve(fpr, tpr, thresholds)
# log the confusion matrix
annotations = []
for item in test_confusion_matrix["annotationSpecs"]:
annotations.append(item["displayName"])
logging.info("confusion matrix annotations: %s", annotations)
metricsc.log_confusion_matrix(
annotations,
test_confusion_matrix["rows"],
)
# log textual metrics info as well
for metric in metrics_list[0].keys():
if metric != "confidenceMetrics":
val_string = json.dumps(metrics_list[0][metric])
metrics.log_metric(metric, val_string)
# metrics.metadata["model_type"] = "AutoML Tabular classification"
logging.getLogger().setLevel(logging.INFO)
aip.init(project=project)
# extract the model resource name from the input Model Artifact
model_resource_path = model.metadata["resourceName"]
logging.info("model path: %s", model_resource_path)
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
client = aip.gapic.ModelServiceClient(client_options=client_options)
eval_name, metrics_list, metrics_str_list = get_eval_info(
client, model_resource_path
)
logging.info("got evaluation name: %s", eval_name)
logging.info("got metrics list: %s", metrics_list)
log_metrics(metrics_list, metricsc)
thresholds_dict = json.loads(thresholds_dict_str)
deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
if deploy:
dep_decision = "true"
else:
dep_decision = "false"
logging.info("deployment decision is %s", dep_decision)
return (dep_decision,)
ステップ 2: Google Cloud の事前構築済みコンポーネントを追加する
このステップでは、パイプラインの残りのコンポーネントを定義し、それらすべてがどのように機能するのかを確認します。最初に、タイムスタンプを使用してパイプライン実行の表示名を定義します。
import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)
次に、ノートブックの新しいセルに以下をコピーします。
@pipeline(name="automl-tab-beans-training-v2",
pipeline_root=PIPELINE_ROOT)
def pipeline(
bq_source: str = "bq://aju-dev-demos.beans.beans1",
display_name: str = DISPLAY_NAME,
project: str = PROJECT_ID,
gcp_region: str = "us-central1",
api_endpoint: str = "us-central1-aiplatform.googleapis.com",
thresholds_dict_str: str = '{"auRoc": 0.95}',
):
dataset_create_op = gcc_aip.TabularDatasetCreateOp(
project=project, display_name=display_name, bq_source=bq_source
)
training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
project=project,
display_name=display_name,
optimization_prediction_type="classification",
budget_milli_node_hours=1000,
column_transformations=[
{"numeric": {"column_name": "Area"}},
{"numeric": {"column_name": "Perimeter"}},
{"numeric": {"column_name": "MajorAxisLength"}},
{"numeric": {"column_name": "MinorAxisLength"}},
{"numeric": {"column_name": "AspectRation"}},
{"numeric": {"column_name": "Eccentricity"}},
{"numeric": {"column_name": "ConvexArea"}},
{"numeric": {"column_name": "EquivDiameter"}},
{"numeric": {"column_name": "Extent"}},
{"numeric": {"column_name": "Solidity"}},
{"numeric": {"column_name": "roundness"}},
{"numeric": {"column_name": "Compactness"}},
{"numeric": {"column_name": "ShapeFactor1"}},
{"numeric": {"column_name": "ShapeFactor2"}},
{"numeric": {"column_name": "ShapeFactor3"}},
{"numeric": {"column_name": "ShapeFactor4"}},
{"categorical": {"column_name": "Class"}},
],
dataset=dataset_create_op.outputs["dataset"],
target_column="Class",
)
model_eval_task = classification_model_eval_metrics(
project,
gcp_region,
api_endpoint,
thresholds_dict_str,
training_op.outputs["model"],
)
with dsl.Condition(
model_eval_task.outputs["dep_decision"] == "true",
name="deploy_decision",
):
endpoint_op = gcc_aip.EndpointCreateOp(
project=project,
location=gcp_region,
display_name="train-automl-beans",
)
gcc_aip.ModelDeployOp(
model=training_op.outputs["model"],
endpoint=endpoint_op.outputs["endpoint"],
dedicated_resources_min_replica_count=1,
dedicated_resources_max_replica_count=1,
dedicated_resources_machine_type="n1-standard-4",
)
このコードの内容は次のとおりです。
- 最初に、前のパイプラインと同様に、このパイプラインが受け取る入力パラメータを定義します。これらはパイプライン内の他のステップの出力には依存しないため、手動で設定する必要があります。
- パイプラインの残りの部分では、Vertex AI サービスとやり取りするために、いくつかの事前構築済みコンポーネントを使用します。
TabularDatasetCreateOp
は、Cloud Storage または BigQuery をデータセットのソースとして、Vertex AI に表形式のデータセットを作成します。このパイプラインでは、BigQuery テーブルの URL を介してデータを渡します。AutoMLTabularTrainingJobRunOp
は、表形式のデータセットに対する AutoML トレーニング ジョブを開始します。このコンポーネントには、モデルタイプ(ここでは分類)、列のデータ、トレーニングの実行時間、データセットへのポインタなど、いくつかの構成パラメータを渡します。このコンポーネントにデータセットを渡すために、dataset_create_op.outputs["dataset"]
で前のコンポーネントの出力を提供しています。EndpointCreateOp
は、Vertex AI にエンドポイントを作成します。このステップで作成されたエンドポイントは、次のコンポーネントに入力として渡されます。ModelDeployOp
は、指定されたモデルを Vertex AI のエンドポイントにデプロイします。この例では、前の手順で作成したエンドポイントを使用します。追加の構成オプションも使用できますが、ここでは、デプロイするエンドポイントのマシンタイプとモデルを指定します。パイプラインのトレーニング ステップの出力にアクセスして、モデルを渡します。
- このパイプラインでは、条件付きロジックも使用されています。これは、条件を定義し、その条件の結果に基づいて異なるブランチを実行できる Vertex Pipelines の機能です。パイプラインを定義したときに
thresholds_dict_str
パラメータを渡したことを思い出してください。これは、モデルをエンドポイントにデプロイするかどうかの決定に使用する精度のしきい値です。これを実装するために、KFP SDK のCondition
クラスを使用します。渡す条件は、この Codelab で前に定義したカスタム評価コンポーネントの出力です。この条件が true の場合、パイプラインは引き続きdeploy_op
コンポーネントを実行します。精度が事前定義されたしきい値を満たしていない場合、パイプラインはここで停止し、モデルをデプロイしません。
ステップ 3: エンドツーエンドの ML パイプラインをコンパイルして実行する
パイプライン全体が定義されたので、これをコンパイルします。
compiler.Compiler().compile(
pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)
次に、ジョブを定義します。
ml_pipeline_job = aiplatform.PipelineJob(
display_name="automl-tab-beans-training",
template_path="tab_classif_pipeline.json",
pipeline_root=PIPELINE_ROOT,
parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
enable_caching=True
)
最後に、ジョブを実行します。
ml_pipeline_job.submit()
上のセルの実行後にログに表示されるリンクに移動して、パイプラインをコンソールに表示します。このパイプラインの実行には 1 時間以上かかります。そのほとんどの時間は AutoML のトレーニング ステップに費やされます。完成したパイプラインは次のようになります。
上部の「アーティファクトを開く」ボタンを使用して、パイプラインで作成された各アーティファクトの詳細を表示できます。たとえば、dataset
アーティファクトをクリックすると、作成された Vertex AI データセットの詳細が表示されます。ここからリンクをクリックして、そのデータセットのページに移動できます。
同様に、カスタム評価コンポーネントの結果として得られた指標の可視化を見るには、metricsc というアーティファクトをクリックします。ダッシュボードの右側で、このモデルの混同行列を確認できます。
このパイプラインの実行によって作成されたモデルとエンドポイントを表示するには、モデル セクションに移動し、automl-beans
という名前のモデルをクリックします。このモデルがエンドポイントにデプロイされているのを確認できます。
パイプラインのグラフで endpoint アーティファクトをクリックしても、このページにアクセスできます。
コンソールでパイプラインのグラフを確認するほかに、Vertex Pipelines で リネージのトラッキングを使用することもできます。リネージのトラッキングとは、パイプライン全体で作成されたアーティファクトのトラッキングを意味します。これは、アーティファクトがどこで作成され、ML ワークフロー全体でどのように使用されているかを理解するのに役立ちます。たとえば、このパイプラインで作成されたデータセットについてリネージのトラッキングを見るには、dataset アーティファクトをクリックしてから、[リネージを表示] をクリックします。
このアーティファクトが使用されているすべての場所が表示されます。
ステップ 4: パイプラインの実行間で指標を比較する
このパイプラインを複数回実行すると、実行間で指標を比較したくなるかもしれません。aiplatform.get_pipeline_df()
メソッドを使用して、実行のメタデータにアクセスできます。ここでは、このパイプラインのすべての実行のメタデータを取得し、Pandas DataFrame に読み込みます。
pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df
これでラボは完了です。
お疲れさまでした
Vertex AI を使って次のことを行う方法を学びました。
- Kubeflow Pipelines SDK を使用してカスタム コンポーネントでエンドツーエンドのパイプラインを作成する
- Vertex Pipelines でパイプラインを実行し、SDK を使用してパイプラインの実行を開始する
- コンソールで Vertex Pipelines のグラフを確認、分析する
- 事前構築済みのパイプライン コンポーネントを使ってパイプラインに Vertex AI サービスを追加する
- パイプライン ジョブの定期的な実行をスケジュールする
Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。
7. クリーンアップ
料金が発生しないようにするため、このラボで作成したリソースを削除することをおすすめします。
ステップ 1: Notebooks インスタンスを停止または削除する
このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。
ステップ 2: エンドポイントを削除する
デプロイしたエンドポイントを削除するには、Vertex AI コンソールの [エンドポイント] セクションに移動し、削除アイコンをクリックします。
次のプロンプトから [Undeploy] をクリックします。
最後に、コンソールの [モデル] セクションに移動して、そのモデルを見つけ、右側のその他メニューから [モデルを削除] をクリックします。
ステップ 3: Cloud Storage バケットを削除する
ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。