プロトタイプから本番環境へ: Vertex AI での分散トレーニング

1. 概要

このラボでは、Vertex AI と TensorFlow を使用して、Vertex AI Training で分散トレーニング ジョブを実行します。

このラボは、動画シリーズ「プロトタイプから本番環境へ」の一部です。このラボに進む前に、前のラボを完了してください。詳細については、関連する動画シリーズでご確認ください。

学習内容

次の方法を学習します。

  • 単一マシンで複数の GPU を使用して分散トレーニングを実行する
  • 複数のマシンで分散トレーニングを実行する

このラボを Google Cloud で実行するための総費用は約 $2 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下でハイライト表示されているプロダクト(TrainingWorkbench)を中心に学習します。

Vertex プロダクトの概要

3. 分散トレーニングの概要

GPU が 1 つの場合、TensorFlow はこのアクセラレータを使用して、追加作業なしでモデル トレーニングを高速化します。しかし、複数の GPU を使用することでさらなる効果を得たい場合は、複数のデバイスで計算を実行するための TensorFlow のモジュールである tf.distribute を使用する必要があります。

このラボの最初のセクションでは tf.distribute.MirroredStrategy を使用します。このコードは、わずかな変更でトレーニング アプリケーションに追加できます。この戦略では、マシン上の各 GPU にモデルのコピーを作成します。その後の勾配の更新は同期的に行われます。これは、各 GPU が、入力データの異なるスライスに対して、モデルを使用してフォワードパスとバックワード パスを計算することを意味しています。これらのスライスから計算された勾配は、すべての GPU に集約され、all-reduce と呼ばれるプロセスで平均化されます。モデル パラメータは、これらの平均勾配を使用して更新されます。

このラボの最後にある省略可能なセクションでは tf.distribute.MultiWorkerMirroredStrategy を使用しますが、これは複数のマシンで実行される点を除けば、MirroredStrategy と類似しています。各マシンで複数の GPU を使用することもできます。MirroredStrategy と同様に、MultiWorkerMirroredStrategy は、わずかなコード変更で使用できる同期型のデータ並列処理戦略です。同期型のデータ並列処理を 1 台のマシンから複数のマシンに移行する際の主な違いは、各ステップの最後で勾配の同期をマシンのすべての GPU で行うのか、クラスタ内のすべてのマシンで行うかという点です。

このラボを完了するために TensorFlow の分散トレーニングの仕組みを詳しく知る必要はありませんが、興味のある方は次の動画をご覧ください。

4.環境を設定する

Vertex AI を使用したカスタムモデルのトレーニング ラボのステップを完了して、環境を設定します。

5. 単一マシンで複数の GPU を使用したトレーニング

分散トレーニング ジョブを Vertex AI に送信しましょう。トレーニング アプリケーション コードを Docker コンテナに格納して、このコンテナを Google Artifact Registry に push します。この方法を使用すると、どのフレームワークで構築されたモデルでもトレーニングできます。

前のラボで作成した Workbench ノートブックの Launcher メニューで、ターミナル ウィンドウを開きます。

ノートブックでターミナルを開く

ステップ 1: トレーニング コードを作成する

flowers-multi-gpu という新しいディレクトリを作成し、そのディレクトリに移動します。

mkdir flowers-multi-gpu
cd flowers-multi-gpu

次のコマンドを実行して、トレーニング コードのディレクトリと、以下のコードを追加する Python ファイルを作成します。

mkdir trainer
touch trainer/task.py

flowers-multi-gpu/ ディレクトリに、次のものが作成されます。

+ trainer/
    + task.py

いま作成した task.py ファイルを開いて、次のコードをコピーします。

BUCKET_ROOT{your-gcs-bucket} は、ラボ 1 で花のデータセットを保存した Cloud Storage バケットに置き換えます。

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def main():

  # Create distribution strategy
  strategy = tf.distribute.MirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap model creation and compilation within scope of strategy
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  model.save(f'{BUCKET_ROOT}/model_output')

if __name__ == "__main__":
    main()

コンテナをビルドする前に、コードを詳しく見ておきましょう。分散トレーニングに固有のコンポーネントがいくつかあります。

  • main() 関数で、MirroredStrategy オブジェクトが作成されます。次に、モデル変数の作成を戦略の範囲内に収めます。このステップでは、GPU 間でミラーリングする変数を TensorFlow に指定します。
  • バッチサイズは num_replicas_in_sync でスケールアップされます。バッチサイズのスケーリングは、TensorFlow で同期データ並列処理戦略を使用する場合のベスト プラクティスです。詳しくはこちらをご覧ください

ステップ 2: Dockerfile を作成する

コードをコンテナ化するには、Dockerfile ファイルを作成する必要があります。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。必要なライブラリがすべてインストールされ、トレーニング コードのエントリ ポイントが設定されます。

