Vertex AI:使用 TensorFlow 进行多工作器训练和迁移学习

1. 概览

在本实验中,您将使用 Vertex AI,为一个 TensorFlow 模型运行多工作器训练作业。

学习内容

您将了解如何:

  • 修改多工作器训练的训练应用代码
  • 通过 Vertex AI 界面配置和启动多工作器训练作业
  • 使用 Vertex SDK 配置并启动多工作器训练作业

在 Google Cloud 上运行此实验的总费用约为 $5

2. Vertex AI 简介

本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,可通过单独的服务访问使用 AutoML 训练的模型和自定义模型。新产品与其他新产品结合在一起,形成了一个 API。您还可以将现有项目迁移到 Vertex AI。如果您有任何反馈,请参阅支持页面

Vertex AI 包含许多不同的产品,可支持端到端机器学习工作流。本实验将重点介绍下列产品:培训Workbench

Vertex 产品概览

3.使用场景概览

在本实验中,您将使用迁移学习,基于 TensorFlow Datasets 中的 cassava 数据集来训练图片分类模型。您将使用的架构是基于 Imagenet 数据集预训练的 tf.keras.applications 库中的 ResNet50 模型。

为什么使用分布式训练?

如果您有一个 GPU,TensorFlow 会使用此加速器加快模型训练速度,您无需执行额外的操作。但是,如果您希望在单台或多台计算机上使用多个 GPU 进一步提升效果(每台计算机可能有多个 GPU),则需要使用 tf.distribute(TensorFlow 的库,用于跨计算运行) 。设备是指 TensorFlow 可以在其上运行运算的计算机上的 CPU 或加速器,例如 GPU 或 TPU。

要开始进行分布式训练,最简单的方法是使用多个 GPU 设备。tf.distribute 模块的 TensorFlow 分布策略将负责管理所有 GPU 之间的数据分布和梯度更新。如果您精通单个主机训练并希望进一步扩展,那么向集群添加多台机器可以帮助您进一步提升性能。您可以使用一组仅支持 CPU 的机器,或者每台机器具有一个或多个 GPU。本实验涵盖后一种情况,并演示如何使用 MultiWorkerMirroredStrategy 在 Vertex AI 上将 TensorFlow 模型的训练过程分布在多台计算机上。

MultiWorkerMirroredStrategy 是一种同步数据并行策略,只需对代码进行少量更改即可使用。系统会在集群中的每台设备上创建模型的副本。后续的渐变更新会以同步的方式进行。这意味着,每个工作器设备都会针对不同的输入数据切片计算前向和后向传递模型。然后,从这些切片中计算的梯度会汇总一台计算机上的所有设备以及集群中所有机器,并在一个称为“全部还原”的进程中对其进行缩减(通常是平均值)。然后,优化程序会使用这些减小的梯度执行参数更新,从而使设备保持同步。如需详细了解如何使用 TensorFlow 进行分布式训练,请观看以下视频:

4.设置环境

您需要一个启用了结算功能的 Google Cloud Platform 项目才能运行此 Codelab。如需创建项目,请按照此处的说明操作。

第 1 步:启用 Compute Engine API

转到 Compute Engine,然后选择启用(如果尚未启用)。创建笔记本实例时需要用到此文件。

第 2 步:启用 Container Registry API

转到 Container Registry,然后选择启用(如果尚未启用)。您将使用此文件为您的自定义训练作业创建容器。

第 3 步:启用 Vertex AI API

转到 Cloud Console 的 Vertex AI 部分,然后点击启用 Vertex AI API

Vertex AI 信息中心

第 4 步:创建 Vertex AI Workbench 实例

在 Cloud Console 的 Vertex AI 部分中,点击 Workbench:

Vertex AI 菜单

启用 Notebooks API(如果尚未启用)。

Notebook_api

启用后,点击 MANAGED NOTES(代管式笔记本):

Notebooks_UI

然后选择新建笔记本

新笔记本

为您的笔记本命名,然后点击高级设置

create_notebook

在“高级设置”下,启用空闲关停,并将分钟数设置为 60。这意味着,当您的笔记本处于未使用状态时,会自动关停,以免产生不必要的费用。

idle_timeout

安全下方,选择“启用终端”(如果尚未启用)。

Enable_terminal

其他所有高级设置无需更改即可。

