從原型轉為正式環境:在 Vertex AI 中進行分散式訓練

1. 總覽

在本實驗室中,您將使用 Vertex AI,透過 TensorFlow 在 Vertex AI Training 上執行分散式訓練工作。

這個實驗室屬於「從原型設計到投入實際工作環境」系列影片。請務必先完成先前的實驗室,再試用這個實驗室。歡迎觀看相關系列影片,瞭解更多資訊:

課程內容

學習重點:

  • 在單一機器上執行分散式訓練,並使用多個 GPU
  • 在多台機器上執行分散式訓練

在 Google Cloud 中執行這個實驗室的總費用約為 $2 美元。

2. Vertex AI 簡介

這個研究室使用 Google Cloud 最新的 AI 產品服務。Vertex AI 整合了 Google Cloud 中的機器學習產品,提供流暢的開發體驗。先前,使用 AutoML 訓練的模型和自訂模型必須透過不同的服務存取。新產品將這兩項功能與其他新產品整合至單一 API。您也可以將現有專案遷移至 Vertex AI。

Vertex AI 包含許多不同產品,可支援端對端機器學習工作流程。本研究室將著重於下列產品:訓練工作台

Vertex 產品總覽

3. 分散式訓練總覽

如果您使用單一 GPU,TensorFlow 會使用這個加速器來加快模型訓練速度,您不須執行額外操作。不過,如果您想透過使用多個 GPU 進一步提升效能,就必須使用 tf.distribute,這是 TensorFlow 的模組,可在多個裝置上執行運算。

本實驗室的第一部分會使用 tf.distribute.MirroredStrategy,您只需變更幾行程式碼,即可將其加入訓練應用程式。這項策略會在電腦上的每個 GPU 上建立模型副本。後續的漸層更新則會以同步方式進行。也就是說,每個 GPU 會在輸入資料的不同切片上,透過模型計算正向和反向傳遞。然後,從每個切片計算的梯度會在所有 GPU 中匯總,並在稱為「all-reduce」的程序中求平均。模型參數會使用這些平均梯度更新。

實驗室課程結尾的選修部分會使用 tf.distribute.MultiWorkerMirroredStrategy,這與 MirroredStrategy 類似,但可跨多台機器運作。這些機器也可能有多個 GPU。MultiWorkerMirroredStrategyMirroredStrategy 一樣,是同步資料平行處理策略,只需進行少許程式碼變更即可使用。從單一機器的同步資料並行處理移至多個機器時,主要差異在於,現在需要在每個步驟結束時,跨越一台機器的所有 GPU 和叢集中的所有機器同步處理梯度。

您不需要瞭解詳細資訊,也能完成本實驗室的課程。不過,如果想進一步瞭解 TensorFlow 中的分散式訓練運作方式,請觀看下方影片:

4. 設定環境

完成「使用 Vertex AI 訓練自訂模型」實驗室的步驟,設定環境。

5. 單一機器、多 GPU 訓練

您可以將訓練應用程式程式碼放入 Docker 容器,然後將該容器推送至 Google Artifact Registry,藉此將分散式訓練工作提交至 Vertex AI。您可以使用這種方法訓練任何架構建立的模型。

首先,請在先前實驗室中建立的 Workbench 筆記本中,透過 Launcher 選單開啟終端機視窗。

在筆記本中開啟終端機

步驟 1:編寫訓練程式碼

建立名為 flowers-multi-gpu 的新目錄,然後切換至該目錄:

mkdir flowers-multi-gpu
cd flowers-multi-gpu

執行下列指令,建立訓練程式碼的目錄,以及您要新增下方程式碼的 Python 檔案。

mkdir trainer
touch trainer/task.py

flowers-multi-gpu/ 目錄現在應該會包含以下內容:

+ trainer/
    + task.py

接著,開啟剛剛建立的 task.py 檔案,並複製以下程式碼。

您需要將 BUCKET_ROOT 中的 {your-gcs-bucket} 替換為 實驗室 1 中儲存花卉資料集的 Cloud Storage 值區。

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset


def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():  

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')


if __name__ == "__main__":
    main()

在建構容器之前,讓我們進一步瞭解程式碼。以下是幾個分散式訓練專用的元件。

  • main() 函式中,系統會建立 MirroredStrategy 物件。接下來,您需要在策略的範圍內包裝模型變數建立作業。這個步驟會告知 TensorFlow 應在 GPU 上鏡像哪些變數。
  • 批次大小會由 num_replicas_in_sync 向上調整。在 TensorFlow 中使用同步資料平行處理策略時,調整批次大小是最佳做法。如要進一步瞭解,請參閱這篇文章

步驟 2:建立 Dockerfile

如要將程式碼容器化,您需要建立 Dockerfile。您會在 Dockerfile 中加入執行映像檔所需的所有指令。它會安裝所有必要的程式庫,並設定訓練程式碼的進入點。

在終端機中,於花卉目錄的根目錄中建立空白的 Dockerfile:

touch Dockerfile

flowers-multi-gpu/ 目錄現在應該會包含以下內容:

+ Dockerfile
+ trainer/
    + task.py

開啟 Dockerfile,並將下列內容複製到該檔案中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

步驟 3:建構容器

在終端機中執行以下指令,為專案定義 env 變數,請務必將 your-cloud-project 替換為專案 ID:

PROJECT_ID='your-cloud-project'

在 Artifact Registry 中建立存放區。我們會使用在第一個實驗室中建立的存放區。

REPO_NAME='flower-app'

在 Artifact Registry 中,使用容器映像檔的 URI 定義變數:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

設定 Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

接著,從 flowers-multi-gpu 目錄根層級執行下列指令,建構容器:

docker build ./ -t $IMAGE_URI

最後,將其推送至 Artifact Registry:

docker push $IMAGE_URI

將容器推送至 Artifact Registry 後,您就可以開始訓練工作。

步驟 4:使用 SDK 執行工作

在本節中,您將瞭解如何使用 Vertex AI Python SDK 設定及啟動分散式訓練工作。

在 Launcher 中建立 TensorFlow 2 筆記本。

new_notebook

匯入 Vertex AI SDK。

from google.cloud import aiplatform

然後定義 CustomContainerTrainingJob

您需要在 container_uri 中替換 {PROJECT_ID},並在 staging_bucket 中替換 {YOUR_BUCKET}

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

定義工作後,您就可以執行工作。將加速器數量設為 2。如果我們只使用 1 個 GPU,這就屬於分散式訓練。使用 2 個以上的加速器時,在同一部機器上安排分散式訓練。

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

您可以在控制台中查看工作進度。

multigpu_job

6. [選用] 多工作站訓練

您已嘗試在單一機器上使用多個 GPU 進行分散式訓練,現在可以透過多台機器的訓練,將分散式訓練技巧提升到更高層級。為降低費用,我們不會為這些機器新增任何 GPU,但您可以視需求新增 GPU 進行實驗。

在筆記本執行個體中開啟新的終端機視窗:

在筆記本中開啟終端機

步驟 1:編寫訓練程式碼

建立名為 flowers-multi-machine 的新目錄,然後切換至該目錄:

mkdir flowers-multi-machine
cd flowers-multi-machine

執行下列指令,建立訓練程式碼的目錄,以及您要新增下方程式碼的 Python 檔案。

mkdir trainer
touch trainer/task.py

現在 flowers-multi-machine/ 目錄中應會顯示以下內容:

+ trainer/
    + task.py

接著,開啟您剛才建立的 task.py 檔案,然後複製下方程式碼。

您需要將 BUCKET_ROOT 中的 {your-gcs-bucket} 替換為 實驗室 1 中儲存花卉資料集的 Cloud Storage 值區。

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset


def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

在建構容器之前,讓我們進一步瞭解程式碼。程式碼中有一些元件是訓練應用程式與 MultiWorkerMirroredStrategy 搭配運作時所需的。

  • main() 函式中建立 MultiWorkerMirroredStrategy 物件。接下來,您需要在策略的範圍內包裝模型變數建立作業。這個關鍵步驟會告知 TensorFlow 應在備用資源間鏡像哪些變數。
  • 批次大小會由 num_replicas_in_sync 向上調整。在 TensorFlow 中使用同步資料平行處理策略時,調整批次大小是最佳做法。
  • 在多工作站的情況下,儲存模型會稍微複雜一些,因為每個 worker 的儲存位置都必須不同。主要工作站將儲存至所需模型目錄,其他 worker 則會將模型儲存至臨時目錄。這些暫存目錄必須是唯一的,才能避免多個 worker 寫入相同位置。儲存作業可包含集體作業,也就是說,所有工作站都必須儲存,而非只有主工作站。_is_chief()_get_temp_dir()write_filepath()main() 函式都包含可協助儲存模型的樣板程式碼。

步驟 2:建立 Dockerfile

如要將程式碼容器化,您需要建立 Dockerfile。您會在 Dockerfile 中加入執行映像檔所需的所有指令。它會安裝所有必要的程式庫,並設定訓練程式碼的進入點。

在終端機中,在 flowers 目錄的根目錄中建立空白 Dockerfile:

touch Dockerfile

flowers-multi-machine/ 目錄現在應該會包含以下內容:

+ Dockerfile
+ trainer/
    + task.py

開啟 Dockerfile,並將以下內容複製到檔案中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

步驟 3:建構容器

在終端機中執行以下指令,為專案定義 env 變數,請務必將 your-cloud-project 替換為專案 ID:

PROJECT_ID='your-cloud-project'

在 Artifact Registry 中建立存放區。我們會使用在第一個實驗室中建立的存放區。

REPO_NAME='flower-app'

在 Google Artifact Registry 中,使用容器映像檔的 URI 定義變數:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

設定 Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

接著,從 flowers-multi-machine 目錄根層級執行下列指令,建構容器:

docker build ./ -t $IMAGE_URI

最後,將其推送至 Artifact Registry:

docker push $IMAGE_URI

將容器推送至 Artifact Registry 後,您就能開始執行訓練工作。

步驟 4:使用 SDK 執行工作

在本節中,您將瞭解如何使用 Vertex AI Python SDK 設定及啟動分散式訓練工作。

在 Launcher 中建立 TensorFlow 2 筆記本。

new_notebook

匯入 Vertex AI SDK。

from google.cloud import aiplatform

接著定義 worker_pool_specs

Vertex AI 提供 4 個工作站集區,涵蓋各種機器工作。

工作站集區 0 會設定主要、主要、排程器或「主機」。在 MultiWorkerMirroredStrategy 中,所有機器都會指派為工作站,這些機器是執行複製運算的實體機器。除了每部機器都是工作站,也需要一個工作站,以便完成一些額外的工作,例如儲存查核點以及將摘要檔案寫入 TensorBoard。這個機器稱為主機。由於工作站集區只有一個主要工作站,工作站集區 0 的工作站數量永遠會是 1。

您可以在 worker 集區 1 中設定叢集的其他 worker。

worker_pool_specs 清單中的第一個字典代表 worker pool 0,第二個字典代表 worker pool 1。在這個範例中,兩個設定檔完全相同。不過,如果您想在 3 部機器上訓練,請將 replica_count 設為 2,藉此在 Worker pool 1 中新增其他 worker。如果您想新增 GPU,請為兩個工作站集區的 machine_spec 新增 accelerator_typeaccelerator_count 參數。請注意,如果您想搭配 MultiWorkerMirroredStrategy 使用 GPU,叢集中的每部機器都必須有相同數量的 GPU。否則工作會失敗。

您需要在 image_uri 中取代 {PROJECT_ID}

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

接著,請建立並執行 CustomJob,將 staging_bucket 中的 {YOUR_BUCKET} 替換為專案中用於建構階段的值區。

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

您可以在控制台中查看工作進度。

multi_worker_job

🎉 恭喜!🎉

您已瞭解如何使用 Vertex AI 執行下列作業:

  • 使用 TensorFlow 執行分散式訓練工作

如要進一步瞭解 Vertex 的其他部分,請參閱說明文件

7. 清理

由於我們已將 Notebook 設為在閒置 60 分鐘後逾時,因此不必擔心關閉執行個體。如果您想手動關閉執行個體,請按一下控制台 Vertex AI Workbench 專區中的「停止」按鈕。如想完全刪除筆記本,請按一下「刪除」按鈕。

停止執行個體

如要刪除 Storage 值區,請使用 Cloud 控制台中的導覽選單前往「Storage」(儲存空間)、選取值區,然後點選「Delete」(刪除):

刪除儲存空間