Vertex Pipelines の概要

1. 概要

このラボでは、Vertex Pipelines を使った ML パイプラインの作成方法と実行方法について学びます。

学習内容

次の方法を学習します。

  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • 3 つのステップからなり、入力としてテキストを受け取る簡単なパイプラインを作成して実行する
  • AutoML 分類モデルのトレーニング、評価、デプロイを行うパイプラインを作成して実行する
  • google_cloud_pipeline_components ライブラリを通して提供される事前構築済みコンポーネントを使用して Vertex AI サービスとやり取りする
  • Cloud Scheduler でパイプライン ジョブのスケジュールを設定する

このラボを Google Cloud で実行するための総費用は約 $25 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、モデルのトレーニングとデプロイ サービスに加え、このラボで取り上げる Vertex Pipelines、Model Monitoring、Feature Store など、さまざまな MLOps プロダクトが含まれています。以下の図ですべての Vertex AI プロダクトを確認できます。

Vertex プロダクトの概要

ご意見やご質問がありましたら、サポートページからお寄せください。

ML パイプラインはなぜ有用か?

本題に入る前に、なぜパイプラインを使用するのかについて理解しておきましょう。データの処理、モデルのトレーニング、ハイパーパラメータの調整、評価、モデルのデプロイを含む ML ワークフローを構築しているとします。これらのステップにはそれぞれ異なる依存関係があり、ワークフロー全体をモノリスとして扱うと、扱いづらくなる場合があります。また、ML プロセスを拡張する際は、チームの他のメンバーがワークフローを実行し、コーディングに参加できるように、ML ワークフローを共有したいところですが、信頼性と再現性のあるプロセスがなければ困難です。パイプラインでは、ML プロセスの各ステップがそれぞれのコンテナとなります。これにより、ステップを独立して開発し、各ステップからの入力と出力を再現可能な方法で追跡できます。また、新しいトレーニング データが利用可能になったらパイプラインの実行を開始するなど、クラウド環境内の他のイベントに基づいてパイプラインの実行をスケジュールまたはトリガーすることもできます。

要約: パイプラインを使用すると、ML ワークフローの自動化再現が可能です。

3. クラウド環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Cloud Shell を起動する

このラボでは、Cloud Shell セッションで作業します。Cloud Shell は、Google のクラウドで実行されている仮想マシンによってホストされるコマンド インタープリタです。このセクションは、パソコンでもローカルで簡単に実行できますが、Cloud Shell を使用することで、誰もが一貫した環境での再現可能な操作性を利用できるようになります。本ラボの後、このセクションをパソコン上で再度実行してみてください。

Cloud Shell を承認する

Cloud Shell をアクティブにする

Cloud コンソールの右上にある次のボタンをクリックして、Cloud Shell を有効にする

Cloud Shell をアクティブにする

Cloud Shell を初めて起動する場合は、その内容を説明する中間画面(スクロールしなければ見えない範囲)が表示されます。その場合は、[続行] をクリックします(二度と表示されません)。この中間画面は次のようになります。

Cloud Shell の設定

Cloud Shell のプロビジョニングと接続に少し時間がかかる程度です。

Cloud Shell の初期化

この仮想マシンには、必要な開発ツールがすべて含まれています。仮想マシンは Google Cloud で稼働し、永続的なホーム ディレクトリが 5 GB 用意されているため、ネットワークのパフォーマンスと認証が大幅に向上しています。このコードラボでの作業のほとんどは、ブラウザまたは Chromebook から実行できます。

Cloud Shell に接続すると、すでに認証は完了しており、プロジェクトに各自のプロジェクト ID が設定されていることがわかります。

Cloud Shell で次のコマンドを実行して、認証されたことを確認します。

gcloud auth list

コマンド出力に次のように表示されます。

Cloud Shell の出力

Cloud Shell で次のコマンドを実行して、gcloud コマンドがプロジェクトを認識していることを確認します。

gcloud config list project

コマンド出力

[core]
project = <PROJECT_ID>

上記のようになっていない場合は、次のコマンドで設定できます。

gcloud config set project <PROJECT_ID>

コマンド出力

Updated property [core/project].

