1. 概览
在本实验中,您将使用 Vertex AI 来通过 TensorFlow 在 Vertex AI Training 上运行分布式训练作业。
本实验是对生产环境进行原型设计视频系列的一部分。请务必先完成之前的实验,然后再尝试此实验。您可以观看随附的视频系列以了解详情:
。
学习内容
您将了解如何:
- 在具有多个 GPU 的单台机器上运行分布式训练
- 在多个机器上运行分布式训练
在 Google Cloud 上运行此实验的总费用约为 2 美元。
2. Vertex AI 简介
本实验使用的是 Google Cloud 上提供的最新 AI 产品。Vertex AI 将整个 Google Cloud 的机器学习产品集成到无缝的开发体验中。以前,使用 AutoML 训练的模型和自定义模型是通过不同的服务访问的。现在,该新产品与其他新产品一起将这两种模型合并到一个 API 中。您还可以将现有项目迁移到 Vertex AI。
Vertex AI 包含许多不同的产品,可支持端到端机器学习工作流。本实验将重点介绍下面突出显示的产品:Training 和 Workbench
3. 分布式训练概览
如果您有一个 GPU,则 TensorFlow 会使用此加速器加快模型训练速度,您无需执行额外的操作。但是,如果您希望使用多个 GPU 进一步提升速度,则需要使用 tf.distribute
,它是 TensorFlow 的模块,用于在多个设备上运行计算。
本实验的第一部分使用 tf.distribute.MirroredStrategy
,您只需对代码进行一些更改,即可将其添加到训练应用中。此策略会在机器的每个 GPU 上创建模型的副本。后续的梯度更新会以同步的方式进行。这意味着,每个 GPU 都会针对不同的输入数据切片计算通过模型的前向和后向传递。然后,来自各个切片的计算梯度会在所有 GPU 上聚合,并在一个称为 all-reduce 的进程中取平均值。模型参数会使用这些平均梯度进行更新。
本实验末尾的可选部分使用 tf.distribute.MultiWorkerMirroredStrategy
,它与 MirroredStrategy
类似,不同之处在于它可以在多台机器上运行。其中每个机器可能还具有多个 GPU。与 MirroredStrategy
类似,MultiWorkerMirroredStrategy
是一种同步数据并行策略,只需进行一些代码更改即可使用。从一台机器上的同步数据并行转移到多台机器上的主要区别在于,现在每一步结束时的梯度都需要在机器中的所有 GPU 和集群中的所有机器上同步。
您无需了解相关详细信息即可完成本实验,但如果您想详细了解分布式训练在 TensorFlow 中的工作原理,请观看以下视频:
4. 设置您的环境
完成使用 Vertex AI 训练自定义模型实验中的步骤以设置您的环境。
5. 单机器、多 GPU 训练
如要将分布式训练作业提交到 Vertex AI,您需要将训练应用代码放在 Docker 容器中,然后将此容器推送到 Google Artifact Registry。使用此方法,您可以训练使用任何框架构建的模型。
首先,通过您在之前的实验中创建的 Workbench 笔记本的“启动器”菜单,打开一个终端窗口。
第 1 步:编写训练代码
创建一个名为 flowers-multi-gpu
的新目录并通过 cd 命令进入该目录:
mkdir flowers-multi-gpu
cd flowers-multi-gpu
运行以下命令,为训练代码创建一个目录和一个 Python 文件(您将在其中添加下面的代码)。
mkdir trainer
touch trainer/task.py
您的 flowers-multi-gpu/
目录中现在应包含以下内容:
+ trainer/
+ task.py
接下来,打开您刚刚创建的 task.py
文件并复制下面的代码。
您需要将 BUCKET_ROOT
中的 {your-gcs-bucket}
替换为您在实验 1 中存储花卉数据集的 Cloud Storage 存储桶。
import tensorflow as tf
import numpy as np
import os
## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'
# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32
IMG_HEIGHT = 180
IMG_WIDTH = 180
DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
def create_datasets(data_dir, batch_size):
'''Creates train and validation datasets.'''
train_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
validation_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
return train_dataset, validation_dataset
def create_model():
'''Creates model.'''
model = tf.keras.Sequential([
tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
])
return model
def main():
# Create distribution strategy
strategy = tf.distribute.MirroredStrategy()
# Get data
GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)
# Wrap model creation and compilation within scope of strategy
with strategy.scope():
model = create_model()
model.compile(optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=['accuracy'])
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=EPOCHS
)
model.save(f'{BUCKET_ROOT}/model_output')
if __name__ == "__main__":
main()
在构建容器之前,我们先来深入了解一下代码。有一些组件专用于使用分布式训练。
- 在
main()
函数中,相关代码创建了MirroredStrategy
对象。接下来,您将模型变量的创建过程封装在策略的范围内。此步骤指示 TensorFlow 应在 GPU 中镜像哪些变量。 - 批次大小按
num_replicas_in_sync
扩容。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳实践。您可以点击此处了解详情。
第 2 步:创建 Dockerfile
如需将代码容器化,您需要创建一个 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。
通过终端,在 flowers 目录的根目录下创建一个空的 Dockerfile:
touch Dockerfile
您的 flowers-multi-gpu/
目录中现在应包含以下内容:
+ Dockerfile
+ trainer/
+ task.py
打开 Dockerfile 并将以下代码复制到其中:
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8
WORKDIR /
# Copies the trainer code to the docker image.
COPY trainer /trainer
# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
第 3 步:构建容器
在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project
替换为您的项目 ID:
PROJECT_ID='your-cloud-project'
在 Artifact Registry 中创建代码库。我们将使用在第一个实验中创建的代码库。
REPO_NAME='flower-app'
使用 Artifact Registry 中容器映像的 URI 定义变量:
IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine
配置 Docker
gcloud auth configure-docker \
us-central1-docker.pkg.dev
然后,从 flowers-multi-gpu
目录的根目录运行以下命令来构建容器:
docker build ./ -t $IMAGE_URI
最后,将其推送到 Artifact Registry:
docker push $IMAGE_URI
在将容器推送到 Artifact Registry 后,您就可以启动训练作业了。
第 4 步:使用 SDK 运行作业
在本部分中,您将了解如何使用 Vertex AI Python SDK 配置和启动分布式训练作业。
在启动器中,创建一个 TensorFlow 2 笔记本。
导入 Vertex AI SDK。
from google.cloud import aiplatform
然后,定义 CustomContainerTrainingJob
。
您需要替换 container_uri
中的 {PROJECT_ID}
和 staging_bucket
中的 {YOUR_BUCKET}
。
job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
staging_bucket='gs://{YOUR_BUCKET}')
定义作业后,您便可以运行作业。您要将加速器数量设为 2。如果我们只使用 1 个 GPU,这不会被视为分布式训练。在一台机器上进行分布式训练是指使用 2 个或更多加速器。
my_custom_job.run(replica_count=1,
machine_type='n1-standard-4',
accelerator_type='NVIDIA_TESLA_V100',
accelerator_count=2)
在控制台中,您可以查看作业的进度。
6. [可选] 多工作器训练
现在,您已经尝试在配备了多个 GPU 的单台机器上进行分布式训练,接下来您可以利用多台机器训练,让分布式训练技能更上一层楼。为了降低费用,我们不会向这些机器添加任何 GPU,但您可以根据需要尝试添加 GPU。
在笔记本实例中打开一个新终端窗口:
第 1 步:编写训练代码
创建一个名为 flowers-multi-machine
的新目录并通过 cd 命令进入该目录:
mkdir flowers-multi-machine
cd flowers-multi-machine
运行以下命令,为训练代码创建一个目录和一个 Python 文件(您将在其中添加下面的代码)。
mkdir trainer
touch trainer/task.py
您的 flowers-multi-machine/
目录中现在应包含以下内容:
+ trainer/
+ task.py
接下来,打开您刚刚创建的 task.py
文件并复制下面的代码。
您需要将 BUCKET_ROOT
中的 {your-gcs-bucket}
替换为您在实验 1 中存储花卉数据集的 Cloud Storage 存储桶。
import tensorflow as tf
import numpy as np
import os
## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'
# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32
IMG_HEIGHT = 180
IMG_WIDTH = 180
DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'
def create_datasets(data_dir, batch_size):
'''Creates train and validation datasets.'''
train_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
validation_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
return train_dataset, validation_dataset
def create_model():
'''Creates model.'''
model = tf.keras.Sequential([
tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
])
return model
def _is_chief(task_type, task_id):
'''Helper function. Determines if machine is chief.'''
return task_type == 'chief'
def _get_temp_dir(dirpath, task_id):
'''Helper function. Gets temporary directory for saving model.'''
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
'''Helper function. Gets filepath to save model.'''
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
def main():
# Create distribution strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
# Get data
GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)
# Wrap variable creation within strategy scope
with strategy.scope():
model = create_model()
model.compile(optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=['accuracy'])
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=EPOCHS
)
# Determine type and task of the machine from
# the strategy cluster resolver
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# Based on the type and task, write to the desired model path
write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
model.save(write_model_path)
if __name__ == "__main__":
main()
在构建容器之前,我们先来深入了解一下代码。代码中有一些组件是您的训练应用使用 MultiWorkerMirroredStrategy
所必需的。
- 在
main()
函数中,相关代码创建了MultiWorkerMirroredStrategy
对象。接下来,您将模型变量的创建过程封装在策略的范围内。这一关键步骤指示 TensorFlow 应在副本中镜像哪些变量。 - 批次大小按
num_replicas_in_sync
扩容。在 TensorFlow 中使用同步数据并行策略时,扩缩批次大小是最佳实践。 - 在多工作器情况下,保存模型稍微复杂一些,因为每个工作器的目的地必须有所不同。主工作器将保存到所需的模型目录,其他工作器会将模型保存到临时目录。为防止多个工作器写入同一位置,这些临时目录必须具有唯一性。保存操作可以包含集体操作,这意味着所有工作器都必须保存,而不仅仅是主工作器。函数
_is_chief()
、_get_temp_dir()
、write_filepath()
和main()
均包含有助于保存模型的样板代码。
第 2 步:创建 Dockerfile
如需将代码容器化,您需要创建一个 Dockerfile。在 Dockerfile 中,您将添加运行映像所需的所有命令。它将安装所有必要的库并为训练代码设置入口点。
通过终端,在 flowers 目录的根目录下创建一个空的 Dockerfile:
touch Dockerfile
您的 flowers-multi-machine/
目录中现在应包含以下内容:
+ Dockerfile
+ trainer/
+ task.py
打开 Dockerfile 并将以下代码复制到其中:
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8
WORKDIR /
# Copies the trainer code to the docker image.
COPY trainer /trainer
# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
第 3 步:构建容器
在终端中,运行以下命令为项目定义一个环境变量,务必注意将 your-cloud-project
替换为您的项目 ID:
PROJECT_ID='your-cloud-project'
在 Artifact Registry 中创建代码库。我们将使用在第一个实验中创建的代码库。
REPO_NAME='flower-app'
使用 Google Artifact Registry 中容器映像的 URI 定义变量:
IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine
配置 Docker
gcloud auth configure-docker \
us-central1-docker.pkg.dev
然后,从 flowers-multi-machine
目录的根目录运行以下命令来构建容器:
docker build ./ -t $IMAGE_URI
最后,将其推送到 Artifact Registry:
docker push $IMAGE_URI
在将容器推送到 Artifact Registry 后,您就可以启动训练作业了。
第 4 步:使用 SDK 运行作业
在本部分中,您将了解如何使用 Vertex AI Python SDK 配置和启动分布式训练作业。
在启动器中,创建一个 TensorFlow 2 笔记本。
导入 Vertex AI SDK。
from google.cloud import aiplatform
然后,定义 worker_pool_specs
。
Vertex AI 提供 4 个工作器池,可处理不同类型的机器任务。
工作器池 0 会配置主实例、主帐号、调度器或“主客户端”。在 MultiWorkerMirroredStrategy
中,所有机器都被指定为工作器,工作器是执行复制计算的物理机器。除了将每台机器用作工作器之外,还需要有一个工作器接受一些额外的工作,例如保存检查点,以及将摘要文件写入 TensorBoard。此机器被称为主工作器。由于只有一个主工作器,因此工作器池 0 的工作器计数始终为 1。
工作器池 1 用于为集群配置其他工作器。
worker_pool_specs
列表中的第一个字典表示工作器池 0,第二个字典表示工作器池 1。在此示例中,这两个配置是相同的。但是,如果您想在 3 台机器上训练,则需要将 replica_count
设为 2,从而向工作器池 1 添加其他工作器。如果您想添加 GPU,则需要将参数 accelerator_type
和 accelerator_count
添加到两个工作器池的 machine_spec
中。请注意,如果您想要将 GPU 与 MultiWorkerMirroredStrategy
搭配使用,集群中的每个机器都必须具有相同的 GPU 数量。否则,作业将失败。
您需要替换 image_uri
中的 {PROJECT_ID}
。
# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.
worker_pool_specs=[
{
"replica_count": 1,
"machine_spec": {
"machine_type": "n1-standard-4",
},
"container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
},
{
"replica_count": 1,
"machine_spec": {
"machine_type": "n1-standard-4",
},
"container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
}
]
接下来,创建并运行 CustomJob
,注意将 staging_bucket
中的 {YOUR_BUCKET}
替换为您项目中用于暂存的存储桶。
my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
worker_pool_specs=worker_pool_specs,
staging_bucket='gs://{YOUR_BUCKET}')
my_custom_job.run()
在控制台中,您可以查看作业的进度。
🎉 恭喜!🎉
您学习了如何使用 Vertex AI 执行以下操作:
- 使用 TensorFlow 运行分布式训练作业
如需详细了解 Vertex 的不同部分,请参阅相关文档。
7. 清理
因为我们将笔记本配置为在空闲 60 分钟后超时,所以不必担心关停实例。如果您要手动关停实例,请点击控制台的 Vertex AI Workbench 部分中的“停止”按钮。如果您想完全删除该笔记本,请点击“删除”按钮。
如需删除存储桶,请使用 Cloud Console 中的导航菜单,浏览到“存储空间”,选择您的存储桶,然后点击“删除”: