テストを最大限に活用: Vertex AI による ML テストの管理

1. 概要

このラボでは、Vertex AI を使用して、TensorFlow でカスタム Keras モデルをトレーニングするパイプラインを構築します。次に、Vertex AI Experiments の新しい機能を使用して、モデルの実行を追跡して比較し、最適なパフォーマンスのハイパーパラメータの組み合わせを特定します。

学習内容

次の方法を学習します。

  • カスタム Keras モデルをトレーニングして、プレーヤーの評価を予測する(回帰など)
  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • Cloud Storage からのデータの取り込み、データのスケーリング、モデルのトレーニング、評価、Cloud Storage へのモデルの再保存を行う 5 ステップのパイプラインを作成して実行する
  • Vertex ML Metadata を利用して、モデルやモデルの指標などのモデル アーティファクトを保存する
  • Vertex AI Experiments を利用して、さまざまなパイプライン実行の結果を比較する

このラボを Google Cloud で実行するための総費用は約 $1 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下でハイライト表示されているプロダクト(ExperimentsPipelinesML MetadataWorkbench)を中心に学習します。

Vertex プロダクトの概要

3. ユースケースの概要

ここでは、EA Sports'FIFA ビデオゲーム シリーズ。2008 ~ 2016 年シーズンの 25,000 以上のサッカーの試合と 10,000 人以上の選手が含まれています。すぐに使い始めることができるように、データは事前に前処理されています。このデータセットはラボ全体を通して使用します。このデータセットは Cloud Storage の公開バケットにあります。このデータセットへのアクセス方法については、この Codelab の後半で詳しく説明します。最終的な目標は、インターセプトやペナルティなど、さまざまなゲーム アクションに基づいてプレーヤーの総合評価を予測することです。

Vertex AI Experiments がデータ サイエンスに役立つ理由

データ サイエンスは本質的に実験的なものであり、結局のところ「サイエンティスト」と呼ばれます。優れたデータ サイエンティストは仮説に基づいて行動し、試行錯誤を繰り返しながらモデルのパフォーマンスが向上することを期待して、さまざまな仮説をテストします。

データ サイエンス チームはテストを取り入れていますが、自身の取り組みや「独自のノウハウ」を把握するのに苦労することがよくあります。新しいテクノロジーを導入しましたこれには、次のような理由が考えられます。

  • トレーニング ジョブの追跡が煩雑になり、何が効果的で何がそうではないかを見失いやすくなる
  • データ サイエンス チーム全体で見ると、すべてのメンバーが実験を追跡したり、結果を他の人と共有したりしていない可能性があるため、この問題はさらに悪化します。
  • データ キャプチャに時間がかかり、ほとんどのチームが手動(スプレッドシートやドキュメントなど)の方法を利用しているため、学習に使用する情報に一貫性がなく不完全である

要約: Vertex AI Experiments を利用すれば、テストの追跡や比較がより簡単になります。

ゲームに Vertex AI Experiments を使用する理由

ゲームは歴史的に、機械学習と ML のテストの場となってきました。ゲームは 1 日に何十億ものリアルタイム イベントを生成するだけでなく、そのデータをすべて活用するために、ML と ML のテストを活用してゲーム内エクスペリエンスを改善し、プレーヤーを維持し、プラットフォーム上のさまざまなプレーヤーを評価しています。そのため、ゲームのデータセットは今回のテスト全体の内容に適していると考えました。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。

ステップ 2: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 3: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックに名前を付けて、[詳細設定] をクリックします。

create_notebook

[詳細設定] で、アイドル状態でのシャットダウンを有効にして、シャットダウンまでの時間(分)を 60 に設定します。これにより、使用されていないノートブックが自動的にシャットダウンされるため、不要なコストが発生しません。

idle_timeout

ステップ 4: ノートブックを開く

インスタンスが作成されたら、[JUPYTERLAB を開く] を選択します。

open_jupyterlab