Cloud Shell には、現在の Cloud プロジェクトの名前が格納されている GOOGLE_CLOUD_PROJECT など、いくつかの環境変数があります。ラボでは、さまざまな場所でこれを使用します。次を実行すると確認できます。

echo $GOOGLE_CLOUD_PROJECT

ステップ 2: API を有効にする

これらのサービスがどんな場面で(なぜ)必要になるのかは、後の手順でわかります。とりあえず、次のコマンドを実行して Compute Engine、Container Registry、Vertex AI の各サービスへのアクセス権を取得します。

gcloud services enable compute.googleapis.com         \
                       containerregistry.googleapis.com  \
                       aiplatform.googleapis.com  \
                       cloudbuild.googleapis.com \
                       cloudfunctions.googleapis.com

成功すると次のようなメッセージが表示されます。

Operation "operations/acf.cc11852d-40af-47ad-9d59-477a12847c9e" finished successfully.

ステップ 3: Cloud Storage バケットを作成する

Vertex AI でトレーニング ジョブを実行するには、保存対象のモデルアセットを格納するストレージ バケットが必要です。これはリージョンのバケットである必要があります。ここでは us-central を使用しますが、別のリージョンを使用することもできます(その場合はラボ内の該当箇所をすべて置き換えてください)。すでにバケットがある場合は、この手順を省略できます。

Cloud Shell で次のコマンドを実行して、バケットを作成します。

BUCKET_NAME=gs://$GOOGLE_CLOUD_PROJECT-bucket
gsutil mb -l us-central1 $BUCKET_NAME

次に、このバケットへのアクセス権をコンピューティング サービス アカウントに付与します。これにより、このバケットにファイルを書き込むために必要な権限が Vertex Pipelines に付与されます。次のコマンドを実行してこの権限を付与します。

gcloud projects describe $GOOGLE_CLOUD_PROJECT > project-info.txt
PROJECT_NUM=$(cat project-info.txt | sed -nre 's:.*projectNumber\: (.*):\1:p')
SVC_ACCOUNT="${PROJECT_NUM//\'/}-compute@developer.gserviceaccount.com"
gcloud projects add-iam-policy-binding $GOOGLE_CLOUD_PROJECT --member serviceAccount:$SVC_ACCOUNT --role roles/storage.objectAdmin

ステップ 4: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

そこから、[ユーザー管理のノートブック] で [新しいノートブック] をクリックします。

新しいノートブックを作成

次に、TensorFlow Enterprise 2.3(LTS 版)インスタンス タイプ(GPU なし)を選択します。

TFE インスタンス

デフォルトのオプションを使用して、[作成] をクリックします。

ステップ 5: ノートブックを開く

インスタンスが作成されたら、[JupyterLab を開く] を選択します。

ノートブックを開く

4. Vertex Pipelines の設定

Vertex Pipelines を使用するためには、いくつかのライブラリを追加でインストールする必要があります。

  • Kubeflow Pipelines: パイプラインの構築に使用する SDK です。Vertex Pipelines は、Kubeflow Pipelines と TFX の両方で構築されたパイプラインの実行をサポートします。
  • Google Cloud パイプライン コンポーネント: このライブラリは、パイプラインの各ステップで Vertex AI サービスとのやり取りを簡単にする事前構築済みコンポーネントを提供します。

ステップ 1: Python ノートブックを作成してライブラリをインストールする

まず、ノートブック インスタンスのランチャー メニューから [Python 3] を選択して、ノートブックを作成します。

Python3 ノートブックを作成する

ランチャー メニューにアクセスするには、ノートブック インスタンスの左上にある [+] 記号をクリックします。

このラボで使用する両方のサービスをインストールするために、最初にノートブック セルでユーザーフラグを設定します。

USER_FLAG = "--user"

続いて、ノートブックから次のコードを実行します。

!pip3 install {USER_FLAG} google-cloud-aiplatform==1.7.0 --upgrade
!pip3 install {USER_FLAG} kfp==1.8.9 google-cloud-pipeline-components==0.2.0

これらのパッケージをインストールした後、カーネルを再起動する必要があります。

import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

最後に、パッケージを正しくインストールしたことを確認します。KFP SDK のバージョンは 1.8 以上である必要があります。

!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

ステップ 2: プロジェクト ID とバケットを設定する

このラボでは、自分の Cloud プロジェクト ID と前に作成したバケットを参照します。次に、これらを格納する変数をそれぞれ作成します。

プロジェクト ID がわからない場合は、次のコードを実行して取得できる可能性があります。

import os
PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output=!gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

それ以外の場合は、こちらで設定します。

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "your-project-id"  # @param {type:"string"}

次に、バケット名を格納する変数を作成します。このラボで作成した場合は、以下のようになります。作成していない場合は以下を手動で設定する必要があります。

BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

ステップ 3: ライブラリをインポートする

この Codelab 全体で使用するライブラリをインポートするために、次のコードを追加します。

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

ステップ 4: 定数を定義する

パイプラインの構築前に最後に行うのが、いくつかの定数変数の定義です。PIPELINE_ROOT は、パイプラインによって作成されるアーティファクトを書き込む Cloud Storage のパスです。ここではリージョンとして us-central1 を使用しますが、バケットの作成時に別のリージョンを使用した場合は、次のコード内の REGION 変数を更新します。

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

上のコードを実行すると、パイプラインのルート ディレクトリが表示されます。これは、パイプラインからのアーティファクトが書き込まれる Cloud Storage の場所です。形式は gs://YOUR-BUCKET-NAME/pipeline_root/ です。

5. 最初のパイプラインを作成する

Vertex Pipelines の仕組みを理解するために、最初に KFP SDK を使用して短いパイプラインを作成します。このパイプラインは ML 関連の処理は何も行いません(それについては後で学習します)。今回はこのパイプラインを使用して次の方法について学びます。

  • KFP SDK でカスタム コンポーネントを作成する方法
  • Vertex Pipelines でパイプラインを実行してモニタリングする方法

2 つの出力(商品名と絵文字の説明)を使用して 1 つの文を表示するパイプラインを作成します。このパイプラインは 3 つのコンポーネントで構成されます。

  • product_name: このコンポーネントは商品名(または指定した名詞)を入力として受け取り、その文字列を出力として返します。
  • emoji: このコンポーネントは絵文字の説明テキストを受け取り、それを絵文字に変換します。たとえば、✨ のテキストコードは「sparkles」です。このコンポーネントでは絵文字ライブラリを使用しており、パイプラインで外部依存関係を管理する方法を見ることができます。
  • build_sentence: この最後のコンポーネントは、前の 2 つの出力を使用して、絵文字を含む文を作成します。たとえば、「Vertex Pipelines is ✨」のような出力が得られます。

コードの作成

ステップ 1: Python 関数ベースのコンポーネントを作成する

KFP SDK を使用すると、Python の関数に基づくコンポーネントを作成できます。最初のパイプラインの 3 つのコンポーネントで KFP SDK を使用します。まず product_name コンポーネントを作成します。このコンポーネントは文字列を入力として受け取り、その文字列を返す単純なものです。ノートブックに次のコードを追加します。

@component(base_image="python:3.9", output_component_file="first-component.yaml")
def product_name(text: str) -> str:
    return text

構文を詳しく見ていきます。

  • @component デコレータは、パイプラインの実行時にこの関数をコンポーネントにコンパイルします。カスタム コンポーネントを記述するときには、これを使用します。
  • base_image パラメータは、このコンポーネントが使用するコンテナ イメージを指定します。
  • output_component_file パラメータはオプションで、コンパイルしたコンポーネントを書き込む yaml ファイルを指定します。セルの実行後、そのファイルがノートブック インスタンスに書き込まれているのを確認できます。このコンポーネントを他のユーザーと共有したい場合は、生成した yaml ファイルをそのユーザーに送信し、次のコードでそれを読み込んでもらうことができます。
product_name_component = kfp.components.load_component_from_file('./first-component.yaml')
  • 関数定義の後の -> str は、このコンポーネントの出力タイプを指定します。

ステップ 2: 追加で 2 つのコンポーネントを作成する

パイプラインを完成させるために、コンポーネントをさらに 2 つ作成します。最初の定義では、文字列を入力として受け取り、対応する絵文字がある場合は、その文字列を絵文字に変換します。渡された入力テキストと結果の絵文字を含むタプルを返します。

