Prototype to Production: addestramento distribuito su Vertex AI

1. Panoramica

In questo lab utilizzerai Vertex AI per eseguire un job di addestramento distribuito su Vertex AI Training utilizzando TensorFlow.

Questo lab fa parte della serie di video Prototype to Production. Assicurati di completare i lab precedenti prima di provare questo. Per scoprire di più, puoi guardare la serie di video allegata:

.

Cosa imparerai

Imparerai a:

  • Esegui l'addestramento distribuito su una singola macchina con più GPU
  • Esegui l'addestramento distribuito su più macchine

Il costo totale per l'esecuzione di questo lab su Google Cloud è di circa 2$.

2. Introduzione a Vertex AI

Questo lab utilizza la più recente offerta di prodotti AI disponibile su Google Cloud. Vertex AI integra le offerte ML di Google Cloud in un'esperienza di sviluppo fluida. In precedenza, i modelli addestrati con AutoML e i modelli personalizzati erano accessibili tramite servizi separati. La nuova offerta combina entrambi in un'unica API, insieme ad altri nuovi prodotti. Puoi anche migrare progetti esistenti su Vertex AI.

Vertex AI include molti prodotti diversi per supportare i flussi di lavoro ML end-to-end. Questo lab è incentrato sui prodotti evidenziati di seguito: Formazione e Workbench

Panoramica del prodotto Vertex

3. Panoramica dell'addestramento distribuito

Se disponi di una singola GPU, TensorFlow utilizzerà questo acceleratore per accelerare l'addestramento del modello senza alcun intervento aggiuntivo da parte tua. Tuttavia, se vuoi ottimizzare l'uso di più GPU, devi usare tf.distribute, che è il modulo di TensorFlow per eseguire un calcolo su più dispositivi.

La prima sezione di questo lab utilizza tf.distribute.MirroredStrategy, che puoi aggiungere alle applicazioni di addestramento con poche modifiche al codice. Questa strategia crea una copia del modello su ogni GPU della tua macchina. I successivi aggiornamenti del gradiente avverranno in modo sincrono. Ciò significa che ogni GPU calcola i passaggi avanti e indietro attraverso il modello su una sezione diversa dei dati di input. I gradienti calcolati da ciascuna di queste sezioni vengono quindi aggregati in tutte le GPU e viene calcolata la media in un processo noto come all-reduce. I parametri del modello vengono aggiornati utilizzando questi gradienti calcolati in media.

Nella sezione facoltativa alla fine del lab viene utilizzato tf.distribute.MultiWorkerMirroredStrategy, che è simile a MirroredStrategy, ad eccezione del fatto che funziona su più macchine. Ognuna di queste macchine potrebbe anche avere più GPU. Ad esempio, MirroredStrategy, MultiWorkerMirroredStrategy è una strategia di parallelismo dei dati sincrona che puoi utilizzare con poche modifiche al codice. La differenza principale quando si passa dal parallelismo dei dati sincrono su una macchina a più macchine è che i gradienti alla fine di ogni passaggio ora devono essere sincronizzati su tutte le GPU di una macchina e su tutte le macchine nel cluster.

Non devi conoscere i dettagli per completare questo lab, ma se vuoi saperne di più su come funziona l'addestramento distribuito in TensorFlow, guarda il video qui sotto:

4. Configura l'ambiente

Completa i passaggi nel lab Addestramento di modelli personalizzati con Vertex AI per configurare il tuo ambiente.

5. Addestramento su una singola macchina, più GPU

Invierai il job di addestramento distribuito a Vertex AI inserendo il codice dell'applicazione di addestramento in un container Docker ed eseguendo il push di questo container a Google Artifact Registry. Con questo approccio, puoi addestrare un modello creato con qualsiasi framework.

Per iniziare, apri una finestra del terminale dal menu Avvio app del blocco note di Workbench creato nei lab precedenti.

Apri terminale nel blocco note

Passaggio 1: scrivi il codice di addestramento

Crea una nuova directory chiamata flowers-multi-gpu e accedi tramite cd:

mkdir flowers-multi-gpu
cd flowers-multi-gpu

Esegui il comando riportato di seguito per creare una directory per il codice di addestramento e un file Python in cui aggiungerai il codice riportato di seguito.

mkdir trainer
touch trainer/task.py

Ora dovresti avere quanto segue nella directory flowers-multi-gpu/:

+ trainer/
    + task.py

Successivamente, apri il file task.py che hai appena creato e copia il codice seguente.

Dovrai sostituire {your-gcs-bucket} in BUCKET_ROOT con il bucket Cloud Storage in cui hai archiviato il set di dati sui fiori nel Lab 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset


def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():  

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')


if __name__ == "__main__":
    main()

Prima di creare il container, diamo uno sguardo più approfondito al codice. Esistono alcuni componenti specifici dell'utilizzo dell'addestramento distribuito.

  • Nella funzione main() viene creato l'oggetto MirroredStrategy. A questo punto, completerai la creazione delle variabili del modello nell'ambito della strategia. Questo passaggio indica a TensorFlow quali variabili devono essere sottoposte a mirroring tra le GPU.
  • La dimensione del batch viene scalata dall'num_replicas_in_sync. La scalabilità della dimensione del batch è una best practice quando si utilizzano strategie di parallelismo dei dati sincroni in TensorFlow. Puoi scoprire di più qui.

Passaggio 2: crea un Dockerfile

Per containerizzare il codice, dovrai creare un Dockerfile. Nel Dockerfile includerai tutti i comandi necessari per eseguire l'immagine. Installerà tutte le librerie necessarie e configurerà il punto di accesso per il codice di addestramento.

Dal tuo terminale, crea un Dockerfile vuoto nella radice della directory flowers:

touch Dockerfile

Ora dovresti avere quanto segue nella directory flowers-multi-gpu/:

+ Dockerfile
+ trainer/
    + task.py

Apri il Dockerfile e copia al suo interno quanto segue:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Passaggio 3: crea il container

Dal tuo terminale, esegui quanto segue per definire una variabile env per il tuo progetto, assicurandoti di sostituire your-cloud-project con l'ID del tuo progetto:

PROJECT_ID='your-cloud-project'

Crea un repository in Artifact Registry. Useremo il repository che abbiamo creato nel primo lab.

REPO_NAME='flower-app'

Definisci una variabile con l'URI dell'immagine container in Artifact Registry:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

Configura Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Quindi, crea il container eseguendo quanto segue dalla radice della directory flowers-multi-gpu:

docker build ./ -t $IMAGE_URI

Infine, esegui il push in Artifact Registry:

docker push $IMAGE_URI

Dopo il push del container in Artifact Registry, ora puoi avviare un job di addestramento.

Passaggio 4: esegui il job con l'SDK

In questa sezione vedrai come configurare e avviare il job di addestramento distribuito utilizzando l'SDK Vertex AI per Python.

Da Avvio app, crea un blocco note TensorFlow 2.

new_notebook

Importa l'SDK Vertex AI.

from google.cloud import aiplatform

Poi definisci un valore CustomContainerTrainingJob.

Dovrai sostituire {PROJECT_ID} in container_uri e {YOUR_BUCKET} in staging_bucket.

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

Dopo aver definito il job, puoi eseguirlo. Imposterai il numero di acceleratori su 2. Se utilizzassimo una sola GPU, non viene considerato addestramento distribuito. L'addestramento distribuito su una singola macchina si verifica quando si utilizzano 2 o più acceleratori.

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

Nella console potrai visualizzare l'avanzamento del job.

multigpu_job

6. [Facoltativo] Formazione per più lavoratori

Ora che hai provato l'addestramento distribuito su una singola macchina con più GPU, puoi migliorare le tue competenze di addestramento distribuito su più macchine. Per contenere i costi, non aggiungeremo GPU a queste macchine, ma potresti sperimentare aggiungendo GPU se vuoi.

Apri una nuova finestra del terminale nell'istanza del blocco note:

Apri terminale nel blocco note

Passaggio 1: scrivi il codice di addestramento

Crea una nuova directory chiamata flowers-multi-machine e accedi tramite cd:

mkdir flowers-multi-machine
cd flowers-multi-machine

Esegui il comando riportato di seguito per creare una directory per il codice di addestramento e un file Python in cui aggiungerai il codice riportato di seguito.

mkdir trainer
touch trainer/task.py

Ora dovresti avere quanto segue nella directory flowers-multi-machine/:

+ trainer/
    + task.py

Successivamente, apri il file task.py che hai appena creato e copia il codice seguente.

Dovrai sostituire {your-gcs-bucket} in BUCKET_ROOT con il bucket Cloud Storage in cui hai archiviato il set di dati sui fiori nel Lab 1.

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset


def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

Prima di creare il container, diamo uno sguardo più approfondito al codice. Il codice contiene alcuni componenti necessari affinché l'applicazione di addestramento funzioni con MultiWorkerMirroredStrategy.

  • Nella funzione main() viene creato l'oggetto MultiWorkerMirroredStrategy. A questo punto, completerai la creazione delle variabili del modello nell'ambito della strategia. Questo passaggio fondamentale indica a TensorFlow quali variabili devono essere sottoposte a mirroring tra le repliche.
  • La dimensione del batch viene scalata dall'num_replicas_in_sync. La scalabilità della dimensione del batch è una best practice quando si utilizzano strategie di parallelismo dei dati sincroni in TensorFlow.
  • Salvare il modello è leggermente più complicato nel caso con più worker perché la destinazione deve essere diversa per ogni worker. Il chief worker salverà il modello nella directory del modello desiderata, mentre gli altri worker salvano il modello in directory temporanee. È importante che queste directory temporanee siano univoche per evitare che più worker scrivano nella stessa posizione. Il risparmio può contenere operazioni collettive, cioè tutti i lavoratori devono salvare, non solo il capo. Le funzioni _is_chief(), _get_temp_dir(), write_filepath() e la funzione main() includono tutte il codice boilerplate che consente di salvare il modello.

