Del prototipo a la producción: Entrenamiento distribuido en Vertex AI

1. Descripción general

En este lab, usarás Vertex AI para ejecutar un trabajo de entrenamiento distribuido en Vertex AI Training mediante TensorFlow.

Este lab forma parte de la serie de videos Del prototipo a la producción. Asegúrate de completar los labs anteriores antes de realizar este. Puedes mirar la serie de videos complementaria para obtener más información:

.

Qué aprenderás

Aprenderás a hacer lo siguiente:

  • Ejecutar entrenamiento distribuido en una única máquina con múltiples GPUs
  • Ejecutar entrenamiento distribuido en múltiples máquinas

El costo total de la ejecución de este lab en Google Cloud es de aproximadamente $2.

2. Introducción a Vertex AI

En este lab, se utiliza la oferta de productos de IA más reciente de Google Cloud. Vertex AI integra las ofertas de AA de Google Cloud en una experiencia de desarrollo fluida. Anteriormente, se podía acceder a los modelos personalizados y a los entrenados con AutoML mediante servicios independientes. La nueva oferta combina ambos en una sola API, junto con otros productos nuevos. También puedes migrar proyectos existentes a Vertex AI.

Vertex AI incluye muchos productos distintos para respaldar flujos de trabajo de AA de extremo a extremo. Este lab se enfocará en los productos que se destacan a continuación: Training y Workbench

Descripción general del producto Vertex

3. Descripción general del entrenamiento distribuido

Si tienes una sola GPU, TensorFlow usará este acelerador para incrementar la velocidad del entrenamiento de modelos sin que debas realizar ninguna acción adicional. Sin embargo, si quieres ir más rápido mediante el uso de varias GPUs, deberás usar tf.distribute, que es el módulo de TensorFlow para procesamiento en varios dispositivos.

En la primera sección de este lab, se usa tf.distribute.MirroredStrategy, que puedes agregar a tus aplicaciones de entrenamiento solo con unos pocos cambios en el código. Esta estrategia crea una copia del modelo en cada GPU de tu máquina. Las actualizaciones de gradientes se realizarán de forma síncrona. Esto significa que cada GPU procesa los modelos de propagación y retropropagación en el modelo en una porción diferente de los datos de entrada. Luego, los gradientes procesados de cada una de estas porciones se agregan en todas las GPU y se promedian en un proceso conocido como all‑reduce. Los parámetros del modelo se actualizan con estos gradientes promediados.

En la sección opcional al final de este lab, se usa tf.distribute.MultiWorkerMirroredStrategy, que es similar a MirroredStrategy, con la excepción de que funciona en múltiples máquinas. Cada una de estas máquinas también puede tener múltiples GPUs. Al igual que MirroredStrategy, MultiWorkerMirroredStrategy es una estrategia de paralelismo de datos síncronos que puedes usar con solo algunos cambios en el código. La principal diferencia cuando se pasa del paralelismo de datos síncronos en una máquina a muchas es que los gradientes al final de cada paso ahora necesitan sincronizarse en todas las GPUs de una máquina y en todas las máquinas del clúster.

No necesitas conocer todos los detalles para completar este lab, pero si quieres obtener más información sobre el funcionamiento del entrenamiento distribuido en TensorFlow, mira el siguiente video:

4. Configura tu entorno

Completa los pasos en el lab Entrenamiento de modelos personalizados con Vertex AI para configurar tu entorno.

5. Entrenamiento de una sola máquina con múltiples GPUs

Para enviar tu trabajo de entrenamiento distribuido a Vertex AI, deberás colocar el código de la aplicación de entrenamiento en un contenedor de Docker y enviar el contenedor a Google Artifact Registry. Mediante este enfoque, puedes entrenar un modelo compilado con cualquier framework.

Para comenzar, en el menú Selector (Launcher) del notebook de Workbench que creaste en los labs anteriores, abre una ventana de terminal.

Abrir terminal en el notebook

Paso 1: Escribe el código de entrenamiento

Crea un directorio nuevo llamado flowers-multi-gpu y ábrelo con el comando cd:

mkdir flowers-multi-gpu
cd flowers-multi-gpu

Ejecuta el siguiente comando para crear un directorio para el código de entrenamiento y un archivo de Python en el que agregarás el siguiente código.

mkdir trainer
touch trainer/task.py

Ahora, deberías tener lo siguiente en el directorio flowers-multi-gpu/:

+ trainer/
    + task.py

A continuación, abre el archivo task.py que acabas de crear y copia el código que aparece más abajo.

Deberás reemplazar {your-gcs-bucket} en BUCKET_ROOT por el bucket de Cloud Storage en el que almacenaste el conjunto de datos de flores en el Lab 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')

if __name__ == "__main__":
    main()

