1. 概要
このラボでは、Vertex AI と TensorFlow を使用して、Vertex AI Training で分散トレーニング ジョブを実行します。
このラボは、動画シリーズ「プロトタイプから本番環境へ」の一部です。このラボに進む前に、前のラボを完了してください。詳細については、関連する動画シリーズでご確認ください。
。
学習内容
次の方法を学習します。
- 単一マシンで複数の GPU を使用して分散トレーニングを実行する
- 複数のマシンで分散トレーニングを実行する
このラボを Google Cloud で実行するための総費用は約 $2 です。
2. Vertex AI の概要
このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。
Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下でハイライト表示されているプロダクト(Training と Workbench)を中心に学習します。
3. 分散トレーニングの概要
GPU が 1 つの場合、TensorFlow はこのアクセラレータを使用して、追加作業なしでモデル トレーニングを高速化します。しかし、複数の GPU を使用することでさらなる効果を得たい場合は、複数のデバイスで計算を実行するための TensorFlow のモジュールである tf.distribute
を使用する必要があります。
このラボの最初のセクションでは tf.distribute.MirroredStrategy
を使用します。このコードは、わずかな変更でトレーニング アプリケーションに追加できます。この戦略では、マシン上の各 GPU にモデルのコピーを作成します。その後の勾配の更新は同期的に行われます。これは、各 GPU が、入力データの異なるスライスに対して、モデルを使用してフォワードパスとバックワード パスを計算することを意味しています。これらのスライスから計算された勾配は、すべての GPU に集約され、all-reduce と呼ばれるプロセスで平均化されます。モデル パラメータは、これらの平均勾配を使用して更新されます。
このラボの最後にある省略可能なセクションでは tf.distribute.MultiWorkerMirroredStrategy
を使用しますが、これは複数のマシンで実行される点を除けば、MirroredStrategy
と類似しています。各マシンで複数の GPU を使用することもできます。MirroredStrategy
と同様に、MultiWorkerMirroredStrategy
は、わずかなコード変更で使用できる同期型のデータ並列処理戦略です。同期型のデータ並列処理を 1 台のマシンから複数のマシンに移行する際の主な違いは、各ステップの最後で勾配の同期をマシンのすべての GPU で行うのか、クラスタ内のすべてのマシンで行うかという点です。
このラボを完了するために TensorFlow の分散トレーニングの仕組みを詳しく知る必要はありませんが、興味のある方は次の動画をご覧ください。
4.環境を設定する
Vertex AI を使用したカスタムモデルのトレーニング ラボのステップを完了して、環境を設定します。
5. 単一マシンで複数の GPU を使用したトレーニング
分散トレーニング ジョブを Vertex AI に送信しましょう。トレーニング アプリケーション コードを Docker コンテナに格納して、このコンテナを Google Artifact Registry に push します。この方法を使用すると、どのフレームワークで構築されたモデルでもトレーニングできます。
前のラボで作成した Workbench ノートブックの Launcher メニューで、ターミナル ウィンドウを開きます。
ステップ 1: トレーニング コードを作成する
flowers-multi-gpu
という新しいディレクトリを作成し、そのディレクトリに移動します。
mkdir flowers-multi-gpu
cd flowers-multi-gpu
次のコマンドを実行して、トレーニング コードのディレクトリと、以下のコードを追加する Python ファイルを作成します。
mkdir trainer
touch trainer/task.py
flowers-multi-gpu/
ディレクトリに、次のものが作成されます。
+ trainer/
+ task.py
いま作成した task.py
ファイルを開いて、次のコードをコピーします。
BUCKET_ROOT
の {your-gcs-bucket}
は、ラボ 1 で花のデータセットを保存した Cloud Storage バケットに置き換えます。
import tensorflow as tf
import numpy as np
import os
## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'
# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32
IMG_HEIGHT = 180
IMG_WIDTH = 180
DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
def create_datasets(data_dir, batch_size):
'''Creates train and validation datasets.'''
train_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
validation_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
return train_dataset, validation_dataset
def create_model():
'''Creates model.'''
model = tf.keras.Sequential([
tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
])
return model
def main():
# Create distribution strategy
strategy = tf.distribute.MirroredStrategy()
# Get data
GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)
# Wrap model creation and compilation within scope of strategy
with strategy.scope():
model = create_model()
model.compile(optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=['accuracy'])
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=EPOCHS
)
model.save(f'{BUCKET_ROOT}/model_output')
if __name__ == "__main__":
main()
コンテナをビルドする前に、コードを詳しく見ておきましょう。分散トレーニングに固有のコンポーネントがいくつかあります。
main()
関数で、MirroredStrategy
オブジェクトが作成されます。次に、モデル変数の作成を戦略の範囲内に収めます。このステップでは、GPU 間でミラーリングする変数を TensorFlow に指定します。- バッチサイズは
num_replicas_in_sync
でスケールアップされます。バッチサイズのスケーリングは、TensorFlow で同期データ並列処理戦略を使用する場合のベスト プラクティスです。詳しくはこちらをご覧ください。
ステップ 2: Dockerfile を作成する
コードをコンテナ化するには、Dockerfile ファイルを作成する必要があります。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。必要なライブラリがすべてインストールされ、トレーニング コードのエントリ ポイントが設定されます。
ターミナルで、flowers ディレクトリのルートに空の Dockerfile を作成します。
touch Dockerfile
flowers-multi-gpu/
ディレクトリに、次のものが作成されます。
+ Dockerfile
+ trainer/
+ task.py
Dockerfile を開き、次の内容をコピーします。
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8
WORKDIR /
# Copies the trainer code to the docker image.
COPY trainer /trainer
# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
ステップ 3: コンテナをビルドする
ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project
は実際のプロジェクト ID で置き換えてください。
PROJECT_ID='your-cloud-project'
Artifact Registry にリポジトリを作成します。最初のラボで作成したリポジトリを使用します。
REPO_NAME='flower-app'
Artifact Registry 内のコンテナ イメージの URI を示す変数を定義します。
IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:single_machine
Docker を構成します。
gcloud auth configure-docker \
us-central1-docker.pkg.dev
続いて、flowers-multi-gpu
ディレクトリのルートで次のように実行してコンテナをビルドします。
docker build ./ -t $IMAGE_URI
最後に、コンテナを Artifact Registry に push します。
docker push $IMAGE_URI
コンテナが Artifact Registry に push され、トレーニング ジョブを開始する準備ができました。
ステップ 4: SDK を使用してジョブを実行する
このセクションでは、Vertex AI Python SDK で分散トレーニング ジョブを構成して起動する方法を説明します。
Launcher から TensorFlow 2 ノートブックを作成します。
Vertex AI SDK をインポートします。
from google.cloud import aiplatform
CustomContainerTrainingJob
を定義します。
container_uri
の {PROJECT_ID}
と staging_bucket
の {YOUR_BUCKET}
は置き換える必要があります。
job = aiplatform.CustomContainerTrainingJob(display_name='flowers-multi-gpu',
container_uri='us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:single_machine',
staging_bucket='gs://{YOUR_BUCKET}')
ジョブを定義したら、ジョブを実行できます。アクセラレータの数を 2 に設定します。1 つの GPU しか使用していない場合、分散トレーニングにはなりません。1 台のマシンで分散トレーニングを行う場合は、2 つ以上のアクセラレータを使用します。
my_custom_job.run(replica_count=1,
machine_type='n1-standard-4',
accelerator_type='NVIDIA_TESLA_V100',
accelerator_count=2)
コンソールでジョブの進行状況を確認できます。
6. [省略可] マルチワーカー トレーニング
1 台のマシンで複数の GPU を使用して分散トレーニングを試しました。では、スキルレベルを上げるため、複数のマシンで分散トレーニングを実行してみましょう。ここでは、費用を抑えるためにマシンに GPU を追加しませんが、興味があれば GPU を追加して試してみてください。
ノートブック インスタンスで新しいターミナル ウィンドウを開きます。
ステップ 1: トレーニング コードを作成する
flowers-multi-machine
という新しいディレクトリを作成し、そのディレクトリに移動します。
mkdir flowers-multi-machine
cd flowers-multi-machine
次のコマンドを実行して、トレーニング コードのディレクトリと、以下のコードを追加する Python ファイルを作成します。
mkdir trainer
touch trainer/task.py
flowers-multi-machine/
ディレクトリに、次のものが作成されます。
+ trainer/
+ task.py
いま作成した task.py
ファイルを開いて、次のコードをコピーします。
BUCKET_ROOT
の {your-gcs-bucket}
は、ラボ 1 で花のデータセットを保存した Cloud Storage バケットに置き換えます。
import tensorflow as tf
import numpy as np
import os
## Replace {your-gcs-bucket} !!
BUCKET_ROOT='/gcs/{your-gcs-bucket}'
# Define variables
NUM_CLASSES = 5
EPOCHS=10
BATCH_SIZE = 32
IMG_HEIGHT = 180
IMG_WIDTH = 180
DATA_DIR = f'{BUCKET_ROOT}/flower_photos'
SAVE_MODEL_DIR = f'{BUCKET_ROOT}/multi-machine-output'
def create_datasets(data_dir, batch_size):
'''Creates train and validation datasets.'''
train_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="training",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
validation_dataset = tf.keras.utils.image_dataset_from_directory(
data_dir,
validation_split=0.2,
subset="validation",
seed=123,
image_size=(IMG_HEIGHT, IMG_WIDTH),
batch_size=batch_size)
train_dataset = train_dataset.cache().shuffle(1000).prefetch(buffer_size=tf.data.AUTOTUNE)
validation_dataset = validation_dataset.cache().prefetch(buffer_size=tf.data.AUTOTUNE)
return train_dataset, validation_dataset
def create_model():
'''Creates model.'''
model = tf.keras.Sequential([
tf.keras.layers.Resizing(IMG_HEIGHT, IMG_WIDTH),
tf.keras.layers.Rescaling(1./255, input_shape=(IMG_HEIGHT, IMG_WIDTH, 3)),
tf.keras.layers.Conv2D(16, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(32, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Conv2D(64, 3, padding='same', activation='relu'),
tf.keras.layers.MaxPooling2D(),
tf.keras.layers.Flatten(),
tf.keras.layers.Dense(128, activation='relu'),
tf.keras.layers.Dense(NUM_CLASSES, activation='softmax')
])
return model
def _is_chief(task_type, task_id):
'''Helper function. Determines if machine is chief.'''
return task_type == 'chief'
def _get_temp_dir(dirpath, task_id):
'''Helper function. Gets temporary directory for saving model.'''
base_dirpath = 'workertemp_' + str(task_id)
temp_dir = os.path.join(dirpath, base_dirpath)
tf.io.gfile.makedirs(temp_dir)
return temp_dir
def write_filepath(filepath, task_type, task_id):
'''Helper function. Gets filepath to save model.'''
dirpath = os.path.dirname(filepath)
base = os.path.basename(filepath)
if not _is_chief(task_type, task_id):
dirpath = _get_temp_dir(dirpath, task_id)
return os.path.join(dirpath, base)
def main():
# Create distribution strategy
strategy = tf.distribute.MultiWorkerMirroredStrategy()
# Get data
GLOBAL_BATCH_SIZE = BATCH_SIZE * strategy.num_replicas_in_sync
train_dataset, validation_dataset = create_datasets(DATA_DIR, BATCH_SIZE)
# Wrap variable creation within strategy scope
with strategy.scope():
model = create_model()
model.compile(optimizer=tf.keras.optimizers.Adam(),
loss=tf.keras.losses.SparseCategoricalCrossentropy(),
metrics=['accuracy'])
history = model.fit(
train_dataset,
validation_data=validation_dataset,
epochs=EPOCHS
)
# Determine type and task of the machine from
# the strategy cluster resolver
task_type, task_id = (strategy.cluster_resolver.task_type,
strategy.cluster_resolver.task_id)
# Based on the type and task, write to the desired model path
write_model_path = write_filepath(SAVE_MODEL_DIR, task_type, task_id)
model.save(write_model_path)
if __name__ == "__main__":
main()
コンテナをビルドする前に、コードを詳しく見ておきましょう。コードには、トレーニング アプリケーションを MultiWorkerMirroredStrategy
で機能させるために必要なコンポーネントがいくつかあります。
main()
関数で、MultiWorkerMirroredStrategy
オブジェクトが作成されます。次に、モデル変数の作成を戦略の範囲内に収めます。この重要なステップでは、レプリカ間でミラーリングする変数を TensorFlow に指定します。- バッチサイズは
num_replicas_in_sync
でスケールアップされます。バッチサイズのスケーリングは、TensorFlow で同期データ並列処理戦略を使用する場合のベスト プラクティスです。 - マルチワーカーの場合、ワーカーごとに宛先を別にする必要があるため、モデルの保存は若干複雑になります。チーフワーカーはモデルを目的のモデル ディレクトリに保存しますが、他のワーカーは一時ディレクトリに保存します。複数のワーカーが同じ場所に書き込みを行わないように、これらの一時ディレクトリは一意にする必要があります。保存にはグループ演算が含まれることがあります。これは、チーフだけでなく、すべてのワーカーが保存を実行する必要があることを意味します。
_is_chief()
、_get_temp_dir()
、write_filepath()
関数とmain()
関数には、モデルの保存に役立つボイラープレート コードが含まれています。
ステップ 2: Dockerfile を作成する
コードをコンテナ化するには、Dockerfile ファイルを作成する必要があります。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。必要なライブラリがすべてインストールされ、トレーニング コードのエントリ ポイントが設定されます。
ターミナルで、flowers ディレクトリのルートに空の Dockerfile を作成します。
touch Dockerfile
flowers-multi-machine/
ディレクトリに、次のものが作成されます。
+ Dockerfile
+ trainer/
+ task.py
Dockerfile を開き、次の内容をコピーします。
FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-8
WORKDIR /
# Copies the trainer code to the docker image.
COPY trainer /trainer
# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]
ステップ 3: コンテナをビルドする
ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project
は実際のプロジェクト ID で置き換えてください。
PROJECT_ID='your-cloud-project'
Artifact Registry にリポジトリを作成します。最初のラボで作成したリポジトリを使用します。
REPO_NAME='flower-app'
Google Artifact Registry 内のコンテナ イメージの URI を示す変数を定義します。
IMAGE_URI=us-central1-docker.pkg.dev/$PROJECT_ID/$REPO_NAME/flower_image_distributed:multi_machine
Docker を構成します。
gcloud auth configure-docker \
us-central1-docker.pkg.dev
続いて、flowers-multi-machine
ディレクトリのルートで次のように実行してコンテナをビルドします。
docker build ./ -t $IMAGE_URI
最後に、コンテナを Artifact Registry に push します。
docker push $IMAGE_URI
コンテナが Artifact Registry に push され、トレーニング ジョブを開始する準備ができました。
ステップ 4: SDK を使用してジョブを実行する
このセクションでは、Vertex AI Python SDK で分散トレーニング ジョブを構成して起動する方法を説明します。
Launcher から TensorFlow 2 ノートブックを作成します。
Vertex AI SDK をインポートします。
from google.cloud import aiplatform
worker_pool_specs
を定義します。
Vertex AI は、さまざまなタイプのマシンタスクに対応するため、4 つのワーカープールを提供します。
ワーカープール 0 は、プライマリ、チーフ、スケジューラ、または「マスター」を構成します。MultiWorkerMirroredStrategy
では、すべてのマシンがワーカーに指定されています。これは物理マシンで、複製された計算が実行されます。すべてのマシンがワーカーになるだけでなく、1 つのワーカーではチェックポイントの保存やサマリー ファイルの TensorBoard への書き込みなどの余分な作業を行う必要があります。このマシンをチーフといいます。チーフワーカーは 1 つしかないので、ワーカープール 0 のワーカー数は必ず 1 になります。
ワーカープール 1 で、クラスタの他のワーカーを構成します。
worker_pool_specs
リストの最初のディレクトリはワーカープール 0 を表し、2 番目のディレクトリはワーカープール 1 を表します。このサンプルで 2 つの構成はまったく同じです。ただし、3 台のマシンでトレーニングを行う場合は、replica_count
を 2 に設定して、ワーカープール 1 にワーカーを追加します。GPU を追加する場合は、両方のワーカープールで引数 accelerator_type
と accelerator_count
に machine_spec
を追加する必要があります。MultiWorkerMirroredStrategy
で GPU を使用する場合は、クラスタ内の各マシンが同じ数の GPU を使用する必要があります。そうでないと、ジョブが失敗します。
image_uri
の {PROJECT_ID}
は置き換える必要があります。
# The spec of the worker pools including machine type and Docker image
# Be sure to replace PROJECT_ID in the "image_uri" with your project.
worker_pool_specs=[
{
"replica_count": 1,
"machine_spec": {
"machine_type": "n1-standard-4",
},
"container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
},
{
"replica_count": 1,
"machine_spec": {
"machine_type": "n1-standard-4",
},
"container_spec": {"image_uri": "us-central1-docker.pkg.dev/{PROJECT_ID}/flower-app/flower_image_distributed:multi_machine"}
}
]
次に、CustomJob
を作成して実行します。staging_bucket
の {YOUR_BUCKET}
は、ステージング用プロジェクトのバケットで置き換えます。
my_custom_job = aiplatform.CustomJob(display_name='flowers-multi-worker',
worker_pool_specs=worker_pool_specs,
staging_bucket='gs://{YOUR_BUCKET}')
my_custom_job.run()
コンソールでジョブの進行状況を確認できます。
お疲れさまでした
Vertex AI を使って次のことを行う方法を学びました。
- TensorFlow で分散トレーニング ジョブを実行する
Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。
7. クリーンアップ
ノートブックは、アイドル状態で 60 分が経過するとタイムアウトするように構成されています。このため、インスタンスのシャットダウンを心配する必要はありません。インスタンスを手動でシャットダウンする場合は、Console で [Vertex AI] の [ワークベンチ] セクションにある [停止] ボタンをクリックします。ノートブックを完全に削除する場合は、[削除] ボタンをクリックします。
ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。