ターミナルで、flowers ディレクトリのルートに空の Dockerfile を作成します。

touch Dockerfile

flowers-multi-gpu/ ディレクトリに、次のものが作成されます。

+ Dockerfile
+ trainer/
    + task.py

Dockerfile を開き、次の内容をコピーします。

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

ステップ 3: コンテナをビルドする

ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project は実際のプロジェクト ID で置き換えてください。

PROJECT_ID='your-cloud-project'

Artifact Registry にリポジトリを作成します。最初のラボで作成したリポジトリを使用します。

REPO_NAME='flower-app'

Artifact Registry 内のコンテナ イメージの URI を示す変数を定義します。

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine

Docker を構成します。

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

続いて、flowers-multi-gpu ディレクトリのルートで次のように実行してコンテナをビルドします。

docker build ./ -t $IMAGE_URI

最後に、コンテナを Artifact Registry に push します。

docker push $IMAGE_URI

コンテナが Artifact Registry に push され、トレーニング ジョブを開始する準備ができました。

ステップ 4: SDK を使用してジョブを実行する

このセクションでは、Vertex AI Python SDK で分散トレーニング ジョブを構成して起動する方法を説明します。

Launcher から TensorFlow 2 ノートブックを作成します。

new_notebook

Vertex AI SDK をインポートします。

from google.cloud import aiplatform

CustomContainerTrainingJob を定義します。

container_uri{PROJECT_ID}staging_bucket{YOUR_BUCKET} は置き換える必要があります。

job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
                                            container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
                                            staging_bucket='gs://{YOUR_BUCKET}')

ジョブを定義したら、ジョブを実行できます。アクセラレータの数を 2 に設定します。1 つの GPU しか使用していない場合、分散トレーニングにはなりません。1 台のマシンで分散トレーニングを行う場合は、2 つ以上のアクセラレータを使用します。

my_custom_job.run(replica_count=1,
                  machine_type='n1-standard-4',
                  accelerator_type='NVIDIA_TESLA_V100',
                  accelerator_count=2)

コンソールでジョブの進行状況を確認できます。

multigpu_job

6. [省略可] マルチワーカー トレーニング

1 台のマシンで複数の GPU を使用して分散トレーニングを試しました。では、スキルレベルを上げるため、複数のマシンで分散トレーニングを実行してみましょう。ここでは、費用を抑えるためにマシンに GPU を追加しませんが、興味があれば GPU を追加して試してみてください。

ノートブック インスタンスで新しいターミナル ウィンドウを開きます。

ノートブックでターミナルを開く

ステップ 1: トレーニング コードを作成する

flowers-multi-machine という新しいディレクトリを作成し、そのディレクトリに移動します。

mkdir flowers-multi-machine
cd flowers-multi-machine

次のコマンドを実行して、トレーニング コードのディレクトリと、以下のコードを追加する Python ファイルを作成します。

mkdir trainer
touch trainer/task.py

flowers-multi-machine/ ディレクトリに、次のものが作成されます。

+ trainer/
    + task.py

いま作成した task.py ファイルを開いて、次のコードをコピーします。

BUCKET_ROOT{your-gcs-bucket} は、ラボ 1 で花のデータセットを保存した Cloud Storage バケットに置き換えます。

import tensorflow as tf
import numpy as np
import os

## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'

# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32

IMG_HEIGHT = 180
IMG_WIDTH = 180

DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'

def create_datasets(data_dir, batch_size):
  '''Creates train and validation datasets.'''

  train_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="training",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  validation_dataset = tf.keras.utils.image_dataset_from_directory(
    data_dir,
    validation_split=0.2,
    subset="validation",
    seed=123,
    image_size=(IMG_HEIGHT, IMG_WIDTH),
    batch_size=batch_size)

  train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
  validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)

  return train_dataset, validation_dataset

def create_model():
  '''Creates model.'''

  model = tf.keras.Sequential([
    tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
    tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
    tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
    tf.keras.layers.MaxPooling2D(),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
  ])
  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'

def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create distribution strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
  train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model()
    model.compile(optimizer=tf.keras.optimizers.Adam(),
                  loss=tf.keras.losses.SparseCategoricalCrossentropy(),
                  metrics=['accuracy'])

  history = model.fit(
    train_dataset,
    validation_data=validation_dataset,
    epochs=EPOCHS
  )

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

