Vertex Pipelines の概要

1. 概要

このラボでは、Vertex Pipelines を使用した ML パイプラインの作成方法と実行方法について学びます。

学習内容

次の方法を学習します。

  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • 3 つのステップからなり、入力としてテキストを受け取る簡単なパイプラインを作成して実行する
  • AutoML 分類モデルのトレーニング、評価、デプロイを行うパイプラインを作成して実行する
  • google_cloud_pipeline_components ライブラリを通じて提供される事前構築済みコンポーネントを使用して Vertex AI サービスとやり取りする
  • Cloud Scheduler でパイプライン ジョブのスケジュールを設定する

このラボを Google Cloud で実行するための総費用は約 $25 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、モデルのトレーニングとデプロイ サービスに加え、このラボで取り上げる Vertex Pipelines、Model Monitoring、Feature Store など、さまざまな MLOps プロダクトが含まれています。以下の図ですべての Vertex AI プロダクトを確認できます。

Vertex プロダクトの概要

ご意見やご質問がありましたら、サポートページからお寄せください。

ML パイプラインはなぜ有用か?

本題に入る前に、なぜパイプラインを使用するのかについて理解しておきましょう。データの処理、モデルのトレーニング、ハイパーパラメータの調整、評価、モデルのデプロイを含む ML ワークフローを構築しているとします。これらのステップにはそれぞれ異なる依存関係があり、ワークフロー全体をモノリスとして扱うと、扱いづらくなる場合があります。また、ML プロセスを拡張する際は、チームの他のメンバーがワークフローを実行し、コーディングに参加できるように、ML ワークフローを共有したいところですが、信頼性と再現性のあるプロセスがなければ困難です。パイプラインでは、ML プロセスの各ステップがそれぞれのコンテナとなります。これにより、ステップを独立して開発し、各ステップからの入力と出力を再現可能な方法で追跡できます。また、新しいトレーニング データが利用可能になったらパイプラインの実行を開始するなど、クラウド環境内の他のイベントに基づいてパイプラインの実行をスケジュールまたはトリガーすることもできます。

要約: パイプラインは、ML ワークフローの自動化再現に役立ちます。

3. クラウド環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Cloud Shell を起動する

このラボでは、Cloud Shell セッションで作業します。Cloud Shell は、Google のクラウドで実行されている仮想マシンによってホストされるコマンド インタープリタです。このセクションは、パソコンでもローカルで簡単に実行できますが、Cloud Shell を使用することで、誰もが一貫した環境での再現可能な操作性を利用できるようになります。本ラボの後、このセクションをパソコン上で再度実行してみてください。

Cloud Shell を承認する

Cloud Shell をアクティブにする

Cloud コンソールの右上で、下のボタンをクリックして [Cloud Shell をアクティブにする] をクリックします。

Cloud Shell をアクティブにする

Cloud Shell を初めて起動する場合は、その内容を説明する中間画面(スクロールしなければ見えない範囲)が表示されます。その場合は、[続行] をクリックします(今後表示されなくなります)。この中間画面は次のようになります。

Cloud Shell の設定

Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。

Cloud Shell の初期化

この仮想マシンには、必要な開発ツールがすべて含まれています。永続的なホーム ディレクトリが 5 GB 用意されており、Google Cloud で稼働するため、ネットワークのパフォーマンスと認証が大幅に向上しています。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。

Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。

Cloud Shell で次のコマンドを実行して、認証されたことを確認します。

gcloud auth list

コマンド出力に次のように表示されます。

Cloud Shell の出力

Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。

gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

Cloud Shell には、現在の Cloud プロジェクトの名前が格納されている GOOGLE_CLOUD_PROJECT など、いくつかの環境変数があります。ラボでは、さまざまな場所でこれを使用します。次を実行すると確認できます。

echo $GOOGLE_CLOUD_PROJECT

ステップ 2: API を有効にする

これらのサービスがどこで(なぜ)必要になるかは、後の手順でわかります。とりあえず、次のコマンドを実行して Compute Engine、Container Registry、Vertex AI の各サービスへのアクセス権をプロジェクトに付与します。

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

成功すると次のようなメッセージが表示されます。

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

ステップ 3: Cloud Storage バケットを作成する

Vertex AI でトレーニング ジョブを実行するには、保存したモデルアセットを保存するストレージ バケットが必要です。これはリージョンのバケットである必要があります。ここでは us-central を使用していますが、別のリージョンを使用することもできます(その場合はラボ内の該当箇所をすべて置き換えてください)。すでにバケットがある場合は、この手順を省略できます。

