充分發揮實驗的效益:運用 Vertex AI 管理機器學習實驗

1. 總覽

在這個研究室中,您將使用 Vertex AI 建構管道,在 TensorFlow 中訓練自訂 Keras 模型。之後,我們會使用 Vertex AI 實驗中的新功能追蹤及比較模型執行作業,找出哪些超參數組合能帶來最佳成效。

課程內容

您將學習下列內容:

  • 訓練自訂 Keras Model 模型,預測玩家評分 (例如迴歸)
  • 使用 Kubeflow Pipelines SDK 建構可擴充的機器學習管道
  • 建立並執行 5 步驟管道。這個管道會從 Cloud Storage 擷取資料、調度資料、訓練模型、評估模型,以及將產生的模型存回 Cloud Storage。
  • 運用 Vertex 機器學習中繼資料儲存模型構件,例如模型和模型指標
  • 透過 Vertex AI 實驗,比較各種管道執行作業的結果

在 Google Cloud 中執行這個研究室的總費用約為 $1 美元。

2. Vertex AI 簡介

這個研究室使用 Google Cloud 最新的 AI 產品服務。Vertex AI 將 Google Cloud 中的機器學習產品整合到流暢的開發體驗中。先前使用 AutoML 訓練的模型和自訂模型,都能透過不同的服務存取。這項新產品會與其他新產品一起合併為一個 API。您也可以將現有專案遷移至 Vertex AI。

Vertex AI 提供許多不同的產品,可支援端對端機器學習工作流程。本研究室將著重介紹下列產品:實驗管道機器學習中繼資料Workbench

Vertex 產品總覽

3. 用途總覽

我們會使用 EA Sports 這個熱門的足球資料集FIFA 電玩遊戲系列。包括超過 25,000 場足球比賽和 2008 至 2016 年賽季的超過 10,000 名玩家。資料已預先處理,讓您更容易上手。您現在可以在研究室中運用這個資料集,只要前往公開的 Cloud Storage 值區即可找到。稍後我們會在程式碼研究室中詳細說明如何存取資料集。我們的最終目標是根據各種遊戲行為 (例如攔截次數和懲罰),預測玩家的整體評分。

為什麼 Vertex AI 實驗對數據資料學來說相當實用?

數據資料學在本質上屬於實驗性質,畢竟這些研究在本質上就稱為「科學家」。優秀的數據資料學家會提出假設,使用試誤法測試各種假設,並期望連續的疊代作業能創造出更出色的模型。

雖然數據資料學團隊已勇於實驗,但往往難以追蹤工作進度和「秘密武器」這個現象很明顯有幾個原因:

  • 追蹤訓練工作可能很麻煩,您很容易就無法掌握哪些工作有效,哪些沒用
  • 探究數據資料學團隊時,這個問題會變得更加棘手,因為有些成員會追蹤實驗,甚至與他人分享實驗結果
  • 資料擷取相當耗時,多數團隊都採用手動方法 (例如工作表或文件),導致資訊不一致且不完整,導致資訊無法學習

重點摘要:Vertex AI 實驗會代為處理這些工作,讓您輕鬆追蹤及比較實驗

選用 Vertex AI 遊戲實驗的理由

遊戲過去一直是機器學習和機器學習實驗的遊樂場。遊戲不僅每天產生數十億個即時事件,還會利用機器學習和機器學習實驗改善遊戲體驗、留住玩家,以及評估平台上的不同玩家。因此,我們設計出遊戲資料集,很適合搭配整體實驗練習。

4. 設定環境

您需要已啟用計費功能的 Google Cloud Platform 專案,才能執行這個程式碼研究室。如要建立專案,請按照這裡的操作說明進行。

步驟 1:啟用 Compute Engine API

前往「Compute Engine」,並選取「啟用」 (如果尚未啟用)。

步驟 2:啟用 Vertex AI API

前往 Cloud 控制台的 Vertex AI 專區,然後按一下「啟用 Vertex AI API」

Vertex AI 資訊主頁

步驟 3:建立 Vertex AI Workbench 執行個體

在 Cloud 控制台的 Vertex AI 專區中,按一下 Workbench:

Vertex AI 選單

啟用 Notebooks API (如果尚未啟用)。

Notebook_api

啟用後,按一下「代管的筆記本」

Notebooks_UI

然後選取「新增筆記本」

new_notebook

為筆記本命名,然後按一下「進階設定」

create_notebook

在進階設定下方啟用閒置關閉,並將時間長度設為 60。也就是說,您的筆記本在未使用時自動關閉,因此不會產生不必要的費用。

idle_timeout

步驟 4:開啟筆記本

執行個體建立完成後,請選取「Open JupyterLab」

open_jupyterlab

步驟 5:驗證 (僅限第一次驗證)

首次使用新的執行個體時,系統會要求您進行驗證。請按照 UI 中的步驟操作。

驗證

步驟 6:選取適當的核心

代管的筆記本會在單一 UI 中提供多個核心。選取 TensorFlow 2 (本機) 的核心。

tensorflow_kernel

5. 筆記本的初始設定步驟

建構管道前,您需要先採取一系列額外的步驟,在筆記本中設定環境。這些步驟包括安裝任何其他套件、設定變數、建立 Cloud Storage 值區、從公開儲存空間值區中複製遊戲資料集,以及匯入程式庫以及定義其他常數。

步驟 1:安裝其他套件

我們必須安裝筆記本環境中目前未安裝的其他套件依附元件。例如 KFP SDK。

!pip3 install --user --force-reinstall 'google-cloud-aiplatform>=1.15' -q --no-warn-conflicts
!pip3 install --user kfp -q --no-warn-conflicts

接著重新啟動筆記本核心,以便在筆記本中使用已下載的套件。

# Automatically restart kernel after installs
import os

if not os.getenv("IS_TESTING"):
    # Automatically restart kernel after installs
    import IPython

    app = IPython.Application.instance()
    app.kernel.do_shutdown(True)

步驟 2:設定變數

我們想要定義 PROJECT_ID。如果您不知道自己的 Project_ID,可以透過 gcloud 取得 PROJECT_ID

import os

PROJECT_ID = ""

# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
    shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
    PROJECT_ID = shell_output[0]
    print("Project ID: ", PROJECT_ID)

否則,請在此設定您的 PROJECT_ID

if PROJECT_ID == "" or PROJECT_ID is None:
    PROJECT_ID = "[your-project-id]"  # @param {type:"string"}

我們也要設定 REGION 變數,在此筆記本的其餘部分都會用到。以下是 Vertex AI 支援的區域。建議您選擇離您最近的地區。

  • 美洲:us-central1
  • 歐洲:europe-west4
  • 亞太地區:asia-east1

請勿使用多區域值區進行 Vertex AI 訓練。並非所有區域都支援所有 Vertex AI 服務。進一步瞭解 Vertex AI 區域

#set your region 
REGION = "us-central1"  # @param {type: "string"}

最後,我們要設定 TIMESTAMP 變數。這個變數可用來避免使用者在所建立資源上發生名稱衝突,您可為每個執行個體工作階段建立 TIMESTAMP,並附加至您在本教學課程內建立的資源名稱上。

#set timestamp to avoid collisions between multiple users

from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")

步驟 3:建立 Cloud Storage 值區

您必須指定並使用 Cloud Storage 暫存值區。暫存值區會在各個工作階段中保留與資料集和模型資源相關的所有資料。

請在下方設定 Cloud Storage 值區名稱。值區名稱在所有 Google Cloud 專案 (包括貴機構外部的專案) 中均不得重複。

#set cloud storage bucket 
BUCKET_NAME = "[insert bucket name here]"  # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"

如果值區「不存在」,可以執行下列儲存格來建立 Cloud Storage 值區。

! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI

接著,請執行以下儲存格來驗證 Cloud Storage 值區的存取權。

#verify access 
! gsutil ls -al $BUCKET_URI

步驟 4:複製遊戲資料集

如前所述,您將使用 EA Sports 熱門電玩遊戲《FIFA》的熱門遊戲資料集。我們已為您完成預先處理作業,因此您只要複製公開儲存空間值區中的資料集,並移至您建立的資料集即可。

# copy the data over to your cloud storage bucket
DATASET_URI = "gs://cloud-samples-data/vertex-ai/structured_data/player_data" 

!gsutil cp -r $DATASET_URI $BUCKET_URI

步驟 5:匯入程式庫並定義其他常數

接下來,我們會匯入 Vertex AI、KFP 等程式庫。

import logging
import os
import time

logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)

import kfp.v2.compiler as compiler
# Pipeline Experiments
import kfp.v2.dsl as dsl
# Vertex AI
from google.cloud import aiplatform as vertex_ai
from kfp.v2.dsl import Artifact, Input, Metrics, Model, Output, component
from typing import NamedTuple

我們也會定義其他常數,我們會在筆記本的其餘部分參照這些常數,例如訓練資料的檔案路徑。

#import libraries and define constants
# Experiments

TASK = "regression"
MODEL_TYPE = "tensorflow"
EXPERIMENT_NAME = f"{PROJECT_ID}-{TASK}-{MODEL_TYPE}-{TIMESTAMP}"

# Pipeline
PIPELINE_URI = f"{BUCKET_URI}/pipelines"
TRAIN_URI = f"{BUCKET_URI}/player_data/data.csv"
LABEL_URI = f"{BUCKET_URI}/player_data/labels.csv"
MODEL_URI = f"{BUCKET_URI}/model"
DISPLAY_NAME = "experiments-demo-gaming-data"
BQ_DATASET = "player_data"
BQ_LOCATION = "US"  
VIEW_NAME = 'dataset_test'
PIPELINE_JSON_PKG_PATH = "experiments_demo_gaming_data.json"
PIPELINE_ROOT = f"gs://{BUCKET_URI}/pipeline_root"

6. 開始建構管道

現在我們可以開始享受您的樂趣了,接著就能開始使用 Vertex AI 建構訓練管線。我們會初始化 Vertex AI SDK、將訓練工作設為管線元件、建構管線、提交管線執行作業,以及透過 Vertex AI SDK 查看實驗及監控狀態。

步驟 1:初始化 Vertex AI SDK

初始化 Vertex AI SDK,設定 PROJECT_IDBUCKET_URI

#initialize vertex AI SDK 
vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)

步驟 2:將訓練工作設為管道元件

為了開始執行實驗,我們必須將訓練工作定義為管線元件。我們的管道會接收訓練資料和超參數 (例如DROPOUT_RATELEARNING_RATEEPOCHS) 做為輸入內容和輸出模型指標 (例如MAERMSE) 以及模型構件。

@component(
    packages_to_install=[
        "numpy==1.21.0",
        "pandas==1.3.5", 
        "scikit-learn==1.0.2",
        "tensorflow==2.9.0",
    ]
)
def custom_trainer(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
    metrics: Output[Metrics], 
    model_metadata: Output[Model], 
    

):

    # import libraries
    import logging
    import uuid
    from pathlib import Path as path

    import pandas as pd
    import tensorflow as tf
    from tensorflow import keras
    from tensorflow.keras.models import Sequential
    from tensorflow.keras.layers import Dense
    from tensorflow.keras.layers import Dropout
    from tensorflow.keras.metrics import Metric 
    from sklearn.metrics import accuracy_score
    from sklearn.model_selection import train_test_split
    from sklearn.preprocessing import StandardScaler
    from sklearn.metrics import mean_absolute_error
    import numpy as np
    from math import sqrt
    import os
    import tempfile

    # set variables and use gcsfuse to update prefixes
    gs_prefix = "gs://"
    gcsfuse_prefix = "/gcs/"
    train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
    label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
    model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)

    def get_logger():

        logger = logging.getLogger(__name__)
        logger.setLevel(logging.INFO)
        handler = logging.StreamHandler()
        handler.setFormatter(
            logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
        )
        logger.addHandler(handler)
        return logger

    def get_data(
        train_path: str, 
        label_path: str
    ) -> (pd.DataFrame): 
        
        
        #load data into pandas dataframe
        data_0 = pd.read_csv(train_path)
        labels_0 = pd.read_csv(label_path)
        
        #drop unnecessary leading columns
        
        data = data_0.drop('Unnamed: 0', axis=1)
        labels = labels_0.drop('Unnamed: 0', axis=1)
        
        #save as numpy array for reshaping of data 
        
        labels = labels.values
        data = data.values
    
        # Split the data
        labels = labels.reshape((labels.size,))
        train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, shuffle=True, random_state=7)
    
        #Convert data back to pandas dataframe for scaling
        
        train_data = pd.DataFrame(train_data)
        test_data = pd.DataFrame(test_data)
        train_labels = pd.DataFrame(train_labels)
        test_labels = pd.DataFrame(test_labels)
        
        #Scale and normalize the training dataset
        
        scaler = StandardScaler()
        scaler.fit(train_data)
        train_data = pd.DataFrame(scaler.transform(train_data), index=train_data.index, columns=train_data.columns)
        test_data = pd.DataFrame(scaler.transform(test_data), index=test_data.index, columns=test_data.columns)
        
        return train_data,train_labels, test_data, test_labels 
    
        """ Train your Keras model passing in the training data and values for learning rate, dropout rate,and the number of epochs """

    def train_model(
        learning_rate: float, 
        dropout_rate: float,
        epochs: float,
        train_data: pd.DataFrame,
        train_labels: pd.DataFrame):
 
        # Train tensorflow model
        param = {"learning_rate": learning_rate, "dropout_rate": dropout_rate, "epochs": epochs}
        model = Sequential()
        model.add(Dense(500, input_dim=train_data.shape[1], activation= "relu"))
        model.add(Dropout(param['dropout_rate']))
        model.add(Dense(100, activation= "relu"))
        model.add(Dense(50, activation= "relu"))
        model.add(Dense(1))
            
        model.compile(
        tf.keras.optimizers.Adam(learning_rate= param['learning_rate']),
        loss='mse',
        metrics=[tf.keras.metrics.RootMeanSquaredError(),tf.keras.metrics.MeanAbsoluteError()])
        
        model.fit(train_data, train_labels, epochs= param['epochs'])
        
        return model

    # Get Predictions
    def get_predictions(model, test_data):

        dtest = pd.DataFrame(test_data)
        pred = model.predict(dtest)
        return pred

    # Evaluate predictions with MAE
    def evaluate_model_mae(pred, test_labels):
        
        mae = mean_absolute_error(test_labels, pred)
        return mae
    
    # Evaluate predictions with RMSE
    def evaluate_model_rmse(pred, test_labels):

        rmse = np.sqrt(np.mean((test_labels - pred)**2))
        return rmse    
 
    
    #Save your trained model in GCS     
    def save_model(model, model_path):

        model_id = str(uuid.uuid1())
        model_path = f"{model_path}/{model_id}"        
        path(model_path).parent.mkdir(parents=True, exist_ok=True)
        model.save(model_path + '/model_tensorflow')

        
    # Main ----------------------------------------------
    
    train_data, train_labels, test_data, test_labels = get_data(train_path, label_path)
    model = train_model(learning_rate, dropout_rate, epochs, train_data,train_labels )
    pred = get_predictions(model, test_data)
    mae = evaluate_model_mae(pred, test_labels)
    rmse = evaluate_model_rmse(pred, test_labels)
    save_model(model, model_path)

    # Metadata ------------------------------------------

    #convert numpy array to pandas series
    mae = pd.Series(mae)
    rmse = pd.Series(rmse)

    #log metrics and model artifacts with ML Metadata. Save metrics as a list. 
    metrics.log_metric("mae", mae.to_list()) 
    metrics.log_metric("rmse", rmse.to_list()) 
    model_metadata.uri = model_uri

步驟 3:建立管道

現在,我們要使用 KFP 中提供的 Domain Specific Language (DSL) 設定工作流程,並將管道編譯為 JSON 檔案。

# define our workflow

@dsl.pipeline(name="gaming-custom-training-pipeline")
def pipeline(
    train_uri: str,
    label_uri: str,
    dropout_rate: float,
    learning_rate: float,
    epochs: int,
    model_uri: str,
):

    custom_trainer(
        train_uri,label_uri, dropout_rate,learning_rate,epochs, model_uri
    )
#compile our pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path="gaming_pipeline.json")

步驟 4:提交管道執行作業

困難的工作已經完成元件設定,以及定義管道。我們已準備就緒,可提交上述指定管道的各種執行作業。為此,我們需要定義不同超參數的值,如下所示:

runs = [
    {"dropout_rate": 0.001, "learning_rate": 0.001,"epochs": 20},
    {"dropout_rate": 0.002, "learning_rate": 0.002,"epochs": 25},
    {"dropout_rate": 0.003, "learning_rate": 0.003,"epochs": 30},
    {"dropout_rate": 0.004, "learning_rate": 0.004,"epochs": 35},
    {"dropout_rate": 0.005, "learning_rate": 0.005,"epochs": 40},
]

