1. 總覽
在本實驗室中,您將使用 Vertex AI 建構管道,在 TensorFlow 中訓練自訂 Keras 模型。接著,我們會使用 Vertex AI Experiments 提供的新功能追蹤及比較模型執行結果,找出哪些超參數組合可帶來最佳效能。
課程內容
學習重點:
- 訓練自訂 Keras Model 模型,預測玩家評分 (例如迴歸)
- 使用 Kubeflow Pipelines SDK 建構可擴充的機器學習管道
- 建立並執行 5 步驟的管道,從 Cloud Storage 擷取資料、調整資料、訓練模型、評估模型,並將產生的模型儲存回 Cloud Storage
- 利用 Vertex 機器學習中繼資料儲存模型構件,例如模型和模型指標
- 使用 Vertex AI 實驗比較不同管道執行作業的結果
在 Google Cloud 中執行這個研究室的總費用約為 $1 美元。
2. Vertex AI 簡介
這個實驗室使用 Google Cloud 最新推出的 AI 產品服務。Vertex AI 整合了 Google Cloud 中的機器學習產品,提供流暢的開發體驗。先前,使用 AutoML 訓練的模型和自訂模型必須透過不同的服務存取。新產品將這兩項功能與其他新產品整合至單一 API。您也可以將現有專案遷移至 Vertex AI。
Vertex AI 包含許多不同產品,可支援端對端機器學習工作流程。本研究室將著重於以下產品:實驗、管道、機器學習中繼資料和工作台
3. 用途總覽
我們將使用 EA Sports 的 FIFA 系列電玩遊戲中熱門的足球資料集。包括超過 25,000 場足球比賽和 2008 至 2016 年賽季的超過 10,000 名玩家。資料已預先處理,因此您可以更輕鬆地開始使用。您將在整個實驗室中使用這份資料集,目前可在公開的 Cloud Storage 值區中找到。本程式碼研究室後續章節會進一步說明如何存取資料集。我們的最終目標是根據各種遊戲內動作 (例如攔截和犯規),預測玩家的整體評分。
為什麼 Vertex AI Experiments 對數據科學有用?
數據資料學在本質上屬於實驗性質,畢竟這些研究在本質上就稱為「科學家」。優秀的數據資料學家會提出假設,使用試誤法測試各種假設,並期望連續的疊代作業能創造出更出色的模型。
雖然數據科學團隊已採用實驗方法,但他們經常難以追蹤自己的工作,以及透過實驗發現的「秘密配方」。有幾個原因:
- 追蹤訓練工作可能很麻煩,您很容易就無法掌握哪些工作有效,哪些沒用
- 當您查看資料科學團隊時,這個問題會更加複雜,因為並非所有成員都會追蹤實驗,甚至可能不會與他人分享結果
- 擷取資料相當耗時,而且大多數團隊都會使用手動方法 (例如試算表或文件),導致擷取的資訊不一致且不完整
重點摘要:Vertex AI 實驗會代勞,協助您更輕鬆地追蹤及比較實驗
為何選擇使用 Vertex AI Experiments for Gaming?
遊戲一直是機器學習和 ML 實驗的試驗場。遊戲不僅每天產生數十億個即時事件,還會利用機器學習和機器學習實驗改善遊戲體驗、留住玩家,以及評估平台上的不同玩家。因此,我們認為遊戲資料集非常適合我們的整體實驗練習。
4. 設定環境
您需要已啟用計費功能的 Google Cloud Platform 專案,才能執行這個程式碼研究室。如要建立專案,請按照這篇文章中的操作說明進行。
步驟 1:啟用 Compute Engine API
前往「Compute Engine」,然後選取「啟用」 (如果尚未啟用)。
步驟 2:啟用 Vertex AI API
前往 Cloud 控制台的 Vertex AI 專區,然後點選「啟用 Vertex AI API」。
步驟 3:建立 Vertex AI Workbench 執行個體
在 Cloud 控制台的 Vertex AI 專區中,按一下「Workbench」:
如果尚未啟用 Notebooks API,請先啟用。
啟用後,按一下「MANAGED NOTEBOOKS」:
然後選取「新增筆記本」。
為筆記本命名,然後按一下「進階設定」。
在「進階設定」下方,啟用閒置關機功能,並將分鐘數設為 60。也就是說,筆記型電腦會在閒置時自動關機,避免產生不必要的費用。
步驟 4:開啟 Notebook
建立執行個體後,選取「Open JupyterLab」。
步驟 5:驗證 (僅限第一次驗證)
首次使用新的執行個體時,系統會要求您進行驗證。請按照 UI 中的步驟操作。
步驟 6:選取適當的核心
受管理的 Notebook 會在單一 UI 中提供多個核心。選取 Tensorflow 2 (本機) 的核心。
5. 在筆記本中完成初始設定步驟
建構管道前,您需要先採取一系列額外的步驟,在筆記本中設定環境。這些步驟包括:安裝任何額外套件、設定變數、建立 Cloud Storage 儲存空間、從公開儲存空間複製遊戲資料集,以及匯入程式庫和定義其他常數。
步驟 1:安裝其他套件
我們需要安裝筆記本環境中目前未安裝的其他套件依附元件。例如 KFP SDK。
!pip3 install --user --force-reinstall 'google-cloud-aiplatform>=1.15' -q --no-warn-conflicts
!pip3 install --user kfp -q --no-warn-conflicts
接著,請重新啟動 Notebook Kernel,以便在筆記本中使用下載的套件。
# Automatically restart kernel after installs
import os
if not os.getenv("IS_TESTING"):
# Automatically restart kernel after installs
import IPython
app = IPython.Application.instance()
app.kernel.do_shutdown(True)
步驟 2:設定變數
我們想要定義 PROJECT_ID
。如果您不知道自己的 Project_ID
,可以使用 gcloud 取得 PROJECT_ID
。
import os
PROJECT_ID = ""
# Get your Google Cloud project ID from gcloud
if not os.getenv("IS_TESTING"):
shell_output = !gcloud config list --format 'value(core.project)' 2>/dev/null
PROJECT_ID = shell_output[0]
print("Project ID: ", PROJECT_ID)
否則,請在此設定 PROJECT_ID
。
if PROJECT_ID == "" or PROJECT_ID is None:
PROJECT_ID = "[your-project-id]" # @param {type:"string"}
我們也要設定 REGION
變數,在此筆記本的其餘部分都會用到。以下是 Vertex AI 支援的區域。建議您選擇離您最近的區域。
- 美洲:us-central1
- 歐洲:europe-west4
- 亞太地區:asia-east1
請勿使用多區域值區進行 Vertex AI 訓練。並非所有地區都支援所有 Vertex AI 服務。進一步瞭解 Vertex AI 區域。
#set your region
REGION = "us-central1" # @param {type: "string"}
最後,我們會設定 TIMESTAMP
變數。這個變數可避免建立的資源在使用者之間發生名稱衝突,您可以為每個執行個體工作階段建立 TIMESTAMP
,並將其附加至您在本教學課程中建立的資源名稱。
#set timestamp to avoid collisions between multiple users
from datetime import datetime
TIMESTAMP = datetime.now().strftime("%Y%m%d%H%M%S")
步驟 3:建立 Cloud Storage 值區
您必須指定並運用 Cloud Storage 暫存值區。在測試階段值區中,系統會保留與資料集和模型資源相關聯的所有資料。
請在下方設定 Cloud Storage 值區名稱。值區名稱在所有 Google Cloud 專案 (包括貴機構以外的專案) 中不得重複。
#set cloud storage bucket
BUCKET_NAME = "[insert bucket name here]" # @param {type:"string"}
BUCKET_URI = f"gs://{BUCKET_NAME}"
如果值區「不存在」,可以執行下列儲存格來建立 Cloud Storage 值區。
! gsutil mb -l $REGION -p $PROJECT_ID $BUCKET_URI
接著,請執行以下儲存格來驗證 Cloud Storage 值區的存取權。
#verify access
! gsutil ls -al $BUCKET_URI
步驟 4:複製我們的遊戲資料集
如前所述,您將運用 EA Sports 熱門電玩遊戲《FIFA》的熱門遊戲資料集。我們已為您完成預先處理作業,因此您只需從公開儲存體值區複製資料集,然後移至您建立的值區即可。
# copy the data over to your cloud storage bucket
DATASET_URI = "gs://cloud-samples-data/vertex-ai/structured_data/player_data"
!gsutil cp -r $DATASET_URI $BUCKET_URI
步驟 5:匯入程式庫並定義其他常數
接下來,我們要匯入 Vertex AI、KFP 等程式庫。
import logging
import os
import time
logger = logging.getLogger("logger")
logging.basicConfig(level=logging.INFO)
import kfp.v2.compiler as compiler
# Pipeline Experiments
import kfp.v2.dsl as dsl
# Vertex AI
from google.cloud import aiplatform as vertex_ai
from kfp.v2.dsl import Artifact, Input, Metrics, Model, Output, component
from typing import NamedTuple
我們也會定義其他常數,以便在筆記本的其他部分參照,例如訓練資料的檔案路徑。
#import libraries and define constants
# Experiments
TASK = "regression"
MODEL_TYPE = "tensorflow"
EXPERIMENT_NAME = f"{PROJECT_ID}-{TASK}-{MODEL_TYPE}-{TIMESTAMP}"
# Pipeline
PIPELINE_URI = f"{BUCKET_URI}/pipelines"
TRAIN_URI = f"{BUCKET_URI}/player_data/data.csv"
LABEL_URI = f"{BUCKET_URI}/player_data/labels.csv"
MODEL_URI = f"{BUCKET_URI}/model"
DISPLAY_NAME = "experiments-demo-gaming-data"
BQ_DATASET = "player_data"
BQ_LOCATION = "US"
VIEW_NAME = 'dataset_test'
PIPELINE_JSON_PKG_PATH = "experiments_demo_gaming_data.json"
PIPELINE_ROOT = f"gs://{BUCKET_URI}/pipeline_root"
6. 開始建構管道
接下來,我們將開始使用 Vertex AI 建立訓練管道。我們會初始化 Vertex AI SDK、將訓練工作設為管道元件、建構管道、提交管道執行作業,並利用 Vertex AI SDK 查看實驗並監控實驗狀態。
步驟 1:初始化 Vertex AI SDK
初始化 Vertex AI SDK,設定 PROJECT_ID
和 BUCKET_URI
。
#initialize vertex AI SDK
vertex_ai.init(project=PROJECT_ID, staging_bucket=BUCKET_URI)
步驟 2:將訓練工作設為管道元件
為了開始執行實驗,我們需要將訓練工作定義為管道元件。我們的管道會接收訓練資料和超參數 (例如DROPOUT_RATE、LEARNING_RATE、EPOCHS) 做為輸入內容,並輸出模型指標 (例如MAE 和 RMSE) 以及模型構件。
@component(
packages_to_install=[
"numpy==1.21.0",
"pandas==1.3.5",
"scikit-learn==1.0.2",
"tensorflow==2.9.0",
]
)
def custom_trainer(
train_uri: str,
label_uri: str,
dropout_rate: float,
learning_rate: float,
epochs: int,
model_uri: str,
metrics: Output[Metrics],
model_metadata: Output[Model],
):
# import libraries
import logging
import uuid
from pathlib import Path as path
import pandas as pd
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.metrics import Metric
from sklearn.metrics import accuracy_score
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import mean_absolute_error
import numpy as np
from math import sqrt
import os
import tempfile
# set variables and use gcsfuse to update prefixes
gs_prefix = "gs://"
gcsfuse_prefix = "/gcs/"
train_path = train_uri.replace(gs_prefix, gcsfuse_prefix)
label_path = label_uri.replace(gs_prefix, gcsfuse_prefix)
model_path = model_uri.replace(gs_prefix, gcsfuse_prefix)
def get_logger():
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.StreamHandler()
handler.setFormatter(
logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
)
logger.addHandler(handler)
return logger
def get_data(
train_path: str,
label_path: str
) -> (pd.DataFrame):
#load data into pandas dataframe
data_0 = pd.read_csv(train_path)
labels_0 = pd.read_csv(label_path)
#drop unnecessary leading columns
data = data_0.drop('Unnamed: 0', axis=1)
labels = labels_0.drop('Unnamed: 0', axis=1)
#save as numpy array for reshaping of data
labels = labels.values
data = data.values
# Split the data
labels = labels.reshape((labels.size,))
train_data, test_data, train_labels, test_labels = train_test_split(data, labels, test_size=0.2, shuffle=True, random_state=7)
#Convert data back to pandas dataframe for scaling
train_data = pd.DataFrame(train_data)
test_data = pd.DataFrame(test_data)
train_labels = pd.DataFrame(train_labels)
test_labels = pd.DataFrame(test_labels)
#Scale and normalize the training dataset
scaler = StandardScaler()
scaler.fit(train_data)
train_data = pd.DataFrame(scaler.transform(train_data), index=train_data.index, columns=train_data.columns)
test_data = pd.DataFrame(scaler.transform(test_data), index=test_data.index, columns=test_data.columns)
return train_data,train_labels, test_data, test_labels
""" Train your Keras model passing in the training data and values for learning rate, dropout rate,and the number of epochs """
def train_model(
learning_rate: float,
dropout_rate: float,
epochs: float,
train_data: pd.DataFrame,
train_labels: pd.DataFrame):
# Train tensorflow model
param = {"learning_rate": learning_rate, "dropout_rate": dropout_rate, "epochs": epochs}
model = Sequential()
model.add(Dense(500, input_dim=train_data.shape[1], activation= "relu"))
model.add(Dropout(param['dropout_rate']))
model.add(Dense(100, activation= "relu"))
model.add(Dense(50, activation= "relu"))
model.add(Dense(1))
model.compile(
tf.keras.optimizers.Adam(learning_rate= param['learning_rate']),
loss='mse',
metrics=[tf.keras.metrics.RootMeanSquaredError(),tf.keras.metrics.MeanAbsoluteError()])
model.fit(train_data, train_labels, epochs= param['epochs'])
return model
# Get Predictions
def get_predictions(model, test_data):
dtest = pd.DataFrame(test_data)
pred = model.predict(dtest)
return pred
# Evaluate predictions with MAE
def evaluate_model_mae(pred, test_labels):
mae = mean_absolute_error(test_labels, pred)
return mae
# Evaluate predictions with RMSE
def evaluate_model_rmse(pred, test_labels):
rmse = np.sqrt(np.mean((test_labels - pred)**2))
return rmse
#Save your trained model in GCS
def save_model(model, model_path):
model_id = str(uuid.uuid1())
model_path = f"{model_path}/{model_id}"
path(model_path).parent.mkdir(parents=True, exist_ok=True)
model.save(model_path + '/model_tensorflow')
# Main ----------------------------------------------
train_data, train_labels, test_data, test_labels = get_data(train_path, label_path)
model = train_model(learning_rate, dropout_rate, epochs, train_data,train_labels )
pred = get_predictions(model, test_data)
mae = evaluate_model_mae(pred, test_labels)
rmse = evaluate_model_rmse(pred, test_labels)
save_model(model, model_path)
# Metadata ------------------------------------------
#convert numpy array to pandas series
mae = pd.Series(mae)
rmse = pd.Series(rmse)
#log metrics and model artifacts with ML Metadata. Save metrics as a list.
metrics.log_metric("mae", mae.to_list())
metrics.log_metric("rmse", rmse.to_list())
model_metadata.uri = model_uri
步驟 3:建立管道
我們現在將使用 KFP 提供的 Domain Specific Language (DSL)
設定工作流程,並將管道編譯為 JSON
檔案。
# define our workflow
@dsl.pipeline(name="gaming-custom-training-pipeline")
def pipeline(
train_uri: str,
label_uri: str,
dropout_rate: float,
learning_rate: float,
epochs: int,
model_uri: str,
):
custom_trainer(
train_uri,label_uri, dropout_rate,learning_rate,epochs, model_uri
)
#compile our pipeline
compiler.Compiler().compile(pipeline_func=pipeline, package_path="gaming_pipeline.json")
步驟 4:提交管道執行作業
我們已完成設定元件和定義管道的繁重工作。我們已準備就緒,可提交上述管道的各種執行作業。為此,我們需要定義不同超參數的值,如下所示:
runs = [
{"dropout_rate": 0.001, "learning_rate": 0.001,"epochs": 20},
{"dropout_rate": 0.002, "learning_rate": 0.002,"epochs": 25},
{"dropout_rate": 0.003, "learning_rate": 0.003,"epochs": 30},
{"dropout_rate": 0.004, "learning_rate": 0.004,"epochs": 35},
{"dropout_rate": 0.005, "learning_rate": 0.005,"epochs": 40},
]
定義超參數後,我們可以利用 for loop
成功在管道中執行不同的執行作業:
for i, run in enumerate(runs):
job = vertex_ai.PipelineJob(
display_name=f"{EXPERIMENT_NAME}-pipeline-run-{i}",
template_path="gaming_pipeline.json",
pipeline_root=PIPELINE_URI,
parameter_values={
"train_uri": TRAIN_URI,
"label_uri": LABEL_URI,
"model_uri": MODEL_URI,
**run,
},
)
job.submit(experiment=EXPERIMENT_NAME)
步驟 5:使用 Vertex AI SDK 查看實驗
Vertex AI SDK 可讓您監控管道執行作業的狀態。另外,也能在 Vertex AI 實驗中,傳回管道執行作業的參數和指標。使用下列程式碼查看與執行作業相關聯的參數,以及目前狀態。
# see state/status of all the pipeline runs
vertex_ai.get_experiment_df(EXPERIMENT_NAME)
您可以利用下列程式碼,取得管道執行作業的最新狀態。
#check on current status
while True:
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
if all(
pipeline_state != "COMPLETE" for pipeline_state in pipeline_experiments_df.state
):
print("Pipeline runs are still running...")
if any(
pipeline_state == "FAILED"
for pipeline_state in pipeline_experiments_df.state
):
print("At least one Pipeline run failed")
break
else:
print("Pipeline experiment runs have completed")
break
time.sleep(60)
您也可以使用 run_name
呼叫特定管道工作。
# Call the pipeline runs based on the experiment run name
pipeline_experiments_df = vertex_ai.get_experiment_df(EXPERIMENT_NAME)
job = vertex_ai.PipelineJob.get(pipeline_experiments_df.run_name[0])
print(job.resource_name)
print(job._dashboard_uri())
最後,您可以按照設定的間隔 (例如每 60 秒) 重新整理執行作業的狀態,查看狀態從 RUNNING
變更為 FAILED
或 COMPLETE
。
# wait 60 seconds and view state again
import time
time.sleep(60)
vertex_ai.get_experiment_df(EXPERIMENT_NAME)
7. 找出成效最佳的執行作業
很好,現在我們已取得管道執行結果。你可能會想知道,我從結果中獲得什麼資訊?實驗的輸出結果應包含五列,管道每次執行各有一個。如下所示:
MAE 和 RMSE 都是模型預測平均誤差的測量結果,因此在大多數情況下,兩個指標的值都會較低。根據 Vertex AI 實驗的輸出內容,我們發現在兩項指標中,最成功的執行結果是最終執行結果,其中 dropout_rate
為 0.001、learning_rate
為 0.001,而 epochs
的總數為 20。根據這項實驗,這些模型參數最終會用於實際工作環境,因為這可帶來最佳模型效能。
這樣就完成實驗室了!
🎉 恭喜!🎉
您已瞭解如何使用 Vertex AI 執行下列作業:
- 訓練自訂 Keras Model 模型,預測玩家評分 (例如迴歸)
- 使用 Kubeflow Pipelines SDK 建構可擴充的機器學習管道
- 建立並執行包含 5 步驟的管道。這個管道會從 GCS 擷取資料、調度資料、訓練模型、評估模型,並將結果模型儲存回 GCS
- 利用 Vertex 機器學習中繼資料儲存模型構件,例如模型和模型指標
- 使用 Vertex AI 實驗比較不同管道執行作業的結果
如要進一步瞭解 Vertex 的其他部分,請參閱說明文件。
8. 清理
為避免產生費用,建議您刪除在本研究室中建立的資源。
步驟 1:停止或刪除 Notebooks 執行個體
如果您想繼續使用在本實驗室中建立的 Notebook,建議您在未使用時關閉 Notebook。在 Cloud 控制台的 Notebook 使用者介面中,選取 Notebook,然後選取「Stop」。如要將執行個體完全刪除,請選取 [Delete] (刪除):
步驟 2:刪除 Cloud Storage 值區
如要刪除儲存體值區,請使用 Cloud 控制台的「導覽」選單,前往「儲存體」頁面,選取所需值區,然後按一下「刪除」: