Vertex AI: TensorFlow を使用したマルチワーカー トレーニングと転移学習

1. 概要

このラボでは、Vertex AI を使用して TensorFlow モデル用のマルチワーカー トレーニング ジョブを実行します。

学習内容

次の方法を学習します。

  • マルチワーカー トレーニング用にトレーニング アプリケーション コードを変更する
  • Vertex AI UI でマルチワーカー トレーニング ジョブを構成して起動する
  • Vertex SDK を使用してマルチワーカー トレーニング ジョブを構成して起動する

このラボを Google Cloud で実行するための総費用は約 $5 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。ご意見やご質問がありましたら、サポートページからお寄せください。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下でハイライト表示されているプロダクト(TrainingWorkbench)を中心に学習します。

Vertex プロダクトの概要

3. ユースケースの概要

このラボでは、転移学習を使用して TensorFlow データセットから cassava データセットで画像分類モデルをトレーニングします。使用するアーキテクチャは、Imagenet データセットで事前トレーニング済みの tf.keras.applications ライブラリの ResNet50 モデルです。

分散トレーニングのメリット

GPU が 1 つの場合、TensorFlow はこのアクセラレータを使用して、追加作業なしでモデル トレーニングを高速化します。ただし、1 台のマシンまたは複数のマシン(それぞれに複数の GPU が搭載されている可能性がある)で複数の GPU を使用することで、さらなる効果を得たい場合は、複数のデバイスで計算を実行するための TensorFlow のライブラリである tf.distribute を使用する必要があります。デバイスとは、TensorFlow がオペレーションを実行できるマシン上の CPU またはアクセラレータ(GPU や TPU など)を指します。

分散トレーニングを始めるための最もシンプルな方法は、1 台のマシンに複数の GPU デバイスを搭載することです。tf.distribute モジュールの TensorFlow 分散戦略は、すべての GPU にわたるデータ分散と勾配更新の調整を管理します。シングルホスト トレーニングに習熟し、さらに拡張したいと考えている場合は、クラスタに複数のマシンを追加すると、パフォーマンスをさらに向上させることができます。CPU のみを備えたマシンのクラスタ、またはそれぞれが 1 つ以上の GPU を持つマシンのクラスタを利用することもできます。このラボでは後者について取り上げ、MultiWorkerMirroredStrategy を使用して Vertex AI 上の複数のマシンに TensorFlow モデルのトレーニングを分散する方法について説明します。

MultiWorkerMirroredStrategy は、わずかなコード変更で使用できる同期型のデータ並列処理ストラテジーです。モデルのコピーがクラスタ内の各デバイスに作成されます。その後の勾配の更新は同期的に行われます。これは、各ワーカー デバイスが、入力データの異なるスライスに対して、モデルを使用してフォワードパスとバックワード パスを計算することを意味しています。これらのスライスから計算された勾配は、マシン上のすべてのデバイスとクラスタ内のすべてのマシンに集約され、all-reduce と呼ばれるプロセスで低減(通常は平均化)されます。オプティマイザーは、これらの低減された勾配でパラメータの更新を行うことで、デバイスの同期を維持します。TensorFlow を使用した分散トレーニングの詳細については、次の動画をご覧ください。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。これはノートブック インスタンスを作成するために必要です。

ステップ 2: Container Registry API を有効にする

まだ有効になっていない場合は、[Container Registry] に移動して [有効にする] を選択します。これは、カスタム トレーニング ジョブのコンテナを作成する際に使用します。

ステップ 3: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 4: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックに名前を付けて、[詳細設定] をクリックします。

create_notebook

[詳細設定] で、アイドル状態でのシャットダウンを有効にして、シャットダウンまでの時間(分)を 60 に設定します。これにより、使用されていないノートブックが自動的にシャットダウンされるため、不要なコストが発生しません。

idle_timeout

まだ有効になっていない場合は、[セキュリティ] で、[Enable terminal] を選択します。

enable_terminal

詳細設定のその他の設定はそのままで構いません。

[作成] をクリックします。インスタンスがプロビジョニングされるまでに数分かかります。

インスタンスが作成されたら、[JUPYTERLAB を開く] を選択します。

open_jupyterlab

新しいインスタンスを初めて使用するときに、認証が求められます。その場合は、UI で手順を行います。

authenticate

5. トレーニング アプリケーション コードをコンテナ化する

このトレーニング ジョブを Vertex に送信するには、トレーニング アプリケーション コードを Docker コンテナに格納して、このコンテナを Google Container Registry に push します。この方法を使用すると、どのフレームワークで構築されたモデルでもトレーニングできます。

まず、Launcher メニューから、ノートブック インスタンスでターミナル ウィンドウを開きます。

ノートブックでターミナルを開く

cassava という新しいディレクトリを作成し、そのディレクトリに移動します。

mkdir cassava
cd cassava

ステップ 1: Dockerfile を作成する

コードをコンテナ化する最初のステップは、Dockerfile の作成です。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。必要なライブラリがすべてインストールされ、トレーニング コードのエントリ ポイントが設定されます。

ターミナルで、空の Dockerfile を作成します。

touch Dockerfile

Dockerfile を開き、次の内容をコピーします。

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

この Dockerfile は Deep Learning Container TensorFlow Enterprise 2.7 GPU Docker イメージを使用します。Google Cloud の Deep Learning Containers には一般的な ML およびデータ サイエンスのフレームワークが数多くプリインストールされています。該当するイメージのダウンロード後、この Dockerfile はトレーニング コードのエントリポイントを設定します。これらのファイルはまだ作成していません。次のステップで、モデルのトレーニングおよび調整用のコードを追加します。

ステップ 2: Cloud Storage バケットを作成する

このトレーニング ジョブでは、トレーニング済みの TensorFlow モデルを Cloud Storage バケットにエクスポートします。ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project は実際のプロジェクト ID で置き換えてください。

PROJECT_ID='your-cloud-project'

次に、ターミナルで次のコマンドを実行して、プロジェクトに新しいバケットを作成します。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

ステップ 3: モデルのトレーニング コードを追加する

ターミナルで、次のように実行してトレーニング コード用のディレクトリと、コードを追加する Python ファイルを作成します。

mkdir trainer
touch trainer/task.py

cassava/ ディレクトリに、次のものが作成されます。

+ Dockerfile
+ trainer/
    + task.py

次に、作成した task.py ファイルを開き、以下のコードをコピーします。{your-gcs-bucket} は、作成した Cloud Storage バケットの名前で置き換えます。

import tensorflow as tf
import tensorflow_datasets as tfds
import os


PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label


def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes


def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model


def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)


def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

コンテナをビルドする前に、tf.distribute.Strategy API の MultiWorkerMirroredStrategy を使用するコードを詳しく見てみましょう。

コードには、コードを MultiWorkerMirroredStrategy で機能させるために必要なコンポーネントがいくつかあります。

  1. データをシャーディングする必要があります。つまり、各ワーカーにデータセット全体のサブセットが割り当てられます。したがって、各ステップで、重複しないデータセット要素のグローバル バッチサイズが各ワーカーによって処理されます。このシャーディングは tf.data.experimental.AutoShardPolicy で自動的に行われ、FILE または DATA に設定できます。この例では、キャッサバ データセットが複数のファイルとしてダウンロードされないため、create_dataset() 関数は AutoShardPolicyDATA に設定します。ただし、ポリシーを DATA に設定しなかった場合、デフォルトの AUTO ポリシーが適用され、最終的な結果は同じになります。MultiWorkerMirroredStrategy を使用したデータセット シャーディングの詳細については、こちらをご覧ください。
  2. main() 関数で、MultiWorkerMirroredStrategy オブジェクトが作成されます。次に、モデル変数の作成を戦略の範囲内に収めます。この重要なステップでは、レプリカ間でミラーリングする変数を TensorFlow に指定します。
  3. バッチサイズは num_replicas_in_sync でスケールアップされます。これにより、各レプリカが各ステップで同じ数の例を処理します。バッチサイズのスケーリングは、TensorFlow で同期データ並列処理戦略を使用する場合のベスト プラクティスです。
  4. マルチワーカーの場合、ワーカーごとに宛先を別にする必要があるため、モデルの保存は若干複雑になります。チーフワーカーはモデルを目的のモデル ディレクトリに保存しますが、他のワーカーは一時ディレクトリに保存します。複数のワーカーが同じ場所に書き込みを行わないように、これらの一時ディレクトリは一意にする必要があります。保存にはグループ演算が含まれることがあります。これは、チーフだけでなく、すべてのワーカーが保存を実行する必要があることを意味します。_is_chief()_get_temp_dir()write_filepath() 関数と main() 関数には、モデルの保存に役立つボイラープレート コードが含まれています。

MultiWorkerMirroredStrategy を別の環境で使用している場合は、TF_CONFIG 環境変数が設定されていることがあります。Vertex AI は TF_CONFIG を自動的に設定するため、この変数をクラスタ内の各マシンで定義する必要はありません。

ステップ 4: コンテナをビルドする

ターミナルで以下のように実行して、プロジェクトの環境変数を定義します。その際、your-cloud-project は実際のプロジェクト ID で置き換えてください。

PROJECT_ID='your-cloud-project'

Google Container Registry 内のコンテナ イメージの URI を示す変数を定義します。

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

Docker を構成します。

gcloud auth configure-docker

続いて、cassava ディレクトリのルートで次のように実行してコンテナをビルドします。

docker build ./ -t $IMAGE_URI

最後に、これを Google Container Registry に push します。

docker push $IMAGE_URI

コンテナが Container Registry に push され、トレーニング ジョブを開始する準備ができました。

6. Vertex AI でマルチワーカー トレーニング ジョブを実行する

このラボでは、Google Container Registry のカスタム コンテナによるカスタム トレーニングを使用します。トレーニング ジョブは、ビルド済みコンテナを使って実行することもできます。