定義超參數後,我們就能利用 for loop,在管道的不同執行作業中成功饋送來源:

for i, run in enumerate(runs):

    job = vertex_ai.PipelineJob(
        display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
        template_path="gaming_pipeline.json",
        pipeline_root=PIPELINE_URI,
        parameter_values={
            "train_uri": TRAIN_URI,
            "label_uri": LABEL_URI,
            "model_uri": MODEL_URI,
            **run,
        },
    )
    job.submit(experiment=EXPERIMENT_NAME)

步驟 5:利用 Vertex AI SDK 查看實驗

Vertex AI SDK 可讓您監控管道執行作業的狀態。另外,也能在 Vertex AI 實驗中,傳回管道執行作業的參數和指標。請使用以下程式碼查看與執行作業相關聯的參數和目前狀態。

# see state/status of all the pipeline runs

vertex_ai.get_experiment_df(EXPERIMENT_NAME)

您可以使用以下程式碼,取得管道執行作業的最新消息。

#check on current status
while True:
    pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
    if all(
        pipeline_state != "COMPLETE" for pipeline_state in pipeline_experiments_df.state
    ):
        print("Pipeline runs are still running...")
        if any(
            pipeline_state == "FAILED"
            for pipeline_state in pipeline_experiments_df.state
        ):
            print("At least one Pipeline run failed")
            break
    else:
        print("Pipeline experiment runs have completed")
        break
    time.sleep(60)

您也可以使用 run_name 呼叫特定管道工作。

# Call the pipeline runs based on the experiment run name
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[0])
print(job.resource_name)
print(job._dashboard_uri())

最後,您可以按照設定的時間間隔 (例如每 60 秒) 重新整理執行作業狀態,查看狀態從 RUNNING 變更為 FAILEDCOMPLETE

# wait 60 seconds and view state again
import time
time.sleep(60)
vertex_ai.get_experiment_df(EXPERIMENT_NAME)

7. 找出最佳跑步表現

很好,現在已取得管道執行作業的結果。你可能會想知道,我從結果中獲得什麼資訊?實驗的輸出結果應包含五列,管道每次執行各有一個。如下所示:

Final-Results-Snapshot

MAE 和 RMSE 都是模型預測平均誤差的測量結果,因此在大多數情況下,兩個指標的值都會較低。根據 Vertex AI 實驗的輸出結果,我們最成功的兩個指標分別是 dropout_rate 為 0.001、learning_rate 為 0.001,以及 epochs 總數為 20,是最後的執行結果。根據這項實驗結果,這些模型參數最終會用於正式環境,因為能產生最佳模型成效。

這表示您已完成研究室!

🎉 恭喜!🎉

您已瞭解如何使用 Vertex AI 執行下列作業:

  • 訓練自訂 Keras Model 模型,預測玩家評分 (例如迴歸)
  • 使用 Kubeflow Pipelines SDK 建構可擴充的機器學習管道
  • 建立並執行包含 5 步驟的管道。這個管道會從 GCS 擷取資料、調度資料、訓練模型、評估模型,並將結果模型儲存回 GCS
  • 運用 Vertex 機器學習中繼資料儲存模型構件,例如模型和模型指標
  • 透過 Vertex AI 實驗,比較各種管道執行作業的結果

如要進一步瞭解 Vertex 的其他部分,請參閱說明文件

8. 清除

因此,建議您刪除在本研究室中建立的資源,如此一來,系統就不會向您收費。

步驟 1:停止或刪除 Notebooks 執行個體

如果想繼續使用您在這個研究室中建立的筆記本,建議在不使用時將其關閉。在 Cloud 控制台的「Notebooks」(筆記本) UI 中,依序選取筆記本和「Stop」(停止)。如要將執行個體完全刪除,請選取 [Delete] (刪除)

停止執行個體

步驟 2:刪除 Cloud Storage 值區

如要刪除 Storage 值區,請使用 Cloud 控制台中的導覽選單前往「Storage」(儲存空間)、選取值區,然後點選「Delete」(刪除):

刪除儲存空間