Antes de crear el contenedor, analicemos el código en detalle. Hay algunos componentes que son específicos para el uso del entrenamiento distribuido.

  • En la función main(), se crea el objeto MirroredStrategy. A continuación, agrega la creación de las variables de tu modelo al alcance de la estrategia. En este paso, se indica a TensorFlow qué variables se deben duplicar en las GPU.
  • El tamaño del lote se escala verticalmente según num_replicas_in_sync. Escalar el tamaño del lote es una práctica recomendada cuando se usan estrategias de paralelismo de datos síncronos en TensorFlow. Puedes obtener más información aquí.

Paso 2: Crea un Dockerfile

Para alojar tu código en contenedores, deberás crear un Dockerfile. En él, incluirás todos los comandos necesarios para ejecutar la imagen. Se instalarán todas las bibliotecas necesarias y se configurará el punto de entrada para el código de entrenamiento.

En la terminal, crea un Dockerfile vacío en la raíz del directorio de flores:

touch Dockerfile

Ahora, deberías tener lo siguiente en el directorio flowers-multi-gpu/:

+ Dockerfile
+ trainer/
    + task.py

Abre el Dockerfile y copia lo siguiente:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Paso 3: Compila el contenedor

Desde tu terminal, ejecuta lo siguiente para definir una variable de entorno para tu proyecto y asegúrate de reemplazar your-cloud-project con el ID de tu proyecto:

PROJECT_ID='your-cloud-project'

Crea un repositorio en Artifact Registry. Usaremos el repositorio que creamos en el primer lab.

REPO_NAME='flower-app'

Define una variable con el URI de la imagen de contenedor en Artifact Registry:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

Configura el Docker.

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Luego, ejecuta el siguiente comando para compilar el contenedor desde la raíz de tu directorio flowers-multi-gpu:

docker build ./ -t $IMAGE_URI

Por último, envíalo a Artifact Registry:

docker push $IMAGE_URI

Ahora que ya enviaste el contenedor a Artifact Registry, puedes iniciar el trabajo de entrenamiento.

Paso 4: Ejecuta el trabajo con el SDK

En esta sección, verás cómo configurar y, luego, iniciar el trabajo de entrenamiento distribuido mediante el SDK de Vertex AI para Python.

En el Selector (Launcher), crea un notebook de TensorFlow 2.

new_notebook

Importa el SDK de Vertex AI.

from google.cloud import aiplatform

Luego, define un CustomContainerTrainingJob.

Deberás reemplazar {PROJECT_ID} en container_uri y {YOUR_BUCKET} en staging_bucket.

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

Cuando el trabajo esté definido, podrás ejecutarlo. Deberás establecer la cantidad de aceleradores en 2. Si usáramos solo 1 GPU, esto no se consideraría entrenamiento distribuido. El entrenamiento distribuido en una única máquina ocurre cuando usas 2 o más aceleradores.

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

En la consola, podrás ver el progreso del trabajo.

multigpu_job

6. Entrenamiento de varios trabajadores [opcional]

Ahora que probaste el entrenamiento distribuido en una única máquina con múltiples GPUs, puedes llevar las habilidades que adquiriste al siguiente nivel entrenando en múltiples máquinas. Para mantener los costos bajos, no agregaremos ninguna GPU a esas máquinas, pero, si quieres, puedes experimentar agregando GPUs.

Abre una nueva ventana de terminal en tu instancia de notebook:

Abrir terminal en el notebook

Paso 1: Escribe el código de entrenamiento

Crea un directorio nuevo llamado flowers-multi-machine y ábrelo con el comando cd:

mkdir flowers-multi-machine
cd flowers-multi-machine

Ejecuta el siguiente comando para crear un directorio para el código de entrenamiento y un archivo de Python en el que agregarás el siguiente código.

mkdir trainer
touch trainer/task.py

Ahora, deberías tener lo siguiente en el directorio flowers-multi-machine/:

+ trainer/
    + task.py

A continuación, abre el archivo task.py que acabas de crear y copia el código que aparece más abajo.

Deberás reemplazar {your-gcs-bucket} en BUCKET_ROOT por el bucket de Cloud Storage en el que almacenaste el conjunto de datos de flores en el Lab 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'

def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

Antes de crear el contenedor, analicemos el código en detalle. Hay algunos componente del código que son necesarios para que la aplicación de entrenamiento funcione con MultiWorkerMirroredStrategy.

  • En la función main(), se crea el objeto MultiWorkerMirroredStrategy. A continuación, agrega la creación de las variables de tu modelo al alcance de la estrategia. En este paso crucial, se indica a TensorFlow qué variables se deben duplicar en las réplicas.
  • El tamaño del lote se escala verticalmente según num_replicas_in_sync. Escalar el tamaño del lote es una práctica recomendada cuando se usan estrategias de paralelismo de datos síncronos en TensorFlow.
  • Guardar el modelo es un poco más complicado en el caso de varios trabajadores porque el destino necesita ser diferente para cada uno de ellos. El trabajador principal se guardará en el directorio de modelo deseado, mientras que los otros trabajadores guardarán el modelo en directorios temporales. Es importante que estos directorios temporales sean únicos para evitar que varios trabajadores escriban en la misma ubicación. La acción de guardar puede contener operaciones colectivas, lo que significa que todos los trabajadores deben realizarla, no solo el principal. Las funciones _is_chief(), _get_temp_dir(), write_filepath(), al igual que la función main() incluyen código estándar que ayuda a guardar el modelo.

