Vertex AI:透過 TensorFlow 進行多工作站訓練和遷移學習

1. 總覽

在本研究室中,您將使用 Vertex AI 為 TensorFlow 模型執行多工作站訓練工作。

課程內容

您將學習下列內容:

  • 為多工作站訓練修改訓練應用程式程式碼
  • 透過 Vertex AI UI 設定及啟動多工作站訓練工作
  • 使用 Vertex SDK 設定及啟動多工作站訓練工作

在 Google Cloud 中執行這個研究室的總費用約為 $5 美元。

2. Vertex AI 簡介

這個研究室使用 Google Cloud 最新的 AI 產品服務。Vertex AI 將 Google Cloud 中的機器學習產品整合到流暢的開發體驗中。先前使用 AutoML 訓練的模型和自訂模型,都能透過不同的服務存取。這項新產品會與其他新產品一起合併為一個 API。您也可以將現有專案遷移至 Vertex AI。如有任何意見,請參閱支援頁面

Vertex AI 提供許多不同的產品,可支援端對端機器學習工作流程。本研究室將著重介紹下列產品:訓練Workbench

Vertex 產品總覽

3. 用途總覽

在本研究室中,您會使用遷移學習,在 TensorFlow 資料集中的 cassava 資料集訓練圖片分類模型。您要使用的架構是從 tf.keras.applications 程式庫預先訓練的 Imagenet 資料集

為什麼要使用分散式訓練?

如果您使用單一 GPU,TensorFlow 會使用這個加速器來加快模型訓練速度,您不須執行額外操作。但是,如果您想在單部機器或多部機器上使用多個 GPU (每部機器可能具備多個 GPU) 獲得額外優勢,就必須使用 tf.distribute,這是 TensorFlow 的程式庫,用於在多部裝置上執行運算。裝置是指在部分可執行 TensorFlow 作業的機器中,CPU 或加速器,例如 GPU 或 TPU。

如要開始使用分散式訓練,最簡單的方法是使用一部搭載多部 GPU 裝置的機器。tf.distribute 模組的 TensorFlow 發布策略,會管理所有 GPU 的資料分佈和梯度更新協調作業。如果您已精通單一主機訓練,並希望進一步擴充規模,將多部機器新增至叢集,就能進一步提高效能。您使用的機器叢集只能使用 CPU,或每個機器都含有一或多個 GPU。本研究室涵蓋第二個案例,示範如何在 Vertex AI 中,使用 MultiWorkerMirroredStrategy 將 TensorFlow 模型的訓練發布在多部機器上。

MultiWorkerMirroredStrategy 是同步資料平行處理策略,只應變更少量程式碼即可使用。系統會在叢集的每部裝置上建立模型副本。後續的漸層更新則會以同步方式進行。也就是說,每個工作站裝置都會根據輸入資料的不同部分,計算向前和向後傳遞模型。從每個配量計算出的梯度後,系統會匯總單一機器上所有裝置和叢集內所有機器的梯度,然後透過「全部縮減」程序進行降低 (通常為平均)。接著,最佳化工具會以這些減少的漸層執行參數更新,藉此讓裝置保持同步。如要進一步瞭解如何使用 TensorFlow 進行分散式訓練,請觀看以下影片:

4. 設定環境

您需要已啟用計費功能的 Google Cloud Platform 專案,才能執行這個程式碼研究室。如要建立專案,請按照這裡的操作說明進行。

步驟 1:啟用 Compute Engine API

前往「Compute Engine」,並選取「啟用」 (如果尚未啟用)。建立筆記本執行個體時會用到。

步驟 2:啟用 Container Registry API

前往「Container Registry」,然後選取「Enable」(啟用) (如果尚未啟用)。您將使用這個選項為自訂訓練工作建立容器。

步驟 3:啟用 Vertex AI API

前往 Cloud 控制台的 Vertex AI 專區,然後按一下「啟用 Vertex AI API」

Vertex AI 資訊主頁

步驟 4:建立 Vertex AI Workbench 執行個體

在 Cloud 控制台的 Vertex AI 專區中,按一下 Workbench:

Vertex AI 選單

啟用 Notebooks API (如果尚未啟用)。

Notebook_api

啟用後,按一下「代管的筆記本」

Notebooks_UI

然後選取「新增筆記本」

new_notebook

為筆記本命名,然後按一下「進階設定」

create_notebook

在進階設定下方啟用閒置關閉,並將時間長度設為 60。也就是說,您的筆記本在未使用時自動關閉,因此不會產生不必要的費用。

idle_timeout

選取「安全性」下方的「啟用終端機」(如果尚未啟用)。

enable_terminal

其他進階設定皆可保持原樣。

接著點選「建立」。執行個體的佈建作業會在幾分鐘內完成。

執行個體建立完成後,請選取「Open JupyterLab」

open_jupyterlab

首次使用新的執行個體時,系統會要求您進行驗證。請按照 UI 中的步驟操作。

驗證

5. 將訓練應用程式程式碼容器化

您必須將訓練應用程式程式碼放入 Docker 容器,並將這個容器推送至 Google Container Registry,藉此將這項訓練工作提交至 Vertex。透過這種做法,您可以訓練以任何架構建構的模型。

如要開始使用,請透過「啟動器」選單開啟筆記本執行個體中的「終端機」視窗:

在筆記本中開啟終端機

建立名為 cassava 的新目錄,並使用 cd 加入該目錄:

mkdir cassava
cd cassava

步驟 1:建立 Dockerfile

將程式碼容器化的第一步,就是建立 Dockerfile。而 Dockerfile 包含執行映像檔所需的所有指令。其會安裝所有必要的程式庫,並設定訓練程式碼的進入點。

在終端機中建立空白的 Dockerfile:

touch Dockerfile

開啟 Dockerfile,並將下列內容複製到該檔案中:

FROM gcr.io/deeplearning-platform-release/tf2-gpu.2-7

WORKDIR /

# Copies the trainer code to the docker image.
COPY trainer /trainer

# Sets up the entry point to invoke the trainer.
ENTRYPOINT ["python", "-m", "trainer.task"]

這個 Dockerfile 使用深度學習容器 TensorFlow 企業版 2.7 GPU Docker 映像檔。Google Cloud 中的深度學習容器已預先安裝許多常見的機器學習和數據資料學架構。下載該映像檔後,這個 Dockerfile 會設定訓練程式碼的進入點。您尚未建立這些檔案。在下一個步驟中,您將新增程式碼來訓練和調整模型。

步驟 2:建立 Cloud Storage 值區

在這項訓練工作中,您會將經過訓練的 TensorFlow 模型匯出至 Cloud Storage 值區。在終端機執行下列指令,定義專案的環境變數;請務必將 your-cloud-project 替換為您的專案 ID:

PROJECT_ID='your-cloud-project'

接著,在終端機中執行下列指令,在專案中建立新值區。

BUCKET="gs://${PROJECT_ID}-bucket"
gsutil mb -l us-central1 $BUCKET

步驟 3:新增模型訓練程式碼

在終端機執行下列指令,為訓練程式碼建立目錄,以及您要新增程式碼的 Python 檔案:

mkdir trainer
touch trainer/task.py

現在 cassava/ 目錄中應會顯示以下內容:

+ Dockerfile
+ trainer/
    + task.py

接著,開啟剛剛建立的 task.py 檔案,並複製以下程式碼。請將 {your-gcs-bucket} 替換為剛建立的 Cloud Storage 值區名稱。

import tensorflow as tf
import tensorflow_datasets as tfds
import os


PER_REPLICA_BATCH_SIZE = 64
EPOCHS = 2

# TODO: replace {your-gcs-bucket} with the name of the Storage bucket you created earlier
BUCKET = 'gs://{your-gcs-bucket}/mwms'

def preprocess_data(image, label):
  '''Resizes and scales images.'''

  image = tf.image.resize(image, (300,300))
  return tf.cast(image, tf.float32) / 255., label


def create_dataset(batch_size):
  '''Loads Cassava dataset and preprocesses data.'''

  data, info = tfds.load(name='cassava', as_supervised=True, with_info=True)
  number_of_classes = info.features['label'].num_classes
  train_data = data['train'].map(preprocess_data,
                                 num_parallel_calls=tf.data.experimental.AUTOTUNE)
  train_data  = train_data.shuffle(1000)
  train_data  = train_data.batch(batch_size)
  train_data  = train_data.prefetch(tf.data.experimental.AUTOTUNE)

  # Set AutoShardPolicy
  options = tf.data.Options()
  options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
  train_data = train_data.with_options(options)

  return train_data, number_of_classes


def create_model(number_of_classes):
  '''Creates and compiles pretrained ResNet50 model.'''

  base_model = tf.keras.applications.ResNet50(weights='imagenet', include_top=False)
  x = base_model.output
  x = tf.keras.layers.GlobalAveragePooling2D()(x)
  x = tf.keras.layers.Dense(1016, activation='relu')(x)
  predictions = tf.keras.layers.Dense(number_of_classes, activation='softmax')(x)
  model = tf.keras.Model(inputs=base_model.input, outputs=predictions)

  model.compile(
      loss='sparse_categorical_crossentropy',
      optimizer=tf.keras.optimizers.Adam(0.0001),
      metrics=['accuracy'])

  return model


def _is_chief(task_type, task_id):
  '''Helper function. Determines if machine is chief.'''

  return task_type == 'chief'