ステップ 5: 認証する(初回のみ)

新しいインスタンスを初めて使用するときに、認証が求められます。その場合は、UI で手順を行います。

authenticate

ステップ 6: 適切なカーネルを選択する

マネージド ノートブックは、1 つの UI で複数のカーネルを提供します。TensorFlow 2(ローカル)のカーネルを選択します。

tensorflow_kernel

5. ノートブックの初期設定手順

パイプラインを構築する前に、ノートブック内で環境を設定する一連の追加手順を行う必要があります。このステップには、追加パッケージのインストール、変数の設定、Cloud Storage バケットの作成、一般公開ストレージ バケットからのゲーム データセットのコピー、ライブラリのインポートと追加の定数の定義が含まれます。

ステップ 1: 追加パッケージをインストールする

ノートブック環境に現在インストールされていない追加のパッケージ依存関係をインストールする必要があります。たとえば、KFP SDK などです。

!pip3 install --user --force-reinstall 'google-cloud-aiplatform>=1.15' -q --no-warn-conflicts
!pip3 install --user kfp -q --no-warn-conflicts

その後、ダウンロードしたパッケージをノートブックで使用できるように、ノートブック カーネルを再起動します。

# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

ステップ 2: 変数を設定する

PROJECT_ID を定義します。Project_ID がわからない場合は、gcloud を使用して PROJECT_ID を取得できます。

import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

それ以外の場合は、PROJECT_ID をここで設定します。

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

また、このノートブックの残りの部分で使用する REGION 変数も設定します。Vertex AI でサポートされているリージョンは次のとおりです。最も近いリージョンを選択することをおすすめします。

  • 南北アメリカ: us-central1
  • ヨーロッパ: europe-west4
  • アジア太平洋: asia-east1

Vertex AI でのトレーニングには、マルチリージョン バケットを使用しないでください。すべてのリージョンがすべての Vertex AI サービスをサポートしているわけではありません。Vertex AI のリージョンの詳細を確認する。

#set your region 
REGION = "us-central1"  # @param {type: "string"}

最後に、TIMESTAMP 変数を設定します。この変数は、作成したリソースについてユーザー間での名前の競合を回避するために使用します。インスタンス セッションごとに TIMESTAMP を作成し、このチュートリアルで作成するリソースの名前に追加します。

#set timestamp to avoid collisions between multiple users

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

ステップ 3: Cloud Storage バケットを作成する

Cloud Storage ステージング バケットを指定して利用する必要があります。ステージング バケットでは、データセットとモデルリソースに関連付けられたすべてのデータがセッション間で保持されます。

Cloud Storage バケットの名前を設定します。バケット名は、組織外のものも含め、すべての Google Cloud プロジェクトでグローバルに一意である必要があります。

#set cloud storage bucket 
BUCKET_NAME = "[insert bucket name here]"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

バケットがまだ存在しない場合は、次のセルを実行して Cloud Storage バケットを作成できます。

! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

その後、次のセルを実行して、Cloud Storage バケットへのアクセスを確認します。

#verify access 
! gsutil ls -al $BUCKET_URI

ステップ 4: ゲーム データセットをコピーする

前述のように、EA Sports の人気ビデオゲームである FIFA の人気ゲーム データセットを利用します。Google が前処理の作業を行ったので、データセットを一般公開ストレージ バケットからコピーし、作成したデータセットに移動するだけです。

# copy the data over to your cloud storage bucket
DATASET_URI = "gs://cloud-samples-data/vertex-ai/structured_data/player_data" 

!gsutil cp -r $DATASET_URI $BUCKET_URI

ステップ 5: ライブラリをインポートし、追加の定数を定義する

次に、Vertex AI や KFP などのライブラリをインポートします。

import logging
import os
import time

logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)

import kfp.v2.compiler as compiler
# Pipeline Experiments
import kfp.v2.dsl as dsl
# Vertex AI
from google.cloud import aiplatform as vertex_ai
from kfp.v2.dsl import Artifact, Input, Metrics, Model, Output, component
from typing import NamedTuple

また、トレーニング データのファイルパスなど、ノートブックの残りの部分で参照する追加の定数も定義します。

#import libraries and define constants
# Experiments

TASK = "regression"
MODEL_TYPE = "tensorflow"
EXPERIMENT_NAME = f"{PROJECT_ID}-{TASK}-{MODEL_TYPE}-{TIMESTAMP}"

# Pipeline
PIPELINE_URI = f"{BUCKET_URI}/pipelines"
TRAIN_URI = f"{BUCKET_URI}/player_data/data.csv"
LABEL_URI = f"{BUCKET_URI}/player_data/labels.csv"
MODEL_URI = f"{BUCKET_URI}/model"
DISPLAY_NAME = "experiments-demo-gaming-data"
BQ_DATASET = "player_data"
BQ_LOCATION = "US"  
VIEW_NAME = 'dataset_test'
PIPELINE_JSON_PKG_PATH = "experiments_demo_gaming_data.json"
PIPELINE_ROOT = f"gs://{BUCKET_URI}/pipeline_root"

6. パイプラインを構築しましょう

それでは、Vertex AI を活用してトレーニング パイプラインを構築してみましょう。Vertex AI SDK を初期化し、トレーニング ジョブをパイプライン コンポーネントとして設定します。次に、パイプラインを構築してパイプライン実行を送信し、Vertex AI SDK を利用してテストを表示し、そのステータスをモニタリングします。

ステップ 1: Vertex AI SDK を初期化する

Vertex AI SDK を初期化して、PROJECT_IDBUCKET_URI を設定します。

#initialize vertex AI SDK 
vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

ステップ 2: トレーニング ジョブをパイプライン コンポーネントとして設定する

テストを開始するには、トレーニング ジョブをパイプライン コンポーネントとして定義して指定する必要があります。パイプラインはトレーニング データとハイパーパラメータ(例:DROPOUT_RATELEARNING_RATEEPOCHS など)を入力として使用し、出力モデルの指標(例:MAERMSE など)とモデル アーティファクトです。

@component(
    packages_to_install=[
        "numpy==1.21.0",
        "pandas==1.3.5", 
        "scikit-learn==1.0.2",
        "tensorflow==2.9.0",
    ]
)
def custom_trainer(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
    metrics: Output[Metrics], 
    model_metadata: Output[Model], 
    

):

    # import libraries
    import logging
    import uuid
    from pathlib import Path as path

    import pandas as pd
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.layers import Dropout
    from tensorflow.keras.metrics import Metric 
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import mean_absolute_error
    import numpy as np
    from math import sqrt
    import os
    import tempfile

    # set variables and use gcsfuse to update prefixes
    gs_prefix = "gs://"
    gcsfuse_prefix = "/gcs/"
    train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
    label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
    model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)

    def get_logger():

        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        logger.addHandler(handler)
        return logger

    def get_data(
        train_path: str, 
        label_path: str
    ) -> (pd.DataFrame): 
        
        
        #load data into pandas dataframe
        data_0 = pd.read_csv(train_path)
        labels_0 = pd.read_csv(label_path)
        
        #drop unnecessary leading columns
        
        data = data_0.drop('Unnamed: 0', axis=1)
        labels = labels_0.drop('Unnamed: 0', axis=1)
        
        #save as numpy array for reshaping of data 
        
        labels = labels.values
        data = data.values
    
        # Split the data
        labels = labels.reshape((labels.size,))
        train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, shuffle=True, random_state=7)
    
        #Convert data back to pandas dataframe for scaling
        
        train_data = pd.DataFrame(train_data)
        test_data = pd.DataFrame(test_data)
        train_labels = pd.DataFrame(train_labels)
        test_labels = pd.DataFrame(test_labels)
        
        #Scale and normalize the training dataset
        
        scaler = StandardScaler()
        scaler.fit(train_data)
        train_data = pd.DataFrame(scaler.transform(train_data), index=train_data.index, columns=train_data.columns)
        test_data = pd.DataFrame(scaler.transform(test_data), index=test_data.index, columns=test_data.columns)
        
        return train_data,train_labels, test_data, test_labels 
    
        """ Train your Keras model passing in the training data and values for learning rate, dropout rate,and the number of epochs """

    def train_model(
        learning_rate: float, 
        dropout_rate: float,
        epochs: float,
        train_data: pd.DataFrame,
        train_labels: pd.DataFrame):
 
        # Train tensorflow model
        param = {"learning_rate": learning_rate, "dropout_rate": dropout_rate, "epochs": epochs}
        model = Sequential()
        model.add(Dense(500, input_dim=train_data.shape[1], activation= "relu"))
        model.add(Dropout(param['dropout_rate']))
        model.add(Dense(100, activation= "relu"))
        model.add(Dense(50, activation= "relu"))
        model.add(Dense(1))
            
        model.compile(
        tf.keras.optimizers.Adam(learning_rate= param['learning_rate']),
        loss='mse',
        metrics=[tf.keras.metrics.RootMeanSquaredError(),tf.keras.metrics.MeanAbsoluteError()])
        
        model.fit(train_data, train_labels, epochs= param['epochs'])
        
        return model

    # Get Predictions
    def get_predictions(model, test_data):

        dtest = pd.DataFrame(test_data)
        pred = model.predict(dtest)
        return pred

    # Evaluate predictions with MAE
    def evaluate_model_mae(pred, test_labels):
        
        mae = mean_absolute_error(test_labels, pred)
        return mae
    
    # Evaluate predictions with RMSE
    def evaluate_model_rmse(pred, test_labels):

        rmse = np.sqrt(np.mean((test_labels - pred)**2))
        return rmse    
 
    
    #Save your trained model in GCS     
    def save_model(model, model_path):

        model_id = str(uuid.uuid1())
        model_path = f"{model_path}/{model_id}"        
        path(model_path).parent.mkdir(parents=True, exist_ok=True)
        model.save(model_path + '/model_tensorflow')

        
    # Main ----------------------------------------------
    
    train_data, train_labels, test_data, test_labels = get_data(train_path, label_path)
    model = train_model(learning_rate, dropout_rate, epochs, train_data,train_labels )
    pred = get_predictions(model, test_data)
    mae = evaluate_model_mae(pred, test_labels)
    rmse = evaluate_model_rmse(pred, test_labels)
    save_model(model, model_path)

    # Metadata ------------------------------------------

    #convert numpy array to pandas series
    mae = pd.Series(mae)
    rmse = pd.Series(rmse)

    #log metrics and model artifacts with ML Metadata. Save metrics as a list. 
    metrics.log_metric("mae", mae.to_list()) 
    metrics.log_metric("rmse", rmse.to_list()) 
    model_metadata.uri = model_uri

ステップ 3: パイプラインを構築する

次に、KFP で利用可能な Domain Specific Language (DSL) を使用してワークフローを設定し、パイプラインを JSON ファイルにコンパイルします。

# define our workflow

@dsl.pipeline(name="gaming-custom-training-pipeline")
def pipeline(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
):

    custom_trainer(
        train_uri,label_uri, dropout_rate,learning_rate,epochs, model_uri
    )
#compile our pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path="gaming_pipeline.json")

ステップ 4: パイプライン実行を送信する

コンポーネントのセットアップとパイプラインの定義は、大変な作業です。上記で指定したパイプラインのさまざまな実行を送信する準備が整いました。そのためには、次のようにさまざまなハイパーパラメータの値を定義する必要があります。