Cloud Shell で次のコマンドを実行して、バケットを作成します。

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

次に、このバケットへのアクセス権をコンピューティング サービス アカウントに付与します。これにより、Vertex Pipelines にこのバケットへのファイルの書き込みに必要な権限が付与されます。次のコマンドを実行してこの権限を付与します。

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

ステップ 4: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

その後、[ユーザー管理のノートブック] で [新しいノートブック] をクリックします。

新しいノートブックを作成

次に、[TensorFlow Enterprise 2.3 (with LTS)] インスタンス タイプを選択します([GPU なし])。

TFE インスタンス

デフォルトのオプションを使用して、[作成] をクリックします。

ステップ 5: ノートブックを開く

インスタンスが作成されたら、[JupyterLab を開く] を選択します。

ノートブックを開く

4. Vertex Pipelines の設定

Vertex Pipelines を使用するためには、いくつかのライブラリを追加でインストールする必要があります。

  • Kubeflow Pipelines: これはパイプラインの構築に使用する SDK です。Vertex Pipelines は、Kubeflow Pipelines と TFX の両方で構築されたパイプラインの実行をサポートします。
  • Google Cloud パイプライン コンポーネント: このライブラリは、パイプラインの各ステップで Vertex AI サービスとのやり取りを簡単にする事前構築済みコンポーネントを提供します。

ステップ 1: Python ノートブックを作成してライブラリをインストールする

まず、ノートブック インスタンスの Launcher メニューから [Python 3] を選択してノートブックを作成します。

Python3 ノートブックを作成する

ランチャー メニューにアクセスするには、ノートブック インスタンスの左上にある [+] 記号をクリックします。

このラボで使用する両方のサービスをインストールするために、最初にノートブック セルでユーザーフラグを設定します。

USER_FLAG = "--user"

続いて、ノートブックから次のコードを実行します。

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

これらのパッケージをインストールした後、カーネルを再起動する必要があります。

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最後に、パッケージを正しくインストールしたことを確認します。KFP SDK のバージョンは 1.8 以上である必要があります。

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

ステップ 2: プロジェクト ID とバケットを設定する

このラボでは、自分の Cloud プロジェクト ID と前に作成したバケットを参照します。次に、これらごとに変数を作成します。

プロジェクト ID がわからない場合は、次のコードを実行して取得できる可能性があります。

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

それ以外の場合は、こちらで設定します。

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

次に、バケット名を格納する変数を作成します。このラボで作成した場合は、以下のようになります。作成していない場合は以下を手動で設定する必要があります。

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

ステップ 3: ライブラリをインポートする

以下を追加して、この Codelab 全体を通して使用するライブラリをインポートします。

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

ステップ 4: 定数を定義する

パイプラインを構築する前に行う必要がある最後の作業は、いくつかの定数を定義することです。PIPELINE_ROOT は、パイプラインによって作成されたアーティファクトが書き込まれる Cloud Storage パスです。ここではリージョンとして us-central1 を使用しますが、バケットの作成時に別のリージョンを使用した場合は、次のコードの REGION 変数を更新します。

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

上のコードを実行すると、パイプラインのルート ディレクトリが表示されます。これは、パイプラインからのアーティファクトが書き込まれる Cloud Storage の場所です。gs://YOUR-BUCKET-NAME/pipeline_root/ の形式になります。

5. 最初のパイプラインの作成

Vertex Pipelines の仕組みを理解するために、まず KFP SDK を使用して短いパイプラインを作成します。このパイプラインは ML 関連の処理は何も行いません(それについては後で学習します)。今回はこのパイプラインを使用して次の方法について学びます。

  • KFP SDK でカスタム コンポーネントを作成する方法
  • Vertex Pipelines でパイプラインを実行してモニタリングする方法

2 つの出力(商品名と絵文字の説明)を使用して 1 つの文を出力するパイプラインを作成します。このパイプラインは 3 つのコンポーネントで構成されます。

  • product_name: このコンポーネントは商品名(または任意の名詞)を入力として受け取り、その文字列を出力として返します。
  • emoji: このコンポーネントは絵文字の説明テキストを受け取り、それを絵文字に変換します。たとえば、✨ のテキストコードは「sparkles」です。このコンポーネントでは絵文字ライブラリを使用して、パイプラインで外部依存関係を管理する方法を示す
  • build_sentence: この最後のコンポーネントは、前の 2 つの出力を使用して、絵文字を使用する文を作成します。たとえば、「Vertex Pipelines is ✨」のような出力が得られます。

コーディングを始めましょう。

ステップ 1: Python の関数ベースのコンポーネントを作成する

