Vertex AI:透過 TensorFlow 進行多工作站訓練和遷移學習

1. 總覽

在本實驗室中,您將使用 Vertex AI 為 TensorFlow 模型執行多工作者訓練工作。

課程內容

學習重點:

  • 修改訓練應用程式程式碼,以便進行多工作站訓練
  • 透過 Vertex AI 使用者介面設定及啟動多工作者訓練工作
  • 使用 Vertex SDK 設定及啟動多工作站訓練工作

在 Google Cloud 上執行本實驗室的總費用約為 $5 美元。

2. Vertex AI 簡介

這個實驗室使用 Google Cloud 最新推出的 AI 產品服務。Vertex AI 整合了 Google Cloud 中的機器學習產品,提供流暢的開發體驗。先前,使用 AutoML 訓練的模型和自訂模型必須透過不同的服務存取。新產品將這兩項功能與其他新產品整合至單一 API。您也可以將現有專案遷移至 Vertex AI。如有任何意見回饋,請前往支援頁面

Vertex AI 包含許多不同產品,可支援端對端機器學習工作流程。本研究室將著重於下列產品:訓練工作台

Vertex 產品總覽

3. 用途簡介

在本研究室中,您會使用遷移學習,在 TensorFlow 資料集中的 cassava 資料集訓練圖片分類模型。您要使用的架構是從 tf.keras.applications 程式庫預先訓練的 Imagenet 資料集

為什麼要採用分散式訓練?

如果您只有一個 GPU,TensorFlow 會使用這個加速器加快模型訓練速度,而您不必額外進行任何工作。不過,如果您想在單一機器或多台機器 (每台機器可能都有多個 GPU) 上使用多個 GPU 來提升效能,就必須使用 tf.distribute,這是 TensorFlow 的程式庫,可在多部裝置上執行運算。裝置是指在部分可執行 TensorFlow 作業的機器中,CPU 或加速器,例如 GPU 或 TPU。

要開始使用分散式訓練,最簡單的方法就是使用單一機器搭配多個 GPU 裝置。tf.distribute 模組的 TensorFlow 分發策略會管理所有 GPU 之間資料分發和梯度更新的協調作業。如果您已掌握單主機訓練,並希望進一步擴大規模,則可在叢集中加入多台機器,進一步提升效能。您可以使用一組僅有 CPU 的機器,或是每部機器都配備一或多個 GPU 的機器。本研究室涵蓋第二個案例,示範如何在 Vertex AI 中,使用 MultiWorkerMirroredStrategy 將 TensorFlow 模型的訓練發布在多部機器上。

MultiWorkerMirroredStrategy 是同步資料並行處理策略,只需變更幾行程式碼即可使用。系統會在叢集中的每部裝置上建立模型副本。後續的漸層更新會以同步方式進行。也就是說,每個 worker 裝置會在輸入資料的不同區塊中,透過模型計算前向和後向傳遞。從每個配量計算出的梯度後,系統會匯總單一機器上所有裝置和叢集內所有機器的梯度,然後透過「All-reduce」(全部減少) 程序降低 (通常為平均)。接著,最佳化工具會使用這些減少的漸層來更新參數,藉此讓裝置保持同步。如要進一步瞭解如何使用 TensorFlow 進行分散式訓練,請觀看以下影片:

4. 設定環境

您必須擁有啟用計費功能的 Google Cloud Platform 專案,才能執行這個程式碼研究室。如要建立專案,請按照這篇文章中的操作說明進行。

步驟 1:啟用 Compute Engine API

前往「Compute Engine」,並選取「啟用」 (如果尚未啟用)。您需要這項資訊才能建立 Notebook 執行個體。

步驟 2:啟用 Container Registry API

前往容器註冊服務,然後選取「啟用」(如果尚未啟用)。您將使用這個檔案為自訂訓練工作建立容器。

步驟 3:啟用 Vertex AI API

前往 Cloud 控制台的 Vertex AI 專區,然後點選「啟用 Vertex AI API」

Vertex AI 資訊主頁

步驟 4:建立 Vertex AI Workbench 執行個體

在 Cloud 控制台的 Vertex AI 專區中,按一下「Workbench」:

Vertex AI 選單

如果尚未啟用 Notebooks API,請先啟用。

Notebook_api

啟用後,按一下「MANAGED NOTEBOOKS」

Notebooks_UI

然後選取「新增筆記本」

new_notebook

為筆記本命名,然後按一下「進階設定」

create_notebook

在「進階設定」下方,啟用閒置關機功能,並將分鐘數設為 60。也就是說,您的筆記本在未使用時自動關閉,因此不會產生不必要的費用。

idle_timeout

在「安全性」下方,選取「啟用終端機」(如果尚未啟用)。

enable_terminal

其他進階設定皆可保持原樣。

接著,按一下「建立」。執行個體的佈建作業會在幾分鐘內完成。

建立執行個體後,選取「Open JupyterLab」

open_jupyterlab

首次使用新執行個體時,系統會要求您進行驗證。請按照 UI 中的步驟操作。

驗證

5. 將訓練應用程式程式碼容器化

您可以將訓練應用程式程式碼放入 Docker 容器,然後將該容器推送至 Google Container Registry,藉此將訓練工作提交至 Vertex。您可以使用這種方法訓練任何架構建立的模型。

首先,請在 Launcher 選單中開啟筆記本執行個體的終端機視窗:

在筆記本中開啟終端機

建立名為 cassava 的新目錄,然後切換至該目錄:

mkdir cassava
cd cassava

步驟 1:建立 Dockerfile

將程式碼容器化的首要步驟,就是建立 Dockerfile。您會在 Dockerfile 中加入執行映像檔所需的所有指令。其會安裝所有必要的程式庫,並設定訓練程式碼的進入點。

在終端機中建立空白的 Dockerfile:

touch Dockerfile

開啟 Dockerfile,並將以下內容複製到檔案中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

這個 Dockerfile 會使用 深度學習容器 TensorFlow Enterprise 2.7 GPU Docker 映像檔。Google Cloud 的深度學習容器預先安裝許多常見的機器學習和數據資料學架構。下載該映像檔後,這個 Dockerfile 會設定訓練程式碼的進入點。您尚未建立這些檔案。在下一個步驟中,您將新增程式碼來訓練和調整模型。

步驟 2:建立 Cloud Storage 值區

在這個訓練工作中,您會將經過訓練的 TensorFlow 模型匯出至 Cloud Storage 值區。在終端機中執行以下指令,為專案定義 env 變數,請務必將 your-cloud-project 替換為專案 ID:

PROJECT_ID='your-cloud-project'

接著,在終端機中執行下列指令,在專案中建立新值區。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

步驟 3:新增模型訓練程式碼

在終端機中執行下列指令,為訓練程式碼建立目錄,並建立 Python 檔案以便新增程式碼:

mkdir trainer
touch trainer/task.py

cassava/ 目錄現在應該會包含以下內容:

+ Dockerfile
+ trainer/
    + task.py

接著,開啟剛剛建立的 task.py 檔案,並複製以下程式碼。請將 {your-gcs-bucket} 替換為剛建立的 Cloud Storage 值區名稱。

import tensorflow as tf
import tensorflow_datasets as tfds
import os


PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label


def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes


def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model


def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)


def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

在建構容器之前,讓我們深入瞭解程式碼,這個程式碼會使用 tf.distribute.Strategy API 中的 MultiWorkerMirroredStrategy

程式碼中有一些元件是程式碼與 MultiWorkerMirroredStrategy 搭配運作時所需的。

  1. 資料需要進行資料分割,也就是說,系統會將整個資料集的一部分指派給每個工作站。因此,在每個步驟中,每個工作站都會處理全域的非重疊資料集元素的全域批次大小。這個資料分割會自動以 tf.data.experimental.AutoShardPolicy 執行,可設為 FILEDATA。在本例中,create_dataset() 函式會將 AutoShardPolicy 設為 DATA,因為 cassava 資料集並未以多個檔案下載。不過,如果您未將政策設為 DATA,系統會啟用預設的 AUTO 政策,最終結果也會相同。如要進一步瞭解如何使用 MultiWorkerMirroredStrategy 進行資料集區塊處理,請參閱這篇文章
  2. main() 函式中建立 MultiWorkerMirroredStrategy 物件。接下來,您需要在策略的範圍內包裝模型變數建立作業。這個重要的步驟會告訴 TensorFlow 應在備援機制中鏡像哪些變數。
  3. 批次大小會由 num_replicas_in_sync 向上調整。這可確保每個備用資源在每個步驟中處理的範例數量相同。在 TensorFlow 中使用同步資料平行處理策略時,調整批次大小是最佳做法。
  4. 在多工作站的情況下,儲存模型會稍微複雜一些,因為每個 worker 的儲存位置都必須不同。主工作站會儲存至所需的模型目錄,其他工作站則會將模型儲存至暫時目錄。這些暫存目錄必須是唯一的,才能避免多個 worker 寫入相同位置。儲存作業可包含集體作業,也就是說,所有工作站都必須儲存,而非只有主工作站。_is_chief()_get_temp_dir()write_filepath()main() 函式全都包含有助於儲存模型的樣板程式碼。

請注意,如果您在其他環境中使用 MultiWorkerMirroredStrategy,可能已設定 TF_CONFIG 環境變數。Vertex AI 會自動為您設定 TF_CONFIG,因此您不必在叢集內的每部機器上定義這個變數。

步驟 4:建構容器

在終端機執行下列指令,定義專案的環境變數;請務必將 your-cloud-project 替換為您的專案 ID:

PROJECT_ID='your-cloud-project'

使用 Google Container Registry 中容器映像檔的 URI 定義變數:

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

設定 Docker

gcloud auth configure-docker

接著,從 cassava 目錄根層級執行下列指令,建構容器:

docker build ./ -t $IMAGE_URI

最後,將映像檔推送至 Google Container Registry:

docker push $IMAGE_URI

將容器推送至 Container Registry 後,您就可以開始訓練工作。

6. 在 Vertex AI 上執行多工作站訓練工作

本研究室透過 Google Container Registry 的自訂容器使用自訂容器進行訓練,但您也可以透過預先建立的容器執行訓練工作。

首先,請前往 Cloud 控制台的 Vertex 專區,然後前往「訓練」專區:

uCAIP 選單

步驟 1:設定訓練工作

按一下「建立」,輸入訓練工作參數。

  • 在「Dataset」下方,選取「No managed dataset」
  • 接著選取「Custom training (advanced)」(自訂訓練 (進階))做為訓練方法,然後按一下「Continue」
  • 在「Model name」中輸入 multiworker-cassava (或您要稱呼模型的名稱)
  • 按一下 [Continue] (繼續)

在「容器設定」步驟中,選取「自訂容器」

自訂容器選項

在第一個方塊 (容器映像檔) 中,輸入上一節的 IMAGE_URI 變數值。這個欄位應該是 gcr.io/your-cloud-project/multiworker:cassava,並有您自己的專案 ID。將其餘欄位留空,然後按一下「Continue」

再次點選「繼續」,略過超參數步驟。

步驟 2:設定運算叢集

Vertex AI 提供 4 個工作站集區,涵蓋各種機器工作。

工作站集區 0 會設定主要、主要、排程器或「主控」設定。在 MultiWorkerMirroredStrategy 中,所有機器都會指定為 worker,也就是執行複製運算的實體機器。除了每部機器都是 worker 之外,還需要有一個 worker 負責執行額外工作,例如儲存檢查點和將摘要檔案寫入 TensorBoard。這部機器稱為首領。只有一個主要工作站,因此工作站集區 0 的工作站數量一律為 1。

在「運算和定價」部分,請保留所選區域,並按照以下方式設定「Worker pool 0」

Worker_pool_0

工作站集區 1 可讓您為叢集設定工作站。

按照下列方式設定「工作站集區 1」

Worker_pool_1

叢集現已設為有兩個僅使用 CPU 的機器。執行訓練應用程式的程式碼時,MultiWorkerMirroredStrategy 會將訓練分散在兩部機器上。

MultiWorkerMirroredStrategy 只有主工作和工作站工作類型,因此不需要設定額外的工作站資源池。不過,如果使用 TensorFlow 的 ParameterServerStrategy,則應在工作站集區 2 中設定參數伺服器。如果您想在叢集中新增評估工具,請在worker pool 3 中設定該機器。

按一下「Start training」,即可啟動超參數調整工作。在控制台的「訓練」部分,您會在「訓練管道」分頁下方看到新啟動的工作:

訓練工作

🎉 恭喜!🎉

您已瞭解如何使用 Vertex AI 執行下列作業:

  • 針對自訂容器中提供的訓練程式碼啟動多工作站訓練工作。在這個範例中,您使用 TensorFlow 模型,但可以透過自訂或內建容器,以任何架構建構的模型。

如要進一步瞭解 Vertex 的其他部分,請參閱說明文件

7. [選用] 使用 Vertex SDK

上一節說明如何透過 UI 啟動訓練工作。在本節中,您將瞭解使用 Vertex Python API 提交訓練工作的其他方法。

返回筆記本執行個體,然後從啟動器建立 TensorFlow 2 筆記本:

new_notebook

匯入 Vertex AI SDK。

from google.cloud import aiplatform

如要啟動多工作站訓練工作,您必須先定義 worker pool 規格。請注意,規格中使用 GPU 完全是可選的,如果您想要建立如前一個部分所示的 CPU 專用叢集,可以移除 accelerator_typeaccelerator_count

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

接下來,請建立並執行 CustomJob。為了進行測試,您需要將 {YOUR_BUCKET} 替換為專案中的值區。您可以使用先前建立的值區。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

在控制台的「Custom JOBS」分頁標籤下方,您會看到訓練工作:

自訂工作

8. 清除

由於我們已將 Notebook 設為在閒置 60 分鐘後逾時,因此不必擔心關閉執行個體。如果您想手動關閉執行個體,請按一下控制台 Vertex AI Workbench 專區中的「停止」按鈕。如要完全刪除筆記本,請按一下「刪除」按鈕。

停止執行個體

如要刪除儲存體值區,請使用 Cloud 控制台的「導覽」選單,前往「儲存體」頁面,選取所需值區,然後按一下「刪除」:

刪除儲存空間