runs = [
    {"dropout_rate": 0.001, "learning_rate": 0.001,"epochs": 20},
    {"dropout_rate": 0.002, "learning_rate": 0.002,"epochs": 25},
    {"dropout_rate": 0.003, "learning_rate": 0.003,"epochs": 30},
    {"dropout_rate": 0.004, "learning_rate": 0.004,"epochs": 35},
    {"dropout_rate": 0.005, "learning_rate": 0.005,"epochs": 40},
]

ハイパーパラメータを定義したら、for loop を利用してパイプラインのさまざまな実行を正常にフィードできます。

for i, run in enumerate(runs):

    job = vertex_ai.PipelineJob(
        display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        template_path="gaming_pipeline.json",
        pipeline_root=PIPELINE_URI,
        parameter_values={
            "train_uri": TRAIN_URI,
            "label_uri": LABEL_URI,
            "model_uri": MODEL_URI,
            **run,
        },
    )
    job.submit(experiment=EXPERIMENT_NAME)

ステップ 5: Vertex AI SDK を利用してテストを表示する

Vertex AI SDK を使用すると、パイプライン実行のステータスをモニタリングできます。また、Vertex AI Experiments でパイプライン実行のパラメータと指標を返す場合にも使用できます。次のコードを使用して、実行に関連付けられたパラメータとその現在の状態を確認します。

# see state/status of all the pipeline runs

vertex_ai.get_experiment_df(EXPERIMENT_NAME)

以下のコードを使用すると、パイプライン実行のステータスに関する最新情報を取得できます。

#check on current status
while True:
    pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
    if all(
        pipeline_state != "COMPLETE" for pipeline_state in pipeline_experiments_df.state
    ):
        print("Pipeline runs are still running...")
        if any(
            pipeline_state == "FAILED"
            for pipeline_state in pipeline_experiments_df.state
        ):
            print("At least one Pipeline run failed")
            break
    else:
        print("Pipeline experiment runs have completed")
        break
    time.sleep(60)

run_name を使用して特定のパイプライン ジョブを呼び出すこともできます。

# Call the pipeline runs based on the experiment run name
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[0])
print(job.resource_name)
print(job._dashboard_uri())

最後に、設定した間隔(60 秒ごとなど)で実行の状態を更新して、状態が RUNNING から FAILED または COMPLETE に変わるのを確認できます。

# wait 60 seconds and view state again
import time
time.sleep(60)
vertex_ai.get_experiment_df(EXPERIMENT_NAME)

7. 最もパフォーマンスが高いランニングを特定する

これでパイプライン実行の結果が出ました。「結果から何がわかるか?」と疑問に思われるかもしれません。テストの出力には、パイプラインの実行ごとに 1 行、合計 5 行が表示されます。次のようになります。

Final-Results-Snapshot

MAE と RMSE はどちらも平均モデル予測誤差の尺度であるため、両方の指標の値を小さくするのが理想的です。Vertex AI Experiments の出力から、両方の指標で最も成功した実行は最終実行であり、dropout_rate が 0.001、0.001 の場合は learning_rateepochs の合計数が 20 であることがわかります。このテストに基づいて、モデルの最適なパフォーマンスが得られるため、これらのモデル パラメータは最終的に本番環境で使用されます。

これでラボは終了です。

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • カスタム Keras モデルをトレーニングして、プレーヤーの評価を予測する(回帰など)
  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • GCS からのデータの取り込み、データのスケーリング、モデルのトレーニング、評価、GCS へのモデルの再保存を行う 5 ステップのパイプラインを作成して実行する
  • Vertex ML Metadata を利用して、モデルやモデルの指標などのモデル アーティファクトを保存する
  • Vertex AI Experiments を利用して、さまざまなパイプライン実行の結果を比較する

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

8. クリーンアップ

料金が発生しないようにするため、このラボで作成したリソースを削除することをおすすめします。

ステップ 1: Notebooks インスタンスを停止または削除する

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。

インスタンスの停止

ステップ 2: Cloud Storage バケットを削除する

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除