Paso 2: Crea un Dockerfile

Para alojar tu código en contenedores, deberás crear un Dockerfile. En él, incluirás todos los comandos necesarios para ejecutar la imagen. Se instalarán todas las bibliotecas necesarias y se configurará el punto de entrada para el código de entrenamiento.

En la terminal, crea un Dockerfile vacío en la raíz del directorio de flores:

touch Dockerfile

Ahora, deberías tener lo siguiente en el directorio flowers-multi-machine/:

+ Dockerfile
+ trainer/
    + task.py

Abre el Dockerfile y copia lo siguiente:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Paso 3: Compila el contenedor

Desde tu terminal, ejecuta lo siguiente para definir una variable de entorno para tu proyecto y asegúrate de reemplazar your-cloud-project con el ID de tu proyecto:

PROJECT_ID='your-cloud-project'

Crea un repositorio en Artifact Registry. Usaremos el repositorio que creamos en el primer lab.

REPO_NAME='flower-app'

Define una variable con el URI de la imagen de contenedor en Google Artifact Registry:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

Configura el Docker.

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Luego, ejecuta el siguiente comando para compilar el contenedor desde la raíz de tu directorio flowers-multi-machine:

docker build ./ -t $IMAGE_URI

Por último, envíalo a Artifact Registry:

docker push $IMAGE_URI

Ahora que ya enviaste el contenedor a Artifact Registry, puedes iniciar el trabajo de entrenamiento.

Paso 4: Ejecuta el trabajo con el SDK

En esta sección, verás cómo configurar y, luego, iniciar el trabajo de entrenamiento distribuido mediante el SDK de Vertex AI para Python.

En el Selector (Launcher), crea un notebook de TensorFlow 2.

new_notebook

Importa el SDK de Vertex AI.

from google.cloud import aiplatform

A continuación, define worker_pool_specs.

Vertex AI proporciona 4 grupos de trabajadores para abarcar los diferentes tipos de tareas de máquinas.

El grupo de trabajadores 0 configura el trabajador principal o programador. En MultiWorkerMirroredStrategy, todas las máquinas se designan como trabajadores, que son las máquinas físicas en las que se ejecuta el procesamiento replicado. Además de que cada máquina sea un trabajador, debe haber un trabajador que realice tareas adicionales, como guardar los puntos de control y escribir archivos de resumen en TensorBoard. Esta máquina se conoce como principal. Solo hay un trabajador principal, por lo que la cantidad de trabajadores en el grupo 0 siempre será 1.

En el grupo de trabajadores 1, se configuran los trabajadores adicionales del clúster.

El primer diccionario de la lista worker_pool_specs representa al grupo de trabajadores 0 y el segundo diccionario, al grupo 1. En esta muestra, las dos configuraciones son idénticas. Sin embargo, si quisieras realizar el entrenamiento en 3 máquinas, deberías agregar trabajadores adicionales al grupo de trabajadores 1 estableciendo replica_count en 2. Si quisieras agregar GPUs, deberías agregar los argumentos accelerator_type y accelerator_count a machine_spec en ambos grupos de trabajadores. Ten en cuenta que si quieres usar GPUs con MultiWorkerMirroredStrategy, cada máquina del clúster debe tener una cantidad idéntica de GPUs. De lo contrario, el trabajo fallará.

Tendrás que reemplazar {PROJECT_ID} en image_uri.

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

Luego, crea y ejecuta un CustomJob y reemplaza {YOUR_BUCKET} en staging_bucket por un bucket de tu proyecto para la etapa de pruebas.

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

En la consola, podrás ver el progreso del trabajo.

multi_worker_job

🎉 ¡Felicitaciones! 🎉

Aprendiste a usar Vertex AI para hacer lo siguiente:

  • Ejecutar trabajos de entrenamiento distribuido con TensorFlow

Para obtener más información sobre las distintas partes de Vertex, consulta la documentación.

7. Realiza una limpieza

Debido a que configuramos el notebook para que se agote el tiempo de espera después de 60 minutos de inactividad, no tenemos que preocuparnos por cerrar la instancia. Si quieres cerrar la instancia de forma manual, haz clic en el botón Detener (Stop) en la sección Vertex AI Workbench de la consola. Si quieres borrar el notebook por completo, haz clic en el botón Borrar (Delete).

Detener instancias

Para borrar el bucket de almacenamiento, en el menú de navegación de la consola de Cloud, navega a Almacenamiento, selecciona tu bucket y haz clic en Borrar (Delete):

Borrar almacenamiento