Vertex AI:使用 TensorFlow 进行多工作器训练和迁移学习

1. 概览

在本实验中,您将使用 Vertex AI 为 TensorFlow 模型运行一项包含多项工作器的训练作业。

学习内容

您将了解如何:

  • 修改用于多工作器训练的训练应用代码
  • 从 Vertex AI 界面配置和启动多工作器训练作业
  • 使用 Vertex SDK 配置和启动多工作器训练作业

在 Google Cloud 上运行本实验的总费用约为 $5

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。如果您有任何反馈,请参阅支持页面

Vertex AI 包含许多不同的产品,可支持端到端机器学习工作流。本实验将重点介绍下面突出显示的产品:TrainingWorkbench

Vertex 产品概览

3. 用例概览

在本实验中,您将使用迁移学习基于 TensorFlow 数据集中的 cassava 数据集训练图片分类模型。您将使用的架构是基于 Imagenet 数据集预训练的 tf.keras.applications 库中的 ResNet50 模型。

为什么要进行分布式训练?

如果您有一个 GPU,则 TensorFlow 会使用此加速器加快模型训练速度,您无需执行额外的操作。但是,如果您希望在单台机器或多台机器(每台机器可能都有多个 GPU)上使用多个 GPU 来进一步提升性能,则需要使用 tf.distribute,它是 TensorFlow 的库,用于跨多个设备运行计算。设备是指 TensorFlow 可在其上运行运算的某些机器上的 CPU 或加速器(例如 GPU 或 TPU)。

开始进行分布式训练的最简单方法是在单台机器上启动多个 GPU 设备。tf.distribute 模块中的 TensorFlow 分布策略将管理所有 GPU 上数据分布和梯度更新的协调。如果您精通单个主机训练,并且想要进一步扩容,那么向集群添加多台机器可以帮助您获得更大的性能提升。您可以利用一组仅支持 CPU 的机器,也可以让每台机器有一个或多个 GPU。本实验涵盖了后一种情况,并演示了如何使用 MultiWorkerMirroredStrategy 在 Vertex AI 上的多台机器上分布 TensorFlow 模型的训练。

MultiWorkerMirroredStrategy 是一种同步数据并行处理策略,只需进行少量代码更改即可使用。系统会在集群中的每台设备上创建模型的副本。后续的梯度更新会以同步的方式进行。这意味着每个工作器设备会根据不同的输入数据切片计算通过模型的前向和后向传递。然后,从每个切片计算的梯度会在一台机器和集群中的所有机器的所有设备上汇总,并通过一个名为 all-reduce 的过程降低(通常是平均值)。然后,优化器会使用减少的梯度执行参数更新,从而使设备保持同步。如需详细了解如何使用 TensorFlow 进行分布式训练,请观看以下视频:

4. 设置您的环境

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启用 Compute Engine API

前往 Compute Engine,然后选择启用(如果尚未启用)。您需要用它来创建笔记本实例。

第 2 步:启用 Container Registry API

前往 Container Registry,然后选择启用(如果尚未启用)。您将使用此产品为您的自定义训练作业创建容器。

第 3 步:启用 Vertex AI API

前往 Cloud Console 的 Vertex AI 部分,然后点击启用 Vertex AI API

Vertex AI 信息中心

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:

Vertex AI 菜单

启用 Notebooks API(如果尚未启用)。

Notebook_api

启用后,点击代管式笔记本

Notebooks_UI

然后选择新建笔记本

new_notebook

为您的笔记本命名,然后点击高级设置

create_notebook

在“高级设置”下,启用空闲关闭,并将分钟数设置为 60。这意味着,您的笔记本处于未使用状态时会自动关闭,以免产生不必要的费用。

idle_timeout

安全性下,选择“启用终端”(如果尚未启用)。

enable_terminal

您可以保留所有其他高级设置。

接下来,点击创建。预配实例需要几分钟时间。

创建实例后,选择打开 JupyterLab

open_jupyterlab

首次使用新实例时,系统会要求您进行身份验证。为此,请按照界面中的步骤操作。

身份验证

5. 容器化训练应用代码