@component(packages_to_install=["emoji"])
def emoji(
    text: str,
) -> NamedTuple(
    "Outputs",
    [
        ("emoji_text", str),  # Return parameters
        ("emoji", str),
    ],
):
    import emoji

    emoji_text = text
    emoji_str = emoji.emojize(':' + emoji_text + ':', language='alias')
    print("output one: {}; output_two: {}".format(emoji_text, emoji_str))
    return (emoji_text, emoji_str)

このコンポーネントは前のものよりも多少複雑です。ここで把握しておくべき新しい点は以下の通りです。

  • packages_to_install パラメータは、このコンテナの外部ライブラリ依存関係をコンポーネントに示します。ここでは、emoji というライブラリを使用しています。
  • このコンポーネントは、Outputs という NamedTuple を返します。このタプルに含まれる各文字列には、emoji_textemoji というキーがあります。次のコンポーネントで、これらを使用して出力にアクセスします。

このパイプラインの最後のコンポーネントは、最初の 2 つのコンポーネントの出力を組み合わせて 1 つの文字列を返します。

@component
def build_sentence(
    product: str,
    emoji: str,
    emojitext: str
) -> str:
    print("We completed the pipeline, hooray!")
    end_str = product + " is "
    if len(emoji) > 0:
        end_str += emoji
    else:
        end_str += emojitext
    return(end_str)

このコンポーネントは、前のステップで定義した出力を使用することをどのように知るのかと疑問に思われるかもしれません。重要なポイントです。次のステップで全体をまとめます。

ステップ 3: コンポーネントをパイプラインへとまとめる

上で定義したコンポーネント定義により、パイプライン定義でステップの作成に使用できるファクトリ関数が作成されました。パイプラインを設定するには、@pipeline デコレータを使用して、パイプラインの名前と説明を指定し、パイプラインのアーティファクトを書き込むルートパスを指定します。アーティファクトとは、パイプラインで生成される出力ファイルを意味します。これは、この簡単なパイプラインでは生成されませんが、次のパイプラインでは生成されます。

次のコードブロックで、intro_pipeline 関数を定義します。ここで最初のパイプライン ステップへの入力、およびステップ間の関係を指定します。

  • product_task は商品名を入力として受け取ります。ここでは「Vertex Pipelines」を渡していますが、これは任意の名前に変更できます。
  • emoji_task は絵文字のテキストコードを入力として受け取ります。これも任意の文字列に変更できます。たとえば、「party_face」は 🥳 という絵文字を表します。このコンポーネントと product_task コンポーネントはどちらも、それらに入力を与えるステップがないため、パイプラインを定義するときに手動で入力を指定します。
  • パイプラインの最後のステップである consumer_task には、3 つの入力パラメータがあります。
    • product_task の出力。このステップは 1 つの出力のみを生成するため、product_task.output を介してそれを参照できます。
    • emoji_task ステップの emoji 出力。出力パラメータに名前を付けた emoji コンポーネントの定義を参照してください。
    • 同様に、emoji コンポーネントからの emoji_text 名前付き出力。絵文字に対応しないテキストがパイプラインに渡された場合は、このテキストを使用して文を作成します。
@pipeline(
    name="hello-world",
    description="An intro pipeline",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    product_task = product_name(text)
    emoji_task = emoji(emoji_str)
    consumer_task = build_sentence(
        product_task.output,
        emoji_task.outputs["emoji"],
        emoji_task.outputs["emoji_text"],
    )

ステップ 4: パイプラインをコンパイルして実行する

パイプラインを定義したら、次はそれをコンパイルします。次のコードで、パイプラインの実行に使用する JSON ファイルを生成します。

compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="intro_pipeline_job.json"
)

次に、TIMESTAMP 変数を作成します。これをジョブ ID に使用します。

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

次に、パイプライン ジョブを定義します。

job = aiplatform.PipelineJob(
    display_name="hello-world-pipeline",
    template_path="intro_pipeline_job.json",
    job_id="hello-world-pipeline-{0}".format(TIMESTAMP),
    enable_caching=True
)

最後に、ジョブを実行して新しいパイプライン実行を作成します。

job.submit()

このセルを実行すると、コンソールでパイプラインの実行結果を表示するためのリンクを含むログが表示されます。

パイプライン ジョブのログ

そのリンクに移動します。完了すると、パイプラインは次のようになります。

完了した簡単なパイプライン

このパイプラインは実行に 5 ~ 6 分かかります。完了したら、build-sentence コンポーネントをクリックして最終出力を表示できます。

簡単なパイプラインの出力

KFP SDK と Vertex Pipelines の機能に習熟したことで、他の Vertex AI サービスを使用して ML モデルの作成やデプロイを行うパイプラインを構築できるようになりました。それでは詳しく見ていきましょう。

6. エンドツーエンドの ML パイプラインを作成する

それでは、最初の ML パイプラインを構築しましょう。このパイプラインでは、UCI Machine Learning の Dry beans データセットを使用します(出典: KOKLU, M. および OZKAN, I.A.(2020 年)「Multiclass Classification of Dry Beans Using Computer Vision and Machine Learning Techniques」、Computers and Electronics in Agriculture、174、105507。DOI

これは表形式のデータセットです。パイプラインでこのデータセットを使用し、豆をその特徴に基づいて 7 種類に分類する AutoML モデルのトレーニング、評価、デプロイを行います。

このパイプラインは次の処理を行います。

  • データセットを作成する
  • AutoML を使用して表形式の分類モデルをトレーニングする
  • このモデルの評価指標を取得する
  • 評価指標に基づき、Vertex Pipelines で条件付きロジックを使用して、モデルをデプロイするかどうかを決定する
  • Vertex Prediction を使用してモデルをエンドポイントにデプロイする

これらの各ステップがコンポーネントとなります。パイプラインのほとんどのステップでは、この Codelab で前にインポートした google_cloud_pipeline_components ライブラリを介して Vertex AI サービス用の事前構築済みコンポーネントを使用します。このセクションでは、最初に 1 つのカスタム コンポーネントを定義します。次に、事前構築済みコンポーネントを使用してパイプラインの残りのステップを定義します。事前構築済みコンポーネントにより、モデルのトレーニングやデプロイなど、Vertex AI サービスへのアクセスが簡単になります。

ステップ 1: モデル評価用のカスタム コンポーネント

ここで定義するカスタム コンポーネントは、モデルのトレーニングの完了後、パイプラインの終わりにかけて使用されます。このコンポーネントはいくつかの処理を実行します。

  • トレーニング済みの AutoML 分類モデルから評価指標を取得する
  • 指標を解析し、Vertex Pipelines UI にレンダリングする
  • 指標をしきい値と比較して、モデルをデプロイするかどうかを決定する

コンポーネントを定義する前に、その入力パラメータと出力パラメータについて理解します。パイプラインは入力として Cloud プロジェクトのメタデータ、結果のトレーニング済みモデル(このコンポーネントは後で定義します)、モデルの評価指標、そして thresholds_dict_str を受け取ります。thresholds_dict_str は、パイプラインの実行時に定義します。この分類モデルの場合、これはモデルをデプロイする ROC 曲線値の下側の領域となります。たとえば、0.95 を渡した場合は、この指標が 95% を超える場合にのみパイプラインでモデルをデプロイすることを意味します。

評価コンポーネントは、モデルをデプロイするかどうかを示す文字列を返します。ノートブックのセルに次のコードを追加して、このカスタム コンポーネントを作成します。

@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="tabular_eval_component.yaml",
    packages_to_install=["google-cloud-aiplatform"],
)
def classification_model_eval_metrics(
    project: str,
    location: str,  # "us-central1",
    api_endpoint: str,  # "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str,
    model: Input[Artifact],
    metrics: Output[Metrics],
    metricsc: Output[ClassificationMetrics],
) -> NamedTuple("Outputs", [("dep_decision", str)]):  # Return parameter.

    import json
    import logging

    from google.cloud import aiplatform as aip

    # Fetch model eval info
    def get_eval_info(client, model_name):
        from google.protobuf.json_format import MessageToDict

        response = client.list_model_evaluations(parent=model_name)
        metrics_list = []
        metrics_string_list = []
        for evaluation in response:
            print("model_evaluation")
            print(" name:", evaluation.name)
            print(" metrics_schema_uri:", evaluation.metrics_schema_uri)
            metrics = MessageToDict(evaluation._pb.metrics)
            for metric in metrics.keys():
                logging.info("metric: %s, value: %s", metric, metrics[metric])
            metrics_str = json.dumps(metrics)
            metrics_list.append(metrics)
            metrics_string_list.append(metrics_str)

        return (
            evaluation.name,
            metrics_list,
            metrics_string_list,
        )

    # Use the given metrics threshold(s) to determine whether the model is
    # accurate enough to deploy.
    def classification_thresholds_check(metrics_dict, thresholds_dict):
        for k, v in thresholds_dict.items():
            logging.info("k {}, v {}".format(k, v))
            if k in ["auRoc", "auPrc"]:  # higher is better
                if metrics_dict[k] < v:  # if under threshold, don't deploy
                    logging.info("{} < {}; returning False".format(metrics_dict[k], v))
                    return False
        logging.info("threshold checks passed.")
        return True

    def log_metrics(metrics_list, metricsc):
        test_confusion_matrix = metrics_list[0]["confusionMatrix"]
        logging.info("rows: %s", test_confusion_matrix["rows"])

        # log the ROC curve
        fpr = []
        tpr = []
        thresholds = []
        for item in metrics_list[0]["confidenceMetrics"]:
            fpr.append(item.get("falsePositiveRate", 0.0))
            tpr.append(item.get("recall", 0.0))
            thresholds.append(item.get("confidenceThreshold", 0.0))
        print(f"fpr: {fpr}")
        print(f"tpr: {tpr}")
        print(f"thresholds: {thresholds}")
        metricsc.log_roc_curve(fpr, tpr, thresholds)

        # log the confusion matrix
        annotations = []
        for item in test_confusion_matrix["annotationSpecs"]:
            annotations.append(item["displayName"])
        logging.info("confusion matrix annotations: %s", annotations)
        metricsc.log_confusion_matrix(
            annotations,
            test_confusion_matrix["rows"],
        )

        # log textual metrics info as well
        for metric in metrics_list[0].keys():
            if metric != "confidenceMetrics":
                val_string = json.dumps(metrics_list[0][metric])
                metrics.log_metric(metric, val_string)
        # metrics.metadata["model_type"] = "AutoML Tabular classification"

    logging.getLogger().setLevel(logging.INFO)
    aip.init(project=project)
    # extract the model resource name from the input Model Artifact
    model_resource_path = model.metadata["resourceName"]
    logging.info("model path: %s", model_resource_path)

    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    client = aip.gapic.ModelServiceClient(client_options=client_options)
    eval_name, metrics_list, metrics_str_list = get_eval_info(
        client, model_resource_path
    )
    logging.info("got evaluation name: %s", eval_name)
    logging.info("got metrics list: %s", metrics_list)
    log_metrics(metrics_list, metricsc)

    thresholds_dict = json.loads(thresholds_dict_str)
    deploy = classification_thresholds_check(metrics_list[0], thresholds_dict)
    if deploy:
        dep_decision = "true"
    else:
        dep_decision = "false"
    logging.info("deployment decision is %s", dep_decision)

    return (dep_decision,)

ステップ 2: Google Cloud の事前構築済みコンポーネントを追加する

このステップでは、パイプラインの残りのコンポーネントを定義し、それらすべてがどのように機能するのかを確認します。最初に、タイムスタンプを使用してパイプライン実行の表示名を定義します。

import time
DISPLAY_NAME = 'automl-beans{}'.format(str(int(time.time())))
print(DISPLAY_NAME)

次に、ノートブックの新しいセルに以下をコピーします。

@pipeline(name="automl-tab-beans-training-v2",
                  pipeline_root=PIPELINE_ROOT)
def pipeline(
    bq_source: str = "bq://aju-dev-demos.beans.beans1",
    display_name: str = DISPLAY_NAME,
    project: str = PROJECT_ID,
    gcp_region: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    thresholds_dict_str: str = '{"auRoc": 0.95}',
):
    dataset_create_op = gcc_aip.TabularDatasetCreateOp(
        project=project, display_name=display_name, bq_source=bq_source
    )

    training_op = gcc_aip.AutoMLTabularTrainingJobRunOp(
        project=project,
        display_name=display_name,
        optimization_prediction_type="classification",
        budget_milli_node_hours=1000,
        column_transformations=[
            {"numeric": {"column_name": "Area"}},
            {"numeric": {"column_name": "Perimeter"}},
            {"numeric": {"column_name": "MajorAxisLength"}},
            {"numeric": {"column_name": "MinorAxisLength"}},
            {"numeric": {"column_name": "AspectRation"}},
            {"numeric": {"column_name": "Eccentricity"}},
            {"numeric": {"column_name": "ConvexArea"}},
            {"numeric": {"column_name": "EquivDiameter"}},
            {"numeric": {"column_name": "Extent"}},
            {"numeric": {"column_name": "Solidity"}},
            {"numeric": {"column_name": "roundness"}},
            {"numeric": {"column_name": "Compactness"}},
            {"numeric": {"column_name": "ShapeFactor1"}},
            {"numeric": {"column_name": "ShapeFactor2"}},
            {"numeric": {"column_name": "ShapeFactor3"}},
            {"numeric": {"column_name": "ShapeFactor4"}},
            {"categorical": {"column_name": "Class"}},
        ],
        dataset=dataset_create_op.outputs["dataset"],
        target_column="Class",
    )
    model_eval_task = classification_model_eval_metrics(
        project,
        gcp_region,
        api_endpoint,
        thresholds_dict_str,
        training_op.outputs["model"],
    )

    with dsl.Condition(
        model_eval_task.outputs["dep_decision"] == "true",
        name="deploy_decision",
    ):

        endpoint_op = gcc_aip.EndpointCreateOp(
            project=project,
            location=gcp_region,
            display_name="train-automl-beans",
        )

        gcc_aip.ModelDeployOp(
            model=training_op.outputs["model"],
            endpoint=endpoint_op.outputs["endpoint"],
            dedicated_resources_min_replica_count=1,
            dedicated_resources_max_replica_count=1,
            dedicated_resources_machine_type="n1-standard-4",
        )

このコードの内容は次のとおりです。

  • 最初に、前のパイプラインと同様に、このパイプラインが受け取る入力パラメータを定義します。これらはパイプライン内の他のステップの出力には依存しないため、手動で設定する必要があります。
  • パイプラインの残りの部分では、Vertex AI サービスとやり取りするために、いくつかの事前構築済みコンポーネントを使用します。
    • TabularDatasetCreateOp は、Cloud Storage または BigQuery をデータセットのソースとして、Vertex AI に表形式のデータセットを作成します。このパイプラインでは、BigQuery テーブルの URL を介してデータを渡します。
    • AutoMLTabularTrainingJobRunOp は、表形式のデータセットに対する AutoML トレーニング ジョブを開始します。このコンポーネントには、モデルタイプ(ここでは分類)、列のデータ、トレーニングの実行時間、データセットへのポインタなど、いくつかの構成パラメータを渡します。このコンポーネントにデータセットを渡すために、dataset_create_op.outputs["dataset"]前のコンポーネントの出力を提供しています。
    • EndpointCreateOp は、Vertex AI にエンドポイントを作成します。このステップで作成されたエンドポイントは、次のコンポーネントに入力として渡されます。
    • ModelDeployOp は、指定されたモデルを Vertex AI のエンドポイントにデプロイします。この例では、前の手順で作成したエンドポイントを使用します。追加の構成オプションも使用できますが、ここでは、デプロイするエンドポイントのマシンタイプとモデルを指定します。パイプラインのトレーニング ステップの出力にアクセスして、モデルを渡します。
  • このパイプラインでは、条件付きロジックも使用されています。これは、条件を定義し、その条件の結果に基づいて異なるブランチを実行できる Vertex Pipelines の機能です。パイプラインを定義したときに thresholds_dict_str パラメータを渡したことを思い出してください。これは、モデルをエンドポイントにデプロイするかどうかの決定に使用する精度のしきい値です。これを実装するために、KFP SDK の Condition クラスを使用します。渡す条件は、この Codelab で前に定義したカスタム評価コンポーネントの出力です。この条件が true の場合、パイプラインは引き続き deploy_op コンポーネントを実行します。精度が事前定義されたしきい値を満たしていない場合、パイプラインはここで停止し、モデルをデプロイしません。

ステップ 3: エンドツーエンドの ML パイプラインをコンパイルして実行する

パイプライン全体が定義されたので、これをコンパイルします。

compiler.Compiler().compile(
    pipeline_func=pipeline, package_path="tab_classif_pipeline.json"
)

次に、ジョブを定義します。

ml_pipeline_job = aiplatform.PipelineJob(
    display_name="automl-tab-beans-training",
    template_path="tab_classif_pipeline.json",
    pipeline_root=PIPELINE_ROOT,
    parameter_values={"project": PROJECT_ID, "display_name": DISPLAY_NAME},
    enable_caching=True
)

最後に、ジョブを実行します。

ml_pipeline_job.submit()

上のセルの実行後にログに表示されるリンクに移動して、パイプラインをコンソールに表示します。このパイプラインの実行には 1 時間以上かかります。そのほとんどの時間は AutoML のトレーニング ステップに費やされます。完成したパイプラインは次のようになります。

完成した AutoML パイプライン

上部の「アーティファクトを開く」ボタンを使用して、パイプラインで作成された各アーティファクトの詳細を表示できます。たとえば、dataset アーティファクトをクリックすると、作成された Vertex AI データセットの詳細が表示されます。ここからリンクをクリックして、そのデータセットのページに移動できます。

パイプラインのデータセット

同様に、カスタム評価コンポーネントの結果として得られた指標の可視化を見るには、metricsc というアーティファクトをクリックします。ダッシュボードの右側で、このモデルの混同行列を確認できます。

指標の可視化

このパイプラインの実行によって作成されたモデルとエンドポイントを表示するには、モデル セクションに移動し、automl-beans という名前のモデルをクリックします。このモデルがエンドポイントにデプロイされているのを確認できます。

モデルのエンドポイント

パイプラインのグラフで endpoint アーティファクトをクリックしても、このページにアクセスできます。

コンソールでパイプラインのグラフを確認するほかに、Vertex Pipelines で リネージのトラッキングを使用することもできます。リネージのトラッキングとは、パイプライン全体で作成されたアーティファクトのトラッキングを意味します。これは、アーティファクトがどこで作成され、ML ワークフロー全体でどのように使用されているかを理解するのに役立ちます。たとえば、このパイプラインで作成されたデータセットについてリネージのトラッキングを見るには、dataset アーティファクトをクリックしてから、[リネージを表示] をクリックします。

リネージを表示

このアーティファクトが使用されているすべての場所が表示されます。

リネージの詳細

ステップ 4: パイプラインの実行間で指標を比較する

このパイプラインを複数回実行すると、実行間で指標を比較したくなるかもしれません。aiplatform.get_pipeline_df() メソッドを使用して、実行のメタデータにアクセスできます。ここでは、このパイプラインのすべての実行のメタデータを取得し、Pandas DataFrame に読み込みます。

pipeline_df = aiplatform.get_pipeline_df(pipeline="automl-tab-beans-training-v2")
small_pipeline_df = pipeline_df.head(2)
small_pipeline_df

これでラボは完了です。

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • Kubeflow Pipelines SDK を使用してカスタム コンポーネントでエンドツーエンドのパイプラインを作成する
  • Vertex Pipelines でパイプラインを実行し、SDK を使用してパイプラインの実行を開始する
  • コンソールで Vertex Pipelines のグラフを確認、分析する
  • 事前構築済みのパイプライン コンポーネントを使ってパイプラインに Vertex AI サービスを追加する
  • パイプライン ジョブの定期的な実行をスケジュールする

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

7. クリーンアップ

料金が発生しないようにするため、このラボで作成したリソースを削除することをおすすめします。

ステップ 1: Notebooks インスタンスを停止または削除する

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。

インスタンスの停止

ステップ 2: エンドポイントを削除する

デプロイしたエンドポイントを削除するには、Vertex AI コンソールの [エンドポイント] セクションに移動し、削除アイコンをクリックします。

エンドポイントの削除

次のプロンプトから [Undeploy] をクリックします。

モデルのデプロイ解除

最後に、コンソールの [モデル] セクションに移動して、そのモデルを見つけ、右側のその他メニューから [モデルを削除] をクリックします。

モデルの削除

ステップ 3: Cloud Storage バケットを削除する

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除