テストを最大限に活用: Vertex AI による ML テストの管理

1. 概要

このラボでは、Vertex AI を使用して、TensorFlow でカスタム Keras モデルをトレーニングするパイプラインを作成します。次に、Vertex AI Experiments で利用可能な新しい機能を使用してモデルの実行を追跡し、比較して、最適なパフォーマンスのハイパーパラメータの組み合わせを特定します。

学習内容

次の方法を学習します。

  • カスタム Keras モデルをトレーニングして、プレーヤーの評価を予測する(回帰など)
  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • 5 ステップのパイプライン(Cloud Storage からのデータの取り込み、データのスケーリング、モデルのトレーニング、その評価、結果としてできたモデルの Cloud Storage への保存)を作成して実行する
  • Vertex ML Metadata を活用し、モデルやモデルの指標などのモデル アーティファクトを保存する
  • Vertex AI Experiments を使用して、さまざまなパイプライン実行の結果を比較する

このラボを Google Cloud で実行するための総費用は約 $1 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下でハイライト表示されているプロダクト(ExperimentsPipelinesML MetadataWorkbench)を中心に学習します。

Vertex プロダクトの概要

3. ユースケースの概要

ここでは、EA Sports の FIFA ビデオゲーム シリーズから取得した人気のサッカー データセットを使用します。2008 ~ 2016 年のシーズンの 25,000 件以上のサッカーの試合と 10,000 人以上の選手が含まれています。すぐに使い始めることができるように、データは事前に前処理されています。このデータセットは、一般公開の Cloud Storage バケットに保存されており、ラボ全体で使用します。このデータセットへのアクセス方法については、この Codelab の後半で詳しく説明します。最終的な目標は、インターセプトやペナルティなどのさまざまなゲーム内アクションに基づいて、選手の総合評価を予測することです。

Vertex AI Experiments がデータ サイエンスに役立つ理由

データ サイエンスには実験的な性質があります。何しろ、データ サイエンティストと名乗るのですから。優れたデータ サイエンティストは仮説駆動型で、試行錯誤でさまざまな仮説をテストし、反復処理を重ねることでパフォーマンスの高いモデルを構築することを目標としています。

データ サイエンス チームはテストを導入していますが、多くの場合、作業とテストを通じて発見された「秘密のソース」を追跡するのに苦労しています。このような動作が行われることにはいくつかの理由があります。

  • トレーニング ジョブの追跡は煩雑になり、どのジョブが有効でどのジョブが有効でないかを見失いやすくなります。
  • データ サイエンス チーム全体で見ると、この問題はさらに複雑になります。すべてのメンバーがテストを追跡しているわけではなく、結果を他の人と共有していない場合もあります。
  • データの収集には時間がかかり、ほとんどのチームは手動の方法(スプレッドシートやドキュメントなど)を利用しているため、一貫性のない不完全な情報を分析することになります。

要約: Vertex AI Experiments が自動的にテストを実行し、テストの追跡と比較を簡単に行えます。

ゲームに Vertex AI Experiments を使用する理由

ゲームは、これまで機械学習と ML のテスト環境として使用されてきました。ゲームでは、1 日に数十億件ものリアルタイム イベントが発生するだけでなく、ML と ML テストを活用して、ゲーム内エクスペリエンスの改善、プレーヤーの維持、プラットフォーム上のさまざまなプレーヤーの評価に、そのすべてのデータを使用しています。そのため、ゲーム データセットが全体的なテスト演習に適していると考えました。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。

ステップ 2: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 3: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックに名前を付けて、[詳細設定] をクリックします。

create_notebook

[詳細設定] で、アイドル状態でのシャットダウンを有効にして、シャットダウンまでの時間(分)を 60 に設定します。これにより、使用されていないノートブックが自動的にシャットダウンされるため、不要なコストが発生しません。

idle_timeout

ステップ 4: ノートブックを開く

インスタンスが作成されたら、[JUPYTERLAB を開く] を選択します。

open_jupyterlab

ステップ 5: 認証する(初回のみ)