要将此训练作业提交到 Vertex,您需要将训练应用代码放在 Docker 容器中,并将该容器推送到 Google Container Registry。使用此方法,您可以训练使用任何框架构建的模型。

首先,通过“启动器”菜单在笔记本实例中打开终端窗口:

在笔记本中打开终端

创建一个名为 cassava 的新目录并通过 cd 命令进入该目录:

mkdir cassava
cd cassava

第 1 步:创建 Dockerfile

容器化代码的第一步是创建 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。

在终端中,创建一个空的 Dockerfile:

touch Dockerfile

打开 Dockerfile 并将以下代码复制到其中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

此 Dockerfile 使用 Deep Learning Container TensorFlow Enterprise 2.7 GPU Docker 映像。Google Cloud 上的 Deep Learning Containers 预安装了许多常见的机器学习和数据科学框架。下载该映像后,此 Dockerfile 会为训练代码设置入口点。您尚未创建这些文件。在下一步中,您将添加用于训练和调优模型的代码。

第 2 步:创建 Cloud Storage 存储分区

在此训练作业中,您需要将经过训练的 TensorFlow 模型导出到 Cloud Storage 存储桶。在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

接下来,在终端中运行以下命令,以便在项目中创建一个新的存储桶。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

第 3 步:添加模型训练代码

从终端运行以下命令,为训练代码创建一个目录,并创建一个用于添加代码的 Python 文件:

mkdir trainer
touch trainer/task.py

您的 cassava/ 目录中现在应包含以下内容:

+ Dockerfile
+ trainer/
    + task.py

接下来,打开您刚刚创建的 task.py 文件,并复制以下代码。您需要将 {your-gcs-bucket} 替换为您刚刚创建的 Cloud Storage 存储分区的名称。

import tensorflow as tf
import tensorflow_datasets as tfds
import os


PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label


def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes


def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model


def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)


def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

在构建容器之前,我们先来深入了解一下使用 tf.distribute.Strategy API 中的 MultiWorkerMirroredStrategy 的代码。

代码中有一些组件是代码与 MultiWorkerMirroredStrategy 配合使用所必需的。

  1. 数据需要分片,这意味着系统会为每个工作器分配整个数据集的一个子集。因此,在每个步骤中,每个工作器都会处理一个非重叠数据集元素的全局批量大小。此分片会自动通过 tf.data.experimental.AutoShardPolicy(可以设置为 FILEDATA)进行。在此示例中,create_dataset() 函数将 AutoShardPolicy 设置为 DATA,因为 Cassava 数据集不会下载为多个文件。不过,如果您未将此政策设为 DATA,系统会启用默认的 AUTO 政策,最终结果也相同。您可以点击此处,详细了解如何使用 MultiWorkerMirroredStrategy 进行数据集分片。
  2. main() 函数中,相关代码创建了 MultiWorkerMirroredStrategy 对象。接下来,您将模型变量的创建过程封装在策略的范围内。这一关键步骤指示 TensorFlow 应在副本中镜像哪些变量。
  3. 批次大小按 num_replicas_in_sync 扩容。这可确保每个副本在每个步骤中处理相同数量的示例。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳实践。
  4. 在多工作器情况下,保存模型稍微复杂一些,因为每个工作器的目的地必须有所不同。主工作器会将模型保存到所需的模型目录,而其他工作器则会将模型保存到临时目录中。这些临时目录必须具有唯一性,以防止多个工作器写入同一位置。保存操作可以包含集体操作,这意味着所有工作器都必须保存,而不仅仅是主工作器。函数 _is_chief()_get_temp_dir()write_filepath() 以及 main() 函数都包含有助于保存模型的样板代码。

请注意,如果您在其他环境中使用过 MultiWorkerMirroredStrategy,则可能已设置 TF_CONFIG 环境变量。Vertex AI 会自动为您设置 TF_CONFIG,因此您无需在集群中的每台机器上定义此变量。

第 4 步:构建容器

在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

使用 Google Container Registry 中容器映像的 URI 定义变量:

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

配置 Docker

gcloud auth configure-docker

