Vertex AI: Multi-Worker Training and Transfer Learning with TensorFlow

In this lab, you'll use Vertex AI to run a multi-worker training job for a TensorFlow model.

What you learn

You'll learn how to:

  • Modify training application code for multi-worker training
  • Configure and launch a multi-worker training job from the Vertex AI UI
  • Configure and launch a multi-worker training job with the Vertex SDK

The total cost to run this lab on Google Cloud is about $5.

This lab uses the newest AI product offering available on Google Cloud. Vertex AI integrates the ML offerings across Google Cloud into a seamless development experience. Previously, models trained with AutoML and custom models were accessible via separate services. The new offering combines both into a single API, along with other new products. You can also migrate existing projects to Vertex AI. If you have any feedback, please see the support page.

Vertex AI includes many different products to support end-to-end ML workflows. This lab will focus on the products highlighted below: Training and Notebooks

Vertex product overview

In this lab you'll use transfer learning to train an image classification model on the cassava dataset from TensorFlow Datasets. The architecture you'll use is a ResNet50 model from the tf.keras.applications library pretrained on the Imagenet dataset.

Why Distributed Training?

If you have a single GPU, TensorFlow will use this accelerator to speed up model training with no extra work on your part. However, if you want to get an additional boost from using multiple GPUs on a single machine or multiple machines (each with potentially multiple GPUs), then you'll need to use tf.distribute, which is TensorFlow's library for running a computation across multiple devices. A device refers to a CPU or accelerator, such as GPUs or TPUs, on some machine that TensorFlow can run operations on.

The simplest way to get started with distributed training is a single machine with multiple GPU devices. A TensorFlow distribution strategy from the tf.distribute module will manage the coordination of data distribution and gradient updates across all of the GPUs. If you've mastered single host training and are looking to scale even further, then adding multiple machines to your cluster can help you get an even greater performance boost. You can make use of a cluster of machines that are CPU only, or that each have one or more GPUs. This lab covers the latter case and demonstrates how to use MultiWorkerMirroredStrategy to distribute training of a TensorFlow model across multiple machines on Vertex AI.

MultiWorkerMirroredStrategy is a synchronous data parallelism strategy that you can use with only a few code changes. A copy of the model is created on each device in your cluster. The subsequent gradient updates will happen in a synchronous manner. This means that each worker device computes the forward and backward passes through the model on a different slice of the input data. The computed gradients from each of these slices are then aggregated across all of the devices on a machine and all machines in the cluster and reduced (usually an average) in a process known as all-reduce. The optimizer then performs the parameter updates with these reduced gradients thereby keeping the devices in sync. To learn more about distributed training with TensorFlow, see this guide.

You'll need a Google Cloud Platform project with billing enabled to run this codelab. To create a project, follow the instructions here.

Step 1: Enable the Compute Engine API

Navigate to Compute Engine and select Enable if it isn't already enabled. You'll need this to create your notebook instance.

Step 2: Enable the Vertex AI API

Navigate to the Vertex AI section of your Cloud Console and click Enable Vertex AI API.

Vertex AI dashboard

Step 3: Enable the Container Registry API

Navigate to the Container Registry and select Enable if it isn't already. You'll use this to create a container for your custom training job.

Step 4: Create an Vertex Notebooks instance

From the Vertex AI section of your Cloud Console, click on Notebooks:

Vertex AI menu

From there, select New Instance. Then select the TensorFlow Enterprise 2.5 instance type without GPUs:

TFE instance

Use the default options and then click Create. Once the instance has been created, select Open JupyterLab:

Open CAIP Notebook

You'll submit this training job to Vertex by putting your training application code in a Docker container and pushing this container to Google Container Registry. Using this approach, you can train a model built with any framework.

To start, from the Launcher menu, open a Terminal window in your notebook instance:

Open terminal in notebook

Create a new directory called cassava and cd into it:

mkdir cassava
cd cassava

