在生产环境中进行原型设计:在 Vertex AI 上执行分布式训练

1. 概览

在本实验中,您将使用 Vertex AI 来通过 TensorFlow 在 Vertex AI Training 上运行分布式训练作业。

本实验是对生产环境进行原型设计视频系列的一部分。请务必先完成之前的实验,然后再尝试此实验。您可以观看随附的视频系列以了解详情:

学习内容

您将了解如何:

  • 在具有多个 GPU 的单台机器上运行分布式训练
  • 在多个机器上运行分布式训练

在 Google Cloud 上运行此实验的总费用约为 2 美元

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。

Vertex AI 包含许多不同的产品,可支持端到端机器学习工作流。本实验将重点介绍下面突出显示的产品:TrainingWorkbench

Vertex 产品概览

3. 分布式训练概览

如果您有一个 GPU,则 TensorFlow 会使用此加速器加快模型训练速度,您无需执行额外的操作。但是,如果您希望使用多个 GPU 进一步提升速度,则需要使用 tf.distribute,它是 TensorFlow 的模块,用于在多个设备上运行计算。

本实验的第一部分使用 tf.distribute.MirroredStrategy,您只需对代码进行一些更改,即可将其添加到训练应用中。此策略会在机器的每个 GPU 上创建模型的副本。后续的梯度更新会以同步的方式进行。这意味着,每个 GPU 都会针对不同的输入数据切片计算通过模型的前向和后向传递。然后,来自各个切片的计算梯度会在所有 GPU 上聚合,并在一个称为 all-reduce 的进程中取平均值。模型参数会使用这些平均梯度进行更新。

本实验末尾的可选部分使用 tf.distribute.MultiWorkerMirroredStrategy,它与 MirroredStrategy 类似,不同之处在于它可以在多台机器上运行。其中每个机器可能还具有多个 GPU。与 MirroredStrategy 类似,MultiWorkerMirroredStrategy 是一种同步数据并行策略,只需进行一些代码更改即可使用。从一台机器上的同步数据并行转移到多台机器上的主要区别在于,现在每一步结束时的梯度都需要在机器中的所有 GPU 和集群中的所有机器上同步。

您无需了解相关详细信息即可完成本实验,但如果您想详细了解分布式训练在 TensorFlow 中的工作原理,请观看以下视频:

4. 设置您的环境

完成使用 Vertex AI 训练自定义模型实验中的步骤以设置您的环境。

5. 单机器、多 GPU 训练

如要将分布式训练作业提交到 Vertex AI,您需要将训练应用代码放在 Docker 容器中,然后将此容器推送到 Google Artifact Registry。使用此方法,您可以训练使用任何框架构建的模型。

首先,通过您在之前的实验中创建的 Workbench 笔记本的“启动器”菜单,打开一个终端窗口。

在笔记本中打开终端

第 1 步:编写训练代码

创建一个名为 flowers-multi-gpu 的新目录并通过 cd 命令进入该目录:

mkdir flowers-multi-gpu
cd flowers-multi-gpu

运行以下命令,为训练代码创建一个目录和一个 Python 文件(您将在其中添加下面的代码)。

mkdir trainer
touch trainer/task.py

您的 flowers-multi-gpu/ 目录中现在应包含以下内容:

+ trainer/
    + task.py

接下来,打开您刚刚创建的 task.py 文件并复制下面的代码。

您需要将 BUCKET_ROOT 中的 {your-gcs-bucket} 替换为您在实验 1 中存储花卉数据集的 Cloud Storage 存储桶。

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')

if __name__ == "__main__":
    main()

在构建容器之前,我们先来深入了解一下代码。有一些组件专用于使用分布式训练。

  • main() 函数中,相关代码创建了 MirroredStrategy 对象。接下来,您将模型变量的创建过程封装在策略的范围内。此步骤指示 TensorFlow 应在 GPU 中镜像哪些变量。
  • 批次大小按 num_replicas_in_sync 扩容。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳实践。您可以点击此处了解详情

第 2 步:创建 Dockerfile

如需将代码容器化,您需要创建一个 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。

通过终端,在 flowers 目录的根目录下创建一个空的 Dockerfile:

touch Dockerfile

您的 flowers-multi-gpu/ 目录中现在应包含以下内容:

+ Dockerfile
+ trainer/
    + task.py

打开 Dockerfile 并将以下代码复制到其中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

第 3 步:构建容器

在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

在 Artifact Registry 中创建代码库。我们将使用在第一个实验中创建的代码库。

REPO_NAME='flower-app'

使用 Artifact Registry 中容器映像的 URI 定义变量:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

配置 Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

然后,从 flowers-multi-gpu 目录的根目录运行以下命令来构建容器:

docker build ./ -t $IMAGE_URI

最后,将其推送到 Artifact Registry:

docker push $IMAGE_URI

在将容器推送到 Artifact Registry 后,您就可以启动训练作业了。

第 4 步:使用 SDK 运行作业

在本部分中,您将了解如何使用 Vertex AI Python SDK 配置和启动分布式训练作业。

在启动器中,创建一个 TensorFlow 2 笔记本。

new_notebook

导入 Vertex AI SDK。

from google.cloud import aiplatform

然后,定义 CustomContainerTrainingJob

您需要替换 container_uri 中的 {PROJECT_ID}staging_bucket 中的 {YOUR_BUCKET}

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

定义作业后,您便可以运行作业。您要将加速器数量设为 2。如果我们只使用 1 个 GPU,这不会被视为分布式训练。在一台机器上进行分布式训练是指使用 2 个或更多加速器。

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

在控制台中,您可以查看作业的进度。

multigpu_job

6. [可选] 多工作器训练

现在,您已经尝试在配备了多个 GPU 的单台机器上进行分布式训练,接下来您可以利用多台机器训练,让分布式训练技能更上一层楼。为了降低费用,我们不会向这些机器添加任何 GPU,但您可以根据需要尝试添加 GPU。

在笔记本实例中打开一个新终端窗口:

在笔记本中打开终端

第 1 步:编写训练代码

创建一个名为 flowers-multi-machine 的新目录并通过 cd 命令进入该目录:

mkdir flowers-multi-machine
cd flowers-multi-machine

运行以下命令,为训练代码创建一个目录和一个 Python 文件(您将在其中添加下面的代码)。

mkdir trainer
touch trainer/task.py

您的 flowers-multi-machine/ 目录中现在应包含以下内容:

+ trainer/
    + task.py

接下来,打开您刚刚创建的 task.py 文件并复制下面的代码。

您需要将 BUCKET_ROOT 中的 {your-gcs-bucket} 替换为您在实验 1 中存储花卉数据集的 Cloud Storage 存储桶。

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'

def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

在构建容器之前,我们先来深入了解一下代码。代码中有一些组件是您的训练应用使用 MultiWorkerMirroredStrategy 所必需的。

  • main() 函数中,相关代码创建了 MultiWorkerMirroredStrategy 对象。接下来,您将模型变量的创建过程封装在策略的范围内。这一关键步骤指示 TensorFlow 应在副本中镜像哪些变量。
  • 批次大小按 num_replicas_in_sync 扩容。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳实践。
  • 在多工作器情况下,保存模型稍微复杂一些,因为每个工作器的目的地必须有所不同。主工作器将保存到所需的模型目录,其他工作器会将模型保存到临时目录。为防止多个工作器写入同一位置,这些临时目录必须具有唯一性。保存操作可以包含集体操作,这意味着所有工作器都必须保存,而不仅仅是主工作器。函数 _is_chief()_get_temp_dir()write_filepath()main() 均包含有助于保存模型的样板代码。

第 2 步:创建 Dockerfile

如需将代码容器化,您需要创建一个 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。

通过终端,在 flowers 目录的根目录下创建一个空的 Dockerfile:

touch Dockerfile

您的 flowers-multi-machine/ 目录中现在应包含以下内容:

+ Dockerfile
+ trainer/
    + task.py

打开 Dockerfile 并将以下代码复制到其中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

第 3 步:构建容器

在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

在 Artifact Registry 中创建代码库。我们将使用在第一个实验中创建的代码库。

REPO_NAME='flower-app'

使用 Google Artifact Registry 中容器映像的 URI 定义变量:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

配置 Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

然后,从 flowers-multi-machine 目录的根目录运行以下命令来构建容器:

docker build ./ -t $IMAGE_URI

最后,将其推送到 Artifact Registry:

docker push $IMAGE_URI

在将容器推送到 Artifact Registry 后,您就可以启动训练作业了。

第 4 步:使用 SDK 运行作业

在本部分中,您将了解如何使用 Vertex AI Python SDK 配置和启动分布式训练作业。

在启动器中,创建一个 TensorFlow 2 笔记本。

new_notebook

导入 Vertex AI SDK。

from google.cloud import aiplatform

然后,定义 worker_pool_specs

Vertex AI 提供 4 个工作器池,可处理不同类型的机器任务。

工作器池 0 会配置主实例、主帐号、调度器或“主客户端”。在 MultiWorkerMirroredStrategy 中,所有机器都被指定为工作器,工作器是执行复制计算的物理机器。除了将每台机器用作工作器之外,还需要有一个工作器接受一些额外的工作,例如保存检查点,以及将摘要文件写入 TensorBoard。此机器被称为主工作器。由于只有一个主工作器,因此工作器池 0 的工作器计数始终为 1。

工作器池 1 用于为集群配置其他工作器。

worker_pool_specs 列表中的第一个字典表示工作器池 0,第二个字典表示工作器池 1。在此示例中,这两个配置是相同的。但是,如果您想在 3 台机器上训练,则需要将 replica_count 设为 2,从而向工作器池 1 添加其他工作器。如果您想添加 GPU,则需要将参数 accelerator_typeaccelerator_count 添加到两个工作器池的 machine_spec 中。请注意,如果您想要将 GPU 与 MultiWorkerMirroredStrategy 搭配使用,集群中的每个机器都必须具有相同的 GPU 数量。否则,作业将失败。

您需要替换 image_uri 中的 {PROJECT_ID}

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

接下来,创建并运行 CustomJob,注意将 staging_bucket 中的 {YOUR_BUCKET} 替换为您项目中用于暂存的存储桶。

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

在控制台中,您可以查看作业的进度。

multi_worker_job

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 使用 TensorFlow 运行分布式训练作业

如需详细了解 Vertex 的不同部分,请参阅相关文档

7. 清理

因为我们将笔记本配置为在空闲 60 分钟后超时,所以不必担心关停实例。如果您要手动关停实例,请点击控制台的 Vertex AI Workbench 部分中的“停止”按钮。如果您想完全删除该笔记本,请点击“删除”按钮。

可停止实例

如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”:

删除存储空间