def _get_temp_dir(dirpath, task_id):
  '''Helper function. Gets temporary directory for saving model.'''

  base_dirpath = 'workertemp_' + str(task_id)
  temp_dir = os.path.join(dirpath, base_dirpath)
  tf.io.gfile.makedirs(temp_dir)
  return temp_dir


def write_filepath(filepath, task_type, task_id):
  '''Helper function. Gets filepath to save model.'''

  dirpath = os.path.dirname(filepath)
  base = os.path.basename(filepath)
  if not _is_chief(task_type, task_id):
    dirpath = _get_temp_dir(dirpath, task_id)
  return os.path.join(dirpath, base)


def main():
  # Create strategy
  strategy = tf.distribute.MultiWorkerMirroredStrategy()

  # Get data
  global_batch_size = PER_REPLICA_BATCH_SIZE * strategy.num_replicas_in_sync
  train_data, number_of_classes = create_dataset(global_batch_size)

  # Wrap variable creation within strategy scope
  with strategy.scope():
    model = create_model(number_of_classes)

  model.fit(train_data, epochs=EPOCHS)

  # Determine type and task of the machine from
  # the strategy cluster resolver
  task_type, task_id = (strategy.cluster_resolver.task_type,
                        strategy.cluster_resolver.task_id)

  # Based on the type and task, write to the desired model path
  write_model_path = write_filepath(BUCKET, task_type, task_id)
  model.save(write_model_path)

if __name__ == "__main__":
    main()

建構容器之前,讓我們來進一步瞭解使用 tf.distribute.Strategy API 中的 MultiWorkerMirroredStrategy 的程式碼。

程式碼必須加入幾個元件,才能搭配 MultiWorkerMirroredStrategy 使用。

  1. 資料需要進行資料分割,也就是說,系統會將整個資料集的一部分指派給每個工作站。因此,在每個步驟中,每個工作站都會處理全域的非重疊資料集元素的全域批次大小。這個資料分割會自動以 tf.data.experimental.AutoShardPolicy 執行,可設為 FILEDATA。在這個範例中,create_dataset() 函式會將 AutoShardPolicy 設為 DATA,因為 cassava 資料集不會以多個檔案的形式下載。不過,如果您沒有將這項政策設為 DATA,系統就會套用預設的 AUTO 政策,最終結果也會相同。如要進一步瞭解使用 MultiWorkerMirroredStrategy 的資料集資料分割,請參閱本文
  2. main() 函式中建立 MultiWorkerMirroredStrategy 物件。接下來,您要將模型變數建立作業納入策略範圍內。這個關鍵步驟會告知 TensorFlow 應在備用資源間鏡像哪些變數。
  3. 批量會由 num_replicas_in_sync 向上擴充。這可確保每個備用資源在各個步驟中處理的範例數量相同。在 TensorFlow 中使用同步資料平行處理策略時,調整批次大小是最佳做法。
  4. 在需要多工作站的情況下,模型的儲存方式會稍微複雜一點,因為每個工作站的目的地都必須不同。主要工作站將儲存至所需模型目錄,其他 worker 則會將模型儲存至臨時目錄。這些臨時目錄不能重複,以免多個工作站寫入同一個位置。保存能夠包括集體作業,意味著所有工作人員必須儲存,而不只是主要業務。_is_chief()_get_temp_dir()write_filepath()main() 函式全都包含有助於儲存模型的樣板程式碼。

請注意,如果您在其他環境中使用 MultiWorkerMirroredStrategy,則可能已經設定 TF_CONFIG 環境變數。Vertex AI 會自動為您設定 TF_CONFIG,因此您不必在叢集內的每部機器上定義這個變數。

步驟 4:建構容器

在終端機執行下列指令,定義專案的環境變數;請務必將 your-cloud-project 替換為您的專案 ID:

PROJECT_ID='your-cloud-project'

使用 Google Container Registry 中容器映像檔的 URI 定義變數:

IMAGE_URI="gcr.io/$PROJECT_ID/multiworker:cassava"

設定 Docker

gcloud auth configure-docker

接著,從 cassava 目錄的根目錄執行下列指令,以建構容器:

docker build ./ -t $IMAGE_URI

最後,將其推送至 Google Container Registry:

docker push $IMAGE_URI

將容器推送至 Container Registry 後,您就可以開始執行訓練工作。

6. 在 Vertex AI 中執行多工作站訓練工作

本研究室透過 Google Container Registry 上的自訂容器使用自訂容器,但您也可以透過預先建立的容器執行訓練工作。

首先,請前往 Cloud 控制台「Vertex」專區中的「訓練」部分:

uCAIP 選單

步驟 1:設定訓練工作

按一下「建立」,輸入訓練工作的參數。

  • 在「資料集」下方,選取「沒有代管資料集」
  • 接著選取「自訂訓練 (進階)」做為訓練方法,然後按一下「繼續」
  • 在「Model name」(模型名稱) 中輸入 multiworker-cassava (或是您想要呼叫模型的任何字詞)
  • 按一下 [Continue] (繼續)

在「容器設定」步驟中,選取「自訂容器」

自訂容器選項

在第一個方塊 (「容器映像檔」) 中輸入上一節的 IMAGE_URI 變數值。這個欄位應該是 gcr.io/your-cloud-project/multiworker:cassava,並有您自己的專案 ID。將其餘欄位留白,並按一下「繼續」

再次點選「繼續」來略過超參數步驟。

步驟 2:設定運算叢集

Vertex AI 提供 4 個工作站集區,可處理不同類型的機器工作。

工作站集區 0:設定主要、主要、排程器或「主要執行個體」。在 MultiWorkerMirroredStrategy 中,所有機器都會指派為工作站,這些機器就是要執行複製運算的實體機器。除了每部機器都是工作站,也需要一個工作站,以便完成一些額外的工作,例如儲存查核點以及將摘要檔案寫入 TensorBoard。這部機器稱為首領。由於只有一個主要工作站,工作站集區 0 的工作站數量永遠是 1。

在「運算和定價」中,維持所選區域的原樣,然後設定「工作站集區 0」,如下所示:

Worker_pool_0

工作站集區 1 可讓您為叢集設定工作站。

按照下列方式設定「工作站集區 1」

Worker_pool_1

叢集現已設為有兩個僅使用 CPU 的機器。執行訓練應用程式的程式碼時,MultiWorkerMirroredStrategy 會將訓練分散在兩部機器上。

MultiWorkerMirroredStrategy 只有主要工作和工作站工作類型,因此不需要設定額外的工作站集區。不過,如果使用 TensorFlow 的 ParameterServerStrategy,則應在工作站集區 2 中設定參數伺服器。如果您要將評估器新增至叢集,可以在工作站集區 3 中設定該機器。

按一下「開始訓練」,開始執行超參數調整工作。在控制台「訓練管道」分頁的「訓練」部分中,您會看到新啟動的工作:

訓練工作

🎉 恭喜!🎉

您已瞭解如何使用 Vertex AI 執行下列作業:

  • 針對自訂容器中提供的訓練程式碼啟動多工作站訓練工作。在這個範例中,您使用 TensorFlow 模型,但可以透過自訂或內建容器,以任何架構建構的模型。

如要進一步瞭解 Vertex 的其他部分,請參閱說明文件

7. [選用] 使用 Vertex SDK

上一節說明如何透過 UI 啟動訓練工作。在本節中,您將學習使用 Vertex Python API 提交訓練工作的替代方式。

返回筆記本執行個體,然後從啟動器建立 TensorFlow 2 筆記本:

new_notebook

匯入 Vertex AI SDK。

from google.cloud import aiplatform

如要啟動多工作站訓練工作,您必須先定義工作站集區規格。請注意,在規格中使用 GPU 完全是選擇性的,如果您想使用僅限 CPU 的叢集,可以移除 accelerator_typeaccelerator_count (如上一節所示)。

# The spec of the worker pools including machine type and Docker image
# Be sure to replace {YOUR-PROJECT-ID} with your project ID.
worker_pool_specs=[
     {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      },
      {
        "replica_count": 1,
        "machine_spec": {
          "machine_type": "n1-standard-8", "accelerator_type": "NVIDIA_TESLA_V100", "accelerator_count": 1
        },
        "container_spec": {"image_uri": "gcr.io/{YOUR-PROJECT-ID}/multiworker:cassava"}
      }
]

接著,請建立並執行 CustomJob。為了進行測試,您需要將 {YOUR_BUCKET} 替換為專案中的值區。您可以使用先前建立的值區。

# Replace YOUR_BUCKET
my_multiworker_job = aiplatform.CustomJob(display_name='multiworker-cassava-sdk',
                              worker_pool_specs=worker_pool_specs,
                              staging_bucket='gs://{YOUR_BUCKET}')

my_multiworker_job.run()

在控制台的「Custom JOBS」分頁標籤下方,您會看到訓練工作:

自訂工作

8. 清除

因為我們設定了筆記本在 60 分鐘閒置後逾時,所以我們不必擔心如何關閉執行個體。如要手動關閉執行個體,請前往控制台的「Vertex AI Workbench」專區,然後按一下「Stop」按鈕。如想完全刪除筆記本,請按一下「刪除」按鈕。

停止執行個體

如要刪除 Storage 值區,請使用 Cloud 控制台中的導覽選單前往「Storage」(儲存空間)、選取值區,然後點選「Delete」(刪除):

刪除儲存空間