Step 1: Create a Dockerfile

The first step in containerizing your code is to create a Dockerfile. In the Dockerfile you'll include all the commands needed to run the image. It'll install all the necessary libraries and set up the entry point for the training code.

From your Terminal, create an empty Dockerfile:

touch Dockerfile

Open the Dockerfile and copy the following into it:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-5

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

This Dockerfile uses the Deep Learning Container TensorFlow Enterprise 2.5 GPU Docker image. The Deep Learning Containers on Google Cloud come with many common ML and data science frameworks pre-installed. After downloading that image, this Dockerfile sets up the entrypoint for the training code. You haven't created these files yet – in the next step, you'll add the code for training and tuning the model.

Step 2: Create a Cloud Storage bucket

In this training job, you'll export the trained TensorFlow model to a Cloud Storage Bucket. From your Terminal, run the following to define an env variable for your project, making sure to replace your-cloud-project with the ID of your project:

PROJECT_ID='your-cloud-project'

Next, run the following in your Terminal to create a new bucket in your project.

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

Step 3: Add model training code

From your Terminal, run the following to create a directory for the training code and a Python file where you'll add the code:

mkdir trainer
touch trainer/task.py

You should now have the following in your cassava/ directory:

+ Dockerfile
+ trainer/
    + task.py

Next, open the task.py file you just created and copy the code below.

import tensorflow as tf
import tensorflow_datasets as tfds
import os


PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label


def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes


def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model


def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)


def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

Before you build the container, let's take a deeper look at the code, which uses MultiWorkerMirroredStrategy from the tf.distribute.Strategy API.

There are a few components in the code that are necessary for your code to work with MultiWorkerMirroredStrategy.

  1. The data needs to be sharded, meaning that each worker is assigned a subset of the entire dataset. Therefore, at each step a global batch size of non overlapping dataset elements will be processed by each worker. This sharding happens automatically with tf.data.experimental.AutoShardPolicy, which can be set to FILE or DATA. In this example, the create_dataset() function sets the AutoShardPolicy to DATA because the the cassava dataset is not downloaded as multiple files. However, if you did not set the policy to DATA, the default AUTO policy would kick in and the end result would be the same. You can learn more about dataset sharding with MultiWorkerMirroredStrategy here.
  2. In the main() function, the MultiWorkerMirroredStrategy object is created. Next, you wrap the creation of your model variables within the strategy's scope. This crucial step tells TensorFlow which variables should be mirrored across the replicas.
  3. The batch size is scaled up by the num_replicas_in_sync. This ensures that each replica processes the same number of examples on each step. Scaling the batch size is a best practice when using synchronous data parallelism strategies in TensorFlow.
  4. Saving your model is slightly more complicated in the multi-worker case because the destination needs to be different for each of the workers. The chief worker will save to the desired model directory, while the other workers will save the model to temporary directories. It's important that these temporary directories are unique in order to prevent multiple workers from writing to the same location. Saving can contain collective operations, meaning that all workers must save and not just the chief. The functions _is_chief(), _get_temp_dir(), write_filepath(), as well as the main() function all include boilerplate code that help save the model.

Note that if you have used MultiWorkerMirroredStrategy in a different environment, you might have set up the TF_CONFIG environment variable. Vertex AI sets TF_CONFIG for you automatically, so you do not need to define this variable on each machine in your cluster.

Step 4: Build the container

From your Terminal, run the following to define an env variable for your project, making sure to replace your-cloud-project with the ID of your project:

PROJECT_ID='your-cloud-project'

Define a variable with the URI of your container image in Google Container Registry:

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

Then, build the container by running the following from the root of your cassava directory:

docker build ./ -t $IMAGE_URI

Lastly, push it to Google Container Registry:

docker push $IMAGE_URI

With the container pushed to Container Registry, you're now ready to kick off a the training job.

This lab uses custom training via a custom container on Google Container Registry, but you can also run a training job with the Pre-built containers.