KFP SDK を使用すると、Python の関数に基づくコンポーネントを作成できます。最初のパイプラインの 3 つのコンポーネントにこれを使用します。まず product_name コンポーネントを作成します。このコンポーネントは文字列を入力として受け取り、その文字列を返す単純なものです。ノートブックに次のコードを追加します。

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

構文を詳しく見ていきます。

  • @component デコレータは、パイプラインの実行時にこの関数をコンポーネントにコンパイルします。カスタム コンポーネントを記述するときには、これを使用します。
  • base_image パラメータは、このコンポーネントが使用するコンテナ イメージを指定します。
  • output_component_file パラメータはオプションで、コンパイルされたコンポーネントを書き込む yaml ファイルを指定します。セルの実行後、そのファイルがノートブック インスタンスに書き込まれているのを確認できます。このコンポーネントを他のユーザーと共有したい場合は、生成した yaml ファイルをそのユーザーに送信し、次のコードでそれを読み込んでもらうことができます。
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 関数定義の後の -> str は、このコンポーネントの出力タイプを指定します。

ステップ 2: 追加で 2 つのコンポーネントを作成する

パイプラインを完成させるために、さらに 2 つのコンポーネントを作成します。最初の定義は、文字列を入力として受け取り、対応する絵文字がある場合は、その文字列を絵文字に変換します。渡された入力テキストと結果の絵文字を含むタプルを返します。

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

このコンポーネントは前のものよりも少し複雑です。ここで把握しておくべき新しい点は以下の通りです。

  • packages_to_install パラメータは、このコンテナの外部ライブラリ依存関係をコンポーネントに伝えます。ここでは「emoji」というライブラリを使用します。
  • このコンポーネントは Outputs という NamedTuple を返します。このタプルの各文字列には、emoji_textemoji というキーがあります。次のコンポーネントで、これらを使用して出力にアクセスします。

このパイプラインの最後のコンポーネントは、最初の 2 つのコンポーネントの出力を組み合わせて 1 つの文字列を返します。

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

このコンポーネントは、前のステップで定義した出力を使用することをどのように知るのかと疑問に思われるかもしれません。重要なポイントです。次のステップで全体をまとめます。

ステップ 3: コンポーネントをパイプラインへとまとめる

上で定義したコンポーネント定義により、パイプライン定義でステップの作成に使用できるファクトリ関数が作成されました。パイプラインを設定するには、@pipeline デコレータを使用して、パイプラインの名前と説明を指定し、パイプラインのアーティファクトを書き込むルートパスを指定します。アーティファクトとは、パイプラインによって生成された出力ファイルを意味します。これは、この簡単なパイプラインでは生成されませんが、次のパイプラインでは生成されます。

次のコードブロックでは、intro_pipeline 関数を定義します。ここで最初のパイプライン ステップへの入力、およびステップ間の関係を指定します。

  • product_task は商品名を入力として受け取ります。ここでは「Vertex Pipelines」をこれは任意の名前に変更できます
  • emoji_task は絵文字のテキストコードを入力として受け取ります。これも任意の文字列に変更できます。たとえば、「party_face」は 🥳 という絵文字を表します。このコンポーネントと product_task コンポーネントにはどちらも、入力をフィードするステップがないため、パイプラインを定義するときに手動で入力を指定します。
  • パイプラインの最後のステップ consumer_task には、次の 3 つの入力パラメータがあります。
    • product_task の出力。このステップでは 1 つの出力しか生成されないため、product_task.output を介して参照できます。
    • emoji_task ステップの emoji 出力。出力パラメータに名前を付けた emoji コンポーネントの定義をご覧ください。
    • 同様に、emoji コンポーネントからの emoji_text 名前付き出力。絵文字に対応しないテキストがパイプラインに渡された場合は、このテキストを使用して文を作成します。
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

ステップ 4: パイプラインをコンパイルして実行する

パイプラインを定義したら、次はそれをコンパイルします。次のコードで、パイプラインの実行に使用する JSON ファイルを生成します。

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

次に、TIMESTAMP 変数を作成します。これをジョブ ID に使用します。

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

次に、パイプライン ジョブを定義します。

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

最後に、ジョブを実行して新しいパイプライン実行を作成します。

job.submit()

このセルを実行すると、コンソールにパイプライン実行を表示するリンクを含むログが表示されます。

パイプライン ジョブのログ

そのリンクに移動します。完了すると、パイプラインは次のようになります。

完了した簡単なパイプライン

このパイプラインは実行に 5 ~ 6 分かかります。完了したら、build-sentence コンポーネントをクリックして最終出力を表示できます。