然后,从 cassava 目录的根目录运行以下命令来构建容器:

docker build ./ -t $IMAGE_URI

最后,将其推送到 Google Container Registry:

docker push $IMAGE_URI

将容器推送到 Container Registry 后,您现在可以开始训练作业了。

6. 在 Vertex AI 上运行多工作器训练作业

本实验通过 Google Container Registry 上的自定义容器进行自定义训练,但您也可以使用预构建容器运行训练作业。

首先,请前往 Cloud 控制台的“Vertex”部分中的训练部分:

uCAIP 菜单

第 1 步:配置训练作业

点击创建,为训练作业输入参数。

  • 数据集下,选择无代管式数据集
  • 然后选择自定义训练(高级)作为训练方法,并点击继续
  • 模型名称部分输入 multiworker-cassava(或您要调用模型的任何内容)
  • 点击继续

在“容器设置”步骤中,选择自定义容器

自定义容器选项

在第一个框(容器映像)中,输入上一部分中的 IMAGE_URI 变量的值。它应该为 gcr.io/your-cloud-project/multiworker:cassava(包含您自己的项目 ID)。将其余字段留空,然后点击继续

再次点击继续,跳过 H 超参数步骤。

第 2 步:配置计算集群

Vertex AI 提供 4 个工作器池,以涵盖不同类型的机器任务。

工作器池 0 用于配置主实例、主池、调度器或“主实例”。在 MultiWorkerMirroredStrategy 中,所有机器都被指定为工作器,即用于执行复制计算的物理机器。除了将每台机器用作工作器之外,还需要有一个工作器接受一些额外的工作,例如保存检查点,以及将摘要文件写入 TensorBoard。此机器被称为主工作器。只有一个主工作器,因此工作器池 0 的工作器计数将始终为 1。

计算和价格中,让所选区域保持不变,并按如下方式配置工作器池 0

Worker_pool_0

工作器池 1 用于为集群配置工作器。

按如下方式配置工作器池 1

Worker_pool_1

该集群现已配置为两台仅支持 CPU 的机器。当训练应用代码运行时,MultiWorkerMirroredStrategy 会在两台机器上分配训练。

MultiWorkerMirroredStrategy 只有主任务类型和工作器任务类型,因此无需配置额外的工作器池。但是,如果要使用 TensorFlow 的 ParameterServerStrategy,您可以在工作器池 2 中配置参数服务器。如果要向集群添加评估器,您可以在工作器池 3 中配置该机器。

点击开始训练以启动超参数调节作业。在控制台的“训练”部分中的训练流水线标签页下,您会看到刚启动的作业:

训练作业

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 启动一个多工作器训练作业,用于训练自定义容器中提供的代码。您在本示例中使用了 TensorFlow 模型,但可以使用自定义或内置容器训练通过任何框架构建的模型。

如需详细了解 Vertex 的不同部分,请参阅相关文档

7. [可选] 使用 Vertex SDK

上一部分介绍了如何通过界面启动训练作业。在本部分中,您将看到使用 Vertex Python API 提交训练作业的另一种方法。

返回笔记本实例,然后从启动器创建 TensorFlow 2 笔记本:

new_notebook

导入 Vertex AI SDK。

from google.cloud import aiplatform

如需启动多工作器训练作业,您首先需要定义工作器池规范。请注意,在本规范中使用 GPU 是完全可选的操作,如果您希望使用仅支持 CPU 的集群(如上一部分所示),可以移除 accelerator_typeaccelerator_count

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

接下来,创建并运行 CustomJob。您需要将 {YOUR_BUCKET} 替换为项目中用于暂存的存储分区。您可以使用之前创建的存储分区。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

在控制台的“训练”部分中的自定义作业标签页下,您将看到您的训练作业:

自定义作业

8. 清理

因为我们将笔记本配置为在空闲 60 分钟后超时,所以不必担心关停实例。如果您要手动关停实例,请点击控制台的 Vertex AI Workbench 部分中的“停止”按钮。如果您想完全删除该笔记本,请点击“删除”按钮。

可停止实例

如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”:

删除存储空间