To start, navigate to the Models section in the Vertex section of your Cloud console:

uCAIP menu

Step 1: Configure training job

Click Create to enter the parameters for your hyperparameter tuning job.

  • Under Dataset, select No managed dataset
  • Then select Custom training (advanced) as your training method and click Continue.
  • Enter multiworker-cassava (or whatever you'd like to call your model) for Model name
  • Click Continue

In the Container settings step, select Custom container:

Custom container option

In the first box (Container image), enter the value of your IMAGE_URI variable from the previous section. It should be: gcr.io/your-cloud-project/multiworker:cassava, with your own project ID. Leave the rest of the fields blank and click Continue.

Step 2: Configure compute cluster

Vertex AI provides 4 worker pools to cover the different types of machine tasks.

Worker pool 0 configures the Primary, chief, scheduler, or "master". In MultiWorkerMirroredStrategy, all machines are designated as workers, which are the physical machines on which the replicated computation is executed. In addition to each machine being a worker, there needs to be one worker that takes on some extra work such as saving checkpoints and writing summary files to TensorBoard. This machine is known as the chief. There is only ever one chief worker, so your worker count for worker pool 0 will always be 1.

In Compute and pricing, leave the selected region as-is and configure Worker pool 0 as follows:

Worker_pool_0

Worker pool 1 is where you configure the workers for your cluster.

Configure Worker pool 1 as follows:

Worker_pool_1

The cluster is now configured to have two CPU only machines. When the training application code is run, MultiWorkerMirroredStrategy will distribute the training across both machines.

MultiWorkerMirroredStrategy only has the chief and worker task types, so there is no need to configure the additional Worker Pools. However, if you were to use TensorFlow's ParameterServerStrategy, you would configure your parameter servers in worker pool 2. And if you wanted to add an evaluator to your cluster, you would confifgure that machine in worker pool 3.

Click Start training to kick off the hyperparameter tuning job. In the Training section of your console under the TRAINING PIPELINES tab you'll see your newly launched job:

Training jobs

🎉 Congratulations! 🎉

You've learned how to use Vertex AI to:

  • Launch a multi-worker training job for training code provided in a custom container. You used a TensorFlow model in this example, but you can train a model built with any framework using custom or built-in containers.

To learn more about different parts of Vertex, check out the documentation.

The previous section showed how to launch the training job via the UI. In this section, you'll see an alternative way to submit the training job by using the Vertex Python API. Go back to your notebook instance, and create a Python 3 notebook from the Launcher:.

new_notebook

In your notebook, run the following in a cell to install the Vertex AI SDK. Once the cell finishes, restart the Kernel.

!pip3 install google-cloud-aiplatform --upgrade --user

After restarting the Kernel, import the SDK:

from google.cloud import aiplatform

To launch the multi-worker training job, you first need to define the worker pool spec. Note that the use of GPUs in the spec is completely optional and you can remove accelerator_type and accelerator_count if you would like a CPU only cluster as shown in the previous section.

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-CLOUD-PROJECT} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_T4", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-CLOUD-PROJECT}/multiworker:cassava"
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_T4", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-CLOUD-PROJECT}/multiworker:cassava"}
      }
]

Next, create and run a CustomJob. You'll need to replace {YOUR_BUCKET} with a bucket in your project for staging.

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='{YOUR_BUCKET}'

my_multiworker_job.run()

In the Training section of your console under the CUSTOM JOBS tab you'll see your training job:

Custom jobs

If you'd like to continue using the notebook you created in this lab, it is recommended that you turn it off when not in use. From the Notebooks UI in your Cloud Console, select the notebook and then select Stop:

Stop instance

If you'd like to delete the notebook entirely, simply click the Delete button in the top right.

To delete the Storage Bucket, using the Navigation menu in your Cloud Console, browse to Storage, select your bucket, and click Delete:

Delete storage