Protótipo para produção: treinamento distribuído na Vertex AI

1. Visão geral

Neste laboratório, você vai usar a Vertex AI para executar um job de treinamento distribuído no Vertex AI Training usando o TensorFlow.

Este laboratório é parte da série de vídeos Protótipo para produção. Finalize os laboratório anteriores antes de tentar este. Para saber mais, confira a série de vídeos complementar:

.

Conteúdo do laboratório

Você vai aprender a:

  • Executar treinamentos distribuídos em uma única máquina com várias GPUs.
  • Executar treinamentos distribuídos em várias máquinas.

O custo total da execução deste laboratório no Google Cloud é de aproximadamente US$ 2.

2. Introdução à Vertex AI

Este laboratório usa a mais nova oferta de produtos de IA disponível no Google Cloud. A Vertex AI integra as ofertas de ML do Google Cloud em uma experiência de desenvolvimento intuitiva. Anteriormente, modelos treinados com o AutoML e modelos personalizados eram acessíveis por serviços separados. A nova oferta combina ambos em uma única API, com outros novos produtos. Você também pode migrar projetos para a Vertex AI.

A Vertex AI inclui vários produtos diferentes para dar suporte a fluxos de trabalho integrais de ML. Os produtos destacados abaixo são o foco deste laboratório: treinamentos e Workbench.

Visão geral do produto Vertex

3. Visão geral do treinamento distribuído

Se você tiver apenas uma GPU, o TensorFlow vai usar esse acelerador no treinamento do modelo sem nenhum trabalho extra da sua parte. No entanto, se você quiser aumentar ainda mais o uso de várias GPUs, será necessário usar o tf.distribute, que é o módulo do TensorFlow para executar computação em vários dispositivos.

A primeira seção deste laboratório usa tf.distribute.MirroredStrategy, que você pode adicionar aos seus apps de treinamento com apenas algumas mudanças no código. Essa estratégia cria uma cópia do modelo em cada GPU na máquina. As próximas atualizações de gradientes ocorrerão de maneira síncrona. Isso significa que cada GPU calcula as transmissões anteriores e posteriores do modelo em uma fração diferente dos dados de entrada. Os gradientes calculados de cada uma dessas fatias são agregados em todas as GPUs e ponderados em um processo conhecido como all-reduce. Os parâmetros do modelo são atualizados com esses gradientes médios.

A seção opcional no final do laboratório usa tf.distribute.MultiWorkerMirroredStrategy, que é semelhante ao MirroredStrategy, só que funciona em várias máquinas. Cada máquina também pode ter várias GPUs. Assim como MirroredStrategy, MultiWorkerMirroredStrategy é uma estratégia síncrona de carregamento em paralelo de dados que você pode usar com apenas algumas mudanças no código. A principal diferença ao migrar da estratégia síncrona de carregamento em paralelo em uma máquina para várias é que os gradientes ao final de cada etapa agora precisam ser sincronizados em todas as GPUs em uma máquina e em todas as máquinas no cluster.

Você não precisa saber os detalhes para concluir este laboratório, mas, se quiser saber mais sobre como o treinamento distribuído funciona no TensorFlow, confira o vídeo abaixo:

4. Configurar o ambiente

Finalize as etapas no laboratório Como treinar modelos personalizados com a Vertex AI para configurar seu ambiente:

5. Uma máquina, vários treinamentos de GPU

Você vai enviar seu job de treinamento distribuído para a Vertex AI adicionado o código do aplicativo de treinamento a um contêiner do Docker e enviando esse contêiner por push para o Google Artifact Registry. Nessa abordagem, você pode treinar um modelo criado com qualquer framework.

Para começar, no menu de acesso rápido do notebook do Workbench que você criou nos laboratórios anteriores, abra uma janela do terminal.

Abrir o terminal no notebook

Etapa 1: escrever código de treinamento

Crie um novo diretório chamado flowers-multi-gpu e coloque cd nele:

mkdir flowers-multi-gpu
cd flowers-multi-gpu

Execute o comando a seguir para criar um diretório destinado ao código de treinamento e a um arquivo Python em que você vai adicionar o código abaixo:

mkdir trainer
touch trainer/task.py

Agora você deve ter o seguinte no diretório flowers-multi-gpu/:

+ trainer/
    + task.py

Depois abra o arquivo task.py que você acabou de criar e copie o código abaixo.

Você vai precisar substituir {your-gcs-bucket} em BUCKET_ROOT pelo bucket do Cloud Storage em que o conjunto de dados de flores está armazenado no laboratório 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')

if __name__ == "__main__":
    main()

