1. Overview
In this lab, you'll use Vertex AI to run a multi-worker training job for a TensorFlow model.
What you learn
You'll learn how to:
- Modify training application code for multi-worker training
- Configure and launch a multi-worker training job from the Vertex AI UI
- Configure and launch a multi-worker training job with the Vertex SDK
The total cost to run this lab on Google Cloud is about $5.
2. Intro to Vertex AI
This lab uses the newest AI product offering available on Google Cloud. Vertex AI integrates the ML offerings across Google Cloud into a seamless development experience. Previously, models trained with AutoML and custom models were accessible via separate services. The new offering combines both into a single API, along with other new products. You can also migrate existing projects to Vertex AI. If you have any feedback, please see the support page.
Vertex AI includes many different products to support end-to-end ML workflows. This lab will focus on the products highlighted below: Training and Workbench
3. Use Case Overview
In this lab you'll use transfer learning to train an image classification model on the cassava dataset from TensorFlow Datasets. The architecture you'll use is a ResNet50 model from the tf.keras.applications
library pretrained on the Imagenet dataset.
Why Distributed Training?
If you have a single GPU, TensorFlow will use this accelerator to speed up model training with no extra work on your part. However, if you want to get an additional boost from using multiple GPUs on a single machine or multiple machines (each with potentially multiple GPUs), then you'll need to use tf.distribute
, which is TensorFlow's library for running a computation across multiple devices. A device refers to a CPU or accelerator, such as GPUs or TPUs, on some machine that TensorFlow can run operations on.
The simplest way to get started with distributed training is a single machine with multiple GPU devices. A TensorFlow distribution strategy from the tf.distribute
module will manage the coordination of data distribution and gradient updates across all of the GPUs. If you've mastered single host training and are looking to scale even further, then adding multiple machines to your cluster can help you get an even greater performance boost. You can make use of a cluster of machines that are CPU only, or that each have one or more GPUs. This lab covers the latter case and demonstrates how to use MultiWorkerMirroredStrategy
to distribute training of a TensorFlow model across multiple machines on Vertex AI.
MultiWorkerMirroredStrategy
is a synchronous data parallelism strategy that you can use with only a few code changes. A copy of the model is created on each device in your cluster. The subsequent gradient updates will happen in a synchronous manner. This means that each worker device computes the forward and backward passes through the model on a different slice of the input data. The computed gradients from each of these slices are then aggregated across all of the devices on a machine and all machines in the cluster and reduced (usually an average) in a process known as all-reduce. The optimizer then performs the parameter updates with these reduced gradients thereby keeping the devices in sync. To learn more about distributed training with TensorFlow, check out the video below:
4. Set up your environment
You'll need a Google Cloud Platform project with billing enabled to run this codelab. To create a project, follow the instructions here.
Step 1: Enable the Compute Engine API
Navigate to Compute Engine and select Enable if it isn't already enabled. You'll need this to create your notebook instance.
Step 2: Enable the Container Registry API
Navigate to the Container Registry and select Enable if it isn't already. You'll use this to create a container for your custom training job.
Step 3: Enable the Vertex AI API
Navigate to the Vertex AI section of your Cloud Console and click Enable Vertex AI API.
Step 4: Create a Vertex AI Workbench instance
From the Vertex AI section of your Cloud Console, click on Workbench:
Enable the Notebooks API if it isn't already.
Once enabled, click MANAGED NOTEBOOKS:
Then select NEW NOTEBOOK.
Give your notebook a name, and then click Advanced Settings.
Under Advanced Settings, enable idle shutdown and set the number of minutes to 60. This means your notebook will shutdown automatically when not in use so you don't incur unnecessary costs.
Under Security select "Enable terminal" if it is not already enabled.
You can leave all of the other advanced settings as is.
Next, click Create. The instance will take a couple minutes to be provisioned.
Once the instance has been created, select Open JupyterLab.
The first time you use a new instance, you'll be asked to authenticate. Follow the steps in the UI to do so.
5. Containerize training application code
You'll submit this training job to Vertex by putting your training application code in a Docker container and pushing this container to Google Container Registry. Using this approach, you can train a model built with any framework.
To start, from the Launcher menu, open a Terminal window in your notebook instance:
Create a new directory called cassava
and cd into it:
mkdir cassava
cd cassava
Step 1: Create a Dockerfile
The first step in containerizing your code is to create a Dockerfile. In the Dockerfile you'll include all the commands needed to run the image. It'll install all the necessary libraries and set up the entry point for the training code.
From your Terminal, create an empty Dockerfile:
touch Dockerfile
Open the Dockerfile and copy the following into it:
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7
WORKDIR /
# Copies the trainer code to the docker image.
COPY trainer /trainer
# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
This Dockerfile uses the Deep Learning Container TensorFlow Enterprise 2.7 GPU Docker image. The Deep Learning Containers on Google Cloud come with many common ML and data science frameworks pre-installed. After downloading that image, this Dockerfile sets up the entrypoint for the training code. You haven't created these files yet – in the next step, you'll add the code for training and tuning the model.
Step 2: Create a Cloud Storage bucket
In this training job, you'll export the trained TensorFlow model to a Cloud Storage Bucket. From your Terminal, run the following to define an env variable for your project, making sure to replace your-cloud-project
with the ID of your project:
PROJECT_ID='your-cloud-project'
Next, run the following in your Terminal to create a new bucket in your project.
BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET
Step 3: Add model training code
From your Terminal, run the following to create a directory for the training code and a Python file where you'll add the code:
mkdir trainer
touch trainer/task.py
You should now have the following in your cassava/
directory:
+ Dockerfile
+ trainer/
+ task.py
Next, open the task.py
file you just created and copy the code below. You'll need to replace {your-gcs-bucket}
with the name of the Cloud Storage bucket you just created.
import tensorflow as tf
import tensorflow_datasets as tfds
import os
PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2
# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'
def preprocess_data(image, label):
'''Resizes and scales images.'''
image = tf.image.resize(image, (300,300))
return tf.cast(image, tf.float32) / 255., label
def create_dataset(batch_size):
'''Loads Cassava dataset and preprocesses data.'''
data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
number_of_classes = info.features['label'].num_classes
train_data = data['train'].map(preprocess_data,
num_parallel_calls=tf.data.experimental.AUTOTUNE)
train_data = train_data.shuffle(1000)
train_data = train_data.batch(batch_size)
train_data = train_data.prefetch(tf.data.experimental.AUTOTUNE)
# Set AutoShardPolicy
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
train_data = train_data.with_options(options)
return train_data, number_of_classes
def create_model(number_of_classes):
'''Creates and compiles pretrained ResNet50 model.'''
base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
x = base_model.output
x = tf.keras.layers.GlobalAveragePooling2D()(x)
x = tf.keras.layers.Dense(1016, activation='relu')(x)
predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
model = tf.keras.Model(inputs=base_model.input, outputs=predictions)
model.compile(
loss='sparse_categorical_crossentropy',
optimizer=tf.keras.optimizers.Adam(0.0001),
metrics=['accuracy'])
return model
def _is_chief(task_type, task_id):
'''Helper function. Determines if machine is chief.'''
return task_type == 'chief'
def _get_temp_dir(dirpath, task_id):
'''Helper function. Gets temporary directory for saving model.'''
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
'''Helper function. Gets filepath to save model.'''
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
def main():
# Create strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
# Get data
global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
train_data, number_of_classes = create_dataset(global_batch_size)
# Wrap variable creation within strategy scope
with strategy.scope():
model = create_model(number_of_classes)
model.fit(train_data, epochs=EPOCHS)
# Determine type and task of the machine from
# the strategy cluster resolver
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# Based on the type and task, write to the desired model path
write_model_path = write_filepath(BUCKET, task_type, task_id)
model.save(write_model_path)
if __name__ == "__main__":
main()
Before you build the container, let's take a deeper look at the code, which uses MultiWorkerMirroredStrategy
from the tf.distribute.Strategy
API.
There are a few components in the code that are necessary for your code to work with MultiWorkerMirroredStrategy
.
- The data needs to be sharded, meaning that each worker is assigned a subset of the entire dataset. Therefore, at each step a global batch size of non overlapping dataset elements will be processed by each worker. This sharding happens automatically with
tf.data.experimental.AutoShardPolicy
, which can be set toFILE
orDATA
. In this example, thecreate_dataset()
function sets theAutoShardPolicy
toDATA
because the the cassava dataset is not downloaded as multiple files. However, if you did not set the policy toDATA
, the defaultAUTO
policy would kick in and the end result would be the same. You can learn more about dataset sharding withMultiWorkerMirroredStrategy
here. - In the
main()
function, theMultiWorkerMirroredStrategy
object is created. Next, you wrap the creation of your model variables within the strategy's scope. This crucial step tells TensorFlow which variables should be mirrored across the replicas. - The batch size is scaled up by the
num_replicas_in_sync
. This ensures that each replica processes the same number of examples on each step. Scaling the batch size is a best practice when using synchronous data parallelism strategies in TensorFlow. - Saving your model is slightly more complicated in the multi-worker case because the destination needs to be different for each of the workers. The chief worker will save to the desired model directory, while the other workers will save the model to temporary directories. It's important that these temporary directories are unique in order to prevent multiple workers from writing to the same location. Saving can contain collective operations, meaning that all workers must save and not just the chief. The functions
_is_chief()
,_get_temp_dir()
,write_filepath()
, as well as themain()
function all include boilerplate code that help save the model.
Note that if you have used MultiWorkerMirroredStrategy
in a different environment, you might have set up the TF_CONFIG
environment variable. Vertex AI sets TF_CONFIG
for you automatically, so you do not need to define this variable on each machine in your cluster.
Step 4: Build the container
From your Terminal, run the following to define an env variable for your project, making sure to replace your-cloud-project
with the ID of your project:
PROJECT_ID='your-cloud-project'
Define a variable with the URI of your container image in Google Container Registry:
IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"
Configure docker
gcloud auth configure-docker
Then, build the container by running the following from the root of your cassava
directory:
docker build ./ -t $IMAGE_URI
Lastly, push it to Google Container Registry:
docker push $IMAGE_URI
With the container pushed to Container Registry, you're now ready to kick off a the training job.
6. Run a multi-worker training job on Vertex AI
This lab uses custom training via a custom container on Google Container Registry, but you can also run a training job with the Pre-built containers.
To start, navigate to the Training section in the Vertex section of your Cloud console:
Step 1: Configure training job
Click Create to enter the parameters for your training job.
- Under Dataset, select No managed dataset
- Then select Custom training (advanced) as your training method and click Continue.
- Enter
multiworker-cassava
(or whatever you'd like to call your model) for Model name - Click Continue
In the Container settings step, select Custom container:
In the first box (Container image), enter the value of your IMAGE_URI
variable from the previous section. It should be: gcr.io/your-cloud-project/multiworker:cassava
, with your own project ID. Leave the rest of the fields blank and click Continue.
Skip the Hhyperparameters step by clicking Continue again.
Step 2: Configure compute cluster
Vertex AI provides 4 worker pools to cover the different types of machine tasks.
Worker pool 0 configures the Primary, chief, scheduler, or "master". In MultiWorkerMirroredStrategy
, all machines are designated as workers, which are the physical machines on which the replicated computation is executed. In addition to each machine being a worker, there needs to be one worker that takes on some extra work such as saving checkpoints and writing summary files to TensorBoard. This machine is known as the chief. There is only ever one chief worker, so your worker count for worker pool 0 will always be 1.
In Compute and pricing, leave the selected region as-is and configure Worker pool 0 as follows:
Worker pool 1 is where you configure the workers for your cluster.
Configure Worker pool 1 as follows:
The cluster is now configured to have two CPU only machines. When the training application code is run, MultiWorkerMirroredStrategy
will distribute the training across both machines.
MultiWorkerMirroredStrategy
only has the chief and worker task types, so there is no need to configure the additional Worker Pools. However, if you were to use TensorFlow's ParameterServerStrategy
, you would configure your parameter servers in worker pool 2. And if you wanted to add an evaluator to your cluster, you would configure that machine in worker pool 3.
Click Start training to kick off the hyperparameter tuning job. In the Training section of your console under the TRAINING PIPELINES tab you'll see your newly launched job:
🎉 Congratulations! 🎉
You've learned how to use Vertex AI to:
- Launch a multi-worker training job for training code provided in a custom container. You used a TensorFlow model in this example, but you can train a model built with any framework using custom or built-in containers.
To learn more about different parts of Vertex, check out the documentation.
7. [Optional] Use the Vertex SDK
The previous section showed how to launch the training job via the UI. In this section, you'll see an alternative way to submit the training job by using the Vertex Python API.
Return to your notebook instance, and create a create a TensorFlow 2 notebook from the Launcher:
Import the Vertex AI SDK.
from google.cloud import aiplatform
To launch the multi-worker training job, you first need to define the worker pool spec. Note that the use of GPUs in the spec is completely optional and you can remove accelerator_type
and accelerator_count
if you would like a CPU only cluster as shown in the previous section.
# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
{
"replica_count": 1,
"machine_spec": {
"machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
},
"container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
},
{
"replica_count": 1,
"machine_spec": {
"machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
},
"container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
}
]
Next, create and run a CustomJob
. You'll need to replace {YOUR_BUCKET}
with a bucket in your project for staging. You can use the same bucket you created earlier.
# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
worker_pool_specs=worker_pool_specs,
staging_bucket='gs://{YOUR_BUCKET}')
my_multiworker_job.run()
In the Training section of your console under the CUSTOM JOBS tab you'll see your training job:
8. Cleanup
Because we configured the notebook to time out after 60 idle minutes, we don't need to worry about shutting the instance down. If you would like to manually shut down the instance, click the Stop button on the Vertex AI Workbench section of the console. If you'd like to delete the notebook entirely, click the Delete button.
To delete the Storage Bucket, using the Navigation menu in your Cloud Console, browse to Storage, select your bucket, and click Delete: