Vertex AI: TensorFlow を使用したマルチワーカーのトレーニングと転移学習

1. 概要

このラボでは、Vertex AI を使用して TensorFlow モデル用のマルチワーカー トレーニング ジョブを実行します。

学習内容

次の方法を学習します。

  • マルチワーカー トレーニング用にトレーニング アプリケーション コードを変更する
  • Vertex AI UI からマルチワーカー トレーニング ジョブを構成して起動する
  • Vertex SDK を使用してマルチワーカー トレーニング ジョブを構成して起動する

Google Cloud でこのラボを実行するための合計費用は約 $5 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供できるプロダクトです。これまで、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。この新しいサービスは、それらを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。ご意見やご質問がありましたら、サポートページからご連絡ください。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、以下に示す [Training] と [Workbench] の内容に重点を置きます。

Vertex プロダクトの概要

3. ユースケースの概要

このラボでは、TensorFlow Datasets に含まれる cassava データセットを使用して、転移学習を使用して画像分類モデルをトレーニングします。使用するアーキテクチャは、Imagenet データセットで事前トレーニング済みの tf.keras.applications ライブラリの ResNet50 モデルです。

分散トレーニングを選ぶ理由

GPU が 1 つしかない場合、TensorFlow はこのアクセラレータを使用して、追加作業なしでモデル トレーニングを高速化します。ただし、1 台のマシンまたは複数のマシン(それぞれが複数の GPU を持つ可能性がある)で複数の GPU を使用することでさらなる効果を得たい場合は、計算を実行するための TensorFlow のライブラリである tf.distribute を使用する必要があります。複数のデバイス デバイスとは、GPU や TPU などの、CPU またはアクセラレータで、TensorFlow が処理を実行できるマシン上のものです。

分散トレーニングを始めるための最もシンプルな方法は、1 台のマシンに複数の GPU デバイスを搭載することです。tf.distribute モジュールの TensorFlow 配信戦略では、すべての GPU にわたるデータ分散と勾配の更新の調整を行います。単一ホストのトレーニングをマスターし、さらにスケールすることを検討している場合は、クラスタに複数のマシンを追加すると、さらにパフォーマンスが向上します。CPU のみのマシンクラスタ、またはそれぞれに 1 つ以上の GPU を持つマシンのクラスタを利用できます。このラボでは、後者の場合について説明し、MultiWorkerMirroredStrategy を使用して TensorFlow AI モデルのトレーニングを Vertex AI 上の複数のマシンに分散する方法を示します。

MultiWorkerMirroredStrategy は、わずかなコード変更で使用できる同期型のデータ並列処理戦略です。モデルのコピーは、クラスタ内の各デバイスに作成されます。その後の勾配の更新は同期的に行われます。これは、各ワーカー デバイスが、入力データの異なるスライスに対して、モデルを使用してフォワードパスとバックワード パスを計算することを意味しています。これらのスライスから計算された勾配は、1 台のマシンとクラスタ内のすべてのマシン上のすべてのデバイスで集計され、all-reduce と呼ばれるプロセスでリダクション(通常は平均)されます。オプティマイザーは、これらの低減された勾配でパラメータを更新し、デバイスの同期を維持します。TensorFlow を使用した分散トレーニングの詳細については、次の動画をご覧ください。

4.環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順に従ってください。

ステップ 1: Compute Engine API を有効にする

関連項目Compute Engine実現] をクリックします。これは、ノートブック インスタンスを作成する際に必要になります。

ステップ 2: Container Registry API を有効にする

Container Registry に移動し、[有効にする] を選択します(まだの場合)。カスタム トレーニング ジョブのコンテナを作成するために使用します。

ステップ 3: Vertex AI API を有効にする

Cloud Console の [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 4: Vertex AI Workbench インスタンスを作成する

Cloud Console の Vertex AI セクションで Workbench をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

ノートブック ID

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックに名前を付けて、[Advanced Settings] をクリックします。

create_notebook

[詳細設定] で、アイドル状態でのシャットダウンを有効にし、分数を 60 に設定します。つまり、使用していないときはノートブックが自動的にシャットダウンされるため、不要なコストは発生しません。

idle_timeout

[セキュリティ] で、[ターミナルを有効にする] を選択します(有効になっていない場合)。

enable_terminal

その他の詳細設定はすべてそのままで構いません。

次に、[作成] をクリックします。インスタンスがプロビジョニングされるまでに数分かかります。

インスタンスが作成されたら、[JupyterLab を開く] を選択します。

open_jupyterlab

新しいインスタンスを初めて使用するときは、認証を求められます。これを行うには、UI の手順に従います。

認証

5. トレーニング アプリケーション コードのコンテナ化

このトレーニング ジョブを Vertex に送信するには、トレーニング アプリケーション コードを Docker コンテナに配置し、このコンテナを Google Container Registry に push します。このアプローチを使用すると、任意のフレームワークで構築されたモデルをトレーニングできます。

まず、ランチャー メニューから、ノートブック インスタンスでターミナル ウィンドウを開きます。

ノートブックでターミナルを開く

cassava という新しいディレクトリを作成し、そのディスクに cd します。

mkdir cassava
cd cassava

ステップ 1: Dockerfile を作成する

コードをコンテナ化する最初のステップは、Dockerfile を作成することです。Dockerfile には、イメージの実行に必要なすべてのコマンドを含めます。必要なライブラリがすべてインストールされ、トレーニング コードのエントリ ポイントが設定されます。

ターミナルで、空の Dockerfile を作成します。

touch Dockerfile

Dockerfile を開き、次の内容をコピーします。

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

この Dockerfile は、Deep Learning Container TensorFlow Enterprise 2.7 GPU Docker イメージを使用します。Google Cloud の Deep Learning Containers には、ML やデータ サイエンスの一般的なフレームワークが多数プリインストールされています。このイメージのダウンロード後、この Dockerfile でトレーニング コードのエントリ ポイントを設定します。これらのファイルはまだ作成されていません。次のステップでは、モデルのトレーニングと調整のためのコードを追加します。

ステップ 2: Cloud Storage バケットを作成する

このトレーニング ジョブでは、トレーニング済みの TensorFlow モデルを Cloud Storage バケットにエクスポートします。ターミナルで次のコマンドを実行し、プロジェクトの環境変数を定義します。your-cloud-project は、プロジェクトの ID に置き換えてください。

PROJECT_ID='your-cloud-project'

次に、ターミナルで次のコマンドを実行して、プロジェクトに新しいバケットを作成します。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

ステップ 3: モデルのトレーニング コードを追加する

ターミナルで次のコマンドを実行して、トレーニング コードのディレクトリと、コードを追加する Python ファイルを作成します。

mkdir trainer
touch trainer/task.py

cassava/ ディレクトリに、次の行が作成されます。

+ Dockerfile
+ trainer/
    + task.py

次に、作成した task.py ファイルを開き、以下のコードをコピーします。{your-gcs-bucket} は、作成した Cloud Storage バケットの名前に置き換える必要があります。

import tensorflow as tf
import tensorflow_datasets as tfds
import os

PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label

def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes

def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model

def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'

def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir

def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)

def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

コンテナを構築する前に、tf.distribute.Strategy API の MultiWorkerMirroredStrategy を使用するコードを詳しく見てみましょう。

コードには、コードが MultiWorkerMirroredStrategy と連携するために必要なコンポーネントがいくつかあります。

  1. データをシャーディングする必要がある(つまり、各ワーカーにデータセット全体のサブセットが割り当てられている)。したがって、重複しないデータセット要素のグローバル バッチサイズが各ステップで各ワーカーによって処理されます。このシャーディングは、tf.data.experimental.AutoShardPolicy で自動的に行われます。これは FILE または DATA に設定できます。この例では、キャッサバ データセットが複数のファイルとしてダウンロードされないため、create_dataset() 関数は AutoShardPolicyDATA に設定します。ただし、ポリシーを DATA に設定しないと、デフォルトの AUTO ポリシーが起動し、最終結果が同じになります。MultiWorkerMirroredStrategy を使用したデータセットのシャーディングについて詳しくは、こちらをご覧ください。
  2. main() 関数で、MultiWorkerMirroredStrategy オブジェクトが作成されます。次に、モデル変数の作成をストラテジーの範囲内に収めます。この重要なステップは、どの変数をレプリカ間でミラーリングするかを TensorFlow に指示します。
  3. バッチサイズは num_replicas_in_sync でスケールアップされます。これにより、各レプリカは各ステップで同じ数のサンプルを処理します。バッチサイズのスケーリングは、TensorFlow で同期データ並列処理戦略を使用する場合のベスト プラクティスです。
  4. マルチワーカーの場合、各ワーカーにデスティネーションが異なる必要があるため、モデルの保存が若干複雑になります。チーフワーカーは目的のモデル ディレクトリに保存しますが、他のワーカーはモデルを一時ディレクトリに保存します。複数のワーカーが同じ場所に書き込むのを防ぐため、これらの一時ディレクトリは一意である必要があります。保存操作には一括オペレーションを含めることができます。つまり、すべてのワーカーで、チーフだけでなく保存も行う必要があります。_is_chief() 関数、_get_temp_dir() 関数、write_filepath() 関数、main() 関数には、モデルを保存するためのボイラープレート コードが含まれています。

別の環境で MultiWorkerMirroredStrategy を使用したことがある場合は、TF_CONFIG 環境変数を設定している可能性があります。Vertex AI は自動的に TF_CONFIG を設定するため、クラスタ内の各マシンでこの変数を定義する必要はありません。

ステップ 4: コンテナをビルドする

ターミナルで次のコマンドを実行し、プロジェクトの環境変数を定義します。your-cloud-project は、プロジェクトの ID に置き換えてください。

PROJECT_ID='your-cloud-project'

Google Container Registry でコンテナ イメージの URI を使用して変数を定義します。

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

次に、cassava ディレクトリのルートから次のコマンドを実行してコンテナをビルドします。

docker build ./ -t $IMAGE_URI

最後に、Google Container Registry に push します。

docker push $IMAGE_URI

コンテナが Container Registry に push されたら、トレーニング ジョブを開始する準備が整いました。

6. Vertex AI でマルチワーカー トレーニング ジョブを実行する

このラボでは、Google Container Registry のカスタム コンテナによるカスタム トレーニングを使用しますが、ビルド済みコンテナを使用してトレーニング ジョブを実行することもできます。

まず、Cloud Console の [Vertex] セクションの [トレーニング] セクションに移動します。

uCAIP メニュー

ステップ 1: トレーニング ジョブを構成する

[作成] をクリックして、トレーニング ジョブのパラメータを入力します。

  • [データセット] で [管理されているデータセットなし] を選択します。
  • 次に、トレーニング方法として [Custom training (advanced)] を選択し、[Continue] をクリックします。
  • [モデル名] に「multiworker-cassava」(またはモデルに付ける名前)を入力します。
  • [続行] をクリックする

[コンテナの設定] ステップで、[Custom container] を選択します。

カスタム コンテナのオプション

最初のボックス(コンテナ イメージ)に、前のセクションの IMAGE_URI 変数の値を入力します。gcr.io/your-cloud-project/multiworker:cassava は自分のプロジェクト ID になっているはずです。その他のフィールドは空白のままにして、[続行] をクリックします。

[続行] をもう一度クリックして、ハイパーパラメータのステップをスキップします。

ステップ 2: コンピューティング クラスタを構成する

Vertex AI には、さまざまな種類のマシンタスクに対応する 4 つのワーカープールが用意されています。

ワーカープール 0 は、プライマリ、チーフ、スケジューラ、または「マスター」を構成します。MultiWorkerMirroredStrategy では、すべてのマシンがワーカーとして指定されます。ワーカーは、レプリケートされた計算が実行される物理マシンです。各マシンがワーカーであることに加えて、チェックポイントの保存や TensorBoard へのサマリー ファイルの書き込みなどの追加の作業を引き受ける 1 つのワーカーが必要です。このマシンをチーフと呼びます。チーフワーカーは 1 つしかないため、ワーカープール 0 のワーカー数は常に 1 になります。

[コンピューティングと料金] で、選択したリージョンをそのままにして [ワーカープール 0] を次のように構成します。

Worker_pool_0

ワーカープール 1 で、クラスタのワーカーを構成します。

ワーカープール 1 を次のように構成します。

Worker_pool_1

これで、クラスタは 2 台の CPU のみのマシンを持つように構成されました。トレーニング アプリケーション コードが実行されると、MultiWorkerMirroredStrategy によって両方のマシンにトレーニングが分散されます。

MultiWorkerMirroredStrategy にはチーフタスクとワーカー タスクのタイプしかないため、追加のワーカープールを構成する必要はありません。ただし、TensorFlow の ParameterServerStrategy を使用する場合は、ワーカー プール 2 でパラメータ サーバーを構成します。 クラスタに評価ツールを追加する場合、ワーカープール 3 でそのマシンを構成します。

[トレーニングを開始] をクリックして、ハイパーパラメータ調整ジョブを開始します。コンソールの [トレーニング] セクションの [TRAINING PIPELINES] タブに、新しく開始したジョブが表示されます。

トレーニング ジョブ

お疲れさまでした

Vertex AI を使用して以下のことを行う方法を学習しました。

  • カスタム コンテナで提供されるトレーニング コード用のマルチワーカー トレーニング ジョブを起動します。この例では TensorFlow モデルを使用しましたが、カスタム コンテナや組み込みコンテナを使用して任意のフレームワークで構築したモデルをトレーニングできます。

Vertex のさまざまな部分について詳しくは、こちらのドキュメントをご覧ください。

7. (省略可)Vertex SDK を使用する

前のセクションでは、UI を使用してトレーニング ジョブを起動する方法を説明しました。このセクションでは、Vertex Python API を使用してトレーニング ジョブを送信する別の方法を紹介します。

ノートブック インスタンスに戻り、ランチャーから TensorFlow 2 ノートブックを作成します。

new_notebook

Vertex AI SDK をインポートします。

from google.cloud import aiplatform

マルチワーカー トレーニング ジョブを開始するには、まずワーカープールの仕様を定義する必要があります。仕様での GPU の使用は任意であり、前のセクションに示されているように、CPU のみのクラスタを使用したい場合は、accelerator_typeaccelerator_count を削除できます。

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

次に、CustomJob を作成して実行します。ステージングするには、{YOUR_BUCKET} をプロジェクト内のバケットに置き換える必要があります。先ほど作成したバケットと同じバケットを使用できます。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

コンソールの [トレーニング] セクションにある [カスタムジョブ] タブに、トレーニング ジョブが表示されます。

カスタムジョブ

8. クリーンアップ

ノートブックは、アイドル状態の 60 分後にタイムアウトするように構成されているため、インスタンスのシャットダウンを心配する必要はありません。インスタンスを手動でシャットダウンする場合は、コンソールの Vertex AI Workbench セクションにある [停止] ボタンをクリックします。ノートブックを完全に削除する場合は、[削除] ボタンをクリックします。

インスタンスの停止

ストレージ バケットを削除するには、Cloud Console のナビゲーション メニューで [ストレージ] を探してバケットを選択し、[削除] をクリックします。

ストレージを削除