Passaggio 2: crea un Dockerfile

Per containerizzare il codice, dovrai creare un Dockerfile. Nel Dockerfile includerai tutti i comandi necessari per eseguire l'immagine. Installerà tutte le librerie necessarie e configurerà il punto di accesso per il codice di addestramento.

Dal tuo terminale, crea un Dockerfile vuoto nella radice della directory flowers:

touch Dockerfile

Ora dovresti avere quanto segue nella directory flowers-multi-machine/:

+ Dockerfile
+ trainer/
    + task.py

Apri il Dockerfile e copia al suo interno quanto segue:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

Passaggio 3: crea il container

Dal tuo terminale, esegui quanto segue per definire una variabile env per il tuo progetto, assicurandoti di sostituire your-cloud-project con l'ID del tuo progetto:

PROJECT_ID='your-cloud-project'

Crea un repository in Artifact Registry. Useremo il repository che abbiamo creato nel primo lab.

REPO_NAME='flower-app'

Definisci una variabile con l'URI dell'immagine container in Google Artifact Registry:

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

Configura Docker

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

Quindi, crea il container eseguendo quanto segue dalla radice della directory flowers-multi-machine:

docker build ./ -t $IMAGE_URI

Infine, esegui il push in Artifact Registry:

docker push $IMAGE_URI

Dopo il push del container in Artifact Registry, ora puoi avviare un job di addestramento.

Passaggio 4: esegui il job con l'SDK

In questa sezione vedrai come configurare e avviare il job di addestramento distribuito utilizzando l'SDK Vertex AI per Python.

Da Avvio app, crea un blocco note TensorFlow 2.

new_notebook

Importa l'SDK Vertex AI.

from google.cloud import aiplatform

Poi definisci il valore worker_pool_specs.

Vertex AI fornisce 4 pool di worker per coprire i diversi tipi di attività delle macchine.

Il pool di worker 0 configura il principale, principale, scheduler o "master". In MultiWorkerMirroredStrategy, tutte le macchine sono designate come worker, ovvero le macchine fisiche su cui viene eseguito il calcolo replicato. Oltre che ogni macchina sia un worker, deve esserci un worker che svolge alcune attività extra come il salvataggio dei checkpoint e la scrittura dei file di riepilogo su TensorBoard. Questa macchina è nota come "Chief". Esiste sempre un solo Chief worker, quindi il conteggio dei worker per il pool di worker 0 sarà sempre 1.

Il pool di worker 1 è il luogo in cui configuri i worker aggiuntivi per il cluster.

Il primo dizionario nell'elenco worker_pool_specs rappresenta il pool di worker 0 e il secondo dizionario rappresenta il pool di worker 1. In questo esempio, le due configurazioni sono identiche. Tuttavia, se volessi eseguire l'addestramento su 3 macchine, potresti aggiungere ulteriori worker al pool di worker 1 impostando replica_count su 2. Se vuoi aggiungere GPU, dovrai aggiungere gli argomenti accelerator_type e accelerator_count a machine_spec per entrambi i pool di worker. Tieni presente che se vuoi utilizzare GPU con MultiWorkerMirroredStrategy, ogni macchina nel cluster deve avere un numero identico di GPU. In caso contrario, il job non andrà a buon fine.

Dovrai sostituire {PROJECT_ID} in image_uri.

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

Poi crea ed esegui un CustomJob, sostituendo {YOUR_BUCKET} in staging_bucket con un bucket nel tuo progetto per la gestione temporanea.

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

Nella console potrai visualizzare l'avanzamento del job.

multi_worker_job

🎉 Complimenti! 🎉

Hai imparato come utilizzare Vertex AI per:

  • Esegui job di addestramento distribuito con TensorFlow

Per scoprire di più sulle diverse parti di Vertex, consulta la documentazione.

7. Esegui la pulizia

Poiché abbiamo configurato il timeout del blocco note dopo 60 minuti di inattività, non dobbiamo preoccuparci di arrestare l'istanza. Se vuoi arrestare manualmente l'istanza, fai clic sul pulsante Arresta nella sezione Vertex AI Workbench della console. Per eliminare completamente il blocco note, fai clic sul pulsante Elimina.

Arresta istanza

Per eliminare il bucket di archiviazione, utilizzando il menu di navigazione nella console Cloud, vai a Storage, seleziona il bucket e fai clic su Elimina:

Elimina spazio di archiviazione