接下来,点击创建。预配实例需要几分钟时间。

创建实例后,选择打开 JupyterLab

open_jupyterlab

首次使用新实例时,系统会要求您进行身份验证。为此,请按照界面中的步骤操作。

身份验证

5. 将训练应用代码容器化

要将训练作业提交到 Vertex,您需要将训练应用代码放入 Docker 容器中,并将该容器推送到 Google Container Registry。使用此方法,您可以训练使用任何框架构建的模型。

首先,在“启动器”菜单中,在笔记本实例中打开终端窗口:

在笔记本中打开终端

新建一个名为 cassava 的目录,并放入该目录:

mkdir cassava
cd cassava

第 1 步:创建 Dockerfile

将代码容器化的第一步是创建 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。

在终端中创建一个空的 Dockerfile:

touch Dockerfile

打开 Dockerfile 并将以下内容复制到其中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

此 Dockerfile 使用 Deep Learning Container TensorFlow Enterprise 2.7 GPU Docker 映像。Google Cloud 上的 Deep Learning Containers 预安装了许多常见的机器学习和数据科学框架。下载该映像后,此 Dockerfile 会为训练代码设置入口点。您尚未创建这些文件 - 在下一步中,您将添加用于训练和调整模型的代码。

第 2 步:创建 Cloud Storage 存储分区

在此训练作业中,您需要将经过训练的 TensorFlow 模型导出到 Cloud Storage 存储分区。在终端中,运行以下命令为项目定义一个环境变量,确保将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

接下来,在终端运行以下命令,在项目中创建一个新的存储分区。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

第 3 步:添加模型训练代码

从终端运行以下命令,为训练代码创建一个目录,以及要在其中添加代码的 Python 文件:

mkdir trainer
touch trainer/task.py

您的 cassava/ 目录中现在应包含以下代码:

+ Dockerfile
+ trainer/
    + task.py

接下来,打开您刚刚创建的 task.py 文件,并复制下面的代码。您需要将 {your-gcs-bucket} 替换为您刚刚创建的 Cloud Storage 存储分区的名称。

import tensorflow as tf
import tensorflow_datasets as tfds
import os

PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label

def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes

def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'

def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

在构建容器之前,让我们来深入了解代码,该代码使用 tf.distribute.Strategy API 中的 MultiWorkerMirroredStrategy

代码中有一些组件是 MultiWorkerMirroredStrategy 正常运行所必需的。

  1. 数据需要分片,这意味着每个工作器会分配整个数据集的一个子集。因此,在每个步骤中,每个工作器都会处理非重叠数据集元素的全局批量大小。分片会借助 tf.data.experimental.AutoShardPolicy(可以设置为 FILEDATA)自动进行。在此示例中,create_dataset() 函数将 AutoShardPolicy 设置为 DATA,因为 Cassava 数据集不会下载为多个文件。不过,如果您未将此政策设为 DATA,系统会启动默认的 AUTO 政策,并使最终结果相同。您可以在此处详细了解使用 MultiWorkerMirroredStrategy 进行的数据集分片。
  2. main() 函数中,创建 MultiWorkerMirroredStrategy 对象。接下来,您需要将模型变量的创建过程封装在策略的作用域内。这个关键步骤会告知 TensorFlow 应在副本之间镜像哪些变量。
  3. 批次大小按 num_replicas_in_sync 按比例增加。这样可确保每个副本在每个步骤处理的样本数量相同。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳做法。
  4. 在多工作器情况下,模型保存会稍微复杂一些,因为每个工作器的目标位置需要不同。主工作器将保存到所需的模型目录,而其他工作器会将模型保存到临时目录。这些临时目录必须唯一,以防止多个工作器写入同一位置。储蓄可能包含集体行动,这意味着所有员工都必须拯救行动,而不仅仅是局长。函数 _is_chief()_get_temp_dir()write_filepath()main() 函数都包含帮助保存模型的样板代码。

请注意,如果您在其他环境中使用 MultiWorkerMirroredStrategy,则可能设置了 TF_CONFIG 环境变量。Vertex AI 会自动为您设置 TF_CONFIG,因此您无需在集群中的每个机器上定义此变量。

第 4 步:构建容器

在终端中,运行以下命令为项目定义一个环境变量,确保将 your-cloud-project 替换为您的项目 ID:

PROJECT_ID='your-cloud-project'

使用 Google Container Registry 中容器映像的 URI 定义变量:

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

然后,从 cassava 目录的根目录下运行以下命令来构建容器:

docker build ./ -t $IMAGE_URI

最后,将其推送到 Google Container Registry:

docker push $IMAGE_URI

将容器推送到 Container Registry 后,您便可以启动训练作业。

6.在 Vertex AI 上运行多工作器训练作业

本实验通过 Google Container Registry 上的自定义容器使用自定义训练,但您也可以使用预构建的容器运行训练作业。

首先,转到 Cloud Console 的“顶点”部分中的训练部分:

uCAIP 菜单

第 1 步:配置训练作业

点击创建,为您的训练作业输入参数。

  • 数据集下,选择无代管式数据集
  • 然后选择自定义训练(高级)作为训练方法,并点击继续
  • 输入 multiworker-cassava(或您希望调用模型的任何名称)作为模型名称
  • 点击继续

在“容器设置”步骤中,选择自定义容器

自定义容器选项

在第一个框(容器映像)中,输入上一部分中的 IMAGE_URI 变量的值。应该为:gcr.io/your-cloud-project/multiworker:cassava(您自己的项目 ID)。将其余字段留空,然后点击继续

再次点击继续以跳过 H 超参数步骤。

第 2 步:配置计算集群

Vertex AI 提供 4 个工作器池,用于处理不同类型的机器任务。

工作器池 0 用于配置主要、主、调度程序或“主实例”。在 MultiWorkerMirroredStrategy 中,所有机器都被指定为工作器,即执行复制计算的物理机器。除了每台机器是一个工作器之外,还需要一个工作器来完成一些额外的工作,例如保存检查点和将摘要文件写入 TensorBoard。这台机器称为主机。只有一个主工作器,因此工作器池 0 的工作器计数将始终为 1。

计算和价格中,保持所选区域不变,并按以下方式配置工作器池 0

工作器池_0

工作器池 1 是为集群配置工作器。

按如下所示配置工作器池 1

工作器池_1

集群现已配置为具有两台仅支持 CPU 的机器。运行训练应用代码后,MultiWorkerMirroredStrategy 会将训练分配到这两台机器上。

MultiWorkerMirroredStrategy 只有主设备和任务任务类型,因此无需配置其他工作器池。但是,如果要使用 TensorFlow 的 ParameterServerStrategy,应在工作器池 2 中配置参数服务器。如果要将评估器添加到集群,请在工作器池 3 中配置该机器。

点击开始训练以启动超参数调节作业。在控制台的“训练”部分,点击训练流水线标签页后,您会看到新发布的作业:

训练作业

🎉? 恭喜!🎉

您学习了如何使用 Vertex AI 执行以下操作:

  • 启动多工作器训练作业,以使用自定义容器中提供的训练代码。您在本示例中使用了 TensorFlow 模型,但可以使用自定义或内置容器训练使用任何框架构建的模型。

如需详细了解 Vertex 的不同部分,请参阅文档

7. [可选] 使用 Vertex SDK

上一部分介绍了如何通过界面启动训练作业。在本部分中,您将看到使用 Vertex Python API 提交训练作业的替代方法。

返回笔记本实例,通过启动器创建 TensorFlow 2 笔记本:

新笔记本

导入 Vertex AI SDK。

from google.cloud import aiplatform

如需启动多工作器训练作业,您首先需要定义工作器池规范。请注意,该规范中的 GPU 的使用是完全可行的,如果您希望使用上一部分中所示的仅支持 CPU 的集群,则可以移除 accelerator_typeaccelerator_count

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

接下来,创建并运行 CustomJob。您需要将 {YOUR_BUCKET} 替换为项目中的存储分区以进行暂存。您可以使用之前创建的同一存储分区。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

在控制台的“自定义作业”标签页下的“训练”部分,您会看到您的训练作业:

自定义作业

8. 清理

因为我们将笔记本配置为在 60 分钟空闲后超时,所以我们不必担心关停实例。如果您要手动关停实例,请点击控制台的 Vertex AI Workbench 部分中的“停止”按钮。如果您想完全删除笔记本,请点击“删除”按钮。

停止实例

如需删除存储分区,请使用 Cloud Console 中的“导航”菜单,转到“存储”,选择您的存储分区,然后点击“删除”:

删除存储空间