新しいインスタンスを初めて使用するときに、認証が求められます。その場合は、UI で手順を行います。

authenticate

ステップ 6: 適切なカーネルを選択する

マネージド ノートブックは、1 つの UI で複数のカーネルを提供します。TensorFlow 2(ローカル)のカーネルを選択します。

tensorflow_kernel

5. ノートブックの初期設定手順

パイプラインを構築する前に、ノートブック内で環境を設定するための一連の追加手順を行う必要があります。これらの手順には、追加のパッケージのインストール、変数の設定、Cloud Storage バケットの作成、一般公開ストレージ バケットからゲーム データセットのコピー、ライブラリのインポート、追加定数の定義が含まれます。

ステップ 1: 追加のパッケージをインストールする

ノートブック環境に現在インストールされていない追加のパッケージ依存関係をインストールする必要があります。たとえば、KFP SDK があります。

!pip3 install --user --force-reinstall 'google-cloud-aiplatform>=1.15' -q --no-warn-conflicts
!pip3 install --user kfp -q --no-warn-conflicts

次に、ダウンロードしたパッケージをノートブックで使用できるように、ノートブック カーネルを再起動します。

# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

ステップ 2: 変数を設定する

PROJECT_ID を定義します。Project_ID がわからない場合は、gcloud を使用して PROJECT_ID を取得できる可能性があります。

import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

それ以外の場合は、ここで PROJECT_ID を設定します。

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

また、このノートブックの残りの部分で使用される REGION 変数も設定します。Vertex AI でサポートされているリージョンは次のとおりです。最も近いリージョンを選択することをおすすめします。

  • 南北アメリカ: us-central1
  • ヨーロッパ: europe-west4
  • アジア太平洋: asia-east1

Vertex AI でのトレーニングには、マルチリージョン バケットを使用しないでください。すべてのリージョンですべての Vertex AI サービスがサポートされているわけではありません。Vertex AI のリージョンの詳細を確認する。

#set your region 
REGION = "us-central1"  # @param {type: "string"}

最後に、TIMESTAMP 変数を設定します。この変数は、作成したリソースについてユーザー間での名前の競合を回避するために使用します。インスタンス セッションごとに TIMESTAMP を作成し、このチュートリアルで作成するリソースの名前に追加します。

#set timestamp to avoid collisions between multiple users

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

ステップ 3: Cloud Storage バケットを作成する

Cloud Storage ステージング バケットを指定して使用する必要があります。ステージング バケットは、データセットとモデルのリソースに関連付けられているすべてのデータが、セッションをまたいで保持される場所です。

Cloud Storage バケットの名前を設定します。バケット名は、組織外のものも含め、すべての Google Cloud プロジェクトでグローバルに一意である必要があります。

#set cloud storage bucket 
BUCKET_NAME = "[insert bucket name here]"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

バケットがまだ存在しない場合は、次のセルを実行して Cloud Storage バケットを作成できます。

! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

その後、次のセルを実行して、Cloud Storage バケットへのアクセスを確認します。

#verify access 
! gsutil ls -al $BUCKET_URI

ステップ 4: ゲーム データセットをコピーする

前述のとおり、EA Sports の人気ビデオゲーム FIFA の人気ゲーム データセットを活用します。前処理はすでに完了しているため、パブリック ストレージ バケットからデータセットをコピーして、作成したバケットに移動するだけです。

# copy the data over to your cloud storage bucket
DATASET_URI = "gs://cloud-samples-data/vertex-ai/structured_data/player_data" 

!gsutil cp -r $DATASET_URI $BUCKET_URI

ステップ 5: ライブラリをインポートして追加の定数を定義する

次に、Vertex AI、KFP などのライブラリをインポートします。

import logging
import os
import time

logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)

import kfp.v2.compiler as compiler
# Pipeline Experiments
import kfp.v2.dsl as dsl
# Vertex AI
from google.cloud import aiplatform as vertex_ai
from kfp.v2.dsl import Artifact, Input, Metrics, Model, Output, component
from typing import NamedTuple

また、トレーニング データのファイルパスなど、ノートブックの残りの部分で参照する追加の定数も定義します。

#import libraries and define constants
# Experiments

TASK = "regression"
MODEL_TYPE = "tensorflow"
EXPERIMENT_NAME = f"{PROJECT_ID}-{TASK}-{MODEL_TYPE}-{TIMESTAMP}"

# Pipeline
PIPELINE_URI = f"{BUCKET_URI}/pipelines"
TRAIN_URI = f"{BUCKET_URI}/player_data/data.csv"
LABEL_URI = f"{BUCKET_URI}/player_data/labels.csv"
MODEL_URI = f"{BUCKET_URI}/model"
DISPLAY_NAME = "experiments-demo-gaming-data"
BQ_DATASET = "player_data"
BQ_LOCATION = "US"  
VIEW_NAME = 'dataset_test'
PIPELINE_JSON_PKG_PATH = "experiments_demo_gaming_data.json"
PIPELINE_ROOT = f"gs://{BUCKET_URI}/pipeline_root"

6. パイプラインを構築しましょう

それでは、Vertex AI を活用してトレーニング パイプラインを構築してみましょう。Vertex AI SDK を初期化し、トレーニング ジョブをパイプライン コンポーネントとして設定し、パイプラインを構築してパイプラインの実行を送信します。また、Vertex AI SDK を使用して、テストを表示し、ステータスをモニタリングします。

ステップ 1: Vertex AI SDK を初期化する

Vertex AI SDK を初期化して、PROJECT_IDBUCKET_URI を設定します。

#initialize vertex AI SDK 
vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

ステップ 2: トレーニング ジョブをパイプライン コンポーネントとして設定する

テストの実行を開始するには、パイプライン コンポーネントとして定義することでトレーニング ジョブを指定する必要があります。パイプラインは、トレーニング データとハイパーパラメータ(例:DROPOUT_RATELEARNING_RATEEPOCHS など)を入力として使用し、出力モデルの指標(例:MAERMSE)として取り込み、また、モデル アーティファクトを取り込みます。

@component(
    packages_to_install=[
        "numpy==1.21.0",
        "pandas==1.3.5", 
        "scikit-learn==1.0.2",
        "tensorflow==2.9.0",
    ]
)
def custom_trainer(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
    metrics: Output[Metrics], 
    model_metadata: Output[Model], 
    

):

    # import libraries
    import logging
    import uuid
    from pathlib import Path as path

    import pandas as pd
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.layers import Dropout
    from tensorflow.keras.metrics import Metric 
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import mean_absolute_error
    import numpy as np
    from math import sqrt
    import os
    import tempfile

    # set variables and use gcsfuse to update prefixes
    gs_prefix = "gs://"
    gcsfuse_prefix = "/gcs/"
    train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
    label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
    model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)

    def get_logger():

        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        logger.addHandler(handler)
        return logger

    def get_data(
        train_path: str, 
        label_path: str
    ) -> (pd.DataFrame): 
        
        
        #load data into pandas dataframe
        data_0 = pd.read_csv(train_path)
        labels_0 = pd.read_csv(label_path)
        
        #drop unnecessary leading columns
        
        data = data_0.drop('Unnamed: 0', axis=1)
        labels = labels_0.drop('Unnamed: 0', axis=1)
        
        #save as numpy array for reshaping of data 
        
        labels = labels.values
        data = data.values
    
        # Split the data
        labels = labels.reshape((labels.size,))
        train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, shuffle=True, random_state=7)
    
        #Convert data back to pandas dataframe for scaling
        
        train_data = pd.DataFrame(train_data)
        test_data = pd.DataFrame(test_data)
        train_labels = pd.DataFrame(train_labels)
        test_labels = pd.DataFrame(test_labels)
        
        #Scale and normalize the training dataset
        
        scaler = StandardScaler()
        scaler.fit(train_data)
        train_data = pd.DataFrame(scaler.transform(train_data), index=train_data.index, columns=train_data.columns)
        test_data = pd.DataFrame(scaler.transform(test_data), index=test_data.index, columns=test_data.columns)
        
        return train_data,train_labels, test_data, test_labels 
    
        """ Train your Keras model passing in the training data and values for learning rate, dropout rate,and the number of epochs """

    def train_model(
        learning_rate: float, 
        dropout_rate: float,
        epochs: float,
        train_data: pd.DataFrame,
        train_labels: pd.DataFrame):
 
        # Train tensorflow model
        param = {"learning_rate": learning_rate, "dropout_rate": dropout_rate, "epochs": epochs}
        model = Sequential()
        model.add(Dense(500, input_dim=train_data.shape[1], activation= "relu"))
        model.add(Dropout(param['dropout_rate']))
        model.add(Dense(100, activation= "relu"))
        model.add(Dense(50, activation= "relu"))
        model.add(Dense(1))
            
        model.compile(
        tf.keras.optimizers.Adam(learning_rate= param['learning_rate']),
        loss='mse',
        metrics=[tf.keras.metrics.RootMeanSquaredError(),tf.keras.metrics.MeanAbsoluteError()])
        
        model.fit(train_data, train_labels, epochs= param['epochs'])
        
        return model

    # Get Predictions
    def get_predictions(model, test_data):

        dtest = pd.DataFrame(test_data)
        pred = model.predict(dtest)
        return pred

    # Evaluate predictions with MAE
    def evaluate_model_mae(pred, test_labels):
        
        mae = mean_absolute_error(test_labels, pred)
        return mae
    
    # Evaluate predictions with RMSE
    def evaluate_model_rmse(pred, test_labels):

        rmse = np.sqrt(np.mean((test_labels - pred)**2))
        return rmse    
 
    
    #Save your trained model in GCS     
    def save_model(model, model_path):

        model_id = str(uuid.uuid1())
        model_path = f"{model_path}/{model_id}"        
        path(model_path).parent.mkdir(parents=True, exist_ok=True)
        model.save(model_path + '/model_tensorflow')

        
    # Main ----------------------------------------------
    
    train_data, train_labels, test_data, test_labels = get_data(train_path, label_path)
    model = train_model(learning_rate, dropout_rate, epochs, train_data,train_labels )
    pred = get_predictions(model, test_data)
    mae = evaluate_model_mae(pred, test_labels)
    rmse = evaluate_model_rmse(pred, test_labels)
    save_model(model, model_path)

    # Metadata ------------------------------------------

    #convert numpy array to pandas series
    mae = pd.Series(mae)
    rmse = pd.Series(rmse)

    #log metrics and model artifacts with ML Metadata. Save metrics as a list. 
    metrics.log_metric("mae", mae.to_list()) 
    metrics.log_metric("rmse", rmse.to_list()) 
    model_metadata.uri = model_uri

ステップ 3: パイプラインを構築する

次に、KFP で利用可能な Domain Specific Language (DSL) を使用してワークフローを設定し、パイプラインを JSON ファイルにコンパイルします。

# define our workflow

@dsl.pipeline(name="gaming-custom-training-pipeline")
def pipeline(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
):

    custom_trainer(
        train_uri,label_uri, dropout_rate,learning_rate,epochs, model_uri
    )
#compile our pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path="gaming_pipeline.json")

ステップ 4: パイプライン実行を送信する

コンポーネントの設定とパイプラインの定義は完了しました。これで、上記で指定したパイプラインのさまざまな実行を送信する準備が整いました。これを行うには、次のようにさまざまなハイパーパラメータの値を定義する必要があります。

runs = [
    {"dropout_rate": 0.001, "learning_rate": 0.001,"epochs": 20},
    {"dropout_rate": 0.002, "learning_rate": 0.002,"epochs": 25},
    {"dropout_rate": 0.003, "learning_rate": 0.003,"epochs": 30},
    {"dropout_rate": 0.004, "learning_rate": 0.004,"epochs": 35},
    {"dropout_rate": 0.005, "learning_rate": 0.005,"epochs": 40},
]

ハイパーパラメータを定義したら、for loop を利用してパイプラインのさまざまな実行を正常にフィードできます。

for i, run in enumerate(runs):

    job = vertex_ai.PipelineJob(
        display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        template_path="gaming_pipeline.json",
        pipeline_root=PIPELINE_URI,
        parameter_values={
            "train_uri": TRAIN_URI,
            "label_uri": LABEL_URI,
            "model_uri": MODEL_URI,
            **run,
        },
    )
    job.submit(experiment=EXPERIMENT_NAME)

ステップ 5: Vertex AI SDK を使用してテストを表示する

Vertex AI SDK を使用すると、パイプラインの実行ステータスをモニタリングできます。また、Vertex AI Experiments でパイプライン実行のパラメータと指標を返す場合にも使用できます。次のコードを使用して、実行に関連付けられているパラメータとその現在の状態を確認します。

# see state/status of all the pipeline runs

vertex_ai.get_experiment_df(EXPERIMENT_NAME)

次のコードを使用して、パイプラインの実行ステータスの最新情報を取得できます。

#check on current status
while True:
    pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
    if all(
        pipeline_state != "COMPLETE" for pipeline_state in pipeline_experiments_df.state
    ):
        print("Pipeline runs are still running...")
        if any(
            pipeline_state == "FAILED"
            for pipeline_state in pipeline_experiments_df.state
        ):
            print("At least one Pipeline run failed")
            break
    else:
        print("Pipeline experiment runs have completed")
        break
    time.sleep(60)

run_name を使用して特定のパイプライン ジョブを呼び出すこともできます。

# Call the pipeline runs based on the experiment run name
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[0])
print(job.resource_name)
print(job._dashboard_uri())

最後に、設定した間隔(60 秒ごとなど)で実行の状態を更新して、状態が RUNNING から FAILED または COMPLETE に変化することを確認します。

# wait 60 seconds and view state again
import time
time.sleep(60)
vertex_ai.get_experiment_df(EXPERIMENT_NAME)

7. パフォーマンスが最も高い実行を特定する

これで、パイプラインの実行結果が得られました。結果から何がわかるのか、疑問に思われるかもしれません。テストの出力には、各パイプライン実行につき 1 行ずつ、合計 5 行の結果が含まれています。次のような形式になります。

Final-Results-Snapshot

MAE と RMSE はどちらも平均モデル予測誤差の尺度であるため、両方の指標の値を小さくするのが理想的です。Vertex AI Experiments の出力から、両方の指標で最もパフォーマンスの高い実行は最後の実行で、dropout_rate が 0.001、learning_rate が 0.001、epochs の合計数が 20 であることがわかります。このテストに基づくと、これらのモデル パラメータは、最終的に本番環境で使用されることになります。このパラメータで最も優れたモデル パフォーマンスが得られたからです。

これでラボは終了です。

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • プレーヤーの評価を予測するカスタム Keras モデルをトレーニングする(回帰など)
  • Kubeflow Pipelines SDK を使用してスケーラブルな ML パイプラインを構築する
  • 5 ステップのパイプライン(GCS からのデータの取り込み、データのスケーリング、モデルのトレーニング、その評価、結果としてできたモデルの GCS への保存)を作成して実行する
  • Vertex ML Metadata を活用し、モデルやモデルの指標などのモデル アーティファクトを保存する
  • Vertex AI Experiments を使用して、さまざまなパイプライン実行の結果を比較する

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

8. クリーンアップ

料金が発生しないように、このラボで作成したリソースを削除することをおすすめします。

ステップ 1: Notebooks インスタンスを停止または削除する

このラボで作成したノートブックを引き続き使用する場合は、未使用時にオフにすることをおすすめします。Cloud コンソールの Notebooks UI で、ノートブックを選択して [停止] を選択します。インスタンスを完全に削除する場合は、[削除] を選択します。

インスタンスの停止

ステップ 2: Cloud Storage バケットを削除する

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除