Prototype to Production: addestramento distribuito su Vertex AI

1. Panoramica

In questo lab utilizzerai Vertex AI per eseguire un job di addestramento distribuito su Vertex AI Training utilizzando TensorFlow.

Questo lab fa parte della serie di video Prototype to Production. Assicurati di completare i lab precedenti prima di provare questo. Per saperne di più, puoi guardare la serie di video di accompagnamento:

.

Cosa imparerai

Al termine del corso sarai in grado di:

  • Esegui l'addestramento distribuito su una singola macchina con più GPU
  • Esegui l'addestramento distribuito su più macchine

Il costo totale per eseguire questo lab su Google Cloud è di circa 2$.

2. Introduzione a Vertex AI

Questo lab utilizza la più recente offerta di prodotti AI disponibile su Google Cloud. Vertex AI integra le offerte ML di Google Cloud in un'esperienza di sviluppo fluida. In precedenza, i modelli addestrati con AutoML e i modelli personalizzati erano accessibili tramite servizi separati. La nuova offerta combina entrambi in un'unica API, insieme ad altri nuovi prodotti. Puoi anche migrare progetti esistenti su Vertex AI.

Vertex AI include molti prodotti diversi per supportare i flussi di lavoro ML end-to-end. Questo lab si concentrerà sui prodotti evidenziati di seguito: Training e Workbench

Panoramica del prodotto Vertex

3. Panoramica dell'addestramento distribuito

Se hai una sola GPU, TensorFlow utilizzerà questo acceleratore per velocizzare l'addestramento del modello senza alcun lavoro aggiuntivo da parte tua. Tuttavia, se vuoi ottenere un ulteriore miglioramento utilizzando più GPU, devi utilizzare tf.distribute, il modulo di TensorFlow per eseguire un calcolo su più dispositivi.

La prima sezione di questo lab utilizza tf.distribute.MirroredStrategy, che puoi aggiungere alle tue applicazioni di addestramento con poche modifiche al codice. Questa strategia crea una copia del modello su ogni GPU della macchina. Gli aggiornamenti successivi del gradiente verranno eseguiti in modo sincrono. Ciò significa che ogni GPU calcola i passaggi in avanti e indietro attraverso il modello su una sezione diversa dei dati di input. I gradienti calcolati da ciascuna di queste sezioni vengono poi aggregati in tutte le GPU e calcolati in media in un processo noto come riduzione collettiva. I parametri del modello vengono aggiornati utilizzando questi gradienti mediati.

La sezione facoltativa alla fine del lab utilizza tf.distribute.MultiWorkerMirroredStrategy, che è simile a MirroredStrategy, tranne per il fatto che funziona su più macchine. Ciascuna di queste macchine potrebbe anche avere più GPU. Ad esempio, MirroredStrategy, MultiWorkerMirroredStrategy è una strategia di parallelismo dei dati sincrono che puoi utilizzare con poche modifiche al codice. La differenza principale quando si passa dal parallelismo dei dati sincrono su una macchina a più macchine è che i gradienti alla fine di ogni passaggio devono ora essere sincronizzati su tutte le GPU di una macchina e su tutte le macchine del cluster.

Non è necessario conoscere i dettagli per completare questo lab, ma se vuoi saperne di più su come funziona l'addestramento distribuito in TensorFlow, guarda il video qui sotto:

4. Configura l'ambiente

Completa i passaggi del lab Addestramento di modelli personalizzati con Vertex AI per configurare l'ambiente.

5. Addestramento su una singola macchina con più GPU

Invierai il job di addestramento distribuito a Vertex AI inserendo il codice dell'applicazione di addestramento in un container Docker ed eseguendo il push di questo container a Google Artifact Registry. Utilizzando questo approccio, puoi addestrare un modello creato con qualsiasi framework.

Per iniziare, apri una finestra del terminale dal menu Avvio app del notebook Workbench che hai creato nei lab precedenti.

Apri il terminale nel notebook

Passaggio 1: scrivi il codice di addestramento

Crea una nuova directory chiamata flowers-multi-gpu e accedi tramite cd:

mkdir flowers-multi-gpu
cd flowers-multi-gpu

Esegui quanto segue per creare una directory per il codice di addestramento e un file Python in cui aggiungerai il codice riportato di seguito.

mkdir trainer
touch trainer/task.py

Ora dovresti avere quanto segue nella directory flowers-multi-gpu/:

+ trainer/
    + task.py

Successivamente, apri il file task.py che hai appena creato e copia il codice riportato di seguito.

Devi sostituire {your-gcs-bucket} in BUCKET_ROOT con il bucket Cloud Storage in cui hai archiviato il set di dati sui fiori nel lab 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset


def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():  

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')


if __name__ == "__main__":
    main()

Prima di creare il container, diamo uno sguardo più approfondito al codice. Esistono alcuni componenti specifici dell'utilizzo dell'addestramento distribuito.

  • Nella funzione main() viene creato l'oggetto MirroredStrategy. Successivamente, includi la creazione delle variabili del modello nell'ambito della strategia. Questo passaggio indica a TensorFlow quali variabili devono essere replicate nelle GPU.
  • La dimensione del batch viene aumentata di num_replicas_in_sync. La scalabilità della dimensione del batch è una best practice quando si utilizzano strategie di parallelismo dei dati sincrone in TensorFlow. Puoi scoprire di più qui.

Passaggio 2: crea un Dockerfile

Per containerizzare il codice, devi creare un Dockerfile. Nel Dockerfile includerai tutti i comandi necessari per eseguire l'immagine. Installerà tutte le librerie necessarie e configurerà il punto di accesso per il codice di addestramento.

Dal terminale, crea un Dockerfile vuoto nella radice della directory flowers:

touch Dockerfile

Ora dovresti avere quanto segue nella directory flowers-multi-gpu/:

+ Dockerfile
+ trainer/
    + task.py

Apri il Dockerfile e copia al suo interno quanto segue:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Passaggio 3: crea il contenitore

Dal terminale, esegui quanto segue per definire una variabile env per il tuo progetto, assicurandoti di sostituire your-cloud-project con l'ID del tuo progetto:

PROJECT_ID='your-cloud-project'

Crea un repository in Artifact Registry. Utilizzeremo il repository creato nel primo lab.

REPO_NAME='flower-app'

Definisci una variabile con l'URI dell'immagine container in Artifact Registry:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

Configura Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Quindi, crea il container eseguendo quanto segue dalla radice della directory flowers-multi-gpu:

docker build ./ -t $IMAGE_URI

Infine, invialo ad Artifact Registry:

docker push $IMAGE_URI

Con il push del container in Artifact Registry, ora puoi avviare il job di addestramento.

Passaggio 4: esegui il job con l'SDK

In questa sezione vedrai come configurare e avviare il job di addestramento distribuito utilizzando l'SDK Vertex AI Python.

Da Avvio app, crea un notebook TensorFlow 2.

new_notebook

Importa l'SDK Vertex AI.

from google.cloud import aiplatform

Poi, definisci un CustomContainerTrainingJob.

Dovrai sostituire {PROJECT_ID} in container_uri e {YOUR_BUCKET} in staging_bucket.

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

Una volta definito il job, puoi eseguirlo. Imposta il numero di acceleratori su 2. Se utilizzassimo una sola GPU, questo non sarebbe considerato addestramento distribuito. L'addestramento distribuito su una singola macchina si verifica quando utilizzi due o più acceleratori.

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

Nella console potrai visualizzare l'avanzamento del job.

multigpu_job

6. [Facoltativo] Addestramento di più lavoratori

Ora che hai provato l'addestramento distribuito su una singola macchina con più GPU, puoi portare le tue competenze di addestramento distribuito al livello successivo eseguendo l'addestramento su più macchine. Per contenere i costi, non aggiungeremo GPU a queste macchine, ma puoi fare degli esperimenti aggiungendo GPU se vuoi.

Apri una nuova finestra del terminale nell'istanza del notebook:

Apri il terminale nel notebook

Passaggio 1: scrivi il codice di addestramento

Crea una nuova directory chiamata flowers-multi-machine e accedi tramite cd:

mkdir flowers-multi-machine
cd flowers-multi-machine

Esegui quanto segue per creare una directory per il codice di addestramento e un file Python in cui aggiungerai il codice riportato di seguito.

mkdir trainer
touch trainer/task.py

Ora dovresti avere quanto segue nella directory flowers-multi-machine/:

+ trainer/
    + task.py

Successivamente, apri il file task.py che hai appena creato e copia il codice riportato di seguito.

Devi sostituire {your-gcs-bucket} in BUCKET_ROOT con il bucket Cloud Storage in cui hai archiviato il set di dati sui fiori nel lab 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset


def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

Prima di creare il container, diamo uno sguardo più approfondito al codice. Nel codice sono presenti alcuni componenti necessari per il funzionamento dell'applicazione di addestramento con MultiWorkerMirroredStrategy.

  • Nella funzione main() viene creato l'oggetto MultiWorkerMirroredStrategy. Successivamente, includi la creazione delle variabili del modello nell'ambito della strategia. Questo passaggio cruciale indica a TensorFlow quali variabili devono essere replicate.
  • La dimensione del batch viene aumentata di num_replicas_in_sync. La scalabilità della dimensione del batch è una best practice quando si utilizzano strategie di parallelismo dei dati sincrone in TensorFlow.
  • Il salvataggio del modello è leggermente più complicato nel caso di più worker perché la destinazione deve essere diversa per ciascuno di essi. Il worker principale salverà il modello nella directory desiderata, mentre gli altri worker lo salveranno in directory temporanee. È importante che queste directory temporanee siano uniche per impedire a più worker di scrivere nella stessa posizione. Il salvataggio può contenere operazioni collettive, il che significa che tutti i lavoratori devono salvare e non solo il capo. Le funzioni _is_chief(), _get_temp_dir(), write_filepath() e main() includono tutte un codice boilerplate che consente di salvare il modello.

Passaggio 2: crea un Dockerfile

Per containerizzare il codice, devi creare un Dockerfile. Nel Dockerfile includerai tutti i comandi necessari per eseguire l'immagine. Installerà tutte le librerie necessarie e configurerà il punto di accesso per il codice di addestramento.

Dal terminale, crea un Dockerfile vuoto nella radice della directory flowers:

touch Dockerfile

Ora dovresti avere quanto segue nella directory flowers-multi-machine/:

+ Dockerfile
+ trainer/
    + task.py

Apri il Dockerfile e copia al suo interno quanto segue:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Passaggio 3: crea il contenitore

Dal terminale, esegui quanto segue per definire una variabile env per il tuo progetto, assicurandoti di sostituire your-cloud-project con l'ID del tuo progetto:

PROJECT_ID='your-cloud-project'

Crea un repository in Artifact Registry. Utilizzeremo il repository creato nel primo lab.

REPO_NAME='flower-app'

Definisci una variabile con l'URI dell'immagine container in Google Artifact Registry:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

Configura Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Quindi, crea il container eseguendo quanto segue dalla radice della directory flowers-multi-machine:

docker build ./ -t $IMAGE_URI

Infine, invialo ad Artifact Registry:

docker push $IMAGE_URI

Con il push del container in Artifact Registry, ora puoi avviare il job di addestramento.

Passaggio 4: esegui il job con l'SDK

In questa sezione vedrai come configurare e avviare il job di addestramento distribuito utilizzando l'SDK Vertex AI Python.

Da Avvio app, crea un notebook TensorFlow 2.

new_notebook

Importa l'SDK Vertex AI.

from google.cloud import aiplatform

Poi, definisci worker_pool_specs.

Vertex AI fornisce 4 pool di worker per coprire i diversi tipi di attività della macchina.

Il worker pool 0 configura il worker principale, il worker principale, lo scheduler o il "master". In MultiWorkerMirroredStrategy, tutte le macchine sono designate come worker, ovvero le macchine fisiche su cui viene eseguito il calcolo replicato. Oltre a ogni macchina che funge da worker, deve esserci un worker che si occupi di alcune attività aggiuntive, come il salvataggio dei checkpoint e la scrittura dei file di riepilogo in TensorBoard. Questa macchina è nota come il capo. Esiste un solo worker principale, quindi il numero di worker per il pool di worker 0 sarà sempre 1.

Il worker pool 1 è il luogo in cui configuri i worker aggiuntivi per il tuo cluster.

Il primo dizionario nell'elenco worker_pool_specs rappresenta il worker pool 0, mentre il secondo dizionario rappresenta il worker pool 1. In questo esempio, le due configurazioni sono identiche. Tuttavia, se vuoi eseguire l'addestramento su tre macchine, devi aggiungere altri worker al pool di worker 1 impostando replica_count su 2. Se vuoi aggiungere GPU, devi aggiungere gli argomenti accelerator_type e accelerator_count a machine_spec per entrambi i pool di worker. Tieni presente che se vuoi utilizzare le GPU con MultiWorkerMirroredStrategy, ogni macchina del cluster deve avere un numero identico di GPU. In caso contrario, il job non andrà a buon fine.

Dovrai sostituire {PROJECT_ID} in image_uri.

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

Poi crea ed esegui un CustomJob, sostituendo {YOUR_BUCKET} in staging_bucket con un bucket nel tuo progetto per lo staging.

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

Nella console potrai visualizzare l'avanzamento del job.

multi_worker_job

🎉 Congratulazioni! 🎉

Hai imparato come utilizzare Vertex AI per:

  • Esegui job di addestramento distribuito con TensorFlow

Per saperne di più sulle diverse parti di Vertex, consulta la documentazione.

7. Esegui la pulizia

Poiché abbiamo configurato il notebook in modo che scada il timeout dopo 60 minuti di inattività, non dobbiamo preoccuparci di arrestare l'istanza. Se vuoi arrestare manualmente l'istanza, fai clic sul pulsante Arresta nella sezione Vertex AI Workbench della console. Se vuoi eliminare completamente il blocco note, fai clic sul pulsante Elimina.

Arresta istanza

Per eliminare il bucket di archiviazione, utilizza il menu di navigazione nella console Google Cloud, vai a Storage, seleziona il bucket e fai clic su Elimina:

Elimina spazio di archiviazione