Vertex AI Workbench: BigQuery のデータで TensorFlow モデルをトレーニングする

1. 概要

このラボでは、データ探索と ML モデルのトレーニングに Vertex AI Workbench を使用する方法について学習します。

学習内容

次の方法を学習します。

  • Vertex AI Workbench インスタンスを作成して構成する
  • Vertex AI Workbench BigQuery コネクタを使用する
  • Vertex AI Workbench カーネルでモデルをトレーニングする

このラボを Google Cloud で実行するための総費用は約 $1 です。

2. Vertex AI の概要

このラボでは、Google Cloud で利用できる最新の AI プロダクトを使用します。Vertex AI は Google Cloud 全体の ML サービスを統合してシームレスな開発エクスペリエンスを提供します。以前は、AutoML でトレーニングしたモデルやカスタムモデルには、個別のサービスを介してアクセスする必要がありました。Vertex AI は、これらの個別のサービスを他の新しいプロダクトとともに 1 つの API へと結合します。既存のプロジェクトを Vertex AI に移行することもできます。

Vertex AI には、エンドツーエンドの ML ワークフローをサポートするさまざまなプロダクトが含まれています。このラボでは、Vertex AI Workbench について学習します。

Vertex AI Workbench は、データサービス(Dataproc、Dataflow、BigQuery、Dataplex など)や Vertex AI との緊密なインテグレーションを通じて、ノートブック ベースのエンドツーエンドのワークフローをすばやく構築するのに便利です。データ サイエンティストは Vertex AI Workbench を使用して、GCP データサービスへの接続、データセットの分析、各種モデリング手法のテスト、トレーニング済みモデルの本番環境への導入、モデル ライフサイクルを通した MLOps の管理を行うことができます。

3. ユースケースの概要

このラボでは、London Bicycles Hire データセットを探索します。このデータには、2011 年以降のロンドンの公共自転車シェアリング プログラムの自転車利用状況に関する情報が含まれています。まず、Vertex AI Workbench BigQuery コネクタを使用して、BigQuery でこのデータセットを探索します。次に、pandas を使用してデータを Jupyter ノートブックに読み込み、TensorFlow モデルをトレーニングして、サイクリング トリップの発生日時とサイクリングした距離に基づいてサイクリング トリップの所要時間を予測します。

このラボでは、Keras 前処理レイヤを使用して、モデル トレーニング用の入力データを変換して準備します。この API を使用すると、前処理を TensorFlow モデルグラフに直接構築できます。これにより、トレーニング データとサービング データが同じ変換を受けるようにすることで、トレーニング/サービング スキューのリスクを軽減できます。TensorFlow 2.6 時点では、この API は安定しています。古いバージョンの TensorFlow を使用している場合は、試験運用版のシンボルをインポートする必要があります。

4. 環境の設定

この Codelab を実行するには、課金が有効になっている Google Cloud Platform プロジェクトが必要です。プロジェクトを作成するには、こちらの手順を行ってください。

ステップ 1: Compute Engine API を有効にする

まだ有効になっていない場合は、[Compute Engine] に移動して [有効にする] を選択します。

ステップ 2: Vertex AI API を有効にする

Cloud コンソールの [Vertex AI] セクションに移動し、[Vertex AI API を有効にする] をクリックします。

Vertex AI ダッシュボード

ステップ 3: Vertex AI Workbench インスタンスを作成する

Cloud Console の [Vertex AI] セクションで [ワークベンチ] をクリックします。

Vertex AI メニュー

Notebooks API をまだ有効にしていない場合は、有効にします。

Notebook_api

有効にしたら、[マネージド ノートブック] をクリックします。

Notebooks_UI

[新しいノートブック] を選択します。

new_notebook

ノートブックの名前を指定して、[権限] で [Service account] を選択します。

service_account

[詳細設定] を選択します。

まだ有効になっていない場合は、[セキュリティ] で、[Enable terminal] を選択します。

enable_terminal

詳細設定のその他の設定はそのままで構いません。

[作成] をクリックします。

インスタンスが作成されたら、[JUPYTERLAB を開く] を選択します。

enable_terminal

5. BigQuery でデータセットを探索する

Vertex AI Workbench インスタンスで左側に移動し、[BigQuery in Notebooks] コネクタをクリックします。

BQ コネクタ

BigQuery コネクタを使用すると、BigQuery データセットを簡単に探索してクエリできます。プロジェクト内のデータセットに加えて、[プロジェクトを追加] ボタンをクリックして、他のプロジェクト内のデータセットを探索することもできます。

固定

このラボでは、BigQuery の一般公開データセットのデータを使用します。london_bicycles データセットが見つかるまで下にスクロールします。このデータセットには、cycle_hirecycle_stations の 2 つのテーブルがあります。それぞれ見ていきましょう

london_bike_ds

まず、cycle_hire テーブルをダブルクリックします。テーブルが新しいタブとして開き、テーブルのスキーマと、行数やサイズなどのメタデータが表示されます。

cycle_hire_ds

[プレビュー] タブをクリックすると、データのサンプルが表示されます。簡単なクエリを実行して、人気のルートを確認しましょう。まず、[テーブルをクエリ] ボタンをクリックします。

cycle_hire_preview_ds

次に、SQL エディタに以下を貼り付け、[Submit Query] をクリックします。

SELECT
  start_station_name,
  end_station_name,
  IF(start_station_name = end_station_name,
    TRUE,
    FALSE) same_station,
  AVG(duration) AS avg_duration,
  COUNT(*) AS total_rides
FROM
  `bigquery-public-data.london_bicycles.cycle_hire`
GROUP BY
  start_station_name,
  end_station_name,
  same_station
ORDER BY
  total_rides DESC

クエリの結果から、Hyde Park Corner 駅との間の自転車ルートが一番人気であることがわかります。

journey_query_results

次に、各ステーションに関する情報を提供する cycle_stations テーブルをダブルクリックします。

cycle_hire テーブルと cycle_stations テーブルを結合します。cycle_stations テーブルには、各ステーションの緯度と経度が含まれています。この情報を使用して、出発地と目的地の間の距離を計算し、各自転車ルートで走行した距離を推定します。

この計算を行うには、BigQuery の地理関数を使用します。具体的には、各緯度/経度の文字列を ST_GEOGPOINT に変換し、ST_DISTANCE 関数を使用して 2 点間の直線距離をメートル単位で計算します。この値は、各自転車ルートでの移動距離の代用として使用します。

次のクエリを SQL エディタにコピーし、[クエリを送信] をクリックします。サイクルの出発地と終点の両方の緯度と経度を取得するには、stations テーブルを 2 回結合する必要があるため、JOIN 条件に 3 つのテーブルが含まれていることに注意してください。

WITH staging AS (
    SELECT
        STRUCT(
            start_stn.name,
            ST_GEOGPOINT(start_stn.longitude, start_stn.latitude) AS POINT,
            start_stn.docks_count,
            start_stn.install_date
        ) AS starting,
        STRUCT(
            end_stn.name,
            ST_GEOGPOINT(end_stn.longitude, end_stn.latitude) AS point,
            end_stn.docks_count,
            end_stn.install_date
        ) AS ending,
        STRUCT(
            rental_id,
            bike_id,
            duration, --seconds
            ST_DISTANCE(
                ST_GEOGPOINT(start_stn.longitude, start_stn.latitude),
                ST_GEOGPOINT(end_stn.longitude, end_stn.latitude)
            ) AS distance, --meters
            start_date,
            end_date
        ) AS bike
        FROM `bigquery-public-data.london_bicycles.cycle_stations` AS start_stn
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_hire` as b
        ON start_stn.id = b.start_station_id
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_stations` AS end_stn
        ON end_stn.id = b.end_station_id
        LIMIT 700000)

SELECT * from STAGING

6. TensorFlow カーネルで ML モデルをトレーニングする

Vertex AI Workbench には、TensorFlow、PySpark、R などのカーネルを 1 つのノートブック インスタンスからすべて起動できるコンピューティング互換レイヤがあります。このラボでは、TensorFlow カーネルを使用してノートブックを作成します。

DataFrame を作成する

クエリが実行されたら、[DataFrame のコードのコピー] をクリックします。これにより、BigQuery クライアントに接続し、このデータを pandas DataFrame として抽出する Python コードをノートブックに貼り付けることができます。

copy_for_df

次に、Launcher に戻って TensorFlow 2 ノートブックを作成します。

tf_kernel

クエリエディタからコピーしたコードをノートブックの最初のセルに貼り付けます。次のようになります。

# The following two lines are only necessary to run once.
# Comment out otherwise for speed-up.
from google.cloud.bigquery import Client, QueryJobConfig
client = Client()

query = """WITH staging AS (
    SELECT
        STRUCT(
            start_stn.name,
            ST_GEOGPOINT(start_stn.longitude, start_stn.latitude) AS POINT,
            start_stn.docks_count,
            start_stn.install_date
        ) AS starting,
        STRUCT(
            end_stn.name,
            ST_GEOGPOINT(end_stn.longitude, end_stn.latitude) AS point,
            end_stn.docks_count,
            end_stn.install_date
        ) AS ending,
        STRUCT(
            rental_id,
            bike_id,
            duration, --seconds
            ST_DISTANCE(
                ST_GEOGPOINT(start_stn.longitude, start_stn.latitude),
                ST_GEOGPOINT(end_stn.longitude, end_stn.latitude)
            ) AS distance, --meters
            start_date,
            end_date
        ) AS bike
        FROM `bigquery-public-data.london_bicycles.cycle_stations` AS start_stn
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_hire` as b 
        ON start_stn.id = b.start_station_id
        LEFT JOIN `bigquery-public-data.london_bicycles.cycle_stations` AS end_stn
        ON end_stn.id = b.end_station_id
        LIMIT 700000)

SELECT * from STAGING"""
job = client.query(query)
df = job.to_dataframe()

このラボでは、トレーニング時間を短縮するために、データセットを 700, 000 に制限します。クエリを変更して、データセット全体を試すこともできます。

次に、必要なライブラリをインポートします。

from datetime import datetime
import pandas as pd
import tensorflow as tf

次のコードを実行して、この演習の ML 部分に必要な列のみを含む縮小されたデータフレームを作成します。

values = df['bike'].values
duration = list(map(lambda a: a['duration'], values))
distance = list(map(lambda a: a['distance'], values))
dates = list(map(lambda a: a['start_date'], values))
data = pd.DataFrame(data={'duration': duration, 'distance': distance, 'start_date':dates})
data = data.dropna()

start_date 列は Python の datetime です。この datetime をモデルで直接使用する代わりに、自転車の移動が発生した曜日と時刻を示す 2 つの新しい特徴量を作成します。

data['weekday'] = data['start_date'].apply(lambda a: a.weekday())
data['hour'] = data['start_date'].apply(lambda a: a.time().hour)
data = data.drop(columns=['start_date'])

最後に、わかりやすくなるように [時間] 列を秒から分に変換します。

data['duration'] = data['duration'].apply(lambda x:float(x / 60))

フォーマットされた DataFrame の最初の数行を確認します。これで、自転車ルートごとに、ルートが行われた曜日と時刻、および移動距離に関するデータが手に入ります。この情報から、旅行に要した時間を予測します。

data.head()

data_head

モデルを作成してトレーニングする前に、データをトレーニング セットと検証セットに分割する必要があります。

# Use 80/20 train/eval split
train_size = int(len(data) * .8)
print ("Train size: %d" % train_size)
print ("Evaluation size: %d" % (len(data) - train_size))

# Split data into train and test sets
train_data = data[:train_size]
val_data = data[train_size:]

TensorFlow モデルを作成する

Keras Functional API を使用して TensorFlow モデルを作成します。入力データを前処理するには、Keras 前処理レイヤ API を使用します。

次のユーティリティ関数は、pandas DataFrame から tf.data.Dataset を作成します。

def df_to_dataset(dataframe, label, shuffle=True, batch_size=32):
  dataframe = dataframe.copy()
  labels = dataframe.pop(label)
  ds = tf.data.Dataset.from_tensor_slices((dict(dataframe), labels))
  if shuffle:
    ds = ds.shuffle(buffer_size=len(dataframe))
  ds = ds.batch(batch_size)
  ds = ds.prefetch(batch_size)
  return ds

上記の関数を使用して、トレーニング用と検証用の 2 つの tf.data.Dataset を作成します。警告が表示されることがありますが、無視してかまいません。

train_dataset = df_to_dataset(train_data, 'duration')
validation_dataset = df_to_dataset(val_data, 'duration')

モデルでは、次の前処理レイヤを使用します。

  • Normalization レイヤ: 入力特徴を特徴単位で正規化します。
  • IntegerLookup レイヤ: 整数カテゴリ値を整数インデックスに変換します。
  • CategoryEncoding レイヤ: 整数カテゴリ特徴量をワンホット、マルチホット、TF-IDF 密表現に変換します。

これらのレイヤはトレーニング不可です。代わりに、adapt() メソッドを介して前処理レイヤをトレーニング データに公開することで、前処理レイヤの状態を設定します。

次の関数は、距離特徴で使用できる正規化レイヤを作成します。モデルをフィットする前に、トレーニング データに adapt() メソッドを使用して状態を設定します。これにより、正規化に使用する平均と分散が計算されます。後で検証データセットをモデルに渡すときに、トレーニング データで計算されたこの平均と分散が検証データのスケーリングに使用されます。

def get_normalization_layer(name, dataset):
  # Create a Normalization layer for our feature.
  normalizer = tf.keras.layers.Normalization(axis=None)

  # Prepare a Dataset that only yields our feature.
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the statistics of the data.
  normalizer.adapt(feature_ds)

  return normalizer

同様に、次の関数は、時間および平日の特徴で使用するカテゴリのエンコードを作成します。

def get_category_encoding_layer(name, dataset, dtype, max_tokens=None):
  index = tf.keras.layers.IntegerLookup(max_tokens=max_tokens)

  # Prepare a Dataset that only yields our feature
  feature_ds = dataset.map(lambda x, y: x[name])

  # Learn the set of possible values and assign them a fixed integer index.
  index.adapt(feature_ds)

  # Create a Discretization for our integer indices.
  encoder = tf.keras.layers.CategoryEncoding(num_tokens=index.vocabulary_size())

  # Apply one-hot encoding to our indices. The lambda function captures the
  # layer so we can use them, or include them in the functional model later.
  return lambda feature: encoder(index(feature))

次に、モデルの前処理部分を作成します。まず、各特徴に tf.keras.Input レイヤを作成します。

# Create a Keras input layer for each feature
numeric_col = tf.keras.Input(shape=(1,), name='distance')
hour_col = tf.keras.Input(shape=(1,), name='hour', dtype='int64')
weekday_col = tf.keras.Input(shape=(1,), name='weekday', dtype='int64')

次に、正規化レイヤとカテゴリ エンコード レイヤを作成し、リストに保存します。

all_inputs = []
encoded_features = []

# Pass 'distance' input to normalization layer
normalization_layer = get_normalization_layer('distance', train_dataset)
encoded_numeric_col = normalization_layer(numeric_col)
all_inputs.append(numeric_col)
encoded_features.append(encoded_numeric_col)

# Pass 'hour' input to category encoding layer
encoding_layer = get_category_encoding_layer('hour', train_dataset, dtype='int64')
encoded_hour_col = encoding_layer(hour_col)
all_inputs.append(hour_col)
encoded_features.append(encoded_hour_col)

# Pass 'weekday' input to category encoding layer
encoding_layer = get_category_encoding_layer('weekday', train_dataset, dtype='int64')
encoded_weekday_col = encoding_layer(weekday_col)
all_inputs.append(weekday_col)
encoded_features.append(encoded_weekday_col)

前処理レイヤを定義したら、残りのモデルを定義できます。すべての入力特徴を連結して、密結合レイヤに渡します。回帰問題であるため、出力レイヤは単一のユニットです。

all_features = tf.keras.layers.concatenate(encoded_features)
x = tf.keras.layers.Dense(64, activation="relu")(all_features)
output = tf.keras.layers.Dense(1)(x)
model = tf.keras.Model(all_inputs, output)

最後に、モデルをコンパイルします。

model.compile(optimizer = tf.keras.optimizers.Adam(0.001),
              loss='mean_squared_logarithmic_error')

モデルの定義が完了したので、アーキテクチャを可視化できます。

tf.keras.utils.plot_model(model, show_shapes=True, rankdir="LR")

keras_model

この単純なデータセットでは、このモデルはかなり複雑です。これはデモ用です。

コードが実行されることを確認するため、1 エポックをトレーニングします。

model.fit(train_dataset, validation_data = validation_dataset, epochs = 1)

GPU を使用したモデルのトレーニング

次に、モデルのトレーニング期間を延長し、ハードウェア スイッチャーを使用してトレーニングを高速化します。Vertex AI Workbench では、インスタンスをシャットダウンすることなくハードウェアを変更できます。GPU は必要なときにのみ追加することで、費用を抑えることができます。

ハードウェア プロファイルを変更するには、右上のマシンタイプをクリックして [ハードウェアを変更] を選択します。

modify_hardware

[GPU を接続] を選択し、NVIDIA T4 Tensor Core GPU を選択します。

add_gpu

ハードウェアの構成には約 5 分ほどかかります。処理が完了したら、モデルをもう少しトレーニングします。各エポックにかかる時間が短くなりました。

model.fit(train_dataset, validation_data = validation_dataset, epochs = 5)

お疲れさまでした

Vertex AI Workbench を使用して次のことを行う方法を学びました。

  • BigQuery でデータを探索する
  • BigQuery クライアントを使用して Python にデータを読み込む
  • Keras 前処理レイヤと GPU を使用して TensorFlow モデルをトレーニングする

Vertex AI のさまざまな機能の詳細については、こちらのドキュメントをご覧ください。

7. クリーンアップ

ノートブックは、アイドル状態で 60 分が経過するとタイムアウトするように構成されています。このため、インスタンスのシャットダウンを心配する必要はありません。インスタンスを手動でシャットダウンする場合は、Console で [Vertex AI] の [ワークベンチ] セクションにある [停止] ボタンをクリックします。ノートブックを完全に削除する場合は、[削除] ボタンをクリックします。

delete