Vertex AI:使用 TensorFlow 进行多工作器训练和迁移学习

1. 概览

在本实验中,您将使用 Vertex AI 为 TensorFlow 模型运行一项多工作器训练作业。

学习内容

您将了解如何:

  • 修改用于多工作器训练的训练应用代码
  • 在 Vertex AI 界面中配置并启动多工作器训练作业
  • 使用 Vertex SDK 配置和启动多工作器训练作业

在 Google Cloud 上运行此实验的总费用约为 5 美元

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。如果您有任何反馈,请参阅支持页面

Vertex AI 包含许多不同的产品,可支持端到端机器学习工作流。本实验将重点介绍下面突出显示的产品:TrainingWorkbench

Vertex 产品概览

3. 用例概览

在本实验中,您将使用迁移学习,基于 TensorFlow 数据集中的木薯数据集训练图片分类模型。您将使用的架构是 tf.keras.applications 库中的 ResNet50 模型,该模型已预训练在 ImageNet 数据集上。

为何采用分布式训练?

如果您有一个 GPU,则 TensorFlow 会使用此加速器加快模型训练速度,您无需执行额外的操作。但是,如果您希望通过在单台机器或多台机器(每台机器可能有多个 GPU)上使用多个 GPU 进一步提升速度,则需要使用 tf.distribute,它是 TensorFlow 的库,用于在多个设备上运行计算。设备是指 TensorFlow 可以运行操作的某些机器上的 CPU 或加速器(例如 GPU 或 TPU)。

若要开始使用分布式训练,最简单的方法是使用一台配备多个 GPU 设备的机器。tf.distribute 模块中的 TensorFlow 分布策略将管理所有 GPU 上数据分布和梯度更新的协调。如果您精通单个主机训练,并且希望进一步扩容,那么向集群添加多台机器可以帮助您获得更大的性能提升。您可以使用一组仅包含 CPU 的机器,也可以使用一组每台都包含一个或多个 GPU 的机器。本实验将介绍后一种情况,并演示如何使用 MultiWorkerMirroredStrategy 在 Vertex AI 上的多台机器上分布式训练 TensorFlow 模型。

MultiWorkerMirroredStrategy 是一种同步数据并行处理策略,只需进行少量代码更改即可使用。系统会在集群中的每台设备上创建模型的副本。后续的梯度更新会以同步的方式进行。这意味着,每个工作器设备都会针对不同的输入数据切片计算通过模型的前向和后向传递。然后,来自各个切片的计算梯度会在机器上的所有设备和集群中的所有机器上聚合,并在一个称为 all-reduce 的进程中进行求和(通常为求平均值)。然后,优化器使用这些经过缩减的梯度执行参数更新,从而使设备保持同步。如需详细了解如何使用 TensorFlow 进行分布式训练,请观看以下视频:

4. 设置您的环境

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启用 Compute Engine API

前往 Compute Engine,然后选择启用(如果尚未启用)。您需要此信息才能创建笔记本实例。

第 2 步:启用 Container Registry API

前往 Container Registry,然后选择启用(如果尚未启用)。您将使用此产品为您的自定义训练作业创建容器。

第 3 步:启用 Vertex AI API

前往 Cloud Console 的 Vertex AI 部分,然后点击启用 Vertex AI API

Vertex AI 信息中心

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击“Workbench”:

Vertex AI 菜单

启用 Notebooks API(如果尚未启用)。

Notebook_api

启用后,点击代管式笔记本

Notebooks_UI

然后选择新建笔记本

new_notebook

为您的笔记本命名,然后点击高级设置

create_notebook

在“高级设置”下,启用空闲关闭,并将分钟数设置为 60。这意味着,您的笔记本处于未使用状态时会自动关闭,以免产生不必要的费用。

idle_timeout

安全性下,选择“启用终端”(如果尚未启用)。

enable_terminal

您可以保留所有其他高级设置。

接下来,点击创建。预配实例需要几分钟时间。

创建实例后,选择打开 JupyterLab

open_jupyterlab

首次使用新实例时,系统会要求您进行身份验证。为此,请按照界面中的步骤操作。

身份验证

5. 容器化训练应用代码

您将通过将训练应用代码放入 Docker 容器并将此容器推送到 Google Container Registry,将此训练作业提交给 Vertex。使用此方法,您可以训练使用任何框架构建的模型。

首先,通过“启动器”菜单在笔记本实例中打开终端窗口:

在笔记本中打开终端

创建一个名为 cassava 的新目录并通过 cd 命令进入该目录:

mkdir cassava
cd cassava

第 1 步:创建 Dockerfile

容器化代码的第一步是创建 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。

在终端中,创建一个空的 Dockerfile:

touch Dockerfile

打开 Dockerfile 并将以下代码复制到其中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

此 Dockerfile 使用 Deep Learning Container TensorFlow Enterprise 2.7 GPU Docker 映像。Google Cloud 上的 Deep Learning Containers 预安装了许多常见的机器学习和数据科学框架。下载该映像后,此 Dockerfile 会为训练代码设置入口点。您尚未创建这些文件 - 在下一步中,您将添加用于训练和调整模型的代码。

第 2 步:创建 Cloud Storage 存储分区

在此训练作业中,您需要将经过训练的 TensorFlow 模型导出到 Cloud Storage 存储桶。在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

接下来,在终端中运行以下命令,以便在项目中创建一个新的存储桶。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

第 3 步:添加模型训练代码

在终端中,运行以下命令,为训练代码创建一个目录和一个 Python 文件(您将在其中添加代码):

mkdir trainer
touch trainer/task.py

您的 cassava/ 目录中现在应包含以下内容:

+ Dockerfile
+ trainer/
    + task.py

接下来,打开您刚刚创建的 task.py 文件,并复制以下代码。您需要将 {your-gcs-bucket} 替换为您刚刚创建的 Cloud Storage 存储分区的名称。

import tensorflow as tf
import tensorflow_datasets as tfds
import os


PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label


def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes


def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model


def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)


def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

在构建容器之前,我们先来深入了解一下该代码,该代码使用了 tf.distribute.Strategy API 中的 MultiWorkerMirroredStrategy

代码中有一些组件是您的代码使用 MultiWorkerMirroredStrategy 所必需的。

  1. 数据需要分片,这意味着系统会为每个工作器分配整个数据集的一个子集。因此,在每个步骤中,每个工作器都会处理大小为非重叠数据集元素的全球批处理。此分片是通过 tf.data.experimental.AutoShardPolicy 自动进行的,该值可以设置为 FILEDATA。在此示例中,create_dataset() 函数将 AutoShardPolicy 设置为 DATA,因为 Cassava 数据集不会下载为多个文件。不过,如果您未将此政策设为 DATA,系统会启用默认的 AUTO 政策,最终结果也相同。如需详细了解如何使用 MultiWorkerMirroredStrategy 对数据集进行分片,请点击此处。
  2. main() 函数中,相关代码创建了 MultiWorkerMirroredStrategy 对象。接下来,您将模型变量的创建过程封装在策略的范围内。这一关键步骤指示 TensorFlow 应在副本中镜像哪些变量。
  3. 批次大小按 num_replicas_in_sync 扩容。这样可确保每个副本在每个步骤中处理的示例数量相同。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳实践。
  4. 在多工作器情况下,保存模型稍微复杂一些,因为每个工作器的目的地必须有所不同。主工作器将保存到所需的模型目录,而其他工作器将将模型保存到临时目录。请务必确保这些临时目录是唯一的,以防止多个工作器写入同一位置。保存操作可以包含集体操作,这意味着所有工作器都必须保存,而不仅仅是主工作器。函数 _is_chief()_get_temp_dir()write_filepath()main() 均包含有助于保存模型的样板代码。

请注意,如果您在其他环境中使用过 MultiWorkerMirroredStrategy,则可能已设置 TF_CONFIG 环境变量。Vertex AI 会自动为您设置 TF_CONFIG,因此您无需在集群中的每台机器上定义此变量。

第 4 步:构建容器

在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

使用 Google Container Registry 中容器映像的 URI 定义变量:

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

配置 Docker

gcloud auth configure-docker

然后,从 cassava 目录的根目录运行以下命令来构建容器:

docker build ./ -t $IMAGE_URI

最后,将其推送到 Google Container Registry:

docker push $IMAGE_URI

在将容器推送到 Container Registry 后,您就可以启动训练作业了。

6. 在 Vertex AI 上运行多工作器训练作业

本实验通过 Google Container Registry 上的自定义容器进行自定义训练,但您也可以使用预构建容器运行训练作业。

首先,请前往 Cloud 控制台的“Vertex”部分中的训练部分:

uCAIP 菜单

第 1 步:配置训练作业

点击创建,输入训练作业的参数。

  • 数据集下,选择无代管式数据集
  • 然后选择自定义训练(高级)作为训练方法,并点击继续
  • 输入 multiworker-cassava(或您要调用模型的任何内容)作为模型名称
  • 点击继续

在“容器设置”步骤中,选择自定义容器

“自定义容器”选项

在第一个框(容器映像)中,输入上一部分中的 IMAGE_URI 变量的值。它应该为 gcr.io/your-cloud-project/multiworker:cassava(包含您自己的项目 ID)。将其余字段留空,然后点击继续

再次点击继续,跳过 H 超参数步骤。

第 2 步:配置计算集群

Vertex AI 提供 4 个工作器池,以涵盖不同类型的机器任务。

工作器池 0 用于配置主要、首要、调度程序或“主实例”。在 MultiWorkerMirroredStrategy 中,所有机器都被指定为工作器,即执行复制计算的物理机器。除了将每台机器用作工作器之外,还需要有一个工作器接受一些额外的工作,例如保存检查点,以及将摘要文件写入 TensorBoard。此机器被称为主工作器。由于只有一个主工作器,因此工作器池 0 的工作器计数始终为 1。

计算和价格中,保留所选区域不变,然后按如下方式配置工作器池 0

Worker_pool_0

工作器池 1 用于为集群配置工作器。

按如下方式配置工作器池 1

Worker_pool_1

集群现在配置为包含两个仅使用 CPU 的机器。当训练应用代码运行时,MultiWorkerMirroredStrategy 会在两台机器上分配训练。

MultiWorkerMirroredStrategy 只有主任务类型和工作器任务类型,因此无需配置额外的工作器池。不过,如果您要使用 TensorFlow 的 ParameterServerStrategy,则需要在工作器池 2 中配置参数服务器。如果要向集群添加评估器,您可以在工作器池 3 中配置该机器。

点击开始训练以启动超参数调节作业。在控制台的“训练”部分中的训练流水线标签页下,您会看到刚启动的作业:

训练作业

🎉 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 启动一个多工作器训练作业,用于训练自定义容器中提供的代码。您在本示例中使用了 TensorFlow 模型,但可以使用自定义或内置容器训练通过任何框架构建的模型。

如需详细了解 Vertex 的不同部分,请参阅相关文档

7. [可选] 使用 Vertex SDK

上一部分介绍了如何通过界面启动训练作业。在本部分中,您将了解使用 Vertex Python API 提交训练作业的另一种方法。

返回您的笔记本实例,然后通过启动器创建一个 TensorFlow 2 笔记本:

new_notebook

导入 Vertex AI SDK。

from google.cloud import aiplatform

如需启动多工作器训练作业,您首先需要定义工作器池规范。请注意,在本规范中使用 GPU 是完全可选的操作,如果您希望使用仅支持 CPU 的集群(如上一部分所示),可以移除 accelerator_typeaccelerator_count

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

接下来,创建并运行 CustomJob。您需要将 {YOUR_BUCKET} 替换为项目中用于暂存的存储分区。您可以使用之前创建的同一存储分区。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

在控制台的自定义作业标签页下的“训练”部分中,您将看到您的训练作业:

自定义作业

8. 清理

因为我们将笔记本配置为在空闲 60 分钟后超时,所以不必担心关停实例。如果您要手动关停实例,请点击控制台的 Vertex AI Workbench 部分中的“停止”按钮。如果您想完全删除该笔记本,请点击“删除”按钮。

可停止实例

如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”:

删除存储空间