まず、Cloud コンソールの Vertex で [トレーニング] セクションに移動します。

uCAIP メニュー

ステップ 1: トレーニング ジョブを構成する

[作成] をクリックして、トレーニング ジョブのパラメータを入力します。

  • [Dataset] で [マネージド データセットなし] を選択します。
  • トレーニング方法として [カスタム トレーニング(上級者向け)] を選択し、[続行] をクリックします。
  • [モデル名] に「multiworker-cassava」(または任意のモデル名)を入力します。
  • [続行] をクリックする

コンテナの設定ステップで、[カスタム コンテナ] を選択します。

カスタム コンテナ オプション

最初のボックス(コンテナ イメージ)で、前のセクションの IMAGE_URI 変数の値を入力します。これは gcr.io/your-cloud-project/multiworker:cassava のようになります(プロジェクト ID はご自分のプロジェクトの ID になります)。その他のフィールドは空白のままにして、[続行] をクリックします。

もう一度 [続行] をクリックして、ハイパーパラメータのステップをスキップします。

ステップ 2: コンピューティング クラスタを構成する

Vertex AI は、さまざまなタイプのマシンタスクに対応するため、4 つのワーカープールを提供します。

ワーカープール 0 は、プライマリ、チーフ、スケジューラ、または「マスター」を構成します。MultiWorkerMirroredStrategy では、すべてのマシンがワーカーに指定されています。これは物理マシンで、複製された計算が実行されます。すべてのマシンがワーカーになるだけでなく、1 つのワーカーではチェックポイントの保存やサマリー ファイルの TensorBoard への書き込みなどの余分な作業を行う必要があります。このマシンをチーフといいます。チーフワーカーは 1 つしかないので、ワーカープール 0 のワーカー数は必ず 1 になります。

[コンピューティングと料金] で、選択されているリージョンはそのままにしておき、[ワーカープール 0] を次のように構成します。

Worker_pool_0

ワーカープール 1 で、クラスタのワーカーを構成します。

次のようにワーカープール 1 を構成します。

Worker_pool_1

これで、CPU のみのマシンが 2 つあるようにクラスタが構成されました。トレーニング アプリケーションのコードを実行すると、MultiWorkerMirroredStrategy によってトレーニングが両方のマシンに分散されます。

MultiWorkerMirroredStrategy にはチーフタスクとワーカータスクのタイプしかないため、追加のワーカープールを構成する必要はありません。ただし、TensorFlow の ParameterServerStrategy を使用する場合は、ワーカープール 2 でパラメータ サーバーを構成します。クラスタにエバリュエータを追加する場合は、そのマシンをワーカープール 3 に構成します。

[トレーニングを開始] をクリックして、ハイパーパラメータ調整ジョブを開始します。コンソールの [トレーニング] セクションで、[TRAINING PIPELINES] タブの下に、新たに起動したジョブが表示されます。

トレーニング ジョブ

お疲れさまでした

Vertex AI を使って次のことを行う方法を学びました。

  • カスタム コンテナに用意されているトレーニング コード用のマルチワーカー トレーニング ジョブを起動する。ここでは例として TensorFlow モデルを使用しましたが、カスタム コンテナまたはビルド済みコンテナを使って任意のフレームワークで構築されたモデルをトレーニングすることもできます。

Vertex のさまざまな部分の説明については、ドキュメントをご覧ください。

7. [省略可] Vertex SDK を使用する

前のセクションでは、UI からトレーニング ジョブを起動しました。このセクションでは、Vertex Python API を使用してトレーニング ジョブを送信する別の方法について説明します。

ノートブック インスタンスに戻り、Launcher から TensorFlow 2 ノートブックを作成します。

new_notebook

Vertex AI SDK をインポートします。

from google.cloud import aiplatform

マルチワーカー トレーニング ジョブを起動するには、まずワーカープールの仕様を定義する必要があります。仕様での GPU の使用は完全に任意です。前のセクションで説明したように、CPU のみのクラスタが必要な場合は、accelerator_typeaccelerator_count を削除できます。

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

次に、CustomJob を作成して実行します。{YOUR_BUCKET} は、ステージング用のプロジェクトのバケットに置き換える必要があります。前に作成したバケットを使用できます。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

コンソールの [トレーニング] セクションにある [カスタムジョブ] タブに、トレーニング ジョブが表示されます。

カスタムジョブ

8. クリーンアップ

ノートブックは、アイドル状態で 60 分が経過するとタイムアウトするように構成されています。このため、インスタンスのシャットダウンを心配する必要はありません。インスタンスを手動でシャットダウンする場合は、Console で [Vertex AI] の [ワークベンチ] セクションにある [停止] ボタンをクリックします。ノートブックを完全に削除する場合は、[削除] ボタンをクリックします。

インスタンスの停止

ストレージ バケットを削除するには、Cloud コンソールのナビゲーション メニューで [ストレージ] に移動してバケットを選択し、[削除] をクリックします。

ストレージを削除