コンテナをビルドする前に、コードを詳しく見ておきましょう。コードには、トレーニング アプリケーションを MultiWorkerMirroredStrategy で機能させるために必要なコンポーネントがいくつかあります。

  • main() 関数で、MultiWorkerMirroredStrategy オブジェクトが作成されます。次に、モデル変数の作成を戦略の範囲内に収めます。この重要なステップでは、レプリカ間でミラーリングする変数を TensorFlow に指定します。
  • バッチサイズは num_replicas_in_sync でスケールアップされます。バッチサイズのスケーリングは、TensorFlow で同期データ並列処理戦略を使用する場合のベスト プラクティスです。
  • マルチワーカーの場合、ワーカーごとに宛先を別にする必要があるため、モデルの保存は若干複雑になります。チーフワーカーはモデルを目的のモデル ディレクトリに保存しますが、他のワーカーは一時ディレクトリに保存します。複数のワーカーが同じ場所に書き込みを行わないように、これらの一時ディレクトリは一意にする必要があります。保存にはグループ演算が含まれることがあります。これは、チーフだけでなく、すべてのワーカーが保存を実行する必要があることを意味します。_is_chief()_get_temp_dir()write_filepath() 関数と main() 関数には、モデルの保存に役立つボイラープレート コードが含まれています。

ステップ 2: Dockerfile を作成する

コードをコンテナ化するには、Dockerfile ファイルを作成する必要があります。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。必要なライブラリがすべてインストールされ、トレーニング コードのエントリ ポイントが設定されます。

ターミナルで、flowers ディレクトリのルートに空の Dockerfile を作成します。

touch Dockerfile

flowers-multi-machine/ ディレクトリに、次のものが作成されます。

+ Dockerfile
+ trainer/
    + task.py

Dockerfile を開き、次の内容をコピーします。

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

ステップ 3: コンテナをビルドする

ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project は実際のプロジェクト ID で置き換えてください。

PROJECT_ID='your-cloud-project'

Artifact Registry にリポジトリを作成します。最初のラボで作成したリポジトリを使用します。

REPO_NAME='flower-app'

Google Artifact Registry 内のコンテナ イメージの URI を示す変数を定義します。

IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine

Docker を構成します。

gcloud auth configure-docker \
    us-central1-docker.pkg.dev

続いて、flowers-multi-machine ディレクトリのルートで次のように実行してコンテナをビルドします。

docker build ./ -t $IMAGE_URI

最後に、コンテナを Artifact Registry に push します。

docker push $IMAGE_URI

コンテナが Artifact Registry に push され、トレーニング ジョブを開始する準備ができました。

ステップ 4: SDK を使用してジョブを実行する

このセクションでは、Vertex AI Python SDK で分散トレーニング ジョブを構成して起動する方法を説明します。

Launcher から TensorFlow 2 ノートブックを作成します。

new_notebook

Vertex AI SDK をインポートします。

from google.cloud import aiplatform

worker_pool_specs を定義します。

Vertex AI は、さまざまなタイプのマシンタスクに対応するため、4 つのワーカープールを提供します。

ワーカープール 0 は、プライマリ、チーフ、スケジューラ、または「マスター」を構成します。MultiWorkerMirroredStrategy では、すべてのマシンがワーカーに指定されています。これは物理マシンで、複製された計算が実行されます。すべてのマシンがワーカーになるだけでなく、1 つのワーカーではチェックポイントの保存やサマリー ファイルの TensorBoard への書き込みなどの余分な作業を行う必要があります。このマシンをチーフといいます。チーフワーカーは 1 つしかないので、ワーカープール 0 のワーカー数は必ず 1 になります。

ワーカープール 1 で、クラスタの他のワーカーを構成します。

worker_pool_specs リストの最初のディレクトリはワーカープール 0 を表し、2 番目のディレクトリはワーカープール 1 を表します。このサンプルで 2 つの構成はまったく同じです。ただし、3 台のマシンでトレーニングを行う場合は、replica_count を 2 に設定して、ワーカープール 1 にワーカーを追加します。GPU を追加する場合は、両方のワーカープールで引数 accelerator_typeaccelerator_countmachine_spec を追加する必要があります。MultiWorkerMirroredStrategy で GPU を使用する場合は、クラスタ内の各マシンが同じ数の GPU を使用する必要があります。そうでないと、ジョブが失敗します。

image_uri{PROJECT_ID} は置き換える必要があります。

# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.

worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-4",
        },
        "container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
      }
          ]

次に、CustomJob を作成して実行します。staging_bucket{YOUR_BUCKET} は、ステージング用プロジェクトのバケットで置き換えます。

my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
                                     worker_pool_specs=worker_pool_specs,
                                     staging_bucket='gs://{YOUR_BUCKET}')

my_custom_job.run()

コンソールでジョブの進行状況を確認できます。

multi_worker_job

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • TensorFlow で分散トレーニング ジョブを実行する

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

7. クリーンアップ

ノートブックは、アイドル状態で 60 分が経過するとタイムアウトするように構成されています。このため、インスタンスのシャットダウンを心配する必要はありません。インスタンスを手動でシャットダウンする場合は、Console で [Vertex AI] の [ワークベンチ] セクションにある [停止] ボタンをクリックします。ノートブックを完全に削除する場合は、[削除] ボタンをクリックします。

インスタンスの停止

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除