簡単なパイプラインの出力

KFP SDK と Vertex Pipelines の仕組みがわかったので、他の Vertex AI サービスを使用して ML モデルを作成してデプロイするパイプラインを構築できるようになりました。それでは詳しく見ていきましょう。

6. エンドツーエンドの ML パイプラインを作成する

それでは、最初の ML パイプラインを構築しましょう。このパイプラインでは、UCI ML の Dry beans データセットを使用します。このデータセットは、(2020 年)「Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques」、Computers and Electronics in Agriculture、174、105507。DOI

これは表形式のデータセットです。パイプラインでは、このデータセットを使用して、豆をその特徴に基づいて 7 つのタイプに分類する AutoML モデルのトレーニング、評価、デプロイを行います。

このパイプラインは次の処理を行います。

  • データセットを作成する
  • AutoML を使用して表形式の分類モデルをトレーニングする
  • このモデルの評価指標を取得する
  • 評価指標に基づき、Vertex Pipelines で条件付きロジックを使用して、モデルをデプロイするかどうかを決定する
  • Vertex Prediction を使用してモデルをエンドポイントにデプロイする

これらの各ステップがコンポーネントとなります。パイプラインのほとんどのステップでは、この Codelab で前にインポートした google_cloud_pipeline_components ライブラリを介して、Vertex AI サービス用の事前構築済みコンポーネントを使用します。このセクションでは、最初に 1 つのカスタム コンポーネントを定義します。次に、事前構築済みコンポーネントを使用してパイプラインの残りのステップを定義します。事前構築済みコンポーネントにより、モデルのトレーニングやデプロイなど、Vertex AI サービスへのアクセスが簡単になります。

ステップ 1: モデル評価用のカスタム コンポーネント

ここで定義するカスタム コンポーネントは、モデルのトレーニングの完了後、パイプラインの終わりにかけて使用されます。このコンポーネントはいくつかの処理を実行します。

  • トレーニング済みの AutoML 分類モデルから評価指標を取得する
  • 指標を解析し、Vertex Pipelines UI にレンダリングする
  • 指標をしきい値と比較して、モデルをデプロイするかどうかを決定する

コンポーネントを定義する前に、その入力パラメータと出力パラメータについて理解しましょう。このパイプラインは入力として、Cloud プロジェクトのメタデータ、結果として得られるトレーニング済みモデル(このコンポーネントは後で定義します)、モデルの評価指標、thresholds_dict_str を受け取ります。thresholds_dict_str は、パイプラインの実行時に定義します。この分類モデルの場合、これはモデルをデプロイする ROC 曲線値の下の領域になります。たとえば、0.95 を渡した場合は、この指標が 95% を超える場合にのみパイプラインでモデルをデプロイすることを意味します。

評価コンポーネントは、モデルをデプロイするかどうかを示す文字列を返します。ノートブックのセルに次のコードを追加して、このカスタム コンポーネントを作成します。

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

ステップ 2: Google Cloud の事前構築済みコンポーネントを追加する

このステップでは、パイプラインの残りのコンポーネントを定義し、それらすべてがどのように連携するかを確認します。最初に、タイムスタンプを使用してパイプライン実行の表示名を定義します。

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

次に、ノートブックの新しいセルに以下をコピーします。

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

このコードの内容を見てみましょう。

  • まず、前のパイプラインと同様に、このパイプラインが受け取る入力パラメータを定義します。これらはパイプライン内の他のステップの出力には依存しないため、手動で設定する必要があります。
  • パイプラインの残りの部分では、Vertex AI サービスとやり取りするために、いくつかの事前構築済みコンポーネントを使用します。
    • TabularDatasetCreateOp は、Cloud Storage または BigQuery にデータセットのソースを指定して、Vertex AI に表形式のデータセットを作成します。このパイプラインでは、BigQuery テーブルの URL を介してデータを渡します。
    • AutoMLTabularTrainingJobRunOp は、表形式のデータセットに対する AutoML のトレーニング ジョブを開始します。このコンポーネントには、モデルタイプ(この場合は分類)、列のデータ、トレーニングの実行期間、データセットへのポインタなど、いくつかの構成パラメータを渡します。このコンポーネントにデータセットを渡すために、dataset_create_op.outputs["dataset"]前のコンポーネントの出力を提供しています。
    • EndpointCreateOp は、Vertex AI にエンドポイントを作成します。このステップで作成されたエンドポイントは、次のコンポーネントに入力として渡されます。
    • ModelDeployOp は、指定されたモデルを Vertex AI のエンドポイントにデプロイします。この例では、前のステップで作成したエンドポイントを使用します。追加の構成オプションも使用できますが、ここでは、デプロイするエンドポイントのマシンタイプとモデルを指定します。パイプラインのトレーニング ステップの出力にアクセスして、モデルを渡します。
  • このパイプラインでは、条件付きロジックも使用されています。これは、条件を定義し、その条件の結果に基づいて異なるブランチを実行できる Vertex Pipelines の機能です。パイプラインを定義したときに thresholds_dict_str パラメータを渡したことを思い出してください。これは、モデルをエンドポイントにデプロイするかどうかの決定に使用する精度のしきい値です。これを実装するために、KFP SDK の Condition クラスを使用します。渡す条件は、この Codelab で前に定義したカスタム評価コンポーネントの出力です。この条件が true の場合、パイプラインは引き続き deploy_op コンポーネントを実行します。精度が事前定義のしきい値を満たしていない場合、パイプラインはここで停止し、モデルをデプロイしません。

ステップ 3: エンドツーエンドの ML パイプラインをコンパイルして実行する

パイプライン全体が定義されたので、これをコンパイルします。

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

次に、ジョブを定義します。

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

最後に、ジョブを実行します。

ml_pipeline_job.submit()

上のセルの実行後、ログに表示されるリンクに移動して、コンソールにパイプラインを表示します。このパイプラインの実行には 1 時間以上かかります。そのほとんどの時間は AutoML のトレーニング ステップに費やされます。完成したパイプラインは次のようになります。

完成した AutoML パイプライン

上部の「アーティファクトを開く」ボタンを使用して、パイプラインで作成された各アーティファクトの詳細を表示できます。たとえば、dataset アーティファクトをクリックすると、作成された Vertex AI データセットの詳細が表示されます。ここからリンクをクリックして、そのデータセットのページに移動できます。

パイプラインのデータセット

同様に、カスタム評価コンポーネントから得られた指標の可視化を表示するには、metricsc というアーティファクトをクリックします。ダッシュボードの右側で、このモデルの混同行列を確認できます。

指標の可視化

このパイプライン実行によって作成されたモデルとエンドポイントを表示するには、モデル セクションに移動し、automl-beans という名前のモデルをクリックします。このモデルがエンドポイントにデプロイされているのを確認できます。

モデルのエンドポイント

パイプラインのグラフで endpoint アーティファクトをクリックしても、このページにアクセスできます。

コンソールでパイプラインのグラフを確認するほかに、Vertex Pipelines で リネージのトラッキングを使用することもできます。リネージのトラッキングとは、パイプライン全体で作成されたアーティファクトのトラッキングを意味します。これは、アーティファクトがどこで作成され、ML ワークフロー全体でどのように使用されているかを理解するのに役立ちます。たとえば、このパイプラインで作成されたデータセットについてリネージのトラッキングを見るには、dataset アーティファクトをクリックしてから、[リネージを表示] をクリックします。

リネージを表示

このアーティファクトが使用されているすべての場所が表示されます。

リネージの詳細

ステップ 4: パイプラインの実行間で指標を比較する

このパイプラインを複数回実行すると、実行間で指標を比較したくなるかもしれません。aiplatform.get_pipeline_df() メソッドを使用して、実行メタデータにアクセスできます。ここでは、このパイプラインのすべての実行のメタデータを取得し、Pandas DataFrame に読み込みます。

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

これでラボは終了です。

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • Kubeflow Pipelines SDK を使用してカスタム コンポーネントでエンドツーエンドのパイプラインを構築する
  • Vertex Pipelines でパイプラインを実行し、SDK でパイプラインの実行を開始する
  • コンソールで Vertex Pipelines グラフを表示して分析する
  • 事前構築済みのパイプライン コンポーネントを使用して Vertex AI サービスをパイプラインに追加する
  • 繰り返し実行するパイプライン ジョブをスケジュールする

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

7. クリーンアップ

料金が発生しないようにするため、このラボで作成したリソースを削除することをおすすめします。

ステップ 1: Notebooks インスタンスを停止または削除する

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。

インスタンスの停止

ステップ 2: エンドポイントを削除する

デプロイしたエンドポイントを削除するには、Vertex AI コンソールの [エンドポイント] セクションに移動し、削除アイコンをクリックします。

エンドポイントの削除

次に、以下のプロンプトで [Undeploy] をクリックします。

モデルのデプロイ解除

最後に、コンソールの [モデル] セクションに移動して、そのモデルを見つけ、右側のその他メニューから [モデルを削除] をクリックします。

モデルの削除

ステップ 3: Cloud Storage バケットを削除する

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除