Antes de criar o contêiner, vamos analisar o código mais a fundo. Há alguns componentes que são específicos do treinamento distribuído.

  • Na função main(), o objeto MirroredStrategy é criado. Depois, envolva a criação das variáveis de modelo no escopo da estratégia. Esta etapa informa ao TensorFlow quais variáveis precisam ser espelhadas nas GPUs.
  • O tamanho do lote é escalonado verticalmente pelo num_replicas_in_sync. O escalonamento do tamanho do lote é uma prática recomendada ao usar estratégias de paralelismo síncrono de dados no TensorFlow. Saiba mais neste link.

Etapa 2: criar um Dockerfile

Para conteinerizar seu código, você precisa criar um Dockerfile. No Dockerfile, você vai incluir todos os comandos necessários para executar a imagem. Ele vai instalar todas as bibliotecas necessárias e configurar o ponto de entrada do código de treinamento.

No seu terminal, crie um Dockerfile vazio na raiz do diretório de flores.

touch Dockerfile

Agora você deve ter o seguinte no diretório flowers-multi-gpu/:

+ Dockerfile
+ trainer/
    + task.py

Abra o Dockerfile e copie o seguinte nele:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Etapa 3: criar o contêiner

No Terminal, execute o comando a seguir e defina uma variável env para o projeto. Lembre-se de substituir your-cloud-project pelo ID do projeto.

PROJECT_ID='your-cloud-project'

Crie um repositório no Artifact Registry. Vamos usar o que criamos no primeiro laboratório.

REPO_NAME='flower-app'

Defina uma variável com o URI da imagem do seu contêiner no Artifact Registry.

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

Configure o Docker.

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Agora execute o comando a seguir na raiz do diretório flowers-multi-gpu para criar o diretório:

docker build ./ -t $IMAGE_URI

Por fim, envie por push para o Artifact Registry:

docker push $IMAGE_URI

Com o contêiner enviado por push para o Artifact Registry, agora está tudo pronto para você iniciar o job de treinamento.

Etapa 4: executar o job com SDK

Nesta seção, você vai saber como configurar e iniciar o job de treinamento distribuído usando o SDK da Vertex AI para Python.

Na tela de início, crie um notebook do TensorFlow 2.

new_notebook

Importe o SDK da Vertex AI.

from google.cloud import aiplatform

Defina um CustomContainerTrainingJob.

Você vai precisar substituir {PROJECT_ID} em container_uri, e {YOUR_BUCKET} em staging_bucket.

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

Depois de definido, o job pode ser executado. Você vai definir o número de aceleradores como dois. Se usarmos apenas uma GPU, o treinamento não seria considerado distribuído. O treinamento distribuído em uma única máquina é quando você usa dois ou mais aceleradores.

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

No console, você pode acompanhar o progresso do job.

multigpu_job

6. [Opcional] Treinamento para vários workers

Agora que você fez o treinamento distribuído em uma única máquina com várias GPUs, pode avançar e usar o que aprendeu em várias máquinas. Para reduzir os custos, não vamos adicionar GPUs a essas máquinas, mas você pode fazer esse teste.

Abra uma nova janela do terminal na instância do notebook.

Abrir o terminal no notebook

Etapa 1: escrever código de treinamento

Crie um novo diretório chamado flowers-multi-machine e coloque cd nele:

mkdir flowers-multi-machine
cd flowers-multi-machine

Execute o comando a seguir para criar um diretório destinado ao código de treinamento e a um arquivo Python em que você vai adicionar o código abaixo:

mkdir trainer
touch trainer/task.py

Agora você deve ter o seguinte no diretório flowers-multi-machine/:

+ trainer/
    + task.py

Depois abra o arquivo task.py que você acabou de criar e copie o código abaixo.

Você vai precisar substituir {your-gcs-bucket} em BUCKET_ROOT pelo bucket do Cloud Storage em que o conjunto de dados de flores está armazenado no laboratório 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'

def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

Antes de criar o contêiner, vamos analisar o código mais a fundo. Há alguns componentes no código que são necessários para seu aplicativo de treinamento funcionar com MultiWorkerMirroredStrategy.

  • Na função main(), o objeto MultiWorkerMirroredStrategy é criado. Depois, envolva a criação das variáveis de modelo no escopo da estratégia. Esta etapa crucial informa ao TensorFlow quais variáveis precisam ser espelhadas nas réplicas.
  • O tamanho do lote é escalonado verticalmente pelo num_replicas_in_sync. O escalonamento do tamanho do lote é uma prática recomendada ao usar estratégias de paralelismo síncrono de dados no TensorFlow.
  • É um pouco mais complicado salvar seu modelo no caso de vários workers porque o destino precisa ser diferente para cada um. O worker CHIEF vai ser salvo no diretório do modelo desejado, enquanto os demais vão salvar o modelo em diretórios temporários. É importante que esses diretórios temporários sejam únicos para impedir que vários workers gravem no mesmo local. O salvamento pode conter operações coletivas: todos os workers precisam salvar e não apenas o CHIEF. As funções _is_chief(), _get_temp_dir(), write_filepath() e main() incluem código boilerplate que ajuda a salvar o modelo.

Etapa 2: criar um Dockerfile

Para conteinerizar seu código, você precisa criar um Dockerfile. No Dockerfile, você vai incluir todos os comandos necessários para executar a imagem. Ele vai instalar todas as bibliotecas necessárias e configurar o ponto de entrada do código de treinamento.

No seu terminal, crie um Dockerfile vazio na raiz do diretório de flores.

touch Dockerfile

Agora você deve ter o seguinte no diretório flowers-multi-machine/:

+ Dockerfile
+ trainer/
    + task.py

Abra o Dockerfile e copie o seguinte nele:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Etapa 3: criar o contêiner

No Terminal, execute o comando a seguir e defina uma variável env para o projeto. Lembre-se de substituir your-cloud-project pelo ID do projeto.

PROJECT_ID='your-cloud-project'

Crie um repositório no Artifact Registry. Vamos usar o que criamos no primeiro laboratório.

REPO_NAME='flower-app'

Defina uma variável com o URI da imagem do seu contêiner no Google Artifact Registry.

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

Configure o Docker.

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Agora execute o comando a seguir na raiz do diretório flowers-multi-machine para criar o diretório:

docker build ./ -t $IMAGE_URI

Por fim, envie por push para o Artifact Registry:

docker push $IMAGE_URI

Com o contêiner enviado por push para o Artifact Registry, agora está tudo pronto para você iniciar o job de treinamento.

Etapa 4: executar o job com SDK

Nesta seção, você vai saber como configurar e iniciar o job de treinamento distribuído usando o SDK da Vertex AI para Python.

Na tela de início, crie um notebook do TensorFlow 2.

new_notebook

Importe o SDK da Vertex AI.

from google.cloud import aiplatform

Depois defina worker_pool_specs.

A Vertex AI tem quatro pools de workers para cobrir diferentes tipos de tarefas de máquina.

O pool 0 configura o worker principal, CHIEF, programador ou "mestre". Em MultiWorkerMirroredStrategy, todas as máquinas são classificadas como workers, que são as máquinas físicas em que a computação replicada é executada. Além de cada máquina ser um worker, é preciso que um deles fique encarregado do trabalho extra, como salvar checkpoints e gravar arquivos de resumo no TensorBoard. Essa máquina é conhecida como CHIEF. Como há apenas um worker CHIEF, sua contagem de workers no pool 0 sempre vai ser 1.

O pool 1 é onde você configura mais workers para o cluster.

O primeiro dicionário na lista de worker_pool_specs representa o pool 0, e o segundo dicionário, o pool 1. Neste exemplo, as duas configurações são idênticas. Mas se você quiser treinar usando três máquinas, adicione outros workers ao pool 1 configurando replica_count como 2. Se você quiser adicionar GPUs, vai precisar adicionar também os argumentos accelerator_type e accelerator_count ao machine_spec nos dois pools de workers. Se você quiser usar as GPUs com MultiWorkerMirroredStrategy, cada máquina no cluster vai precisar ter um número idêntico de GPUs. Caso contrário, o job vai falhar.

Você vai precisar substituir {PROJECT_ID} em image_uri.

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

Depois, crie e execute CustomJob, substituindo {YOUR_BUCKET} em staging_bucket por um bucket no seu projeto na fase de testes.

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

No console, você pode acompanhar o progresso do job.

multi_worker_job

Parabéns! 🎉

Você aprendeu a usar a Vertex AI para:

  • Executar jobs de treinamento distribuído com o TensorFlow.

Para saber mais sobre as diferentes partes da Vertex, consulte a documentação.

7. Limpeza

Como configuramos o notebook para expirar após 60 minutos de inatividade, não precisamos nos preocupar em desligar a instância. Para encerrar a instância manualmente, clique no botão "Parar" na seção "Vertex AI Workbench" do console. Se quiser excluir o notebook completamente, clique no botão "Excluir".

Interromper instância

Para excluir o bucket do Storage, use o menu de navegação do console do Cloud, acesse o Storage, selecione o bucket e clique em "Excluir":

Excluir armazenamento